diff --git a/ompi/runtime/ompi_mpi_finalize.c b/ompi/runtime/ompi_mpi_finalize.c index 557ba24052..bf98a99726 100644 --- a/ompi/runtime/ompi_mpi_finalize.c +++ b/ompi/runtime/ompi_mpi_finalize.c @@ -150,8 +150,8 @@ int ompi_mpi_finalize(void) /* * Wait for everyone to get here */ - if (ORTE_SUCCESS != (ret = orte_rml.xcast(NULL, NULL, 0, NULL, - orte_gpr.deliver_notify_msg, NULL))) { + if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, false, + NULL, orte_gpr.deliver_notify_msg))) { ORTE_ERROR_LOG(ret); return ret; } @@ -291,8 +291,8 @@ int ompi_mpi_finalize(void) * the RTE while the smr is trying to do the update - which causes * an ugly race condition */ - if (ORTE_SUCCESS != (ret = orte_rml.xcast(NULL, NULL, 0, NULL, - orte_gpr.deliver_notify_msg, NULL))) { + if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, false, + NULL, orte_gpr.deliver_notify_msg))) { ORTE_ERROR_LOG(ret); return ret; } diff --git a/ompi/runtime/ompi_mpi_init.c b/ompi/runtime/ompi_mpi_init.c index 2d8c93c0f5..ab2459e334 100644 --- a/ompi/runtime/ompi_mpi_init.c +++ b/ompi/runtime/ompi_mpi_init.c @@ -523,9 +523,13 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) (ompistop.tv_usec - ompistart.tv_usec))); } - /* FIRST BARRIER - WAIT FOR MSG FROM RMGR_PROC_STAGE_GATE_MGR TO ARRIVE */ - if (ORTE_SUCCESS != (ret = orte_rml.xcast(NULL, NULL, 0, NULL, - orte_gpr.deliver_notify_msg, NULL))) { + /* FIRST BARRIER - WAIT FOR MSG FROM RMGR_PROC_STAGE_GATE_MGR TO ARRIVE. + * We pass a "process_first" flag of "true" to indicate that we need to + * process the STG1 message prior to sending it along the xcast chain + * as this message contains all the oob contact info we need! + */ + if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, true, + NULL, orte_gpr.deliver_notify_msg))) { ORTE_ERROR_LOG(ret); error = "ompi_mpi_init: failed to see all procs register\n"; goto error; @@ -640,8 +644,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) /* Second barrier -- wait for message from RMGR_PROC_STAGE_GATE_MGR to arrive */ - if (ORTE_SUCCESS != (ret = orte_rml.xcast(NULL, NULL, 0, NULL, - orte_gpr.deliver_notify_msg, NULL))) { + if (ORTE_SUCCESS != (ret = orte_rml.xcast(ORTE_PROC_MY_NAME->jobid, false, + NULL, orte_gpr.deliver_notify_msg))) { ORTE_ERROR_LOG(ret); error = "ompi_mpi_init: failed to see all procs register\n"; goto error; diff --git a/orte/mca/gpr/proxy/gpr_proxy_compound_cmd.c b/orte/mca/gpr/proxy/gpr_proxy_compound_cmd.c index 5558fcd59f..3e5ee3ab79 100644 --- a/orte/mca/gpr/proxy/gpr_proxy_compound_cmd.c +++ b/orte/mca/gpr/proxy/gpr_proxy_compound_cmd.c @@ -114,11 +114,6 @@ int orte_gpr_proxy_exec_compound_cmd(void) OPAL_THREAD_LOCK(&orte_gpr_proxy_globals.wait_for_compound_mutex); rc = ORTE_SUCCESS; - if (orte_gpr_proxy_globals.timing) { - opal_output(0, "gpr_proxy [%ld]: compound cmd size %lu", (long)ORTE_PROC_MY_NAME->vpid, - (unsigned long)orte_gpr_proxy_globals.compound_cmd->bytes_used); - } - if (0 > orte_rml.send_buffer(orte_process_info.gpr_replica, orte_gpr_proxy_globals.compound_cmd, ORTE_RML_TAG_GPR, 0)) { ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); rc = ORTE_ERR_COMM_FAILURE; diff --git a/orte/mca/oob/base/base.h b/orte/mca/oob/base/base.h index 8a1ac8468a..00ae62ebc1 100644 --- a/orte/mca/oob/base/base.h +++ b/orte/mca/oob/base/base.h @@ -400,12 +400,10 @@ ORTE_DECLSPEC int mca_oob_recv_packed_nb( * continuing to forward data along the distribution tree. */ -ORTE_DECLSPEC int mca_oob_xcast( - orte_process_name_t* root, - orte_process_name_t* peers, - orte_std_cntr_t num_peers, - orte_buffer_t* buffer, - orte_gpr_trigger_cb_fn_t cbfunc); +ORTE_DECLSPEC int mca_oob_xcast(orte_jobid_t job, + bool process_first, + orte_buffer_t* buffer, + orte_gpr_trigger_cb_fn_t cbfunc); /* * Callback on exception condition. diff --git a/orte/mca/oob/base/oob_base_xcast.c b/orte/mca/oob/base/oob_base_xcast.c index 01ec3f5b5f..8ade7f4604 100644 --- a/orte/mca/oob/base/oob_base_xcast.c +++ b/orte/mca/oob/base/oob_base_xcast.c @@ -26,6 +26,7 @@ #include "opal/threads/condition.h" #include "opal/util/output.h" +#include "opal/util/bit_ops.h" #include "orte/util/proc_info.h" #include "orte/dss/dss.h" @@ -38,139 +39,137 @@ /** - * A "broadcast-like" function over the specified set of peers. - * @param root The process acting as the root of the broadcast. - * @param peers The list of processes receiving the broadcast (excluding root). + * A "broadcast-like" function to a job's processes. + * @param jobid The job whose processes are to receive the message + * @param process_first Whether or not to process the message locally before sending it on * @param buffer The data to broadcast - only significant at root. * @param cbfunc Callback function on receipt of data - not significant at root. - * - * Note that the callback function is provided so that the data can be - * received and interpreted by the application prior to the broadcast - * continuing to forward data along the distribution tree. */ +static opal_mutex_t xcastmutex; +static int xcast_bitmap, bitmap_save; +static bool bitmap_init = false; -struct mca_oob_xcast_t { - opal_object_t super; - opal_mutex_t mutex; - opal_condition_t cond; - orte_std_cntr_t counter; -}; -typedef struct mca_oob_xcast_t mca_oob_xcast_t; - -static void mca_oob_xcast_construct(mca_oob_xcast_t* xcast) -{ - OBJ_CONSTRUCT(&xcast->mutex, opal_mutex_t); - OBJ_CONSTRUCT(&xcast->cond, opal_condition_t); -} - -static void mca_oob_xcast_destruct(mca_oob_xcast_t* xcast) -{ - OBJ_DESTRUCT(&xcast->mutex); - OBJ_DESTRUCT(&xcast->cond); -} - -static OBJ_CLASS_INSTANCE( - mca_oob_xcast_t, - opal_object_t, - mca_oob_xcast_construct, - mca_oob_xcast_destruct); - -static void mca_oob_xcast_cb(int status, orte_process_name_t* peer, orte_buffer_t* buffer, int tag, void* cbdata) -{ - mca_oob_xcast_t* xcast = (mca_oob_xcast_t*)cbdata; - OPAL_THREAD_LOCK(&xcast->mutex); - if(--xcast->counter == 0) { - opal_condition_signal(&xcast->cond); - } - OPAL_THREAD_UNLOCK(&xcast->mutex); -} - - -int mca_oob_xcast( - orte_process_name_t* root, - orte_process_name_t* peers, - orte_std_cntr_t num_peers, - orte_buffer_t* buffer, - orte_gpr_trigger_cb_fn_t cbfunc) +int mca_oob_xcast(orte_jobid_t job, + bool process_first, + orte_buffer_t* buffer, + orte_gpr_trigger_cb_fn_t cbfunc) { orte_std_cntr_t i; int rc; int tag = ORTE_RML_TAG_XCAST; - int status; - orte_proc_state_t state; - struct timeval sendstart, sendstop; + int peer, size, rank, hibit, mask; + orte_buffer_t rbuf, sbuf; + orte_gpr_notify_message_t *msg; + orte_process_name_t target; - /* check to see if I am the root process name */ - if(NULL != root && ORTE_EQUAL == orte_dss.compare(root, orte_process_info.my_name, ORTE_NAME)) { - mca_oob_xcast_t *xcast = OBJ_NEW(mca_oob_xcast_t); - xcast->counter = num_peers; - - /* check for timing request - if so, we want to printout the size of the message being sent */ - if (orte_oob_base_timing) { - opal_output(0, "oob_xcast: message size is %lu bytes", (unsigned long)buffer->bytes_used); - gettimeofday(&sendstart, NULL); - } - - for(i=0; imutex); - while(xcast->counter > 0) { - opal_condition_wait(&xcast->cond, &xcast->mutex); - } - - /* check for timing request - if so, report comm time */ - if (orte_oob_base_timing) { - gettimeofday(&sendstop, NULL); - opal_output(0, "oob_xcast: time to send was %ld usec", - (long int)((sendstop.tv_sec - sendstart.tv_sec)*1000000 + - (sendstop.tv_usec - sendstart.tv_usec))); - } - - OPAL_THREAD_UNLOCK(&xcast->mutex); - OBJ_RELEASE(xcast); - - } else { - orte_buffer_t rbuf; - orte_gpr_notify_message_t *msg; - - OBJ_CONSTRUCT(&rbuf, orte_buffer_t); - rc = mca_oob_recv_packed(ORTE_NAME_WILDCARD, &rbuf, tag); - if(rc < 0) { - OBJ_DESTRUCT(&rbuf); + target.cellid = ORTE_PROC_MY_NAME->cellid; + target.jobid = job; + target.vpid = 0; + if (0 > (rc = mca_oob_send_packed(&target, buffer, tag, 0))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&xcastmutex); + OBJ_DESTRUCT(&xcastmutex); return rc; } - if (cbfunc != NULL) { - msg = OBJ_NEW(orte_gpr_notify_message_t); - if (NULL == msg) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } - i=1; - if (ORTE_SUCCESS != (rc = orte_dss.unpack(&rbuf, &msg, &i, ORTE_GPR_NOTIFY_MSG))) { + + OPAL_THREAD_UNLOCK(&xcastmutex); + OBJ_DESTRUCT(&xcastmutex); + + return ORTE_SUCCESS; + + } + + + /* this process is one of the application procs - accordingly, it will + * receive the message from its "parent" in the broadcast tree, and + * then send it along to some set of children + */ + + /* compute the bitmap, if we haven't already done so */ + if (!bitmap_init) { + bitmap_save = opal_cube_dim((int)orte_process_info.num_procs); + bitmap_init = true; + } + + xcast_bitmap = bitmap_save; + rank = (int)(ORTE_PROC_MY_NAME->vpid); + size = (int)orte_process_info.num_procs; + + hibit = opal_hibit(rank, xcast_bitmap); + --xcast_bitmap; + + /* regardless of who we are, we first have to receive the message */ + OBJ_CONSTRUCT(&rbuf, orte_buffer_t); + if (0 > (rc = mca_oob_recv_packed(ORTE_NAME_WILDCARD, &rbuf, tag))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&rbuf); + return rc; + } + + msg = OBJ_NEW(orte_gpr_notify_message_t); + if (NULL == msg) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + i=1; + if (ORTE_SUCCESS != (rc = orte_dss.unpack(&rbuf, &msg, &i, ORTE_GPR_NOTIFY_MSG))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(msg); + return rc; + } + OBJ_DESTRUCT(&rbuf); + + /* repack the message so we can send it on */ + OBJ_CONSTRUCT(&sbuf, orte_buffer_t); + if (ORTE_SUCCESS != (rc = orte_dss.pack(&sbuf, &msg, 1, ORTE_GPR_NOTIFY_MSG))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&sbuf); + return rc; + } + + /* since the OOB contact info for our peers is in the STG1 message, we have to + * process it BEFORE we can relay the message to any "children" + */ + if (cbfunc != NULL && process_first) { + /* process the message */ + cbfunc(msg); + } + + /* send data to any children */ + target.cellid = ORTE_PROC_MY_NAME->cellid; + target.jobid = ORTE_PROC_MY_NAME->jobid; + + for (i = hibit + 1, mask = 1 << i; i <= xcast_bitmap; ++i, mask <<= 1) { + peer = rank | mask; + if (peer < size) { + target.vpid = (orte_vpid_t)peer; + if (0 > (rc = mca_oob_send_packed(&target, &sbuf, tag, 0))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(msg); return rc; } - cbfunc(msg); - OBJ_RELEASE(msg); } - OBJ_DESTRUCT(&rbuf); } + OBJ_DESTRUCT(&sbuf); + + /* if it wasn't the STG1 message, then process it here */ + if (cbfunc != NULL && !process_first) { + cbfunc(msg); + } + OBJ_RELEASE(msg); + return ORTE_SUCCESS; } diff --git a/orte/mca/oob/oob.h b/orte/mca/oob/oob.h index c61ddbe278..72eb23bd72 100644 --- a/orte/mca/oob/oob.h +++ b/orte/mca/oob/oob.h @@ -196,11 +196,10 @@ typedef int (*mca_oob_base_module_fini_fn_t)(void); /** * xcast function for sending common messages to all processes */ -typedef int (*mca_oob_base_module_xcast_fn_t)(orte_process_name_t* root, - orte_process_name_t* peers, - orte_std_cntr_t num_peers, - orte_buffer_t* buffer, - orte_gpr_trigger_cb_fn_t cbfunc); +typedef int (*mca_oob_base_module_xcast_fn_t)(orte_jobid_t job, + bool process_first, + orte_buffer_t* buffer, + orte_gpr_trigger_cb_fn_t cbfunc); /** * OOB Module diff --git a/orte/mca/rmgr/base/rmgr_base_stage_gate.c b/orte/mca/rmgr/base/rmgr_base_stage_gate.c index 813fca199d..7f65d93052 100644 --- a/orte/mca/rmgr/base/rmgr_base_stage_gate.c +++ b/orte/mca/rmgr/base/rmgr_base_stage_gate.c @@ -58,12 +58,8 @@ int orte_rmgr_base_proc_stage_gate_init(orte_jobid_t job) int orte_rmgr_base_proc_stage_gate_mgr(orte_gpr_notify_message_t *msg) { orte_buffer_t buffer; - orte_process_name_t *recipients=NULL; - orte_std_cntr_t n=0; int rc; orte_jobid_t job; - opal_list_t attrs; - opal_list_item_t *item; OPAL_TRACE(1); @@ -88,23 +84,7 @@ int orte_rmgr_base_proc_stage_gate_mgr(orte_gpr_notify_message_t *msg) } OPAL_TRACE_ARG1(1, job); - - /* need the list of peers for this job so we can send them the xcast. - * obtain this list from the name service's get_job_peers function - */ - OBJ_CONSTRUCT(&attrs, opal_list_t); - if (ORTE_SUCCESS != (rc = orte_rmgr.add_attribute(&attrs, ORTE_NS_USE_JOBID, ORTE_JOBID, - &job, ORTE_RMGR_ATTR_OVERRIDE))) { - ORTE_ERROR_LOG(rc); - OBJ_DESTRUCT(&attrs); - return rc; - } - - if (ORTE_SUCCESS != (rc = orte_ns.get_peers(&recipients, &n, &attrs))) { - ORTE_ERROR_LOG(rc); - goto CLEANUP; - } - + /* set the job state to the appropriate level */ if (orte_schema.check_std_trigger_name(msg->target, ORTE_ALL_LAUNCHED_TRIGGER)) { if (ORTE_SUCCESS != (rc = orte_smr.set_job_state(job, ORTE_JOB_STATE_LAUNCHED))) { @@ -148,19 +128,12 @@ int orte_rmgr_base_proc_stage_gate_mgr(orte_gpr_notify_message_t *msg) } /* send the message */ - if (ORTE_SUCCESS != (rc = orte_rml.xcast(orte_process_info.my_name, recipients, - n, &buffer, NULL, NULL))) { + if (ORTE_SUCCESS != (rc = orte_rml.xcast(job, false, &buffer, NULL))) { ORTE_ERROR_LOG(rc); } OBJ_DESTRUCT(&buffer); CLEANUP: - if (NULL != recipients) free(recipients); - - while (NULL != (item = opal_list_remove_first(&attrs))) { - OBJ_RELEASE(item); - } - OBJ_DESTRUCT(&attrs); return rc; } diff --git a/orte/mca/rml/rml.h b/orte/mca/rml/rml.h index d4366a50de..67366787ed 100644 --- a/orte/mca/rml/rml.h +++ b/orte/mca/rml/rml.h @@ -326,13 +326,10 @@ typedef int (*orte_rml_module_recv_cancel_fn_t)(orte_process_name_t* peer, orte_ * xcast function for sending common messages to all processes */ -typedef int (*orte_rml_module_xcast_fn_t)( - orte_process_name_t* root, - orte_process_name_t* peers, - size_t num_peers, - orte_buffer_t* buffer, - orte_gpr_trigger_cb_fn_t cbfunc, - void *user_tag); +typedef int (*orte_rml_module_xcast_fn_t)(orte_jobid_t job, + bool process_first, + orte_buffer_t* buffer, + orte_gpr_trigger_cb_fn_t cbfunc); /* * Callback on exception condition.