From 4efddc7b0a82da91abd7a68d22fe53325cef117c Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Mon, 24 Mar 2008 20:50:31 +0000 Subject: [PATCH] Fix the allgather and allgather_list functions to avoid deadlocks at large node/proc counts. Violated the RML rules here - we received the allgather buffer and then did an xcast, which causes a send to go out, and is then subsequently received by the sender. This fix breaks that pattern by forcing the recv to complete outside of the function itself - thus, the allgather and allgather_list always complete their recvs before returning or sending. Reogranize the grpcomm code a little to provide support for soon-to-come new grpcomm components. The revised organization puts what will be common code elements in the base to avoid duplication, while allowing components that don't need those functions to ignore them. This commit was SVN r17941. --- orte/mca/grpcomm/base/Makefile.am | 5 +- orte/mca/grpcomm/base/base.h | 23 + .../mca/grpcomm/base/grpcomm_base_allgather.c | 437 +++++++++ orte/mca/grpcomm/base/grpcomm_base_barrier.c | 182 ++++ orte/mca/grpcomm/base/grpcomm_base_close.c | 4 +- orte/mca/grpcomm/base/grpcomm_base_modex.c | 557 ++++++++++++ orte/mca/grpcomm/base/grpcomm_base_open.c | 1 - orte/mca/grpcomm/base/grpcomm_base_select.c | 121 ++- orte/mca/grpcomm/basic/grpcomm_basic.h | 16 +- .../grpcomm/basic/grpcomm_basic_component.c | 30 +- orte/mca/grpcomm/basic/grpcomm_basic_module.c | 845 ++---------------- orte/mca/grpcomm/cnos/grpcomm_cnos.h | 8 +- .../mca/grpcomm/cnos/grpcomm_cnos_component.c | 8 - orte/mca/grpcomm/cnos/grpcomm_cnos_module.c | 21 + orte/mca/grpcomm/grpcomm.h | 45 +- orte/mca/rml/rml_types.h | 29 +- orte/orted/orted_comm.c | 27 +- 17 files changed, 1401 insertions(+), 958 deletions(-) create mode 100644 orte/mca/grpcomm/base/grpcomm_base_allgather.c create mode 100644 orte/mca/grpcomm/base/grpcomm_base_barrier.c create mode 100644 orte/mca/grpcomm/base/grpcomm_base_modex.c diff --git a/orte/mca/grpcomm/base/Makefile.am b/orte/mca/grpcomm/base/Makefile.am index f1336cc709..cea5eaa151 100644 --- a/orte/mca/grpcomm/base/Makefile.am +++ b/orte/mca/grpcomm/base/Makefile.am @@ -22,4 +22,7 @@ headers += \ libmca_grpcomm_la_SOURCES += \ base/grpcomm_base_close.c \ base/grpcomm_base_select.c \ - base/grpcomm_base_open.c \ No newline at end of file + base/grpcomm_base_open.c \ + base/grpcomm_base_allgather.c \ + base/grpcomm_base_barrier.c \ + base/grpcomm_base_modex.c diff --git a/orte/mca/grpcomm/base/base.h b/orte/mca/grpcomm/base/base.h index 6b233109ca..79d352d14a 100644 --- a/orte/mca/grpcomm/base/base.h +++ b/orte/mca/grpcomm/base/base.h @@ -28,6 +28,8 @@ #include "opal/class/opal_list.h" #include "opal/mca/mca.h" +#include "opal/threads/mutex.h" +#include "opal/threads/condition.h" #include "orte/mca/grpcomm/grpcomm.h" @@ -53,6 +55,27 @@ ORTE_DECLSPEC extern bool mca_grpcomm_base_selected; ORTE_DECLSPEC extern opal_list_t mca_grpcomm_base_components_available; ORTE_DECLSPEC extern orte_grpcomm_base_component_t mca_grpcomm_base_selected_component; + +/* + * Base functions + */ +ORTE_DECLSPEC int orte_grpcomm_base_allgather_list(opal_list_t *names, + opal_buffer_t *sbuf, + opal_buffer_t *rbuf); +ORTE_DECLSPEC int orte_grpcomm_base_allgather(opal_buffer_t *sbuf, + opal_buffer_t *rbuf); +ORTE_DECLSPEC int orte_grpcomm_base_barrier(void); +ORTE_DECLSPEC int orte_grpcomm_base_set_proc_attr(const char *attr_name, + const void *data, + size_t size); +ORTE_DECLSPEC int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc, + const char * attribute_name, void **val, + size_t *size); +ORTE_DECLSPEC int orte_grpcomm_base_modex(opal_list_t *procs); +ORTE_DECLSPEC int orte_grpcomm_base_purge_proc_attrs(void); +ORTE_DECLSPEC int orte_grpcomm_base_modex_init(void); +ORTE_DECLSPEC void orte_grpcomm_base_modex_finalize(void); + /* * external API functions will be documented in the mca/grpcomm/grpcomm.h file */ diff --git a/orte/mca/grpcomm/base/grpcomm_base_allgather.c b/orte/mca/grpcomm/base/grpcomm_base_allgather.c new file mode 100644 index 0000000000..f9ba674709 --- /dev/null +++ b/orte/mca/grpcomm/base/grpcomm_base_allgather.c @@ -0,0 +1,437 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" +#include "orte/constants.h" +#include "orte/types.h" + +#include +#ifdef HAVE_SYS_TIME_H +#include +#endif /* HAVE_SYS_TIME_H */ + +#include "opal/threads/condition.h" +#include "opal/util/output.h" + +#include "orte/util/proc_info.h" +#include "opal/dss/dss.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/odls/odls_types.h" +#include "orte/mca/rml/rml.h" +#include "orte/runtime/orte_globals.h" +#include "orte/util/name_fns.h" +#include "orte/orted/orted.h" +#include "orte/runtime/orte_wait.h" + +#include "orte/mca/grpcomm/base/base.h" + +static bool allgather_failed; +static orte_std_cntr_t allgather_num_recvd; +static opal_buffer_t *allgather_buf; + +static void allgather_server_recv(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + int rc; + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather buffer received from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(sender))); + + /* append this data to the allgather_buf */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(allgather_buf, buffer))) { + ORTE_ERROR_LOG(rc); + allgather_failed = true; + return; + } + + /* bump the counter */ + ++allgather_num_recvd; + + /* reissue the recv */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER, + ORTE_RML_NON_PERSISTENT, allgather_server_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + allgather_failed = true; + } +} + +static void allgather_client_recv(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + int rc; + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s grpcomm:base: allgather buffer received", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* transfer the buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(allgather_buf, buffer))) { + ORTE_ERROR_LOG(rc); + allgather_failed = true; + } + + /* bump the counter */ + ++allgather_num_recvd; +} + +int orte_grpcomm_base_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf) +{ + orte_process_name_t name; + int rc; + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: entering allgather", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* everything happens within my jobid */ + name.jobid = ORTE_PROC_MY_NAME->jobid; + + /*** RANK != 0 ***/ + if (0 != ORTE_PROC_MY_NAME->vpid) { + /* everyone but rank=0 sends data */ + name.vpid = 0; + if (0 > orte_rml.send_buffer(&name, sbuf, ORTE_RML_TAG_ALLGATHER_SERVER, 0)) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + return ORTE_ERR_COMM_FAILURE; + } + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather buffer sent", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* setup the buffer that will recv the results */ + allgather_buf = OBJ_NEW(opal_buffer_t); + + /* now receive the final result from rank=0. Be sure to do this in + * a manner that allows us to return without being in a recv! + */ + allgather_num_recvd = 0; + allgather_failed = false; + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_CLIENT, + ORTE_RML_NON_PERSISTENT, allgather_client_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, 1); + + /* if the allgather failed, return an error */ + if (allgather_failed) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + OBJ_RELEASE(allgather_buf); + return ORTE_ERR_COMM_FAILURE; + } + + /* copy payload to the caller's buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, allgather_buf))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(allgather_buf); + return rc; + } + OBJ_RELEASE(allgather_buf); + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather buffer received", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + return ORTE_SUCCESS; + } + + + /*** RANK = 0 ***/ + /* seed the outgoing buffer with the num_procs so it can be unpacked */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(rbuf, &orte_process_info.num_procs, 1, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* put my own information into the outgoing buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, sbuf))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather collecting buffers", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* setup the recv conditions */ + allgather_failed = false; + allgather_num_recvd = 0; + + /* setup the buffer that will recv the results */ + allgather_buf = OBJ_NEW(opal_buffer_t); + + /* post the non-blocking recv */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER, + ORTE_RML_NON_PERSISTENT, allgather_server_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, (orte_std_cntr_t)orte_process_info.num_procs-1); + + /* cancel the lingering recv */ + if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(allgather_buf); + return rc; + } + + /* if the allgather failed, say so */ + if (allgather_failed) { + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather failed!", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + OBJ_RELEASE(allgather_buf); + return ORTE_ERROR; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather xcasting collected data", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* copy the received info to the caller's buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, allgather_buf))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(allgather_buf); + return rc; + } + OBJ_RELEASE(allgather_buf); + + /* xcast the results */ + orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, rbuf, ORTE_RML_TAG_ALLGATHER_CLIENT); + + /* xcast automatically ensures that the sender -always- gets a copy + * of the message. This is required to ensure proper operation of the + * launch system as the HNP -must- get a copy itself. So we have to + * post our own receive here so that we don't leave a message rattling + * around in our RML + */ + /* setup the buffer that will recv the results */ + allgather_buf = OBJ_NEW(opal_buffer_t); + + /* receive the echo'd message. Be sure to do this in + * a manner that allows us to return without being in a recv! + */ + allgather_num_recvd = 0; + allgather_failed = false; + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_CLIENT, + ORTE_RML_NON_PERSISTENT, allgather_client_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, 1); + + /* if the allgather failed, return an error */ + if (allgather_failed) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + OBJ_RELEASE(allgather_buf); + return ORTE_ERR_COMM_FAILURE; + } + + /* don't need the received buffer - we already have what we need in rbuf */ + OBJ_DESTRUCT(allgather_buf); + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: allgather completed", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + return ORTE_SUCCESS; +} + + + +static orte_std_cntr_t allgather_num_sent; +static void allgather_send_cb(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + /* increment the count */ + ++allgather_num_sent; +} + + +int orte_grpcomm_base_allgather_list(opal_list_t *names, opal_buffer_t *sbuf, opal_buffer_t *rbuf) +{ + opal_list_item_t *item; + orte_namelist_t *peer, *root; + orte_std_cntr_t num_peers; + int rc; + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: entering allgather_list", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* the first entry on the list is the "root" that collects + * all the data - everyone else just sends and gets back + * the results + */ + root = (orte_namelist_t*)opal_list_get_first(names); + + /*** NON-ROOT ***/ + if (OPAL_EQUAL != opal_dss.compare(&root->name, ORTE_PROC_MY_NAME, ORTE_NAME)) { + /* everyone but root sends data */ + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather_list: sending my data to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&root->name))); + + if (0 > orte_rml.send_buffer(&root->name, sbuf, ORTE_RML_TAG_ALLGATHER_SERVER, 0)) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + return ORTE_ERR_COMM_FAILURE; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather_list: buffer sent", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* setup the buffer that will recv the results */ + allgather_buf = OBJ_NEW(opal_buffer_t); + + /* now receive the final result from rank=0. Be sure to do this in + * a manner that allows us to return without being in a recv! + */ + allgather_num_recvd = 0; + allgather_failed = false; + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_CLIENT, + ORTE_RML_NON_PERSISTENT, allgather_client_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, 1); + + /* if the allgather failed, return an error */ + if (allgather_failed) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + OBJ_RELEASE(allgather_buf); + return ORTE_ERR_COMM_FAILURE; + } + + /* copy payload to the caller's buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, allgather_buf))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(allgather_buf); + return rc; + } + OBJ_RELEASE(allgather_buf); + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather_list: buffer received", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + return ORTE_SUCCESS; + } + + + /*** ROOT ***/ + /* count how many peers are participating, including myself */ + num_peers = (orte_std_cntr_t)opal_list_get_size(names); + + /* seed the outgoing buffer with the num_procs so it can be unpacked */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(rbuf, &num_peers, 1, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* put my own information into the outgoing buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, sbuf))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* setup the recv conditions */ + allgather_failed = false; + allgather_num_recvd = 0; + + /* setup the buffer that will recv the results */ + allgather_buf = OBJ_NEW(opal_buffer_t); + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather_list: waiting to recv %ld inputs", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (long)num_peers-1)); + + /* post the non-blocking recv */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER, + ORTE_RML_NON_PERSISTENT, allgather_server_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, num_peers-1); + + /* cancel the lingering recv */ + if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(allgather_buf); + return rc; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s allgather_list: received all data", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* copy the received info to the caller's buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, allgather_buf))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(allgather_buf); + return rc; + } + OBJ_RELEASE(allgather_buf); + + /* broadcast the results */ + allgather_num_sent = 0; + for (item = opal_list_get_first(names); + item != opal_list_get_end(names); + item = opal_list_get_next(item)) { + peer = (orte_namelist_t*)item; + + /* skip myself */ + if (OPAL_EQUAL == opal_dss.compare(&root->name, &peer->name, ORTE_NAME)) { + continue; + } + + /* transmit the buffer to this process */ + if (0 > orte_rml.send_buffer_nb(&peer->name, rbuf, ORTE_RML_TAG_ALLGATHER_CLIENT, + 0, allgather_send_cb, 0)) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + return ORTE_ERR_COMM_FAILURE; + } + } + + ORTE_PROGRESSED_WAIT(false, allgather_num_sent, num_peers-1); + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: allgather_list completed", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + return ORTE_SUCCESS; +} diff --git a/orte/mca/grpcomm/base/grpcomm_base_barrier.c b/orte/mca/grpcomm/base/grpcomm_base_barrier.c new file mode 100644 index 0000000000..bb49885f47 --- /dev/null +++ b/orte/mca/grpcomm/base/grpcomm_base_barrier.c @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" +#include "orte/constants.h" +#include "orte/types.h" + +#include +#ifdef HAVE_SYS_TIME_H +#include +#endif /* HAVE_SYS_TIME_H */ + +#include "opal/threads/condition.h" +#include "opal/util/output.h" +#include "opal/util/bit_ops.h" + +#include "opal/class/opal_hash_table.h" +#include "orte/util/proc_info.h" +#include "opal/dss/dss.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/odls/odls_types.h" +#include "orte/mca/rml/rml.h" +#include "orte/runtime/orte_globals.h" +#include "orte/util/name_fns.h" +#include "orte/orted/orted.h" +#include "orte/runtime/orte_wait.h" + +#include "orte/mca/grpcomm/base/base.h" + +static orte_std_cntr_t barrier_num_recvd; +static bool barrier_failed; + +static void barrier_server_recv(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + int rc; + + /* bump counter */ + ++barrier_num_recvd; + /* reissue the recv */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_SERVER, + ORTE_RML_NON_PERSISTENT, barrier_server_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + barrier_failed = true; + } +} + +static void barrier_recv(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + /* bump counter */ + ++barrier_num_recvd; +} + +int orte_grpcomm_base_barrier(void) +{ + orte_process_name_t name; + orte_std_cntr_t i=0; + opal_buffer_t buf; + int rc; + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: entering barrier", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* everything happens within the same jobid */ + name.jobid = ORTE_PROC_MY_NAME->jobid; + + /*** RANK != 0 ***/ + if (0 != ORTE_PROC_MY_NAME->vpid) { + /* All non-root send & receive near-zero-length message. */ + name.vpid = 0; + OBJ_CONSTRUCT(&buf, opal_buffer_t); + opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */ + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s sending barrier", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + rc = orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_BARRIER_SERVER,0); + if (rc < 0) { + ORTE_ERROR_LOG(rc); + return rc; + } + OBJ_DESTRUCT(&buf); + + /* now receive the release from rank=0. Be sure to do this in + * a manner that allows us to return without being in a recv! + */ + barrier_num_recvd = 0; + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_CLIENT, + ORTE_RML_NON_PERSISTENT, barrier_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(false, barrier_num_recvd, 1); + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s received barrier release", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + return ORTE_SUCCESS; + } + + + /*** RANK = 0 ***/ + /* setup to recv the barrier messages from all peers */ + barrier_num_recvd = 0; + barrier_failed = false; + + /* post the non-blocking recv */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_SERVER, + ORTE_RML_NON_PERSISTENT, barrier_server_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(barrier_failed, barrier_num_recvd, (orte_std_cntr_t)orte_process_info.num_procs-1); + + /* if the barrier failed, say so */ + if (barrier_failed) { + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s barrier failed!", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + return ORTE_ERROR; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s barrier xcasting release", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* xcast the release */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */ + orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, &buf, ORTE_RML_TAG_BARRIER_CLIENT); + OBJ_DESTRUCT(&buf); + + /* xcast automatically ensures that the sender -always- gets a copy + * of the message. This is required to ensure proper operation of the + * launch system as the HNP -must- get a copy itself. So we have to + * post our own receive here so that we don't leave a message rattling + * around in our RML + */ + barrier_num_recvd = 0; + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_CLIENT, + ORTE_RML_NON_PERSISTENT, barrier_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(false, barrier_num_recvd, 1); + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: barrier completed", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + return ORTE_SUCCESS; +} + diff --git a/orte/mca/grpcomm/base/grpcomm_base_close.c b/orte/mca/grpcomm/base/grpcomm_base_close.c index 52831bf03e..fbd02452f0 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_close.c +++ b/orte/mca/grpcomm/base/grpcomm_base_close.c @@ -30,10 +30,10 @@ int orte_grpcomm_base_close(void) /* If we have a selected component and module, then finalize it */ if (mca_grpcomm_base_selected) { - mca_grpcomm_base_selected_component.grpcomm_finalize(); + orte_grpcomm.finalize(); } - /* Close all remaining available components (may be one if this is a + /* Close all remaining available components (may be one if this is a OpenRTE program, or [possibly] multiple if this is ompi_info) */ mca_base_components_close(orte_grpcomm_base_output, diff --git a/orte/mca/grpcomm/base/grpcomm_base_modex.c b/orte/mca/grpcomm/base/grpcomm_base_modex.c new file mode 100644 index 0000000000..9f1fcc9590 --- /dev/null +++ b/orte/mca/grpcomm/base/grpcomm_base_modex.c @@ -0,0 +1,557 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" +#include "orte/constants.h" +#include "orte/types.h" + +#include +#ifdef HAVE_SYS_TIME_H +#include +#endif /* HAVE_SYS_TIME_H */ + +#include "opal/threads/condition.h" +#include "opal/util/output.h" +#include "opal/util/bit_ops.h" + +#include "opal/class/opal_hash_table.h" +#include "orte/util/proc_info.h" +#include "opal/dss/dss.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/odls/odls_types.h" +#include "orte/mca/rml/rml.h" +#include "orte/runtime/orte_globals.h" +#include "orte/util/name_fns.h" +#include "orte/orted/orted.h" +#include "orte/runtime/orte_wait.h" + +#include "orte/mca/grpcomm/base/base.h" + + +/*************** MODEX SECTION **************/ + +/** + * MODEX DESIGN + * + * Modex data is always associated with a given orte process name, in + * an opal hash table. The hash table is necessary because modex data is + * received for entire jobids and when working with + * dynamic processes, it is possible we will receive data for a + * process not yet in the ompi_proc_all() list of processes. This + * information must be kept for later use, because if accept/connect + * causes the proc to be added to the ompi_proc_all() list, it could + * cause a connection storm. Therefore, we use an + * opal_hast_table_t backing store to contain all modex information. + * + * While we could add the now discovered proc into the ompi_proc_all() + * list, this has some problems, in that we don't have the + * architecture and hostname information needed to properly fill in + * the ompi_proc_t structure and we don't want to cause RML + * communication to get it when we don't really need to know anything + * about the remote proc. + * + * All data put into the modex (or received from the modex) is + * associated with a given proc,attr_name pair. The data structures + * to maintain this data look something like: + * + * opal_hash_table_t modex_data -> list of attr_proc_t objects + * + * +-----------------------------+ + * | modex_proc_data_t | + * | - opal_list_item_t | + * +-----------------------------+ + * | opal_mutex_t modex_lock | + * | bool modex_received_data | 1 + * | opal_list_t modules | ---------+ + * +-----------------------------+ | + * * | + * +--------------------------------+ <--------+ + * | modex_module_data_t | + * | - opal_list_item_t | + * +--------------------------------+ + * | mca_base_component_t component | + * | void *module_data | + * | size_t module_data_size | + * +--------------------------------+ + * + */ + +/* Local "globals" */ +static orte_std_cntr_t num_entries; +static opal_buffer_t *modex_buffer; +static opal_hash_table_t *modex_data; +static opal_mutex_t mutex; +static opal_condition_t cond; + +/** + * Modex data for a particular orte process + * + * Locking infrastructure and list of module data for a given orte + * process name. The name association is maintained in the + * modex_data hash table. + */ +struct modex_proc_data_t { + /** Structure can be put on lists (including in hash tables) */ + opal_list_item_t super; + /* Lock held whenever the modex data for this proc is being + modified */ + opal_mutex_t modex_lock; + /* True if modex data has ever been received from this process, + false otherwise. */ + bool modex_received_data; + /* List of modex_module_data_t structures containing all data + received from this process, sorted by component name. */ + opal_list_t modex_module_data; +}; +typedef struct modex_proc_data_t modex_proc_data_t; + +static void +modex_construct(modex_proc_data_t * modex) +{ + OBJ_CONSTRUCT(&modex->modex_lock, opal_mutex_t); + modex->modex_received_data = false; + OBJ_CONSTRUCT(&modex->modex_module_data, opal_list_t); +} + +static void +modex_destruct(modex_proc_data_t * modex) +{ + OBJ_DESTRUCT(&modex->modex_module_data); + OBJ_DESTRUCT(&modex->modex_lock); +} + +OBJ_CLASS_INSTANCE(modex_proc_data_t, opal_object_t, + modex_construct, modex_destruct); + + + +/** + * Data for a particular attribute + * + * Container for data for a particular module,attribute pair. This + * structure should be contained in the modex_module_data list in an + * modex_proc_data_t structure to maintain an association with a + * given proc. The list is then searched for a matching attribute + * name. + * + * While searching the list or reading from (or writing to) this + * structure, the lock in the proc_data_t should be held. + */ +struct modex_attr_data_t { + /** Structure can be put on lists */ + opal_list_item_t super; + /** Attribute name */ + char * attr_name; + /** Binary blob of data associated with this proc,component pair */ + void *attr_data; + /** Size (in bytes) of module_data */ + size_t attr_data_size; +}; +typedef struct modex_attr_data_t modex_attr_data_t; + +static void +modex_attr_construct(modex_attr_data_t * module) +{ + module->attr_name = NULL; + module->attr_data = NULL; + module->attr_data_size = 0; +} + +static void +modex_attr_destruct(modex_attr_data_t * module) +{ + if (NULL != module->attr_name) { + free(module->attr_name); + } + if (NULL != module->attr_data) { + free(module->attr_data); + } +} + +OBJ_CLASS_INSTANCE(modex_attr_data_t, + opal_list_item_t, + modex_attr_construct, + modex_attr_destruct); + + +/** + * Find data for a given attribute in a given modex_proc_data_t + * container. + * + * The proc_data's modex_lock must be held during this + * search. + */ +static modex_attr_data_t * +modex_lookup_attr_data(modex_proc_data_t *proc_data, + const char *attr_name, + bool create_if_not_found) +{ + modex_attr_data_t *attr_data = NULL; + for (attr_data = (modex_attr_data_t *) opal_list_get_first(&proc_data->modex_module_data); + attr_data != (modex_attr_data_t *) opal_list_get_end(&proc_data->modex_module_data); + attr_data = (modex_attr_data_t *) opal_list_get_next(attr_data)) { + if (0 == strcmp(attr_name, attr_data->attr_name)) { + return attr_data; + } + } + + if (create_if_not_found) { + attr_data = OBJ_NEW(modex_attr_data_t); + if (NULL == attr_data) return NULL; + + attr_data->attr_name = strdup(attr_name); + opal_list_append(&proc_data->modex_module_data, &attr_data->super); + + return attr_data; + } + + return NULL; +} + + +/** +* Find modex_proc_data_t container associated with given + * orte_process_name_t. + * + * The global lock should *NOT* be held when + * calling this function. + */ +static modex_proc_data_t* +modex_lookup_orte_proc(const orte_process_name_t *orte_proc) +{ + modex_proc_data_t *proc_data = NULL; + + OPAL_THREAD_LOCK(&mutex); + opal_hash_table_get_value_uint64(modex_data, + orte_util_hash_name(orte_proc), (void**)&proc_data); + if (NULL == proc_data) { + /* The proc clearly exists, so create a modex structure + for it */ + proc_data = OBJ_NEW(modex_proc_data_t); + if (NULL == proc_data) { + opal_output(0, "grpcomm_basic_modex_lookup_orte_proc: unable to allocate modex_proc_data_t\n"); + OPAL_THREAD_UNLOCK(&mutex); + return NULL; + } + opal_hash_table_set_value_uint64(modex_data, + orte_util_hash_name(orte_proc), proc_data); + } + OPAL_THREAD_UNLOCK(&mutex); + + return proc_data; +} + +int orte_grpcomm_base_modex_init(void) +{ + OBJ_CONSTRUCT(&mutex, opal_mutex_t); + OBJ_CONSTRUCT(&cond, opal_condition_t); + + modex_data = OBJ_NEW(opal_hash_table_t); + opal_hash_table_init(modex_data, 256); + num_entries = 0; + modex_buffer = OBJ_NEW(opal_buffer_t); + + return ORTE_SUCCESS; +} + +void orte_grpcomm_base_modex_finalize(void) +{ + OBJ_DESTRUCT(&mutex); + OBJ_DESTRUCT(&cond); + + opal_hash_table_remove_all(modex_data); + OBJ_RELEASE(modex_data); + + OBJ_RELEASE(modex_buffer); +} + +int orte_grpcomm_base_set_proc_attr(const char *attr_name, + const void *data, + size_t size) +{ + int rc; + + OPAL_THREAD_LOCK(&mutex); + + /* Pack the attribute name information into the local buffer */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, &attr_name, 1, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* pack the size */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, &size, 1, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* Pack the actual data into the buffer */ + if (0 != size) { + if (ORTE_SUCCESS != (rc = opal_dss.pack(modex_buffer, (void *) data, size, OPAL_BYTE))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + + /* track the number of entries */ + ++num_entries; + +cleanup: + OPAL_THREAD_UNLOCK(&mutex); + + return rc; +} + +int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc, + const char * attribute_name, void **val, + size_t *size) +{ + modex_proc_data_t *proc_data; + modex_attr_data_t *attr_data; + + proc_data = modex_lookup_orte_proc(&proc); + if (NULL == proc_data) return ORTE_ERR_NOT_FOUND; + + OPAL_THREAD_LOCK(&proc_data->modex_lock); + + /* look up attribute */ + attr_data = modex_lookup_attr_data(proc_data, attribute_name, false); + + /* copy the data out to the user */ + if ((NULL == attr_data) || + (attr_data->attr_data_size == 0)) { + opal_output(0, "grpcomm_basic_get_proc_attr: no attr avail or zero byte size"); + *val = NULL; + *size = 0; + } else { + void *copy = malloc(attr_data->attr_data_size); + + if (copy == NULL) { + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + return ORTE_ERR_OUT_OF_RESOURCE; + } + memcpy(copy, attr_data->attr_data, attr_data->attr_data_size); + *val = copy; + *size = attr_data->attr_data_size; + } + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + + return ORTE_SUCCESS; +} + + +int orte_grpcomm_base_modex(opal_list_t *procs) +{ + opal_buffer_t buf, rbuf; + orte_std_cntr_t i, j, num_procs, num_recvd_entries; + void *bytes = NULL; + orte_std_cntr_t cnt; + orte_process_name_t proc_name; + modex_proc_data_t *proc_data; + modex_attr_data_t *attr_data; + int rc; + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: modex entered", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* setup the buffer that will actually be sent */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + OBJ_CONSTRUCT(&rbuf, opal_buffer_t); + + /* put our process name in the buffer so it can be unpacked later */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s modex: reporting %ld entries", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (long)num_entries)); + + /* put the number of entries into the buffer */ + OPAL_THREAD_LOCK(&mutex); + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &num_entries, 1, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&mutex); + goto cleanup; + } + + /* if there are entries, non-destructively copy the data across */ + if (0 < num_entries) { + if (ORTE_SUCCESS != (opal_dss.copy_payload(&buf, modex_buffer))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&mutex); + goto cleanup; + } + } + OPAL_THREAD_UNLOCK(&mutex); + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s modex: executing allgather", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* exchange the buffer with the list of peers (if provided) or all my peers */ + if (NULL == procs) { + if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } else { + if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s modex: processing modex info", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* process the results */ + /* extract the number of procs that put data in the buffer */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output, + "%s modex: received data for %ld procs", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (long)num_procs)); + + /* process the buffer */ + for (i=0; i < num_procs; i++) { + /* unpack the process name */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + /* look up the modex data structure */ + proc_data = modex_lookup_orte_proc(&proc_name); + if (proc_data == NULL) { + /* report the error */ + opal_output(0, "grpcomm_basic_modex: received modex info for unknown proc %s\n", + ORTE_NAME_PRINT(&proc_name)); + rc = ORTE_ERR_NOT_FOUND; + goto cleanup; + } + + /* unpack the number of entries for this proc */ + cnt=1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) { + ORTE_ERROR_LOG(rc); + goto cleanup; + } + + OPAL_THREAD_LOCK(&proc_data->modex_lock); + + /* + * Extract the attribute names and values + */ + for (j = 0; j < num_recvd_entries; j++) { + size_t num_bytes; + char *attr_name; + + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &attr_name, &cnt, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + goto cleanup; + } + + cnt = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + goto cleanup; + } + if (num_bytes != 0) { + if (NULL == (bytes = malloc(num_bytes))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + rc = ORTE_ERR_OUT_OF_RESOURCE; + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + goto cleanup; + } + cnt = (orte_std_cntr_t) num_bytes; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, bytes, &cnt, OPAL_BYTE))) { + ORTE_ERROR_LOG(rc); + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + goto cleanup; + } + num_bytes = cnt; + } else { + bytes = NULL; + } + + /* + * Lookup the corresponding modex structure + */ + if (NULL == (attr_data = modex_lookup_attr_data(proc_data, + attr_name, true))) { + opal_output(0, "grpcomm_basic_modex: modex_lookup_attr_data failed\n"); + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + rc = ORTE_ERR_NOT_FOUND; + goto cleanup; + } + if (NULL != attr_data->attr_data) { + /* some pre-existing value must be here - release it */ + free(attr_data->attr_data); + } + attr_data->attr_data = bytes; + attr_data->attr_data_size = num_bytes; + proc_data->modex_received_data = true; + } + OPAL_THREAD_UNLOCK(&proc_data->modex_lock); + } + +cleanup: + OBJ_DESTRUCT(&buf); + OBJ_DESTRUCT(&rbuf); + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm: modex completed", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + return rc; +} + +int orte_grpcomm_base_purge_proc_attrs(void) +{ + /* + * Purge the attributes + */ + opal_hash_table_remove_all(modex_data); + OBJ_RELEASE(modex_data); + modex_data = OBJ_NEW(opal_hash_table_t); + opal_hash_table_init(modex_data, 256); + + /* + * Clear the modex buffer + */ + OBJ_RELEASE(modex_buffer); + num_entries = 0; + modex_buffer = OBJ_NEW(opal_buffer_t); + + return ORTE_SUCCESS; +} diff --git a/orte/mca/grpcomm/base/grpcomm_base_open.c b/orte/mca/grpcomm/base/grpcomm_base_open.c index bec635ab09..877c99aa6e 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_open.c +++ b/orte/mca/grpcomm/base/grpcomm_base_open.c @@ -46,7 +46,6 @@ opal_list_t mca_grpcomm_base_components_available; orte_grpcomm_base_component_t mca_grpcomm_base_selected_component; - /** * Function for finding and opening either all MCA components, or the one * that was specifically requested via a MCA parameter. diff --git a/orte/mca/grpcomm/base/grpcomm_base_select.c b/orte/mca/grpcomm/base/grpcomm_base_select.c index 4fb69bc44b..096afc36c9 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_select.c +++ b/orte/mca/grpcomm/base/grpcomm_base_select.c @@ -31,71 +31,62 @@ */ int orte_grpcomm_base_select(void) { - opal_list_item_t *item; - mca_base_component_list_item_t *cli; - orte_grpcomm_base_component_t *component, *best_component = NULL; - orte_grpcomm_base_module_t *module, *best_module = NULL; - int priority, best_priority = -1; - - /* Iterate through all the available components */ - - for (item = opal_list_get_first(&mca_grpcomm_base_components_available); - item != opal_list_get_end(&mca_grpcomm_base_components_available); - item = opal_list_get_next(item)) { - cli = (mca_base_component_list_item_t *) item; - component = (orte_grpcomm_base_component_t *) cli->cli_component; - - /* Call the component's init function and see if it wants to be - selected */ - - module = component->grpcomm_init(&priority); - - /* If we got a non-NULL module back, then the component wants to - be selected. So save its multi/hidden values and save the - module with the highest priority */ - - if (NULL != module) { - /* If this is the best one, save it */ - - if (priority > best_priority) { - - /* If there was a previous best one, finalize */ - - if (NULL != best_component) { - best_component->grpcomm_finalize(); + opal_list_item_t *item; + mca_base_component_list_item_t *cli; + orte_grpcomm_base_component_t *component, *best_component = NULL; + orte_grpcomm_base_module_t *module, *best_module = NULL; + int priority, best_priority = -1; + + /* Iterate through all the available components */ + + for (item = opal_list_get_first(&mca_grpcomm_base_components_available); + item != opal_list_get_end(&mca_grpcomm_base_components_available); + item = opal_list_get_next(item)) { + cli = (mca_base_component_list_item_t *) item; + component = (orte_grpcomm_base_component_t *) cli->cli_component; + + /* Call the component's init function and see if it wants to be + * selected + */ + + module = component->grpcomm_init(&priority); + + /* If we got a non-NULL module back, then the component wants to + * be selected. So save the module with the highest priority + */ + + if (NULL != module) { + /* If this is the best one, save it */ + + if (priority > best_priority) { + + /* Save the new best one */ + + best_module = module; + best_component = component; + + /* update the best priority */ + best_priority = priority; + } + } - - /* Save the new best one */ - - best_module = module; - best_component = component; - - /* update the best priority */ - best_priority = priority; - } - - /* If it's not the best one, finalize it */ - - else { - component->grpcomm_finalize(); - } } - } - - /* If we didn't find one to select, barf */ - - if (NULL == best_component) { - return ORTE_ERROR; - } - - /* We have happiness -- save the component and module for later - usage */ - - orte_grpcomm = *best_module; - mca_grpcomm_base_selected_component = *best_component; - mca_grpcomm_base_selected = true; - - /* all done */ - - return ORTE_SUCCESS; + + /* If we didn't find one to select, barf */ + + if (NULL == best_component) { + return ORTE_ERROR; + } + + /* We have happiness */ + orte_grpcomm = *best_module; + if (ORTE_SUCCESS != orte_grpcomm.init()) { + /* ouch! */ + return ORTE_ERROR; + } + mca_grpcomm_base_selected = true; + + /* all done */ + + return ORTE_SUCCESS; } diff --git a/orte/mca/grpcomm/basic/grpcomm_basic.h b/orte/mca/grpcomm/basic/grpcomm_basic.h index 2a52614b48..be402dfb8d 100644 --- a/orte/mca/grpcomm/basic/grpcomm_basic.h +++ b/orte/mca/grpcomm/basic/grpcomm_basic.h @@ -40,18 +40,12 @@ BEGIN_C_DECLS typedef struct { orte_vpid_t xcast_linear_xover; orte_vpid_t xcast_binomial_xover; - orte_std_cntr_t num_active; - opal_mutex_t mutex; - opal_condition_t cond; - opal_hash_table_t modex_data; - opal_buffer_t modex_buffer; - orte_std_cntr_t modex_num_entries; } orte_grpcomm_basic_globals_t; extern orte_grpcomm_basic_globals_t orte_grpcomm_basic; /* - * Module open / close + * Component open / close */ int orte_grpcomm_basic_open(void); int orte_grpcomm_basic_close(void); @@ -59,13 +53,7 @@ orte_grpcomm_base_module_t* orte_grpcomm_basic_init(int *priority); /* - * Startup / Shutdown - */ -int orte_grpcomm_basic_module_init(void); -int orte_grpcomm_basic_finalize(void); - -/* - * xcast interfaces + * Grpcomm interfaces */ ORTE_MODULE_DECLSPEC extern orte_grpcomm_base_component_t mca_grpcomm_basic_component; diff --git a/orte/mca/grpcomm/basic/grpcomm_basic_component.c b/orte/mca/grpcomm/basic/grpcomm_basic_component.c index 8cd7ce4b96..de6302b4a7 100644 --- a/orte/mca/grpcomm/basic/grpcomm_basic_component.c +++ b/orte/mca/grpcomm/basic/grpcomm_basic_component.c @@ -68,8 +68,7 @@ orte_grpcomm_base_component_t mca_grpcomm_basic_component = { /* The component is checkpoint ready */ MCA_BASE_METADATA_PARAM_CHECKPOINT }, - orte_grpcomm_basic_init, /* component init */ - orte_grpcomm_basic_finalize /* component shutdown */ + orte_grpcomm_basic_init /* component init */ }; /* @@ -113,7 +112,6 @@ int orte_grpcomm_basic_open(void) return ORTE_SUCCESS; } -/* Close the component */ int orte_grpcomm_basic_close(void) { return ORTE_SUCCESS; @@ -121,34 +119,8 @@ int orte_grpcomm_basic_close(void) orte_grpcomm_base_module_t* orte_grpcomm_basic_init(int *priority) { - /* initialize globals */ - OBJ_CONSTRUCT(&orte_grpcomm_basic.mutex, opal_mutex_t); - OBJ_CONSTRUCT(&orte_grpcomm_basic.cond, opal_condition_t); - orte_grpcomm_basic.num_active = 0; - - OBJ_CONSTRUCT(&orte_grpcomm_basic.modex_data, opal_hash_table_t); - OBJ_CONSTRUCT(&orte_grpcomm_basic.modex_buffer, opal_buffer_t); - orte_grpcomm_basic.modex_num_entries = 0; - - opal_hash_table_init(&orte_grpcomm_basic.modex_data, 256); - /* we are the default, so set a low priority so we can be overridden */ *priority = 1; return &orte_grpcomm_basic_module; } - -/* - * finalize routine - */ -int orte_grpcomm_basic_finalize(void) -{ - OBJ_DESTRUCT(&orte_grpcomm_basic.mutex); - OBJ_DESTRUCT(&orte_grpcomm_basic.cond); - - opal_hash_table_remove_all(&orte_grpcomm_basic.modex_data); - OBJ_DESTRUCT(&orte_grpcomm_basic.modex_data); - - OBJ_DESTRUCT(&orte_grpcomm_basic.modex_buffer); - return ORTE_SUCCESS; -} diff --git a/orte/mca/grpcomm/basic/grpcomm_basic_module.c b/orte/mca/grpcomm/basic/grpcomm_basic_module.c index af8baf0d41..b5321a0613 100644 --- a/orte/mca/grpcomm/basic/grpcomm_basic_module.c +++ b/orte/mca/grpcomm/basic/grpcomm_basic_module.c @@ -58,13 +58,58 @@ static int xcast_direct(orte_jobid_t job, opal_buffer_t *buffer, orte_rml_tag_t tag); +/* Static API's */ +static int init(void); +static void finalize(void); +static int xcast(orte_jobid_t job, + opal_buffer_t *buffer, + orte_rml_tag_t tag); +static int next_recips(opal_list_t *names, orte_grpcomm_mode_t mode); + +/* Module def */ +orte_grpcomm_base_module_t orte_grpcomm_basic_module = { + init, + finalize, + xcast, + orte_grpcomm_base_allgather, + orte_grpcomm_base_allgather_list, + orte_grpcomm_base_barrier, + next_recips, + orte_grpcomm_base_set_proc_attr, + orte_grpcomm_base_get_proc_attr, + orte_grpcomm_base_modex, + orte_grpcomm_base_purge_proc_attrs +}; + + +/** + * Initialize the module + */ +static int init(void) +{ + int rc; + + if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) { + ORTE_ERROR_LOG(rc); + } + return rc; +} + +/** + * Finalize the module + */ +static void finalize(void) +{ + orte_grpcomm_base_modex_finalize(); +} + + /** * A "broadcast-like" function to a job's processes. * @param jobid The job whose processes are to receive the message * @param buffer The data to broadcast */ -/* Blocking version */ static int xcast(orte_jobid_t job, opal_buffer_t *buffer, orte_rml_tag_t tag) @@ -344,12 +389,6 @@ static int xcast_linear(orte_jobid_t job, /* if we are a daemon or the HNP, get the number of daemons out there */ range = orte_process_info.num_procs; - /* we have to account for all of the messages we are about to send - * because the non-blocking send can come back almost immediately - before - * we would get the chance to increment the num_active. This causes us - * to not correctly wakeup and reset the xcast_in_progress flag - */ - /* send the message to each daemon as fast as we can */ dummy.jobid = ORTE_PROC_MY_HNP->jobid; for (i=0; i < range; i++) { @@ -462,7 +501,7 @@ static int xcast_direct(orte_jobid_t job, { int rc; orte_process_name_t peer; - orte_vpid_t i, num_targets; + orte_vpid_t i, num_targets=0; opal_buffer_t *buf=NULL, *bfr=buffer; orte_daemon_cmd_flag_t command; orte_rml_tag_t target=tag; @@ -676,303 +715,6 @@ CLEANUP: return rc; } -static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf) -{ - orte_process_name_t name; - int rc; - orte_vpid_t i; - opal_buffer_t tmpbuf; - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: entering allgather", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* everything happens within my jobid */ - name.jobid = ORTE_PROC_MY_NAME->jobid; - - if (0 != ORTE_PROC_MY_NAME->vpid) { - /* everyone but rank=0 sends data */ - name.vpid = 0; - if (0 > orte_rml.send_buffer(&name, sbuf, ORTE_RML_TAG_ALLGATHER, 0)) { - ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); - return ORTE_ERR_COMM_FAILURE; - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather buffer sent", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - /* now receive the final result from rank=0 */ - if (0 > orte_rml.recv_buffer(ORTE_NAME_WILDCARD, rbuf, ORTE_RML_TAG_ALLGATHER, 0)) { - ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); - return ORTE_ERR_COMM_FAILURE; - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather buffer received", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - return ORTE_SUCCESS; - } - - /* seed the outgoing buffer with the num_procs so it can be unpacked */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(rbuf, &orte_process_info.num_procs, 1, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - /* put my own information into the outgoing buffer */ - if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, sbuf))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather collecting buffers", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* rank=0 receives everyone else's data */ - for (i=1; i < orte_process_info.num_procs; i++) { - name.vpid = i; - OBJ_CONSTRUCT(&tmpbuf, opal_buffer_t); - if (0 > orte_rml.recv_buffer(&name, &tmpbuf, ORTE_RML_TAG_ALLGATHER, 0)) { - ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); - return ORTE_ERR_COMM_FAILURE; - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather buffer %ld received", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (long)i)); - /* append this data to the rbuf */ - if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, &tmpbuf))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* clear out the tmpbuf */ - OBJ_DESTRUCT(&tmpbuf); - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather xcasting collected data", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* xcast the results */ - xcast(ORTE_PROC_MY_NAME->jobid, rbuf, ORTE_RML_TAG_ALLGATHER); - - /* xcast automatically ensures that the sender -always- gets a copy - * of the message. This is required to ensure proper operation of the - * launch system as the HNP -must- get a copy itself. So we have to - * post our own receive here so that we don't leave a message rattling - * around in our RML - */ - OBJ_CONSTRUCT(&tmpbuf, opal_buffer_t); - if (0 > (rc = orte_rml.recv_buffer(ORTE_NAME_WILDCARD, &tmpbuf, ORTE_RML_TAG_ALLGATHER, 0))) { - ORTE_ERROR_LOG(rc); - return rc; - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather buffer received", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - /* don't need the received buffer - we already have what we need */ - OBJ_DESTRUCT(&tmpbuf); - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: allgather completed", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - return ORTE_SUCCESS; -} - -static int allgather_list(opal_list_t *names, opal_buffer_t *sbuf, opal_buffer_t *rbuf) -{ - opal_list_item_t *item; - orte_namelist_t *peer, *root; - orte_std_cntr_t i, num_peers; - opal_buffer_t tmpbuf; - int rc; - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: entering allgather_list", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* the first entry on the list is the "root" that collects - * all the data - everyone else just sends and gets back - * the results - */ - root = (orte_namelist_t*)opal_list_get_first(names); - - if (OPAL_EQUAL != opal_dss.compare(&root->name, ORTE_PROC_MY_NAME, ORTE_NAME)) { - /* everyone but root sends data */ - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather_list: sending my data to %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&root->name))); - if (0 > (rc = orte_rml.send_buffer(&root->name, sbuf, ORTE_RML_TAG_ALLGATHER_LIST, 0))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* now receive the final result */ - if (0 > (rc = orte_rml.recv_buffer(&root->name, rbuf, ORTE_RML_TAG_ALLGATHER_LIST, 0))) { - ORTE_ERROR_LOG(rc); - return rc; - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather_list: received result", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - return ORTE_SUCCESS; - } - - /* count how many peers are participating, including myself */ - num_peers = (orte_std_cntr_t)opal_list_get_size(names); - - /* seed the outgoing buffer with the num_procs so it can be unpacked */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(rbuf, &num_peers, 1, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - /* put my own information into the outgoing buffer */ - if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, sbuf))) { - ORTE_ERROR_LOG(rc); - return rc; - } - - /* root receives everyone else's data */ - for (i=1; i < num_peers; i++) { - /* receive the buffer from this process */ - OBJ_CONSTRUCT(&tmpbuf, opal_buffer_t); - if (0 > orte_rml.recv_buffer(ORTE_NAME_WILDCARD, &tmpbuf, ORTE_RML_TAG_ALLGATHER_LIST, 0)) { - ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); - return ORTE_ERR_COMM_FAILURE; - } - /* append this data to the rbuf */ - if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, &tmpbuf))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* clear out the tmpbuf */ - OBJ_DESTRUCT(&tmpbuf); - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s allgather_list: received all data", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* broadcast the results */ - for (item = opal_list_get_first(names); - item != opal_list_get_end(names); - item = opal_list_get_next(item)) { - peer = (orte_namelist_t*)item; - - /* skip myself */ - if (OPAL_EQUAL == opal_dss.compare(&root->name, &peer->name, ORTE_NAME)) { - continue; - } - - /* transmit the buffer to this process */ - if (0 > orte_rml.send_buffer(&peer->name, rbuf, ORTE_RML_TAG_ALLGATHER_LIST, 0)) { - ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); - return ORTE_ERR_COMM_FAILURE; - } - } - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: allgather_list completed", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - return ORTE_SUCCESS; -} - - -static int barrier(void) -{ - orte_process_name_t name; - orte_vpid_t i; - opal_buffer_t buf; - int rc; - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: entering barrier", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* everything happens within the same jobid */ - name.jobid = ORTE_PROC_MY_NAME->jobid; - - /* All non-root send & receive zero-length message. */ - if (0 != ORTE_PROC_MY_NAME->vpid) { - name.vpid = 0; - OBJ_CONSTRUCT(&buf, opal_buffer_t); - i=0; - opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */ - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s sending barrier", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - rc = orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_BARRIER,0); - if (rc < 0) { - ORTE_ERROR_LOG(rc); - return rc; - } - OBJ_DESTRUCT(&buf); - - /* get the release from rank=0 */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - rc = orte_rml.recv_buffer(ORTE_NAME_WILDCARD,&buf,ORTE_RML_TAG_BARRIER,0); - if (rc < 0) { - ORTE_ERROR_LOG(rc); - return rc; - } - OBJ_DESTRUCT(&buf); - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s received barrier release", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - return ORTE_SUCCESS; - } - - for (i = 1; i < orte_process_info.num_procs; i++) { - name.vpid = (orte_vpid_t)i; - OBJ_CONSTRUCT(&buf, opal_buffer_t); - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s barrier %ld received", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (long)i)); - rc = orte_rml.recv_buffer(&name,&buf,ORTE_RML_TAG_BARRIER,0); - if (rc < 0) { - ORTE_ERROR_LOG(rc); - return rc; - } - OBJ_DESTRUCT(&buf); - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s barrier xcasting release", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* xcast the release */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */ - xcast(ORTE_PROC_MY_NAME->jobid, &buf, ORTE_RML_TAG_BARRIER); - OBJ_DESTRUCT(&buf); - - /* xcast automatically ensures that the sender -always- gets a copy - * of the message. This is required to ensure proper operation of the - * launch system as the HNP -must- get a copy itself. So we have to - * post our own receive here so that we don't leave a message rattling - * around in our RML - */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - if (0 > (rc = orte_rml.recv_buffer(ORTE_NAME_WILDCARD, &buf, ORTE_RML_TAG_BARRIER, 0))) { - ORTE_ERROR_LOG(rc); - return rc; - } - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s barrier release received", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - OBJ_DESTRUCT(&buf); - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: barrier completed", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - return ORTE_SUCCESS; -} - static int chain_recips(opal_list_t *names) { orte_namelist_t *target; @@ -1023,15 +765,15 @@ static int linear_recips(opal_list_t *names) orte_vpid_t i; /* if we are not the HNP, we just return - only - * the HNP sends in this mode - */ + * the HNP sends in this mode + */ if (!orte_process_info.hnp) { return ORTE_SUCCESS; } /* if we are the HNP, then just add the names of - * all daemons to the list - */ + * all daemons to the list + */ for (i=1; i < orte_process_info.num_procs; i++) { if (NULL == (target = OBJ_NEW(orte_namelist_t))) { return ORTE_ERR_OUT_OF_RESOURCE; @@ -1067,488 +809,3 @@ static int next_recips(opal_list_t *names, orte_grpcomm_mode_t mode) } -/*************** MODEX SECTION **************/ - -/** - * MODEX DESIGN - * - * Modex data is always associated with a given orte process name, in - * an opal hash table. The hash table is necessary because modex data is - * received for entire jobids and when working with - * dynamic processes, it is possible we will receive data for a - * process not yet in the ompi_proc_all() list of processes. This - * information must be kept for later use, because if accept/connect - * causes the proc to be added to the ompi_proc_all() list, it could - * cause a connection storm. Therefore, we use an - * opal_hast_table_t backing store to contain all modex information. - * - * While we could add the now discovered proc into the ompi_proc_all() - * list, this has some problems, in that we don't have the - * architecture and hostname information needed to properly fill in - * the ompi_proc_t structure and we don't want to cause RML - * communication to get it when we don't really need to know anything - * about the remote proc. - * - * All data put into the modex (or received from the modex) is - * associated with a given proc,attr_name pair. The data structures - * to maintain this data look something like: - * - * opal_hash_table_t modex_data -> list of attr_proc_t objects - * - * +-----------------------------+ - * | modex_proc_data_t | - * | - opal_list_item_t | - * +-----------------------------+ - * | opal_mutex_t modex_lock | - * | bool modex_received_data | 1 - * | opal_list_t modules | ---------+ - * +-----------------------------+ | - * * | - * +--------------------------------+ <--------+ - * | modex_module_data_t | - * | - opal_list_item_t | - * +--------------------------------+ - * | mca_base_component_t component | - * | void *module_data | - * | size_t module_data_size | - * +--------------------------------+ - * - */ - - -/** - * Modex data for a particular orte process - * - * Locking infrastructure and list of module data for a given orte - * process name. The name association is maintained in the - * modex_data hash table. - */ -struct modex_proc_data_t { - /** Structure can be put on lists (including in hash tables) */ - opal_list_item_t super; - /* Lock held whenever the modex data for this proc is being - modified */ - opal_mutex_t modex_lock; - /* True if modex data has ever been received from this process, - false otherwise. */ - bool modex_received_data; - /* List of modex_module_data_t structures containing all data - received from this process, sorted by component name. */ - opal_list_t modex_module_data; -}; -typedef struct modex_proc_data_t modex_proc_data_t; - -static void -modex_construct(modex_proc_data_t * modex) -{ - OBJ_CONSTRUCT(&modex->modex_lock, opal_mutex_t); - modex->modex_received_data = false; - OBJ_CONSTRUCT(&modex->modex_module_data, opal_list_t); -} - -static void -modex_destruct(modex_proc_data_t * modex) -{ - OBJ_DESTRUCT(&modex->modex_module_data); - OBJ_DESTRUCT(&modex->modex_lock); -} - -OBJ_CLASS_INSTANCE(modex_proc_data_t, opal_object_t, - modex_construct, modex_destruct); - - - -/** - * Data for a particular attribute - * - * Container for data for a particular module,attribute pair. This - * structure should be contained in the modex_module_data list in an - * modex_proc_data_t structure to maintain an association with a - * given proc. The list is then searched for a matching attribute - * name. - * - * While searching the list or reading from (or writing to) this - * structure, the lock in the proc_data_t should be held. - */ -struct modex_attr_data_t { - /** Structure can be put on lists */ - opal_list_item_t super; - /** Attribute name */ - char * attr_name; - /** Binary blob of data associated with this proc,component pair */ - void *attr_data; - /** Size (in bytes) of module_data */ - size_t attr_data_size; -}; -typedef struct modex_attr_data_t modex_attr_data_t; - -static void -modex_attr_construct(modex_attr_data_t * module) -{ - module->attr_name = NULL; - module->attr_data = NULL; - module->attr_data_size = 0; -} - -static void -modex_attr_destruct(modex_attr_data_t * module) -{ - if (NULL != module->attr_name) { - free(module->attr_name); - } - if (NULL != module->attr_data) { - free(module->attr_data); - } -} - -OBJ_CLASS_INSTANCE(modex_attr_data_t, - opal_list_item_t, - modex_attr_construct, - modex_attr_destruct); - - -/** - * Find data for a given attribute in a given modex_proc_data_t - * container. - * - * The proc_data's modex_lock must be held during this - * search. - */ -static modex_attr_data_t * -modex_lookup_attr_data(modex_proc_data_t *proc_data, - const char *attr_name, - bool create_if_not_found) -{ - modex_attr_data_t *attr_data = NULL; - for (attr_data = (modex_attr_data_t *) opal_list_get_first(&proc_data->modex_module_data); - attr_data != (modex_attr_data_t *) opal_list_get_end(&proc_data->modex_module_data); - attr_data = (modex_attr_data_t *) opal_list_get_next(attr_data)) { - if (0 == strcmp(attr_name, attr_data->attr_name)) { - return attr_data; - } - } - - if (create_if_not_found) { - attr_data = OBJ_NEW(modex_attr_data_t); - if (NULL == attr_data) return NULL; - - attr_data->attr_name = strdup(attr_name); - opal_list_append(&proc_data->modex_module_data, &attr_data->super); - - return attr_data; - } - - return NULL; -} - - -/** -* Find modex_proc_data_t container associated with given - * orte_process_name_t. - * - * The global lock should *NOT* be held when - * calling this function. - */ -static modex_proc_data_t* -modex_lookup_orte_proc(const orte_process_name_t *orte_proc) -{ - modex_proc_data_t *proc_data = NULL; - - OPAL_THREAD_LOCK(&orte_grpcomm_basic.mutex); - opal_hash_table_get_value_uint64(&orte_grpcomm_basic.modex_data, - orte_util_hash_name(orte_proc), (void**)&proc_data); - if (NULL == proc_data) { - /* The proc clearly exists, so create a modex structure - for it */ - proc_data = OBJ_NEW(modex_proc_data_t); - if (NULL == proc_data) { - opal_output(0, "grpcomm_basic_modex_lookup_orte_proc: unable to allocate modex_proc_data_t\n"); - OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex); - return NULL; - } - opal_hash_table_set_value_uint64(&orte_grpcomm_basic.modex_data, - orte_util_hash_name(orte_proc), proc_data); - } - OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex); - - return proc_data; -} - - -static int set_proc_attr(const char *attr_name, - const void *data, - size_t size) -{ - int rc; - - OPAL_THREAD_LOCK(&orte_grpcomm_basic.mutex); - - /* Pack the attribute name information into the local buffer */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&orte_grpcomm_basic.modex_buffer, &attr_name, 1, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* pack the size */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&orte_grpcomm_basic.modex_buffer, &size, 1, OPAL_SIZE))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* Pack the actual data into the buffer */ - if (0 != size) { - if (ORTE_SUCCESS != (rc = opal_dss.pack(&orte_grpcomm_basic.modex_buffer, (void *) data, size, OPAL_BYTE))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - - /* track the number of entries */ - ++orte_grpcomm_basic.modex_num_entries; - -cleanup: - OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex); - - return rc; -} - -static int get_proc_attr(const orte_process_name_t proc, - const char * attribute_name, void **val, - size_t *size) -{ - modex_proc_data_t *proc_data; - modex_attr_data_t *attr_data; - - proc_data = modex_lookup_orte_proc(&proc); - if (NULL == proc_data) return ORTE_ERR_NOT_FOUND; - - OPAL_THREAD_LOCK(&proc_data->modex_lock); - - /* look up attribute */ - attr_data = modex_lookup_attr_data(proc_data, attribute_name, false); - - /* copy the data out to the user */ - if ((NULL == attr_data) || - (attr_data->attr_data_size == 0)) { - opal_output(0, "grpcomm_basic_get_proc_attr: no attr avail or zero byte size"); - *val = NULL; - *size = 0; - } else { - void *copy = malloc(attr_data->attr_data_size); - - if (copy == NULL) { - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - return ORTE_ERR_OUT_OF_RESOURCE; - } - memcpy(copy, attr_data->attr_data, attr_data->attr_data_size); - *val = copy; - *size = attr_data->attr_data_size; - } - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - - return ORTE_SUCCESS; -} - - -static int modex(opal_list_t *procs) -{ - opal_buffer_t buf, rbuf; - orte_std_cntr_t i, j, num_procs, num_entries; - void *bytes = NULL; - orte_std_cntr_t cnt; - orte_process_name_t proc_name; - modex_proc_data_t *proc_data; - modex_attr_data_t *attr_data; - int rc; - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: modex entered", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* setup the buffer that will actually be sent */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - OBJ_CONSTRUCT(&rbuf, opal_buffer_t); - - /* put our process name in the buffer so it can be unpacked later */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* put the number of entries into the buffer */ - OPAL_THREAD_LOCK(&orte_grpcomm_basic.mutex); - if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_grpcomm_basic.modex_num_entries, 1, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex); - goto cleanup; - } - - /* if there are entries, non-destructively copy the data across */ - if (0 < orte_grpcomm_basic.modex_num_entries) { - if (ORTE_SUCCESS != (opal_dss.copy_payload(&buf, &orte_grpcomm_basic.modex_buffer))) { - ORTE_ERROR_LOG(rc); - OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex); - goto cleanup; - } - } - OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex); - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s modex: executing allgather", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* exchange the buffer with the list of peers (if provided) or all my peers */ - if (NULL == procs) { - if (ORTE_SUCCESS != (rc = allgather(&buf, &rbuf))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } else { - if (ORTE_SUCCESS != (rc = allgather_list(procs, &buf, &rbuf))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - } - - OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, - "%s modex: processing modex info", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* process the results */ - /* extract the number of procs that put data in the buffer */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* process the buffer */ - for (i=0; i < num_procs; i++) { - /* unpack the process name */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - /* look up the modex data structure */ - proc_data = modex_lookup_orte_proc(&proc_name); - if (proc_data == NULL) { - /* report the error */ - opal_output(0, "grpcomm_basic_modex: received modex info for unknown proc %s\n", - ORTE_NAME_PRINT(&proc_name)); - rc = ORTE_ERR_NOT_FOUND; - goto cleanup; - } - - /* unpack the number of entries for this proc */ - cnt=1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_entries, &cnt, ORTE_STD_CNTR))) { - ORTE_ERROR_LOG(rc); - goto cleanup; - } - - OPAL_THREAD_LOCK(&proc_data->modex_lock); - - /* - * Extract the attribute names and values - */ - for (j = 0; j < num_entries; j++) { - size_t num_bytes; - char *attr_name; - - cnt = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &attr_name, &cnt, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - goto cleanup; - } - - cnt = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) { - ORTE_ERROR_LOG(rc); - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - goto cleanup; - } - if (num_bytes != 0) { - if (NULL == (bytes = malloc(num_bytes))) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - rc = ORTE_ERR_OUT_OF_RESOURCE; - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - goto cleanup; - } - cnt = (orte_std_cntr_t) num_bytes; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, bytes, &cnt, OPAL_BYTE))) { - ORTE_ERROR_LOG(rc); - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - goto cleanup; - } - num_bytes = cnt; - } else { - bytes = NULL; - } - - /* - * Lookup the corresponding modex structure - */ - if (NULL == (attr_data = modex_lookup_attr_data(proc_data, - attr_name, true))) { - opal_output(0, "grpcomm_basic_modex: modex_lookup_attr_data failed\n"); - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - rc = ORTE_ERR_NOT_FOUND; - goto cleanup; - } - if (NULL != attr_data->attr_data) { - /* some pre-existing value must be here - release it */ - free(attr_data->attr_data); - } - attr_data->attr_data = bytes; - attr_data->attr_data_size = num_bytes; - proc_data->modex_received_data = true; - } - OPAL_THREAD_UNLOCK(&proc_data->modex_lock); - } - -cleanup: - OBJ_DESTRUCT(&buf); - OBJ_DESTRUCT(&rbuf); - - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, - "%s grpcomm: modex completed", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - return rc; -} - -static int purge_proc_attrs(void) -{ - /* - * Purge the attributes - */ - opal_hash_table_remove_all(&orte_grpcomm_basic.modex_data); - OBJ_DESTRUCT(&orte_grpcomm_basic.modex_data); - OBJ_CONSTRUCT(&orte_grpcomm_basic.modex_data, opal_hash_table_t); - opal_hash_table_init(&orte_grpcomm_basic.modex_data, 256); - - /* - * Clear the modex buffer - */ - OBJ_DESTRUCT(&orte_grpcomm_basic.modex_buffer); - OBJ_CONSTRUCT(&orte_grpcomm_basic.modex_buffer, opal_buffer_t); - orte_grpcomm_basic.modex_num_entries = 0; - - return ORTE_SUCCESS; -} - -orte_grpcomm_base_module_t orte_grpcomm_basic_module = { - xcast, - allgather, - allgather_list, - barrier, - next_recips, - set_proc_attr, - get_proc_attr, - modex, - purge_proc_attrs -}; - diff --git a/orte/mca/grpcomm/cnos/grpcomm_cnos.h b/orte/mca/grpcomm/cnos/grpcomm_cnos.h index 4ab4ee9d36..520235fade 100644 --- a/orte/mca/grpcomm/cnos/grpcomm_cnos.h +++ b/orte/mca/grpcomm/cnos/grpcomm_cnos.h @@ -41,13 +41,7 @@ orte_grpcomm_base_module_t* orte_grpcomm_cnos_init(int *priority); /* - * Startup / Shutdown - */ -int orte_grpcomm_cnos_module_init(void); -int orte_grpcomm_cnos_finalize(void); - -/* - * xcast interfaces + * Grpcomm interfaces */ ORTE_MODULE_DECLSPEC extern orte_grpcomm_base_component_t mca_grpcomm_cnos_component; diff --git a/orte/mca/grpcomm/cnos/grpcomm_cnos_component.c b/orte/mca/grpcomm/cnos/grpcomm_cnos_component.c index e7812f8320..c41b48b4b6 100644 --- a/orte/mca/grpcomm/cnos/grpcomm_cnos_component.c +++ b/orte/mca/grpcomm/cnos/grpcomm_cnos_component.c @@ -59,7 +59,6 @@ orte_grpcomm_base_component_t mca_grpcomm_cnos_component = { MCA_BASE_METADATA_PARAM_CHECKPOINT }, orte_grpcomm_cnos_init, /* component init */ - orte_grpcomm_cnos_finalize /* component shutdown */ }; /* @@ -86,10 +85,3 @@ orte_grpcomm_base_module_t* orte_grpcomm_cnos_init(int *priority) return &orte_grpcomm_cnos_module; } -/* - * finalize routine - */ -int orte_grpcomm_cnos_finalize(void) -{ - return ORTE_SUCCESS; -} diff --git a/orte/mca/grpcomm/cnos/grpcomm_cnos_module.c b/orte/mca/grpcomm/cnos/grpcomm_cnos_module.c index 034c588a62..b766c34bef 100644 --- a/orte/mca/grpcomm/cnos/grpcomm_cnos_module.c +++ b/orte/mca/grpcomm/cnos/grpcomm_cnos_module.c @@ -37,6 +37,9 @@ #endif /* API functions */ +static int init(void); +static void finalize(void); + static int xcast(orte_jobid_t job, opal_buffer_t *buffer, orte_rml_tag_t tag); @@ -62,6 +65,8 @@ static int modex(opal_list_t *procs); static int purge_proc_attrs(void); orte_grpcomm_base_module_t orte_grpcomm_cnos_module = { + init, + finalize, xcast, allgather, allgather_list, @@ -73,6 +78,22 @@ orte_grpcomm_base_module_t orte_grpcomm_cnos_module = { purge_proc_attrs }; +/** + * Init the module + */ +static int init(void) +{ + return ORTE_SUCCESS; +} + +/** + * Finalize module + */ +static void finalize(void) +{ + return; +} + /** * A "broadcast-like" function to a job's processes. diff --git a/orte/mca/grpcomm/grpcomm.h b/orte/mca/grpcomm/grpcomm.h index 011dd60d7e..8e37202090 100644 --- a/orte/mca/grpcomm/grpcomm.h +++ b/orte/mca/grpcomm/grpcomm.h @@ -52,6 +52,12 @@ BEGIN_C_DECLS * Component functions - all MUST be provided! */ +/* initialize the selected module */ +typedef int (*orte_grpcomm_base_module_init_fn_t)(void); + +/* finalize the selected module */ +typedef void (*orte_grpcomm_base_module_finalize_fn_t)(void); + /* Send a message to all members of a job - blocking */ typedef int (*orte_grpcomm_base_module_xcast_fn_t)(orte_jobid_t job, opal_buffer_t *buffer, @@ -67,42 +73,45 @@ typedef int (*orte_grpcomm_base_module_allgather_list_fn_t)(opal_list_t *names, typedef int (*orte_grpcomm_base_module_barrier_fn_t)(void); /* for collectives, return next recipients in the chain */ -typedef int (*orte_gprcomm_base_next_recipients_fn_t)(opal_list_t *list, orte_grpcomm_mode_t mode); +typedef int (*orte_gprcomm_base_module_next_recipients_fn_t)(opal_list_t *list, orte_grpcomm_mode_t mode); /** DATA EXCHANGE FUNCTIONS - SEE ompi/runtime/ompi_module_exchange.h FOR A DESCRIPTION * OF HOW THIS ALL WORKS */ /* send an attribute buffer */ -typedef int (*orte_grpcomm_base_modex_set_proc_attr_fn_t)(const char* attr_name, +typedef int (*orte_grpcomm_base_module_modex_set_proc_attr_fn_t)(const char* attr_name, const void *buffer, size_t size); /* get an attribute buffer */ -typedef int (*orte_grpcomm_base_modex_get_proc_attr_fn_t)(const orte_process_name_t name, +typedef int (*orte_grpcomm_base_module_modex_get_proc_attr_fn_t)(const orte_process_name_t name, const char* attr_name, void **buffer, size_t *size); /* perform a modex operation */ -typedef int (*orte_grpcomm_base_modex_fn_t)(opal_list_t *procs); +typedef int (*orte_grpcomm_base_module_modex_fn_t)(opal_list_t *procs); /* purge the internal attr table */ -typedef int (*orte_grpcomm_base_purge_proc_attrs_fn_t)(void); +typedef int (*orte_grpcomm_base_module_purge_proc_attrs_fn_t)(void); /* * Ver 2.0 */ struct orte_grpcomm_base_module_2_0_0_t { - orte_grpcomm_base_module_xcast_fn_t xcast; - orte_grpcomm_base_module_allgather_fn_t allgather; - orte_grpcomm_base_module_allgather_list_fn_t allgather_list; - orte_grpcomm_base_module_barrier_fn_t barrier; - orte_gprcomm_base_next_recipients_fn_t next_recipients; - /* modex support functions */ - orte_grpcomm_base_modex_set_proc_attr_fn_t set_proc_attr; - orte_grpcomm_base_modex_get_proc_attr_fn_t get_proc_attr; - orte_grpcomm_base_modex_fn_t modex; - orte_grpcomm_base_purge_proc_attrs_fn_t purge_proc_attrs; + orte_grpcomm_base_module_init_fn_t init; + orte_grpcomm_base_module_finalize_fn_t finalize; + /* collective operations */ + orte_grpcomm_base_module_xcast_fn_t xcast; + orte_grpcomm_base_module_allgather_fn_t allgather; + orte_grpcomm_base_module_allgather_list_fn_t allgather_list; + orte_grpcomm_base_module_barrier_fn_t barrier; + orte_gprcomm_base_module_next_recipients_fn_t next_recipients; + /* modex functions */ + orte_grpcomm_base_module_modex_set_proc_attr_fn_t set_proc_attr; + orte_grpcomm_base_module_modex_get_proc_attr_fn_t get_proc_attr; + orte_grpcomm_base_module_modex_fn_t modex; + orte_grpcomm_base_module_purge_proc_attrs_fn_t purge_proc_attrs; }; typedef struct orte_grpcomm_base_module_2_0_0_t orte_grpcomm_base_module_2_0_0_t; @@ -113,11 +122,6 @@ typedef orte_grpcomm_base_module_2_0_0_t orte_grpcomm_base_module_t; */ typedef orte_grpcomm_base_module_t* (*orte_grpcomm_base_component_init_fn_t)(int *priority); -/** - * Finalize the selected module - */ -typedef int (*orte_grpcomm_base_component_finalize_fn_t)(void); - /* * the standard component data structure @@ -128,7 +132,6 @@ struct orte_grpcomm_base_component_2_0_0_t { mca_base_component_data_1_0_0_t grpcomm_data; orte_grpcomm_base_component_init_fn_t grpcomm_init; - orte_grpcomm_base_component_finalize_fn_t grpcomm_finalize; }; typedef struct orte_grpcomm_base_component_2_0_0_t orte_grpcomm_base_component_2_0_0_t; typedef orte_grpcomm_base_component_2_0_0_t orte_grpcomm_base_component_t; diff --git a/orte/mca/rml/rml_types.h b/orte/mca/rml/rml_types.h index 77eb145f57..cd1eba6989 100644 --- a/orte/mca/rml/rml_types.h +++ b/orte/mca/rml/rml_types.h @@ -66,31 +66,32 @@ BEGIN_C_DECLS #define ORTE_RML_TAG_RML_ROUTE 13 -#define ORTE_RML_TAG_ALLGATHER 14 -#define ORTE_RML_TAG_ALLGATHER_LIST 15 -#define ORTE_RML_TAG_BARRIER 16 +#define ORTE_RML_TAG_ALLGATHER_SERVER 14 +#define ORTE_RML_TAG_ALLGATHER_CLIENT 15 +#define ORTE_RML_TAG_BARRIER_SERVER 16 +#define ORTE_RML_TAG_BARRIER_CLIENT 17 -#define ORTE_RML_TAG_INIT_ROUTES 17 -#define ORTE_RML_TAG_UPDATE_ROUTES 18 -#define ORTE_RML_TAG_SYNC 19 +#define ORTE_RML_TAG_INIT_ROUTES 18 +#define ORTE_RML_TAG_UPDATE_ROUTES 19 +#define ORTE_RML_TAG_SYNC 20 /* For FileM Base */ -#define ORTE_RML_TAG_FILEM_BASE 20 -#define ORTE_RML_TAG_FILEM_BASE_RESP 21 +#define ORTE_RML_TAG_FILEM_BASE 21 +#define ORTE_RML_TAG_FILEM_BASE_RESP 22 /* For FileM RSH Component */ -#define ORTE_RML_TAG_FILEM_RSH 22 +#define ORTE_RML_TAG_FILEM_RSH 23 /* For SnapC Framework */ -#define ORTE_RML_TAG_SNAPC 23 -#define ORTE_RML_TAG_SNAPC_FULL 24 +#define ORTE_RML_TAG_SNAPC 24 +#define ORTE_RML_TAG_SNAPC_FULL 25 /* For tools */ -#define ORTE_RML_TAG_TOOL 25 +#define ORTE_RML_TAG_TOOL 26 /* support data store/lookup */ -#define ORTE_RML_TAG_DATA_SERVER 26 -#define ORTE_RML_TAG_DATA_CLIENT 27 +#define ORTE_RML_TAG_DATA_SERVER 27 +#define ORTE_RML_TAG_DATA_CLIENT 28 /* support for shared memory collectives */ #define OMPI_RML_TAG_COLL_SM2_BACK_FILE_CREATED 40 diff --git a/orte/orted/orted_comm.c b/orte/orted/orted_comm.c index 10d539103d..40f1d1adba 100644 --- a/orte/orted/orted_comm.c +++ b/orte/orted/orted_comm.c @@ -108,6 +108,9 @@ static void send_relay(int fd, short event, void *data) "%s orte:daemon:send_relay", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + /* setup a list of next recipients */ + OBJ_CONSTRUCT(&recips, opal_list_t); + /* we pass the relay_mode in the mev "tag" field. This is a bit * of a hack as the two sizes may not exactly match. However, since * the rml_tag is an int32, it is doubtful we will ever see a @@ -115,8 +118,28 @@ static void send_relay(int fd, short event, void *data) */ relay_mode = (orte_grpcomm_mode_t)tag; - /* setup a list of next recipients */ - OBJ_CONSTRUCT(&recips, opal_list_t); + /* if the mode is linear and we are the HNP, don't ask for + * next recipients as this will generate a potentially very + * long list! Instead, just look over the known daemons + */ + if (ORTE_GRPCOMM_LINEAR == relay_mode && orte_process_info.hnp) { + orte_process_name_t dummy; + orte_vpid_t i; + + /* send the message to each daemon as fast as we can - but + * not to us! + */ + dummy.jobid = ORTE_PROC_MY_HNP->jobid; + for (i=1; i < orte_process_info.num_procs; i++) { + dummy.vpid = i; + if (0 > (ret = orte_rml.send_buffer(&dummy, buffer, ORTE_RML_TAG_DAEMON, 0))) { + ORTE_ERROR_LOG(ret); + goto CLEANUP; + } + } + goto CLEANUP; + } + /* ask the active grpcomm module for the next recipients */ if (ORTE_SUCCESS != (ret = orte_grpcomm.next_recipients(&recips, relay_mode))) { ORTE_ERROR_LOG(ret);