diff --git a/orte/mca/grpcomm/base/Makefile.am b/orte/mca/grpcomm/base/Makefile.am index f1336cc709..cea5eaa151 100644 --- a/orte/mca/grpcomm/base/Makefile.am +++ b/orte/mca/grpcomm/base/Makefile.am @@ -22,4 +22,7 @@ headers += \ libmca_grpcomm_la_SOURCES += \ base/grpcomm_base_close.c \ base/grpcomm_base_select.c \ - base/grpcomm_base_open.c \ No newline at end of file + base/grpcomm_base_open.c \ + base/grpcomm_base_allgather.c \ + base/grpcomm_base_barrier.c \ + base/grpcomm_base_modex.c diff --git a/orte/mca/grpcomm/base/base.h b/orte/mca/grpcomm/base/base.h index 6b233109ca..79d352d14a 100644 --- a/orte/mca/grpcomm/base/base.h +++ b/orte/mca/grpcomm/base/base.h @@ -28,6 +28,8 @@ #include "opal/class/opal_list.h" #include "opal/mca/mca.h" +#include "opal/threads/mutex.h" +#include "opal/threads/condition.h" #include "orte/mca/grpcomm/grpcomm.h" @@ -53,6 +55,27 @@ ORTE_DECLSPEC extern bool mca_grpcomm_base_selected; ORTE_DECLSPEC extern opal_list_t mca_grpcomm_base_components_available; ORTE_DECLSPEC extern orte_grpcomm_base_component_t mca_grpcomm_base_selected_component; + +/* + * Base functions + */ +ORTE_DECLSPEC int orte_grpcomm_base_allgather_list(opal_list_t *names, + opal_buffer_t *sbuf, + opal_buffer_t *rbuf); +ORTE_DECLSPEC int orte_grpcomm_base_allgather(opal_buffer_t *sbuf, + opal_buffer_t *rbuf); +ORTE_DECLSPEC int orte_grpcomm_base_barrier(void); +ORTE_DECLSPEC int orte_grpcomm_base_set_proc_attr(const char *attr_name, + const void *data, + size_t size); +ORTE_DECLSPEC int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc, + const char * attribute_name, void **val, + size_t *size); +ORTE_DECLSPEC int orte_grpcomm_base_modex(opal_list_t *procs); +ORTE_DECLSPEC int orte_grpcomm_base_purge_proc_attrs(void); +ORTE_DECLSPEC int orte_grpcomm_base_modex_init(void); +ORTE_DECLSPEC void orte_grpcomm_base_modex_finalize(void); + /* * external API functions will be documented in the mca/grpcomm/grpcomm.h file */ diff --git a/orte/mca/grpcomm/base/grpcomm_base_allgather.c b/orte/mca/grpcomm/base/grpcomm_base_allgather.c new file mode 100644 index 0000000000..f9ba674709 --- /dev/null +++ b/orte/mca/grpcomm/base/grpcomm_base_allgather.c @@ -0,0 +1,437 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" +#include "orte/constants.h" +#include "orte/types.h" + +#include +#ifdef HAVE_SYS_TIME_H +#include +#endif /* HAVE_SYS_TIME_H */ + +#include "opal/threads/condition.h" +#include "opal/util/output.h" + +#include "orte/util/proc_info.h" +#include "opal/dss/dss.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/odls/odls_types.h" +#include "orte/mca/rml/rml.h" +#include "orte/runtime/orte_globals.h" +#include "orte/util/name_fns.h" +#include "orte/orted/orted.h" +#include "orte/runtime/orte_wait.h" + +#include "orte/mca/grpcomm/base/base.h" + +static bool allgather_failed; +static orte_std_cntr_t allgather_num_recvd; +static opal_buffer_t *allgather_buf; + +static void allgather_server_recv(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + int rc; + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather buffer received from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(sender))); + + /* append this data to the allgather_buf */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(allgather_buf, buffer))) { + ORTE_ERROR_LOG(rc); + allgather_failed = true; + return; + } + + /* bump the counter */ + ++allgather_num_recvd; + + /* reissue the recv */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER, + ORTE_RML_NON_PERSISTENT, allgather_server_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + allgather_failed = true; + } +} + +static void allgather_client_recv(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + int rc; + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s grpcomm:base: allgather buffer received", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* transfer the buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(allgather_buf, buffer))) { + ORTE_ERROR_LOG(rc); + allgather_failed = true; + } + + /* bump the counter */ + ++allgather_num_recvd; +} + +int orte_grpcomm_base_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf) +{ + orte_process_name_t name; + int rc; + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: entering allgather", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* everything happens within my jobid */ + name.jobid = ORTE_PROC_MY_NAME->jobid; + + /*** RANK != 0 ***/ + if (0 != ORTE_PROC_MY_NAME->vpid) { + /* everyone but rank=0 sends data */ + name.vpid = 0; + if (0 > orte_rml.send_buffer(&name, sbuf, ORTE_RML_TAG_ALLGATHER_SERVER, 0)) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + return ORTE_ERR_COMM_FAILURE; + } + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather buffer sent", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* setup the buffer that will recv the results */ + allgather_buf = OBJ_NEW(opal_buffer_t); + + /* now receive the final result from rank=0. Be sure to do this in + * a manner that allows us to return without being in a recv! + */ + allgather_num_recvd = 0; + allgather_failed = false; + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_CLIENT, + ORTE_RML_NON_PERSISTENT, allgather_client_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, 1); + + /* if the allgather failed, return an error */ + if (allgather_failed) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + OBJ_RELEASE(allgather_buf); + return ORTE_ERR_COMM_FAILURE; + } + + /* copy payload to the caller's buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, allgather_buf))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(allgather_buf); + return rc; + } + OBJ_RELEASE(allgather_buf); + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather buffer received", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + return ORTE_SUCCESS; + } + + + /*** RANK = 0 ***/ + /* seed the outgoing buffer with the num_procs so it can be unpacked */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(rbuf, &orte_process_info.num_procs, 1, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* put my own information into the outgoing buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, sbuf))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather collecting buffers", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* setup the recv conditions */ + allgather_failed = false; + allgather_num_recvd = 0; + + /* setup the buffer that will recv the results */ + allgather_buf = OBJ_NEW(opal_buffer_t); + + /* post the non-blocking recv */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER, + ORTE_RML_NON_PERSISTENT, allgather_server_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, (orte_std_cntr_t)orte_process_info.num_procs-1); + + /* cancel the lingering recv */ + if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(allgather_buf); + return rc; + } + + /* if the allgather failed, say so */ + if (allgather_failed) { + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather failed!", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + OBJ_RELEASE(allgather_buf); + return ORTE_ERROR; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather xcasting collected data", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* copy the received info to the caller's buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, allgather_buf))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(allgather_buf); + return rc; + } + OBJ_RELEASE(allgather_buf); + + /* xcast the results */ + orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, rbuf, ORTE_RML_TAG_ALLGATHER_CLIENT); + + /* xcast automatically ensures that the sender -always- gets a copy + * of the message. This is required to ensure proper operation of the + * launch system as the HNP -must- get a copy itself. So we have to + * post our own receive here so that we don't leave a message rattling + * around in our RML + */ + /* setup the buffer that will recv the results */ + allgather_buf = OBJ_NEW(opal_buffer_t); + + /* receive the echo'd message. Be sure to do this in + * a manner that allows us to return without being in a recv! + */ + allgather_num_recvd = 0; + allgather_failed = false; + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_CLIENT, + ORTE_RML_NON_PERSISTENT, allgather_client_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, 1); + + /* if the allgather failed, return an error */ + if (allgather_failed) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + OBJ_RELEASE(allgather_buf); + return ORTE_ERR_COMM_FAILURE; + } + + /* don't need the received buffer - we already have what we need in rbuf */ + OBJ_DESTRUCT(allgather_buf); + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: allgather completed", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + return ORTE_SUCCESS; +} + + + +static orte_std_cntr_t allgather_num_sent; +static void allgather_send_cb(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + /* increment the count */ + ++allgather_num_sent; +} + + +int orte_grpcomm_base_allgather_list(opal_list_t *names, opal_buffer_t *sbuf, opal_buffer_t *rbuf) +{ + opal_list_item_t *item; + orte_namelist_t *peer, *root; + orte_std_cntr_t num_peers; + int rc; + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: entering allgather_list", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* the first entry on the list is the "root" that collects + * all the data - everyone else just sends and gets back + * the results + */ + root = (orte_namelist_t*)opal_list_get_first(names); + + /*** NON-ROOT ***/ + if (OPAL_EQUAL != opal_dss.compare(&root->name, ORTE_PROC_MY_NAME, ORTE_NAME)) { + /* everyone but root sends data */ + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather_list: sending my data to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&root->name))); + + if (0 > orte_rml.send_buffer(&root->name, sbuf, ORTE_RML_TAG_ALLGATHER_SERVER, 0)) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + return ORTE_ERR_COMM_FAILURE; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather_list: buffer sent", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* setup the buffer that will recv the results */ + allgather_buf = OBJ_NEW(opal_buffer_t); + + /* now receive the final result from rank=0. Be sure to do this in + * a manner that allows us to return without being in a recv! + */ + allgather_num_recvd = 0; + allgather_failed = false; + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_CLIENT, + ORTE_RML_NON_PERSISTENT, allgather_client_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, 1); + + /* if the allgather failed, return an error */ + if (allgather_failed) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + OBJ_RELEASE(allgather_buf); + return ORTE_ERR_COMM_FAILURE; + } + + /* copy payload to the caller's buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, allgather_buf))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(allgather_buf); + return rc; + } + OBJ_RELEASE(allgather_buf); + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather_list: buffer received", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + return ORTE_SUCCESS; + } + + + /*** ROOT ***/ + /* count how many peers are participating, including myself */ + num_peers = (orte_std_cntr_t)opal_list_get_size(names); + + /* seed the outgoing buffer with the num_procs so it can be unpacked */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(rbuf, &num_peers, 1, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* put my own information into the outgoing buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, sbuf))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* setup the recv conditions */ + allgather_failed = false; + allgather_num_recvd = 0; + + /* setup the buffer that will recv the results */ + allgather_buf = OBJ_NEW(opal_buffer_t); + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather_list: waiting to recv %ld inputs", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (long)num_peers-1)); + + /* post the non-blocking recv */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER, + ORTE_RML_NON_PERSISTENT, allgather_server_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, num_peers-1); + + /* cancel the lingering recv */ + if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(allgather_buf); + return rc; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather_list: received all data", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* copy the received info to the caller's buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, allgather_buf))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(allgather_buf); + return rc; + } + OBJ_RELEASE(allgather_buf); + + /* broadcast the results */ + allgather_num_sent = 0; + for (item = opal_list_get_first(names); + item != opal_list_get_end(names); + item = opal_list_get_next(item)) { + peer = (orte_namelist_t*)item; + + /* skip myself */ + if (OPAL_EQUAL == opal_dss.compare(&root->name, &peer->name, ORTE_NAME)) { + continue; + } + + /* transmit the buffer to this process */ + if (0 > orte_rml.send_buffer_nb(&peer->name, rbuf, ORTE_RML_TAG_ALLGATHER_CLIENT, + 0, allgather_send_cb, 0)) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + return ORTE_ERR_COMM_FAILURE; + } + } + + ORTE_PROGRESSED_WAIT(false, allgather_num_sent, num_peers-1); + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: allgather_list completed", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + return ORTE_SUCCESS; +} diff --git a/orte/mca/grpcomm/base/grpcomm_base_barrier.c b/orte/mca/grpcomm/base/grpcomm_base_barrier.c new file mode 100644 index 0000000000..bb49885f47 --- /dev/null +++ b/orte/mca/grpcomm/base/grpcomm_base_barrier.c @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" +#include "orte/constants.h" +#include "orte/types.h" + +#include +#ifdef HAVE_SYS_TIME_H +#include +#endif /* HAVE_SYS_TIME_H */ + +#include "opal/threads/condition.h" +#include "opal/util/output.h" +#include "opal/util/bit_ops.h" + +#include "opal/class/opal_hash_table.h" +#include "orte/util/proc_info.h" +#include "opal/dss/dss.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/odls/odls_types.h" +#include "orte/mca/rml/rml.h" +#include "orte/runtime/orte_globals.h" +#include "orte/util/name_fns.h" +#include "orte/orted/orted.h" +#include "orte/runtime/orte_wait.h" + +#include "orte/mca/grpcomm/base/base.h" + +static orte_std_cntr_t barrier_num_recvd; +static bool barrier_failed; + +static void barrier_server_recv(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + int rc; + + /* bump counter */ + ++barrier_num_recvd; + /* reissue the recv */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_SERVER, + ORTE_RML_NON_PERSISTENT, barrier_server_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + barrier_failed = true; + } +} + +static void barrier_recv(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + /* bump counter */ + ++barrier_num_recvd; +} + +int orte_grpcomm_base_barrier(void) +{ + orte_process_name_t name; + orte_std_cntr_t i=0; + opal_buffer_t buf; + int rc; + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: entering barrier", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* everything happens within the same jobid */ + name.jobid = ORTE_PROC_MY_NAME->jobid; + + /*** RANK != 0 ***/ + if (0 != ORTE_PROC_MY_NAME->vpid) { + /* All non-root send & receive near-zero-length message. */ + name.vpid = 0; + OBJ_CONSTRUCT(&buf, opal_buffer_t); + opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */ + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s sending barrier", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + rc = orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_BARRIER_SERVER,0); + if (rc < 0) { + ORTE_ERROR_LOG(rc); + return rc; + } + OBJ_DESTRUCT(&buf); + + /* now receive the release from rank=0. Be sure to do this in + * a manner that allows us to return without being in a recv! + */ + barrier_num_recvd = 0; + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_CLIENT, + ORTE_RML_NON_PERSISTENT, barrier_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(false, barrier_num_recvd, 1); + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s received barrier release", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + return ORTE_SUCCESS; + } + + + /*** RANK = 0 ***/ + /* setup to recv the barrier messages from all peers */ + barrier_num_recvd = 0; + barrier_failed = false; + + /* post the non-blocking recv */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_SERVER, + ORTE_RML_NON_PERSISTENT, barrier_server_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(barrier_failed, barrier_num_recvd, (orte_std_cntr_t)orte_process_info.num_procs-1); + + /* if the barrier failed, say so */ + if (barrier_failed) { + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s barrier failed!", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + return ORTE_ERROR; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s barrier xcasting release", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* xcast the release */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */ + orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, &buf, ORTE_RML_TAG_BARRIER_CLIENT); + OBJ_DESTRUCT(&buf); + + /* xcast automatically ensures that the sender -always- gets a copy + * of the message. This is required to ensure proper operation of the + * launch system as the HNP -must- get a copy itself. So we have to + * post our own receive here so that we don't leave a message rattling + * around in our RML + */ + barrier_num_recvd = 0; + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_CLIENT, + ORTE_RML_NON_PERSISTENT, barrier_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(false, barrier_num_recvd, 1); + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: barrier completed", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + return ORTE_SUCCESS; +} + diff --git a/orte/mca/grpcomm/base/grpcomm_base_close.c b/orte/mca/grpcomm/base/grpcomm_base_close.c index 52831bf03e..fbd02452f0 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_close.c +++ b/orte/mca/grpcomm/base/grpcomm_base_close.c @@ -30,10 +30,10 @@ int orte_grpcomm_base_close(void) /* If we have a selected component and module, then finalize it */ if (mca_grpcomm_base_selected) { - mca_grpcomm_base_selected_component.grpcomm_finalize(); + orte_grpcomm.finalize(); } - /* Close all remaining available components (may be one if this is a + /* Close all remaining available components (may be one if this is a OpenRTE program, or [possibly] multiple if this is ompi_info) */ mca_base_components_close(orte_grpcomm_base_output, diff --git a/orte/mca/grpcomm/base/grpcomm_base_modex.c b/orte/mca/grpcomm/base/grpcomm_base_modex.c new file mode 100644 index 0000000000..9f1fcc9590 --- /dev/null +++ b/orte/mca/grpcomm/base/grpcomm_base_modex.c @@ -0,0 +1,557 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" +#include "orte/constants.h" +#include "orte/types.h" + +#include +#ifdef HAVE_SYS_TIME_H +#include +#endif /* HAVE_SYS_TIME_H */ + +#include "opal/threads/condition.h" +#include "opal/util/output.h" +#include "opal/util/bit_ops.h" + +#include "opal/class/opal_hash_table.h" +#include "orte/util/proc_info.h" +#include "opal/dss/dss.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/odls/odls_types.h" +#include "orte/mca/rml/rml.h" +#include "orte/runtime/orte_globals.h" +#include "orte/util/name_fns.h" +#include "orte/orted/orted.h" +#include "orte/runtime/orte_wait.h" + +#include "orte/mca/grpcomm/base/base.h" + + +/*************** MODEX SECTION **************/ + +/** + * MODEX DESIGN + * + * Modex data is always associated with a given orte process name, in + * an opal hash table. The hash table is necessary because modex data is + * received for entire jobids and when working with + * dynamic processes, it is possible we will receive data for a + * process not yet in the ompi_proc_all() list of processes. This + * information must be kept for later use, because if accept/connect + * causes the proc to be added to the ompi_proc_all() list, it could + * cause a connection storm. Therefore, we use an + * opal_hast_table_t backing store to contain all modex information. + * + * While we could add the now discovered proc into the ompi_proc_all() + * list, this has some problems, in that we don't have the + * architecture and hostname information needed to properly fill in + * the ompi_proc_t structure and we don't want to cause RML + * communication to get it when we don't really need to know anything + * about the remote proc. + * + * All data put into the modex (or received from the modex) is + * associated with a given proc,attr_name pair. The data structures + * to maintain this data look something like: + * + * opal_hash_table_t modex_data -> list of attr_proc_t objects + * + * +-----------------------------+ + * | modex_proc_data_t | + * | - opal_list_item_t | + * +-----------------------------+ + * | opal_mutex_t modex_lock | + * | bool modex_received_data | 1 + * | opal_list_t modules | ---------+ + * +-----------------------------+ | + * * | + * +--------------------------------+ <--------+ + * | modex_module_data_t | + * | - opal_list_item_t | + * +--------------------------------+ + * | mca_base_component_t component | + * | void *module_data | + * | size_t module_data_size | + * +--------------------------------+ + * + */ + +/* Local "globals" */ +static orte_std_cntr_t num_entries; +static opal_buffer_t *modex_buffer; +static opal_hash_table_t *modex_data; +static opal_mutex_t mutex; +static opal_condition_t cond; + +/** + * Modex data for a particular orte process + * + * Locking infrastructure and list of module data for a given orte + * process name. The name association is maintained in the + * modex_data hash table. + */ +struct modex_proc_data_t { + /** Structure can be put on lists (including in hash tables) */ + opal_list_item_t super; + /* Lock held whenever the modex data for this proc is being + modified */ + opal_mutex_t modex_lock; + /* True if modex data has ever been received from this process, + false otherwise. */ + bool modex_received_data; + /* List of modex_module_data_t structures containing all data + received from this process, sorted by component name. */ + opal_list_t modex_module_data; +}; +typedef struct modex_proc_data_t modex_proc_data_t; + +static void +modex_construct(modex_proc_data_t * modex) +{ + OBJ_CONSTRUCT(&modex->modex_lock, opal_mutex_t); + modex->modex_received_data = false; + OBJ_CONSTRUCT(&modex->modex_module_data, opal_list_t); +} + +static void +modex_destruct(modex_proc_data_t * modex) +{ + OBJ_DESTRUCT(&modex->modex_module_data); + OBJ_DESTRUCT(&modex->modex_lock); +} + +OBJ_CLASS_INSTANCE(modex_proc_data_t, opal_object_t, + modex_construct, modex_destruct); + + + +/** + * Data for a particular attribute + * + * Container for data for a particular module,attribute pair. This + * structure should be contained in the modex_module_data list in an + * modex_proc_data_t structure to maintain an association with a + * given proc. The list is then searched for a matching attribute + * name. + * + * While searching the list or reading from (or writing to) this + * structure, the lock in the proc_data_t should be held. + */ +struct modex_attr_data_t { + /** Structure can be put on lists */ + opal_list_item_t super; + /** Attribute name */ + char * attr_name; + /** Binary blob of data associated with this proc,component pair */ + void *attr_data; + /** Size (in bytes) of module_data */ + size_t attr_data_size; +}; +typedef struct modex_attr_data_t modex_attr_data_t; + +static void +modex_attr_construct(modex_attr_data_t * module) +{ + module->attr_name = NULL; + module->attr_data = NULL; + module->attr_data_size = 0; +} + +static void +modex_attr_destruct(modex_attr_data_t * module) +{ + if (NULL != module->attr_name) { + free(module->attr_name); + } + if (NULL != module->attr_data) { + free(module->attr_data); + } +} + +OBJ_CLASS_INSTANCE(modex_attr_data_t, + opal_list_item_t, + modex_attr_construct, + modex_attr_destruct); + + +/** + * Find data for a given attribute in a given modex_proc_data_t + * container. + * + * The proc_data's modex_lock must be held during this + * search. + */ +static modex_attr_data_t * +modex_lookup_attr_data(modex_proc_data_t *proc_data, + const char *attr_name, + bool create_if_not_found) +{ + modex_attr_data_t *attr_data = NULL; + for (attr_data = (modex_attr_data_t *) opal_list_get_first(&proc_data->modex_module_data); + attr_data != (modex_attr_data_t *) opal_list_get_end(&proc_data->modex_module_data); + attr_data = (modex_attr_data_t *) opal_list_get_next(attr_data)) { + if (0 == strcmp(attr_name, attr_data->attr_name)) { + return attr_data; + } + } + + if (create_if_not_found) { + attr_data = OBJ_NEW(modex_attr_data_t); + if (NULL == attr_data) return NULL; + + attr_data->attr_name = strdup(attr_name); + opal_list_append(&proc_data->modex_module_data, &attr_data->super); + + return attr_data; + } + + return NULL; +} + + +/** +* Find modex_proc_data_t container associated with given + * orte_process_name_t. + * + * The global lock should *NOT* be held when + * calling this function. + */ +static modex_proc_data_t* +modex_lookup_orte_proc(const orte_process_name_t *orte_proc) +{ + modex_proc_data_t *proc_data = NULL; + + OPAL_THREAD_LOCK(&mutex); + opal_hash_table_get_value_uint64(modex_data, + orte_util_hash_name(orte_proc), (void**)&proc_data); + if (NULL == proc_data) { + /* The proc clearly exists, so create a modex structure + for it */ + proc_data = OBJ_NEW(modex_proc_data_t); + if (NULL == proc_data) { + opal_output(0, "grpcomm_basic_modex_lookup_orte_proc: unable to allocate modex_proc_data_t\n"); + OPAL_THREAD_UNLOCK(&mutex); + return NULL; + } + opal_hash_table_set_value_uint64(modex_data, + orte_util_hash_name(orte_proc), proc_data); + } + OPAL_THREAD_UNLOCK(&mutex); + + return proc_data; +} + +int orte_grpcomm_base_modex_init(void) +{ + OBJ_CONSTRUCT(&mutex, opal_mutex_t); + OBJ_CONSTRUCT(&cond, opal_condition_t); + + modex_data = OBJ_NEW(opal_hash_table_t); + opal_hash_table_init(modex_data, 256); + num_entries = 0; + modex_buffer = OBJ_NEW(opal_buffer_t); + + return ORTE_SUCCESS; +} + +void orte_grpcomm_base_modex_finalize(void) +{ + OBJ_DESTRUCT(&mutex); + OBJ_DESTRUCT(&cond); + + opal_hash_table_remove_all(modex_data); + OBJ_RELEASE(modex_data); + + OBJ_RELEASE(modex_buffer); +} + +int orte_grpcomm_base_set_proc_attr(const char *attr_name, + const void *data, + size_t size) +{ + int rc; + + OPAL_THREAD_LOCK(&mutex); + + /* Pack the attribute name information into the local buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, &attr_name, 1, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack the size */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, &size, 1, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* Pack the actual data into the buffer */ + if (0 != size) { + if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, (void *) data, size, OPAL_BYTE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + + /* track the number of entries */ + ++num_entries; + +cleanup: + OPAL_THREAD_UNLOCK(&mutex); + + return rc; +} + +int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc, + const char * attribute_name, void **val, + size_t *size) +{ + modex_proc_data_t *proc_data; + modex_attr_data_t *attr_data; + + proc_data = modex_lookup_orte_proc(&proc); + if (NULL == proc_data) return ORTE_ERR_NOT_FOUND; + + OPAL_THREAD_LOCK(&proc_data->modex_lock); + + /* look up attribute */ + attr_data = modex_lookup_attr_data(proc_data, attribute_name, false); + + /* copy the data out to the user */ + if ((NULL == attr_data) || + (attr_data->attr_data_size == 0)) { + opal_output(0, "grpcomm_basic_get_proc_attr: no attr avail or zero byte size"); + *val = NULL; + *size = 0; + } else { + void *copy = malloc(attr_data->attr_data_size); + + if (copy == NULL) { + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + return ORTE_ERR_OUT_OF_RESOURCE; + } + memcpy(copy, attr_data->attr_data, attr_data->attr_data_size); + *val = copy; + *size = attr_data->attr_data_size; + } + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + + return ORTE_SUCCESS; +} + + +int orte_grpcomm_base_modex(opal_list_t *procs) +{ + opal_buffer_t buf, rbuf; + orte_std_cntr_t i, j, num_procs, num_recvd_entries; + void *bytes = NULL; + orte_std_cntr_t cnt; + orte_process_name_t proc_name; + modex_proc_data_t *proc_data; + modex_attr_data_t *attr_data; + int rc; + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: modex entered", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* setup the buffer that will actually be sent */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + OBJ_CONSTRUCT(&rbuf, opal_buffer_t); + + /* put our process name in the buffer so it can be unpacked later */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s modex: reporting %ld entries", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (long)num_entries)); + + /* put the number of entries into the buffer */ + OPAL_THREAD_LOCK(&mutex); + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &num_entries, 1, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&mutex); + goto cleanup; + } + + /* if there are entries, non-destructively copy the data across */ + if (0 < num_entries) { + if (ORTE_SUCCESS != (opal_dss.copy_payload(&buf, modex_buffer))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&mutex); + goto cleanup; + } + } + OPAL_THREAD_UNLOCK(&mutex); + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s modex: executing allgather", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* exchange the buffer with the list of peers (if provided) or all my peers */ + if (NULL == procs) { + if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } else { + if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s modex: processing modex info", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* process the results */ + /* extract the number of procs that put data in the buffer */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s modex: received data for %ld procs", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (long)num_procs)); + + /* process the buffer */ + for (i=0; i < num_procs; i++) { + /* unpack the process name */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* look up the modex data structure */ + proc_data = modex_lookup_orte_proc(&proc_name); + if (proc_data == NULL) { + /* report the error */ + opal_output(0, "grpcomm_basic_modex: received modex info for unknown proc %s\n", + ORTE_NAME_PRINT(&proc_name)); + rc = ORTE_ERR_NOT_FOUND; + goto cleanup; + } + + /* unpack the number of entries for this proc */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_THREAD_LOCK(&proc_data->modex_lock); + + /* + * Extract the attribute names and values + */ + for (j = 0; j < num_recvd_entries; j++) { + size_t num_bytes; + char *attr_name; + + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &attr_name, &cnt, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + goto cleanup; + } + + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + goto cleanup; + } + if (num_bytes != 0) { + if (NULL == (bytes = malloc(num_bytes))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + rc = ORTE_ERR_OUT_OF_RESOURCE; + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + goto cleanup; + } + cnt = (orte_std_cntr_t) num_bytes; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, bytes, &cnt, OPAL_BYTE))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + goto cleanup; + } + num_bytes = cnt; + } else { + bytes = NULL; + } + + /* + * Lookup the corresponding modex structure + */ + if (NULL == (attr_data = modex_lookup_attr_data(proc_data, + attr_name, true))) { + opal_output(0, "grpcomm_basic_modex: modex_lookup_attr_data failed\n"); + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + rc = ORTE_ERR_NOT_FOUND; + goto cleanup; + } + if (NULL != attr_data->attr_data) { + /* some pre-existing value must be here - release it */ + free(attr_data->attr_data); + } + attr_data->attr_data = bytes; + attr_data->attr_data_size = num_bytes; + proc_data->modex_received_data = true; + } + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + } + +cleanup: + OBJ_DESTRUCT(&buf); + OBJ_DESTRUCT(&rbuf); + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: modex completed", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + return rc; +} + +int orte_grpcomm_base_purge_proc_attrs(void) +{ + /* + * Purge the attributes + */ + opal_hash_table_remove_all(modex_data); + OBJ_RELEASE(modex_data); + modex_data = OBJ_NEW(opal_hash_table_t); + opal_hash_table_init(modex_data, 256); + + /* + * Clear the modex buffer + */ + OBJ_RELEASE(modex_buffer); + num_entries = 0; + modex_buffer = OBJ_NEW(opal_buffer_t); + + return ORTE_SUCCESS; +} diff --git a/orte/mca/grpcomm/base/grpcomm_base_open.c b/orte/mca/grpcomm/base/grpcomm_base_open.c index bec635ab09..877c99aa6e 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_open.c +++ b/orte/mca/grpcomm/base/grpcomm_base_open.c @@ -46,7 +46,6 @@ opal_list_t mca_grpcomm_base_components_available; orte_grpcomm_base_component_t mca_grpcomm_base_selected_component; - /** * Function for finding and opening either all MCA components, or the one * that was specifically requested via a MCA parameter. diff --git a/orte/mca/grpcomm/base/grpcomm_base_select.c b/orte/mca/grpcomm/base/grpcomm_base_select.c index 4fb69bc44b..096afc36c9 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_select.c +++ b/orte/mca/grpcomm/base/grpcomm_base_select.c @@ -31,71 +31,62 @@ */ int orte_grpcomm_base_select(void) { - opal_list_item_t *item; - mca_base_component_list_item_t *cli; - orte_grpcomm_base_component_t *component, *best_component = NULL; - orte_grpcomm_base_module_t *module, *best_module = NULL; - int priority, best_priority = -1; - - /* Iterate through all the available components */ - - for (item = opal_list_get_first(&mca_grpcomm_base_components_available); - item != opal_list_get_end(&mca_grpcomm_base_components_available); - item = opal_list_get_next(item)) { - cli = (mca_base_component_list_item_t *) item; - component = (orte_grpcomm_base_component_t *) cli->cli_component; - - /* Call the component's init function and see if it wants to be - selected */ - - module = component->grpcomm_init(&priority); - - /* If we got a non-NULL module back, then the component wants to - be selected. So save its multi/hidden values and save the - module with the highest priority */ - - if (NULL != module) { - /* If this is the best one, save it */ - - if (priority > best_priority) { - - /* If there was a previous best one, finalize */ - - if (NULL != best_component) { - best_component->grpcomm_finalize(); + opal_list_item_t *item; + mca_base_component_list_item_t *cli; + orte_grpcomm_base_component_t *component, *best_component = NULL; + orte_grpcomm_base_module_t *module, *best_module = NULL; + int priority, best_priority = -1; + + /* Iterate through all the available components */ + + for (item = opal_list_get_first(&mca_grpcomm_base_components_available); + item != opal_list_get_end(&mca_grpcomm_base_components_available); + item = opal_list_get_next(item)) { + cli = (mca_base_component_list_item_t *) item; + component = (orte_grpcomm_base_component_t *) cli->cli_component; + + /* Call the component's init function and see if it wants to be + * selected + */ + + module = component->grpcomm_init(&priority); + + /* If we got a non-NULL module back, then the component wants to + * be selected. So save the module with the highest priority + */ + + if (NULL != module) { + /* If this is the best one, save it */ + + if (priority > best_priority) { + + /* Save the new best one */ + + best_module = module; + best_component = component; + + /* update the best priority */ + best_priority = priority; + } + } - - /* Save the new best one */ - - best_module = module; - best_component = component; - - /* update the best priority */ - best_priority = priority; - } - - /* If it's not the best one, finalize it */ - - else { - component->grpcomm_finalize(); - } } - } - - /* If we didn't find one to select, barf */ - - if (NULL == best_component) { - return ORTE_ERROR; - } - - /* We have happiness -- save the component and module for later - usage */ - - orte_grpcomm = *best_module; - mca_grpcomm_base_selected_component = *best_component; - mca_grpcomm_base_selected = true; - - /* all done */ - - return ORTE_SUCCESS; + + /* If we didn't find one to select, barf */ + + if (NULL == best_component) { + return ORTE_ERROR; + } + + /* We have happiness */ + orte_grpcomm = *best_module; + if (ORTE_SUCCESS != orte_grpcomm.init()) { + /* ouch! */ + return ORTE_ERROR; + } + mca_grpcomm_base_selected = true; + + /* all done */ + + return ORTE_SUCCESS; } diff --git a/orte/mca/grpcomm/basic/grpcomm_basic.h b/orte/mca/grpcomm/basic/grpcomm_basic.h index 2a52614b48..be402dfb8d 100644 --- a/orte/mca/grpcomm/basic/grpcomm_basic.h +++ b/orte/mca/grpcomm/basic/grpcomm_basic.h @@ -40,18 +40,12 @@ BEGIN_C_DECLS typedef struct { orte_vpid_t xcast_linear_xover; orte_vpid_t xcast_binomial_xover; - orte_std_cntr_t num_active; - opal_mutex_t mutex; - opal_condition_t cond; - opal_hash_table_t modex_data; - opal_buffer_t modex_buffer; - orte_std_cntr_t modex_num_entries; } orte_grpcomm_basic_globals_t; extern orte_grpcomm_basic_globals_t orte_grpcomm_basic; /* - * Module open / close + * Component open / close */ int orte_grpcomm_basic_open(void); int orte_grpcomm_basic_close(void); @@ -59,13 +53,7 @@ orte_grpcomm_base_module_t* orte_grpcomm_basic_init(int *priority); /* - * Startup / Shutdown - */ -int orte_grpcomm_basic_module_init(void); -int orte_grpcomm_basic_finalize(void); - -/* - * xcast interfaces + * Grpcomm interfaces */ ORTE_MODULE_DECLSPEC extern orte_grpcomm_base_component_t mca_grpcomm_basic_component; diff --git a/orte/mca/grpcomm/basic/grpcomm_basic_component.c b/orte/mca/grpcomm/basic/grpcomm_basic_component.c index 8cd7ce4b96..de6302b4a7 100644 --- a/orte/mca/grpcomm/basic/grpcomm_basic_component.c +++ b/orte/mca/grpcomm/basic/grpcomm_basic_component.c @@ -68,8 +68,7 @@ orte_grpcomm_base_component_t mca_grpcomm_basic_component = { /* The component is checkpoint ready */ MCA_BASE_METADATA_PARAM_CHECKPOINT }, - orte_grpcomm_basic_init, /* component init */ - orte_grpcomm_basic_finalize /* component shutdown */ + orte_grpcomm_basic_init /* component init */ }; /* @@ -113,7 +112,6 @@ int orte_grpcomm_basic_open(void) return ORTE_SUCCESS; } -/* Close the component */ int orte_grpcomm_basic_close(void) { return ORTE_SUCCESS; @@ -121,34 +119,8 @@ int orte_grpcomm_basic_close(void) orte_grpcomm_base_module_t* orte_grpcomm_basic_init(int *priority) { - /* initialize globals */ - OBJ_CONSTRUCT(&orte_grpcomm_basic.mutex, opal_mutex_t); - OBJ_CONSTRUCT(&orte_grpcomm_basic.cond, opal_condition_t); - orte_grpcomm_basic.num_active = 0; - - OBJ_CONSTRUCT(&orte_grpcomm_basic.modex_data, opal_hash_table_t); - OBJ_CONSTRUCT(&orte_grpcomm_basic.modex_buffer, opal_buffer_t); - orte_grpcomm_basic.modex_num_entries = 0; - - opal_hash_table_init(&orte_grpcomm_basic.modex_data, 256); - /* we are the default, so set a low priority so we can be overridden */ *priority = 1; return &orte_grpcomm_basic_module; } - -/* - * finalize routine - */ -int orte_grpcomm_basic_finalize(void) -{ - OBJ_DESTRUCT(&orte_grpcomm_basic.mutex); - OBJ_DESTRUCT(&orte_grpcomm_basic.cond); - - opal_hash_table_remove_all(&orte_grpcomm_basic.modex_data); - OBJ_DESTRUCT(&orte_grpcomm_basic.modex_data); - - OBJ_DESTRUCT(&orte_grpcomm_basic.modex_buffer); - return ORTE_SUCCESS; -} diff --git a/orte/mca/grpcomm/basic/grpcomm_basic_module.c b/orte/mca/grpcomm/basic/grpcomm_basic_module.c index af8baf0d41..b5321a0613 100644 --- a/orte/mca/grpcomm/basic/grpcomm_basic_module.c +++ b/orte/mca/grpcomm/basic/grpcomm_basic_module.c @@ -58,13 +58,58 @@ static int xcast_direct(orte_jobid_t job, opal_buffer_t *buffer, orte_rml_tag_t tag); +/* Static API's */ +static int init(void); +static void finalize(void); +static int xcast(orte_jobid_t job, + opal_buffer_t *buffer, + orte_rml_tag_t tag); +static int next_recips(opal_list_t *names, orte_grpcomm_mode_t mode); + +/* Module def */ +orte_grpcomm_base_module_t orte_grpcomm_basic_module = { + init, + finalize, + xcast, + orte_grpcomm_base_allgather, + orte_grpcomm_base_allgather_list, + orte_grpcomm_base_barrier, + next_recips, + orte_grpcomm_base_set_proc_attr, + orte_grpcomm_base_get_proc_attr, + orte_grpcomm_base_modex, + orte_grpcomm_base_purge_proc_attrs +}; + + +/** + * Initialize the module + */ +static int init(void) +{ + int rc; + + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) { + ORTE_ERROR_LOG(rc); + } + return rc; +} + +/** + * Finalize the module + */ +static void finalize(void) +{ + orte_grpcomm_base_modex_finalize(); +} + + /** * A "broadcast-like" function to a job's processes. * @param jobid The job whose processes are to receive the message * @param buffer The data to broadcast */ -/* Blocking version */ static int xcast(orte_jobid_t job, opal_buffer_t *buffer, orte_rml_tag_t tag) @@ -344,12 +389,6 @@ static int xcast_linear(orte_jobid_t job, /* if we are a daemon or the HNP, get the number of daemons out there */ range = orte_process_info.num_procs; - /* we have to account for all of the messages we are about to send - * because the non-blocking send can come back almost immediately - before - * we would get the chance to increment the num_active. This causes us - * to not correctly wakeup and reset the xcast_in_progress flag - */ - /* send the message to each daemon as fast as we can */ dummy.jobid = ORTE_PROC_MY_HNP->jobid; for (i=0; i < range; i++) { @@ -462,7 +501,7 @@ static int xcast_direct(orte_jobid_t job, { int rc; orte_process_name_t peer; - orte_vpid_t i, num_targets; + orte_vpid_t i, num_targets=0; opal_buffer_t *buf=NULL, *bfr=buffer; orte_daemon_cmd_flag_t command; orte_rml_tag_t target=tag; @@ -676,303 +715,6 @@ CLEANUP: return rc; } -static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf) -{ - orte_process_name_t name; - int rc; - orte_vpid_t i; - opal_buffer_t tmpbuf; - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: entering allgather", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* everything happens within my jobid */ - name.jobid = ORTE_PROC_MY_NAME->jobid; - - if (0 != ORTE_PROC_MY_NAME->vpid) { - /* everyone but rank=0 sends data */ - name.vpid = 0; - if (0 > orte_rml.send_buffer(&name, sbuf, ORTE_RML_TAG_ALLGATHER, 0)) { - ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); - return ORTE_ERR_COMM_FAILURE; - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather buffer sent", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - /* now receive the final result from rank=0 */ - if (0 > orte_rml.recv_buffer(ORTE_NAME_WILDCARD, rbuf, ORTE_RML_TAG_ALLGATHER, 0)) { - ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); - return ORTE_ERR_COMM_FAILURE; - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather buffer received", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - return ORTE_SUCCESS; - } - - /* seed the outgoing buffer with the num_procs so it can be unpacked */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(rbuf, &orte_process_info.num_procs, 1, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - /* put my own information into the outgoing buffer */ - if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, sbuf))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather collecting buffers", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* rank=0 receives everyone else's data */ - for (i=1; i < orte_process_info.num_procs; i++) { - name.vpid = i; - OBJ_CONSTRUCT(&tmpbuf, opal_buffer_t); - if (0 > orte_rml.recv_buffer(&name, &tmpbuf, ORTE_RML_TAG_ALLGATHER, 0)) { - ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); - return ORTE_ERR_COMM_FAILURE; - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather buffer %ld received", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (long)i)); - /* append this data to the rbuf */ - if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, &tmpbuf))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* clear out the tmpbuf */ - OBJ_DESTRUCT(&tmpbuf); - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather xcasting collected data", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* xcast the results */ - xcast(ORTE_PROC_MY_NAME->jobid, rbuf, ORTE_RML_TAG_ALLGATHER); - - /* xcast automatically ensures that the sender -always- gets a copy - * of the message. This is required to ensure proper operation of the - * launch system as the HNP -must- get a copy itself. So we have to - * post our own receive here so that we don't leave a message rattling - * around in our RML - */ - OBJ_CONSTRUCT(&tmpbuf, opal_buffer_t); - if (0 > (rc = orte_rml.recv_buffer(ORTE_NAME_WILDCARD, &tmpbuf, ORTE_RML_TAG_ALLGATHER, 0))) { - ORTE_ERROR_LOG(rc); - return rc; - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather buffer received", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - /* don't need the received buffer - we already have what we need */ - OBJ_DESTRUCT(&tmpbuf); - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: allgather completed", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - return ORTE_SUCCESS; -} - -static int allgather_list(opal_list_t *names, opal_buffer_t *sbuf, opal_buffer_t *rbuf) -{ - opal_list_item_t *item; - orte_namelist_t *peer, *root; - orte_std_cntr_t i, num_peers; - opal_buffer_t tmpbuf; - int rc; - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: entering allgather_list", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* the first entry on the list is the "root" that collects - * all the data - everyone else just sends and gets back - * the results - */ - root = (orte_namelist_t*)opal_list_get_first(names); - - if (OPAL_EQUAL != opal_dss.compare(&root->name, ORTE_PROC_MY_NAME, ORTE_NAME)) { - /* everyone but root sends data */ - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather_list: sending my data to %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&root->name))); - if (0 > (rc = orte_rml.send_buffer(&root->name, sbuf, ORTE_RML_TAG_ALLGATHER_LIST, 0))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* now receive the final result */ - if (0 > (rc = orte_rml.recv_buffer(&root->name, rbuf, ORTE_RML_TAG_ALLGATHER_LIST, 0))) { - ORTE_ERROR_LOG(rc); - return rc; - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather_list: received result", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - return ORTE_SUCCESS; - } - - /* count how many peers are participating, including myself */ - num_peers = (orte_std_cntr_t)opal_list_get_size(names); - - /* seed the outgoing buffer with the num_procs so it can be unpacked */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(rbuf, &num_peers, 1, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - /* put my own information into the outgoing buffer */ - if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, sbuf))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - /* root receives everyone else's data */ - for (i=1; i < num_peers; i++) { - /* receive the buffer from this process */ - OBJ_CONSTRUCT(&tmpbuf, opal_buffer_t); - if (0 > orte_rml.recv_buffer(ORTE_NAME_WILDCARD, &tmpbuf, ORTE_RML_TAG_ALLGATHER_LIST, 0)) { - ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); - return ORTE_ERR_COMM_FAILURE; - } - /* append this data to the rbuf */ - if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, &tmpbuf))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* clear out the tmpbuf */ - OBJ_DESTRUCT(&tmpbuf); - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather_list: received all data", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* broadcast the results */ - for (item = opal_list_get_first(names); - item != opal_list_get_end(names); - item = opal_list_get_next(item)) { - peer = (orte_namelist_t*)item; - - /* skip myself */ - if (OPAL_EQUAL == opal_dss.compare(&root->name, &peer->name, ORTE_NAME)) { - continue; - } - - /* transmit the buffer to this process */ - if (0 > orte_rml.send_buffer(&peer->name, rbuf, ORTE_RML_TAG_ALLGATHER_LIST, 0)) { - ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); - return ORTE_ERR_COMM_FAILURE; - } - } - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: allgather_list completed", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - return ORTE_SUCCESS; -} - - -static int barrier(void) -{ - orte_process_name_t name; - orte_vpid_t i; - opal_buffer_t buf; - int rc; - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: entering barrier", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* everything happens within the same jobid */ - name.jobid = ORTE_PROC_MY_NAME->jobid; - - /* All non-root send & receive zero-length message. */ - if (0 != ORTE_PROC_MY_NAME->vpid) { - name.vpid = 0; - OBJ_CONSTRUCT(&buf, opal_buffer_t); - i=0; - opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */ - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s sending barrier", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - rc = orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_BARRIER,0); - if (rc < 0) { - ORTE_ERROR_LOG(rc); - return rc; - } - OBJ_DESTRUCT(&buf); - - /* get the release from rank=0 */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - rc = orte_rml.recv_buffer(ORTE_NAME_WILDCARD,&buf,ORTE_RML_TAG_BARRIER,0); - if (rc < 0) { - ORTE_ERROR_LOG(rc); - return rc; - } - OBJ_DESTRUCT(&buf); - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s received barrier release", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - return ORTE_SUCCESS; - } - - for (i = 1; i < orte_process_info.num_procs; i++) { - name.vpid = (orte_vpid_t)i; - OBJ_CONSTRUCT(&buf, opal_buffer_t); - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s barrier %ld received", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (long)i)); - rc = orte_rml.recv_buffer(&name,&buf,ORTE_RML_TAG_BARRIER,0); - if (rc < 0) { - ORTE_ERROR_LOG(rc); - return rc; - } - OBJ_DESTRUCT(&buf); - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s barrier xcasting release", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* xcast the release */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */ - xcast(ORTE_PROC_MY_NAME->jobid, &buf, ORTE_RML_TAG_BARRIER); - OBJ_DESTRUCT(&buf); - - /* xcast automatically ensures that the sender -always- gets a copy - * of the message. This is required to ensure proper operation of the - * launch system as the HNP -must- get a copy itself. So we have to - * post our own receive here so that we don't leave a message rattling - * around in our RML - */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - if (0 > (rc = orte_rml.recv_buffer(ORTE_NAME_WILDCARD, &buf, ORTE_RML_TAG_BARRIER, 0))) { - ORTE_ERROR_LOG(rc); - return rc; - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s barrier release received", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - OBJ_DESTRUCT(&buf); - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: barrier completed", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - return ORTE_SUCCESS; -} - static int chain_recips(opal_list_t *names) { orte_namelist_t *target; @@ -1023,15 +765,15 @@ static int linear_recips(opal_list_t *names) orte_vpid_t i; /* if we are not the HNP, we just return - only - * the HNP sends in this mode - */ + * the HNP sends in this mode + */ if (!orte_process_info.hnp) { return ORTE_SUCCESS; } /* if we are the HNP, then just add the names of - * all daemons to the list - */ + * all daemons to the list + */ for (i=1; i < orte_process_info.num_procs; i++) { if (NULL == (target = OBJ_NEW(orte_namelist_t))) { return ORTE_ERR_OUT_OF_RESOURCE; @@ -1067,488 +809,3 @@ static int next_recips(opal_list_t *names, orte_grpcomm_mode_t mode) } -/*************** MODEX SECTION **************/ - -/** - * MODEX DESIGN - * - * Modex data is always associated with a given orte process name, in - * an opal hash table. The hash table is necessary because modex data is - * received for entire jobids and when working with - * dynamic processes, it is possible we will receive data for a - * process not yet in the ompi_proc_all() list of processes. This - * information must be kept for later use, because if accept/connect - * causes the proc to be added to the ompi_proc_all() list, it could - * cause a connection storm. Therefore, we use an - * opal_hast_table_t backing store to contain all modex information. - * - * While we could add the now discovered proc into the ompi_proc_all() - * list, this has some problems, in that we don't have the - * architecture and hostname information needed to properly fill in - * the ompi_proc_t structure and we don't want to cause RML - * communication to get it when we don't really need to know anything - * about the remote proc. - * - * All data put into the modex (or received from the modex) is - * associated with a given proc,attr_name pair. The data structures - * to maintain this data look something like: - * - * opal_hash_table_t modex_data -> list of attr_proc_t objects - * - * +-----------------------------+ - * | modex_proc_data_t | - * | - opal_list_item_t | - * +-----------------------------+ - * | opal_mutex_t modex_lock | - * | bool modex_received_data | 1 - * | opal_list_t modules | ---------+ - * +-----------------------------+ | - * * | - * +--------------------------------+ <--------+ - * | modex_module_data_t | - * | - opal_list_item_t | - * +--------------------------------+ - * | mca_base_component_t component | - * | void *module_data | - * | size_t module_data_size | - * +--------------------------------+ - * - */ - - -/** - * Modex data for a particular orte process - * - * Locking infrastructure and list of module data for a given orte - * process name. The name association is maintained in the - * modex_data hash table. - */ -struct modex_proc_data_t { - /** Structure can be put on lists (including in hash tables) */ - opal_list_item_t super; - /* Lock held whenever the modex data for this proc is being - modified */ - opal_mutex_t modex_lock; - /* True if modex data has ever been received from this process, - false otherwise. */ - bool modex_received_data; - /* List of modex_module_data_t structures containing all data - received from this process, sorted by component name. */ - opal_list_t modex_module_data; -}; -typedef struct modex_proc_data_t modex_proc_data_t; - -static void -modex_construct(modex_proc_data_t * modex) -{ - OBJ_CONSTRUCT(&modex->modex_lock, opal_mutex_t); - modex->modex_received_data = false; - OBJ_CONSTRUCT(&modex->modex_module_data, opal_list_t); -} - -static void -modex_destruct(modex_proc_data_t * modex) -{ - OBJ_DESTRUCT(&modex->modex_module_data); - OBJ_DESTRUCT(&modex->modex_lock); -} - -OBJ_CLASS_INSTANCE(modex_proc_data_t, opal_object_t, - modex_construct, modex_destruct); - - - -/** - * Data for a particular attribute - * - * Container for data for a particular module,attribute pair. This - * structure should be contained in the modex_module_data list in an - * modex_proc_data_t structure to maintain an association with a - * given proc. The list is then searched for a matching attribute - * name. - * - * While searching the list or reading from (or writing to) this - * structure, the lock in the proc_data_t should be held. - */ -struct modex_attr_data_t { - /** Structure can be put on lists */ - opal_list_item_t super; - /** Attribute name */ - char * attr_name; - /** Binary blob of data associated with this proc,component pair */ - void *attr_data; - /** Size (in bytes) of module_data */ - size_t attr_data_size; -}; -typedef struct modex_attr_data_t modex_attr_data_t; - -static void -modex_attr_construct(modex_attr_data_t * module) -{ - module->attr_name = NULL; - module->attr_data = NULL; - module->attr_data_size = 0; -} - -static void -modex_attr_destruct(modex_attr_data_t * module) -{ - if (NULL != module->attr_name) { - free(module->attr_name); - } - if (NULL != module->attr_data) { - free(module->attr_data); - } -} - -OBJ_CLASS_INSTANCE(modex_attr_data_t, - opal_list_item_t, - modex_attr_construct, - modex_attr_destruct); - - -/** - * Find data for a given attribute in a given modex_proc_data_t - * container. - * - * The proc_data's modex_lock must be held during this - * search. - */ -static modex_attr_data_t * -modex_lookup_attr_data(modex_proc_data_t *proc_data, - const char *attr_name, - bool create_if_not_found) -{ - modex_attr_data_t *attr_data = NULL; - for (attr_data = (modex_attr_data_t *) opal_list_get_first(&proc_data->modex_module_data); - attr_data != (modex_attr_data_t *) opal_list_get_end(&proc_data->modex_module_data); - attr_data = (modex_attr_data_t *) opal_list_get_next(attr_data)) { - if (0 == strcmp(attr_name, attr_data->attr_name)) { - return attr_data; - } - } - - if (create_if_not_found) { - attr_data = OBJ_NEW(modex_attr_data_t); - if (NULL == attr_data) return NULL; - - attr_data->attr_name = strdup(attr_name); - opal_list_append(&proc_data->modex_module_data, &attr_data->super); - - return attr_data; - } - - return NULL; -} - - -/** -* Find modex_proc_data_t container associated with given - * orte_process_name_t. - * - * The global lock should *NOT* be held when - * calling this function. - */ -static modex_proc_data_t* -modex_lookup_orte_proc(const orte_process_name_t *orte_proc) -{ - modex_proc_data_t *proc_data = NULL; - - OPAL_THREAD_LOCK(&orte_grpcomm_basic.mutex); - opal_hash_table_get_value_uint64(&orte_grpcomm_basic.modex_data, - orte_util_hash_name(orte_proc), (void**)&proc_data); - if (NULL == proc_data) { - /* The proc clearly exists, so create a modex structure - for it */ - proc_data = OBJ_NEW(modex_proc_data_t); - if (NULL == proc_data) { - opal_output(0, "grpcomm_basic_modex_lookup_orte_proc: unable to allocate modex_proc_data_t\n"); - OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex); - return NULL; - } - opal_hash_table_set_value_uint64(&orte_grpcomm_basic.modex_data, - orte_util_hash_name(orte_proc), proc_data); - } - OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex); - - return proc_data; -} - - -static int set_proc_attr(const char *attr_name, - const void *data, - size_t size) -{ - int rc; - - OPAL_THREAD_LOCK(&orte_grpcomm_basic.mutex); - - /* Pack the attribute name information into the local buffer */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&orte_grpcomm_basic.modex_buffer, &attr_name, 1, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* pack the size */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&orte_grpcomm_basic.modex_buffer, &size, 1, OPAL_SIZE))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* Pack the actual data into the buffer */ - if (0 != size) { - if (ORTE_SUCCESS != (rc = opal_dss.pack(&orte_grpcomm_basic.modex_buffer, (void *) data, size, OPAL_BYTE))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - - /* track the number of entries */ - ++orte_grpcomm_basic.modex_num_entries; - -cleanup: - OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex); - - return rc; -} - -static int get_proc_attr(const orte_process_name_t proc, - const char * attribute_name, void **val, - size_t *size) -{ - modex_proc_data_t *proc_data; - modex_attr_data_t *attr_data; - - proc_data = modex_lookup_orte_proc(&proc); - if (NULL == proc_data) return ORTE_ERR_NOT_FOUND; - - OPAL_THREAD_LOCK(&proc_data->modex_lock); - - /* look up attribute */ - attr_data = modex_lookup_attr_data(proc_data, attribute_name, false); - - /* copy the data out to the user */ - if ((NULL == attr_data) || - (attr_data->attr_data_size == 0)) { - opal_output(0, "grpcomm_basic_get_proc_attr: no attr avail or zero byte size"); - *val = NULL; - *size = 0; - } else { - void *copy = malloc(attr_data->attr_data_size); - - if (copy == NULL) { - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - return ORTE_ERR_OUT_OF_RESOURCE; - } - memcpy(copy, attr_data->attr_data, attr_data->attr_data_size); - *val = copy; - *size = attr_data->attr_data_size; - } - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - - return ORTE_SUCCESS; -} - - -static int modex(opal_list_t *procs) -{ - opal_buffer_t buf, rbuf; - orte_std_cntr_t i, j, num_procs, num_entries; - void *bytes = NULL; - orte_std_cntr_t cnt; - orte_process_name_t proc_name; - modex_proc_data_t *proc_data; - modex_attr_data_t *attr_data; - int rc; - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: modex entered", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* setup the buffer that will actually be sent */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - OBJ_CONSTRUCT(&rbuf, opal_buffer_t); - - /* put our process name in the buffer so it can be unpacked later */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* put the number of entries into the buffer */ - OPAL_THREAD_LOCK(&orte_grpcomm_basic.mutex); - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_grpcomm_basic.modex_num_entries, 1, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex); - goto cleanup; - } - - /* if there are entries, non-destructively copy the data across */ - if (0 < orte_grpcomm_basic.modex_num_entries) { - if (ORTE_SUCCESS != (opal_dss.copy_payload(&buf, &orte_grpcomm_basic.modex_buffer))) { - ORTE_ERROR_LOG(rc); - OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex); - goto cleanup; - } - } - OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex); - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s modex: executing allgather", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* exchange the buffer with the list of peers (if provided) or all my peers */ - if (NULL == procs) { - if (ORTE_SUCCESS != (rc = allgather(&buf, &rbuf))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } else { - if (ORTE_SUCCESS != (rc = allgather_list(procs, &buf, &rbuf))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s modex: processing modex info", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* process the results */ - /* extract the number of procs that put data in the buffer */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* process the buffer */ - for (i=0; i < num_procs; i++) { - /* unpack the process name */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* look up the modex data structure */ - proc_data = modex_lookup_orte_proc(&proc_name); - if (proc_data == NULL) { - /* report the error */ - opal_output(0, "grpcomm_basic_modex: received modex info for unknown proc %s\n", - ORTE_NAME_PRINT(&proc_name)); - rc = ORTE_ERR_NOT_FOUND; - goto cleanup; - } - - /* unpack the number of entries for this proc */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_entries, &cnt, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - OPAL_THREAD_LOCK(&proc_data->modex_lock); - - /* - * Extract the attribute names and values - */ - for (j = 0; j < num_entries; j++) { - size_t num_bytes; - char *attr_name; - - cnt = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &attr_name, &cnt, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - goto cleanup; - } - - cnt = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) { - ORTE_ERROR_LOG(rc); - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - goto cleanup; - } - if (num_bytes != 0) { - if (NULL == (bytes = malloc(num_bytes))) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - rc = ORTE_ERR_OUT_OF_RESOURCE; - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - goto cleanup; - } - cnt = (orte_std_cntr_t) num_bytes; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, bytes, &cnt, OPAL_BYTE))) { - ORTE_ERROR_LOG(rc); - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - goto cleanup; - } - num_bytes = cnt; - } else { - bytes = NULL; - } - - /* - * Lookup the corresponding modex structure - */ - if (NULL == (attr_data = modex_lookup_attr_data(proc_data, - attr_name, true))) { - opal_output(0, "grpcomm_basic_modex: modex_lookup_attr_data failed\n"); - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - rc = ORTE_ERR_NOT_FOUND; - goto cleanup; - } - if (NULL != attr_data->attr_data) { - /* some pre-existing value must be here - release it */ - free(attr_data->attr_data); - } - attr_data->attr_data = bytes; - attr_data->attr_data_size = num_bytes; - proc_data->modex_received_data = true; - } - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - } - -cleanup: - OBJ_DESTRUCT(&buf); - OBJ_DESTRUCT(&rbuf); - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: modex completed", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - return rc; -} - -static int purge_proc_attrs(void) -{ - /* - * Purge the attributes - */ - opal_hash_table_remove_all(&orte_grpcomm_basic.modex_data); - OBJ_DESTRUCT(&orte_grpcomm_basic.modex_data); - OBJ_CONSTRUCT(&orte_grpcomm_basic.modex_data, opal_hash_table_t); - opal_hash_table_init(&orte_grpcomm_basic.modex_data, 256); - - /* - * Clear the modex buffer - */ - OBJ_DESTRUCT(&orte_grpcomm_basic.modex_buffer); - OBJ_CONSTRUCT(&orte_grpcomm_basic.modex_buffer, opal_buffer_t); - orte_grpcomm_basic.modex_num_entries = 0; - - return ORTE_SUCCESS; -} - -orte_grpcomm_base_module_t orte_grpcomm_basic_module = { - xcast, - allgather, - allgather_list, - barrier, - next_recips, - set_proc_attr, - get_proc_attr, - modex, - purge_proc_attrs -}; - diff --git a/orte/mca/grpcomm/cnos/grpcomm_cnos.h b/orte/mca/grpcomm/cnos/grpcomm_cnos.h index 4ab4ee9d36..520235fade 100644 --- a/orte/mca/grpcomm/cnos/grpcomm_cnos.h +++ b/orte/mca/grpcomm/cnos/grpcomm_cnos.h @@ -41,13 +41,7 @@ orte_grpcomm_base_module_t* orte_grpcomm_cnos_init(int *priority); /* - * Startup / Shutdown - */ -int orte_grpcomm_cnos_module_init(void); -int orte_grpcomm_cnos_finalize(void); - -/* - * xcast interfaces + * Grpcomm interfaces */ ORTE_MODULE_DECLSPEC extern orte_grpcomm_base_component_t mca_grpcomm_cnos_component; diff --git a/orte/mca/grpcomm/cnos/grpcomm_cnos_component.c b/orte/mca/grpcomm/cnos/grpcomm_cnos_component.c index e7812f8320..c41b48b4b6 100644 --- a/orte/mca/grpcomm/cnos/grpcomm_cnos_component.c +++ b/orte/mca/grpcomm/cnos/grpcomm_cnos_component.c @@ -59,7 +59,6 @@ orte_grpcomm_base_component_t mca_grpcomm_cnos_component = { MCA_BASE_METADATA_PARAM_CHECKPOINT }, orte_grpcomm_cnos_init, /* component init */ - orte_grpcomm_cnos_finalize /* component shutdown */ }; /* @@ -86,10 +85,3 @@ orte_grpcomm_base_module_t* orte_grpcomm_cnos_init(int *priority) return &orte_grpcomm_cnos_module; } -/* - * finalize routine - */ -int orte_grpcomm_cnos_finalize(void) -{ - return ORTE_SUCCESS; -} diff --git a/orte/mca/grpcomm/cnos/grpcomm_cnos_module.c b/orte/mca/grpcomm/cnos/grpcomm_cnos_module.c index 034c588a62..b766c34bef 100644 --- a/orte/mca/grpcomm/cnos/grpcomm_cnos_module.c +++ b/orte/mca/grpcomm/cnos/grpcomm_cnos_module.c @@ -37,6 +37,9 @@ #endif /* API functions */ +static int init(void); +static void finalize(void); + static int xcast(orte_jobid_t job, opal_buffer_t *buffer, orte_rml_tag_t tag); @@ -62,6 +65,8 @@ static int modex(opal_list_t *procs); static int purge_proc_attrs(void); orte_grpcomm_base_module_t orte_grpcomm_cnos_module = { + init, + finalize, xcast, allgather, allgather_list, @@ -73,6 +78,22 @@ orte_grpcomm_base_module_t orte_grpcomm_cnos_module = { purge_proc_attrs }; +/** + * Init the module + */ +static int init(void) +{ + return ORTE_SUCCESS; +} + +/** + * Finalize module + */ +static void finalize(void) +{ + return; +} + /** * A "broadcast-like" function to a job's processes. diff --git a/orte/mca/grpcomm/grpcomm.h b/orte/mca/grpcomm/grpcomm.h index 011dd60d7e..8e37202090 100644 --- a/orte/mca/grpcomm/grpcomm.h +++ b/orte/mca/grpcomm/grpcomm.h @@ -52,6 +52,12 @@ BEGIN_C_DECLS * Component functions - all MUST be provided! */ +/* initialize the selected module */ +typedef int (*orte_grpcomm_base_module_init_fn_t)(void); + +/* finalize the selected module */ +typedef void (*orte_grpcomm_base_module_finalize_fn_t)(void); + /* Send a message to all members of a job - blocking */ typedef int (*orte_grpcomm_base_module_xcast_fn_t)(orte_jobid_t job, opal_buffer_t *buffer, @@ -67,42 +73,45 @@ typedef int (*orte_grpcomm_base_module_allgather_list_fn_t)(opal_list_t *names, typedef int (*orte_grpcomm_base_module_barrier_fn_t)(void); /* for collectives, return next recipients in the chain */ -typedef int (*orte_gprcomm_base_next_recipients_fn_t)(opal_list_t *list, orte_grpcomm_mode_t mode); +typedef int (*orte_gprcomm_base_module_next_recipients_fn_t)(opal_list_t *list, orte_grpcomm_mode_t mode); /** DATA EXCHANGE FUNCTIONS - SEE ompi/runtime/ompi_module_exchange.h FOR A DESCRIPTION * OF HOW THIS ALL WORKS */ /* send an attribute buffer */ -typedef int (*orte_grpcomm_base_modex_set_proc_attr_fn_t)(const char* attr_name, +typedef int (*orte_grpcomm_base_module_modex_set_proc_attr_fn_t)(const char* attr_name, const void *buffer, size_t size); /* get an attribute buffer */ -typedef int (*orte_grpcomm_base_modex_get_proc_attr_fn_t)(const orte_process_name_t name, +typedef int (*orte_grpcomm_base_module_modex_get_proc_attr_fn_t)(const orte_process_name_t name, const char* attr_name, void **buffer, size_t *size); /* perform a modex operation */ -typedef int (*orte_grpcomm_base_modex_fn_t)(opal_list_t *procs); +typedef int (*orte_grpcomm_base_module_modex_fn_t)(opal_list_t *procs); /* purge the internal attr table */ -typedef int (*orte_grpcomm_base_purge_proc_attrs_fn_t)(void); +typedef int (*orte_grpcomm_base_module_purge_proc_attrs_fn_t)(void); /* * Ver 2.0 */ struct orte_grpcomm_base_module_2_0_0_t { - orte_grpcomm_base_module_xcast_fn_t xcast; - orte_grpcomm_base_module_allgather_fn_t allgather; - orte_grpcomm_base_module_allgather_list_fn_t allgather_list; - orte_grpcomm_base_module_barrier_fn_t barrier; - orte_gprcomm_base_next_recipients_fn_t next_recipients; - /* modex support functions */ - orte_grpcomm_base_modex_set_proc_attr_fn_t set_proc_attr; - orte_grpcomm_base_modex_get_proc_attr_fn_t get_proc_attr; - orte_grpcomm_base_modex_fn_t modex; - orte_grpcomm_base_purge_proc_attrs_fn_t purge_proc_attrs; + orte_grpcomm_base_module_init_fn_t init; + orte_grpcomm_base_module_finalize_fn_t finalize; + /* collective operations */ + orte_grpcomm_base_module_xcast_fn_t xcast; + orte_grpcomm_base_module_allgather_fn_t allgather; + orte_grpcomm_base_module_allgather_list_fn_t allgather_list; + orte_grpcomm_base_module_barrier_fn_t barrier; + orte_gprcomm_base_module_next_recipients_fn_t next_recipients; + /* modex functions */ + orte_grpcomm_base_module_modex_set_proc_attr_fn_t set_proc_attr; + orte_grpcomm_base_module_modex_get_proc_attr_fn_t get_proc_attr; + orte_grpcomm_base_module_modex_fn_t modex; + orte_grpcomm_base_module_purge_proc_attrs_fn_t purge_proc_attrs; }; typedef struct orte_grpcomm_base_module_2_0_0_t orte_grpcomm_base_module_2_0_0_t; @@ -113,11 +122,6 @@ typedef orte_grpcomm_base_module_2_0_0_t orte_grpcomm_base_module_t; */ typedef orte_grpcomm_base_module_t* (*orte_grpcomm_base_component_init_fn_t)(int *priority); -/** - * Finalize the selected module - */ -typedef int (*orte_grpcomm_base_component_finalize_fn_t)(void); - /* * the standard component data structure @@ -128,7 +132,6 @@ struct orte_grpcomm_base_component_2_0_0_t { mca_base_component_data_1_0_0_t grpcomm_data; orte_grpcomm_base_component_init_fn_t grpcomm_init; - orte_grpcomm_base_component_finalize_fn_t grpcomm_finalize; }; typedef struct orte_grpcomm_base_component_2_0_0_t orte_grpcomm_base_component_2_0_0_t; typedef orte_grpcomm_base_component_2_0_0_t orte_grpcomm_base_component_t; diff --git a/orte/mca/rml/rml_types.h b/orte/mca/rml/rml_types.h index 77eb145f57..cd1eba6989 100644 --- a/orte/mca/rml/rml_types.h +++ b/orte/mca/rml/rml_types.h @@ -66,31 +66,32 @@ BEGIN_C_DECLS #define ORTE_RML_TAG_RML_ROUTE 13 -#define ORTE_RML_TAG_ALLGATHER 14 -#define ORTE_RML_TAG_ALLGATHER_LIST 15 -#define ORTE_RML_TAG_BARRIER 16 +#define ORTE_RML_TAG_ALLGATHER_SERVER 14 +#define ORTE_RML_TAG_ALLGATHER_CLIENT 15 +#define ORTE_RML_TAG_BARRIER_SERVER 16 +#define ORTE_RML_TAG_BARRIER_CLIENT 17 -#define ORTE_RML_TAG_INIT_ROUTES 17 -#define ORTE_RML_TAG_UPDATE_ROUTES 18 -#define ORTE_RML_TAG_SYNC 19 +#define ORTE_RML_TAG_INIT_ROUTES 18 +#define ORTE_RML_TAG_UPDATE_ROUTES 19 +#define ORTE_RML_TAG_SYNC 20 /* For FileM Base */ -#define ORTE_RML_TAG_FILEM_BASE 20 -#define ORTE_RML_TAG_FILEM_BASE_RESP 21 +#define ORTE_RML_TAG_FILEM_BASE 21 +#define ORTE_RML_TAG_FILEM_BASE_RESP 22 /* For FileM RSH Component */ -#define ORTE_RML_TAG_FILEM_RSH 22 +#define ORTE_RML_TAG_FILEM_RSH 23 /* For SnapC Framework */ -#define ORTE_RML_TAG_SNAPC 23 -#define ORTE_RML_TAG_SNAPC_FULL 24 +#define ORTE_RML_TAG_SNAPC 24 +#define ORTE_RML_TAG_SNAPC_FULL 25 /* For tools */ -#define ORTE_RML_TAG_TOOL 25 +#define ORTE_RML_TAG_TOOL 26 /* support data store/lookup */ -#define ORTE_RML_TAG_DATA_SERVER 26 -#define ORTE_RML_TAG_DATA_CLIENT 27 +#define ORTE_RML_TAG_DATA_SERVER 27 +#define ORTE_RML_TAG_DATA_CLIENT 28 /* support for shared memory collectives */ #define OMPI_RML_TAG_COLL_SM2_BACK_FILE_CREATED 40 diff --git a/orte/orted/orted_comm.c b/orte/orted/orted_comm.c index 10d539103d..40f1d1adba 100644 --- a/orte/orted/orted_comm.c +++ b/orte/orted/orted_comm.c @@ -108,6 +108,9 @@ static void send_relay(int fd, short event, void *data) "%s orte:daemon:send_relay", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + /* setup a list of next recipients */ + OBJ_CONSTRUCT(&recips, opal_list_t); + /* we pass the relay_mode in the mev "tag" field. This is a bit * of a hack as the two sizes may not exactly match. However, since * the rml_tag is an int32, it is doubtful we will ever see a @@ -115,8 +118,28 @@ static void send_relay(int fd, short event, void *data) */ relay_mode = (orte_grpcomm_mode_t)tag; - /* setup a list of next recipients */ - OBJ_CONSTRUCT(&recips, opal_list_t); + /* if the mode is linear and we are the HNP, don't ask for + * next recipients as this will generate a potentially very + * long list! Instead, just look over the known daemons + */ + if (ORTE_GRPCOMM_LINEAR == relay_mode && orte_process_info.hnp) { + orte_process_name_t dummy; + orte_vpid_t i; + + /* send the message to each daemon as fast as we can - but + * not to us! + */ + dummy.jobid = ORTE_PROC_MY_HNP->jobid; + for (i=1; i < orte_process_info.num_procs; i++) { + dummy.vpid = i; + if (0 > (ret = orte_rml.send_buffer(&dummy, buffer, ORTE_RML_TAG_DAEMON, 0))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + } + goto CLEANUP; + } + /* ask the active grpcomm module for the next recipients */ if (ORTE_SUCCESS != (ret = orte_grpcomm.next_recipients(&recips, relay_mode))) { ORTE_ERROR_LOG(ret);