From 851279fc9f86a412a94c4f165a54ed2ba935cc29 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Thu, 10 Apr 2008 15:35:11 +0000 Subject: [PATCH] Consolidate the daemon wireup message into the launch message. The daemons don't need their contact info prior to the launch message anyway. This not only eliminates a job-wide communication from the startup procedure, but it also resolves a race condition reported when operating across highly distributed (i.e., cross-country) networks. In such scenarios, it proved possible for a daemon to receive its launch message -before- it had received the contact info message, even though the latter had been sent first! This eliminates that problem... This commit was SVN r18126. --- orte/mca/odls/base/odls_base_default_fns.c | 72 ++++++++++++++++++++- orte/mca/plm/base/plm_base_launch_support.c | 18 ------ orte/mca/routed/tree/routed_tree.c | 8 --- orte/mca/routed/unity/routed_unity.c | 8 --- 4 files changed, 71 insertions(+), 35 deletions(-) diff --git a/orte/mca/odls/base/odls_base_default_fns.c b/orte/mca/odls/base/odls_base_default_fns.c index 894946f841..b18ff9bcb4 100644 --- a/orte/mca/odls/base/odls_base_default_fns.c +++ b/orte/mca/odls/base/odls_base_default_fns.c @@ -80,6 +80,46 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data, orte_std_cntr_t i; orte_vpid_t j; orte_vpid_t invalid_vpid=ORTE_VPID_INVALID; + opal_buffer_t *wireup; + opal_byte_object_t bo, *boptr; + int32_t numbytes; + + /* get wireup info for daemons per the selected routing module */ + wireup = OBJ_NEW(opal_buffer_t); + if (ORTE_SUCCESS != (rc = orte_routed.get_wireup_info(ORTE_PROC_MY_NAME->jobid, wireup))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(wireup); + return rc; + } + /* if anything was inserted, put it in a byte object for xmission */ + if (0 < wireup->bytes_used) { + opal_dss.unload(wireup, (void**)&bo.bytes, &numbytes); + /* pack the number of bytes required by payload */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &numbytes, 1, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(wireup); + return rc; + } + /* pack the byte object */ + bo.size = numbytes; + boptr = &bo; + if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &boptr, 1, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(wireup); + return rc; + } + /* release the data since it has now been copied into our buffer */ + free(bo.bytes); + } else { + /* pack numbytes=0 so the unpack routine remains sync'd to us */ + numbytes = 0; + if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &numbytes, 1, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(wireup); + return rc; + } + } + OBJ_RELEASE(wireup); /* get the job data pointer */ if (NULL == (jdata = orte_get_job_data_object(job))) { @@ -238,6 +278,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, char *slot_str; bool node_oversubscribed; orte_odls_job_t *jobdat; + opal_buffer_t wireup; + opal_byte_object_t *bo; + int32_t numbytes; OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output, "%s odls:constructing child list", @@ -257,7 +300,34 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, *node_included = false; *oversubscribed = false; *override_oversubscribed = false; - + + /* unpack the #bytes of daemon wireup info in the message */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &numbytes, &cnt, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* any bytes there? */ + if (0 < numbytes) { + /* unpack the byte object */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + return rc; + } + /* load it into a buffer */ + OBJ_CONSTRUCT(&wireup, opal_buffer_t); + opal_dss.load(&wireup, bo->bytes, bo->size); + /* pass it for processing */ + if (ORTE_SUCCESS != (rc = orte_routed.init_routes(ORTE_PROC_MY_NAME->jobid, &wireup))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&wireup); + return rc; + } + /* done with the buffer - dump it */ + OBJ_DESTRUCT(&wireup); + } + /* unpack the jobid we are to launch */ cnt=1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, job, &cnt, ORTE_JOBID))) { diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index 1a12e2ab42..0cbeb73be8 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -301,7 +301,6 @@ CLEANUP: int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons) { int rc; - opal_buffer_t *wireup; OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, "%s plm:base:daemon_callback", @@ -336,23 +335,6 @@ int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons) ORTE_ERROR_LOG(rc); } - /* get wireup info for daemons per the selected routing module */ - wireup = OBJ_NEW(opal_buffer_t); - if (ORTE_SUCCESS != (rc = orte_routed.get_wireup_info(ORTE_PROC_MY_NAME->jobid, wireup))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(wireup); - return rc; - } - /* if anything was inserted, send it out */ - if (0 < wireup->bytes_used) { - if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, wireup, ORTE_RML_TAG_RML_INFO_UPDATE))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(wireup); - return rc; - } - } - OBJ_RELEASE(wireup); - return ORTE_SUCCESS; } diff --git a/orte/mca/routed/tree/routed_tree.c b/orte/mca/routed/tree/routed_tree.c index e480c56144..6cc7ad5a5b 100644 --- a/orte/mca/routed/tree/routed_tree.c +++ b/orte/mca/routed/tree/routed_tree.c @@ -561,16 +561,8 @@ static int route_lost(const orte_process_name_t *route) static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf) { - orte_rml_cmd_flag_t command; int rc; - /* pack the update-RML command */ - command = ORTE_RML_UPDATE_CMD; - if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_RML_CMD))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(buf); - return rc; - } if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, buf))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(buf); diff --git a/orte/mca/routed/unity/routed_unity.c b/orte/mca/routed/unity/routed_unity.c index 91ff6b32c0..7ce803dea4 100644 --- a/orte/mca/routed/unity/routed_unity.c +++ b/orte/mca/routed/unity/routed_unity.c @@ -566,16 +566,8 @@ static int route_lost(const orte_process_name_t *route) static int get_wireup_info(orte_jobid_t job, opal_buffer_t *buf) { - orte_rml_cmd_flag_t command; int rc; - /* pack the update-RML command */ - command = ORTE_RML_UPDATE_CMD; - if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_RML_CMD))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(buf); - return rc; - } if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, buf))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(buf);