diff --git a/orte/mca/grpcomm/bad/grpcomm_bad_module.c b/orte/mca/grpcomm/bad/grpcomm_bad_module.c index 3e6bdf8a2b..da74fb2c8e 100644 --- a/orte/mca/grpcomm/bad/grpcomm_bad_module.c +++ b/orte/mca/grpcomm/bad/grpcomm_bad_module.c @@ -52,6 +52,9 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf); static int barrier(void); static int onesided_barrier(void); static int modex(opal_list_t *procs); +static int orte_grpcomm_bad_get_proc_attr(const orte_process_name_t proc, + const char * attribute_name, void **val, + size_t *size); /* Module def */ orte_grpcomm_base_module_t orte_grpcomm_bad_module = { @@ -63,7 +66,7 @@ orte_grpcomm_base_module_t orte_grpcomm_bad_module = { barrier, onesided_barrier, orte_grpcomm_base_set_proc_attr, - orte_grpcomm_base_get_proc_attr, + orte_grpcomm_bad_get_proc_attr, modex, orte_grpcomm_base_purge_proc_attrs }; @@ -488,8 +491,37 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf) return ORTE_SUCCESS; } +static int +orte_grpcomm_bad_get_proc_attr(const orte_process_name_t proc, + const char * attribute_name, void **val, + size_t *size) +{ + if( false == allgather_complete ) { + ORTE_PROGRESSED_WAIT(allgather_complete, 0, 1); + } + return orte_grpcomm_base_get_proc_attr(proc, attribute_name, val, size); +} /*** MODEX SECTION ***/ +static void allgather_recv_modex(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + opal_buffer_t *allgather_buf = (opal_buffer_t*)cbdata; + int rc; + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm:bad modex received", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + if( ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_unpack(buffer, true)) ) { + ORTE_ERROR_LOG(rc); + } + OBJ_RELEASE(allgather_buf); + + allgather_complete = true; +} + static int modex(opal_list_t *procs) { int rc; @@ -499,9 +531,83 @@ static int modex(opal_list_t *procs) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); if (NULL == procs) { - if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(true))) { + /* The modex will be realized in the background by the daemons. The processes will + * only be informed when all data has been collected from all processes. The get_attr + * will realize the blocking, it will not return until the data has been rteceived. + */ + opal_buffer_t *buf, *rbuf; + orte_grpcomm_coll_t coll_type = ORTE_GRPCOMM_ALLGATHER; + bool modex_reqd = true; + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm:bad:peer:modex: performing modex", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* setup the buffer that will actually be sent */ + buf = OBJ_NEW(opal_buffer_t); + rbuf = OBJ_NEW(opal_buffer_t); + + /* tell the daemon we are doing an allgather */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &coll_type, 1, ORTE_GRPCOMM_COLL_T))) { ORTE_ERROR_LOG(rc); + return rc; + } + + /* put our process name in the buffer so it can be unpacked later */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; } + + if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &orte_process_info.arch, 1, OPAL_UINT32))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack the entries we have received */ + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(buf, &modex_reqd))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s grpcomm:bad:peer:modex: executing non-blocking allgather", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* send to local daemon */ + if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, buf, ORTE_RML_TAG_DAEMON_COLLECTIVE, 0))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s grpcomm:bad allgather buffer sent", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* now receive the final result. Be sure to do this in + * a manner that allows us to return without being in a recv! + */ + allgather_complete = false; + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER, + ORTE_RML_NON_PERSISTENT, allgather_recv_modex, (void*)rbuf); + if (rc != ORTE_SUCCESS) { + ORTE_ERROR_LOG(rc); + return rc; + } + rbuf = NULL; /* make sure we don't release it yet */ + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm:bad: modex posted", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + cleanup: + if( NULL != buf ) { + OBJ_RELEASE(buf); + } + if( NULL != rbuf ) { + OBJ_RELEASE(rbuf); + } + + return rc; } else { if (ORTE_SUCCESS != (rc = orte_grpcomm_base_full_modex(procs, true))) { ORTE_ERROR_LOG(rc);