1
1

BAD grpcomm now has the ability to execute the modex offline. The MPI process

prepare the send buffer, and post the collective order to the local daemon. It
then register the callback and return fromthe modex exchange. It will only 
wait for this modex completion when the modex_recv is called. Meanwhile, the
daemon will do the allgather.

This commit was SVN r21543.
Этот коммит содержится в:
George Bosilca 2009-06-26 20:32:31 +00:00
родитель 6239e714a6
Коммит f06198a999

Просмотреть файл

@ -52,6 +52,9 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
static int barrier(void); static int barrier(void);
static int onesided_barrier(void); static int onesided_barrier(void);
static int modex(opal_list_t *procs); static int modex(opal_list_t *procs);
static int orte_grpcomm_bad_get_proc_attr(const orte_process_name_t proc,
const char * attribute_name, void **val,
size_t *size);
/* Module def */ /* Module def */
orte_grpcomm_base_module_t orte_grpcomm_bad_module = { orte_grpcomm_base_module_t orte_grpcomm_bad_module = {
@ -63,7 +66,7 @@ orte_grpcomm_base_module_t orte_grpcomm_bad_module = {
barrier, barrier,
onesided_barrier, onesided_barrier,
orte_grpcomm_base_set_proc_attr, orte_grpcomm_base_set_proc_attr,
orte_grpcomm_base_get_proc_attr, orte_grpcomm_bad_get_proc_attr,
modex, modex,
orte_grpcomm_base_purge_proc_attrs orte_grpcomm_base_purge_proc_attrs
}; };
@ -488,8 +491,37 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
return ORTE_SUCCESS; return ORTE_SUCCESS;
} }
static int
orte_grpcomm_bad_get_proc_attr(const orte_process_name_t proc,
const char * attribute_name, void **val,
size_t *size)
{
if( false == allgather_complete ) {
ORTE_PROGRESSED_WAIT(allgather_complete, 0, 1);
}
return orte_grpcomm_base_get_proc_attr(proc, attribute_name, val, size);
}
/*** MODEX SECTION ***/ /*** MODEX SECTION ***/
static void allgather_recv_modex(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
opal_buffer_t *allgather_buf = (opal_buffer_t*)cbdata;
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:bad modex received",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if( ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_unpack(buffer, true)) ) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(allgather_buf);
allgather_complete = true;
}
static int modex(opal_list_t *procs) static int modex(opal_list_t *procs)
{ {
int rc; int rc;
@ -499,9 +531,83 @@ static int modex(opal_list_t *procs)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (NULL == procs) { if (NULL == procs) {
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(true))) { /* The modex will be realized in the background by the daemons. The processes will
* only be informed when all data has been collected from all processes. The get_attr
* will realize the blocking, it will not return until the data has been rteceived.
*/
opal_buffer_t *buf, *rbuf;
orte_grpcomm_coll_t coll_type = ORTE_GRPCOMM_ALLGATHER;
bool modex_reqd = true;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:bad:peer:modex: performing modex",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* setup the buffer that will actually be sent */
buf = OBJ_NEW(opal_buffer_t);
rbuf = OBJ_NEW(opal_buffer_t);
/* tell the daemon we are doing an allgather */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &coll_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc;
}
/* put our process name in the buffer so it can be unpacked later */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
} }
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &orte_process_info.arch, 1, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* pack the entries we have received */
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(buf, &modex_reqd))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:bad:peer:modex: executing non-blocking allgather",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* send to local daemon */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, buf, ORTE_RML_TAG_DAEMON_COLLECTIVE, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:bad allgather buffer sent",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* now receive the final result. Be sure to do this in
* a manner that allows us to return without being in a recv!
*/
allgather_complete = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER,
ORTE_RML_NON_PERSISTENT, allgather_recv_modex, (void*)rbuf);
if (rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
rbuf = NULL; /* make sure we don't release it yet */
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:bad: modex posted",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
cleanup:
if( NULL != buf ) {
OBJ_RELEASE(buf);
}
if( NULL != rbuf ) {
OBJ_RELEASE(rbuf);
}
return rc;
} else { } else {
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_full_modex(procs, true))) { if (ORTE_SUCCESS != (rc = orte_grpcomm_base_full_modex(procs, true))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);