1
1
openmpi/orte/mca/grpcomm/basic/grpcomm_basic_module.c

1555 строки
54 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#include <string.h>
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif /* HAVE_SYS_TIME_H */
#include "opal/threads/condition.h"
#include "opal/util/output.h"
#include "opal/util/bit_ops.h"
#include "opal/class/opal_hash_table.h"
#include "orte/util/proc_info.h"
#include "opal/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/name_fns.h"
#include "orte/orted/orted.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/grpcomm/base/base.h"
#include "grpcomm_basic.h"
/* Local functions */
static int xcast_binomial_tree(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag);
static int xcast_linear(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag);
static int xcast_direct(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag);
/**
* A "broadcast-like" function to a job's processes.
* @param jobid The job whose processes are to receive the message
* @param buffer The data to broadcast
*/
/* Blocking version */
static int xcast(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag)
{
int rc = ORTE_SUCCESS;
struct timeval start, stop;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:xcast sent to job %s tag %ld",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(job), (long)tag));
/* if there is no message to send, then just return ok */
if (NULL == buffer) {
return ORTE_SUCCESS;
}
if (orte_timing) {
gettimeofday(&start, NULL);
}
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:xcast: num_procs %ld linear xover: %ld binomial xover: %ld",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)orte_process_info.num_procs,
(long)orte_grpcomm_basic.xcast_linear_xover,
(long)orte_grpcomm_basic.xcast_binomial_xover));
if (orte_process_info.num_procs < 2 || orte_abnormal_term_ordered) {
/* if there is only one proc in the system, then we must
* use the direct mode - there is no other option. Note that
* since the HNP is the one that typically does xcast sends,
* only one daemon means that the HNP is sending to
* itself. This is required as an HNP starts
* itself up
*
* NOTE: although we allow users to alter crossover points
* for selecting specific xcast modes, this required
* use-case behavior MUST always be retained or else
* singletons and HNP startup will fail!
*
* We also insist that the direct xcast mode be used when
* an orted has failed as we cannot rely on alternative
* methods to reach all orteds and/or procs
*/
rc = xcast_direct(job, buffer, tag);
goto DONE;
}
/* now use the crossover points to select the proper transmission
* mode. We have built-in default crossover points for this
* decision tree, but the user is free to alter them as
* they wish via MCA params
*/
if (orte_process_info.num_procs < orte_grpcomm_basic.xcast_linear_xover) {
rc = xcast_direct(job, buffer, tag);
} else if (orte_process_info.num_procs < orte_grpcomm_basic.xcast_binomial_xover) {
rc = xcast_linear(job, buffer, tag);
} else {
rc = xcast_binomial_tree(job, buffer, tag);
}
DONE:
if (orte_timing) {
gettimeofday(&stop, NULL);
opal_output(0, "%s grpcomm:xcast: time %ld usec", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long int)((stop.tv_sec - start.tv_sec)*1000000 +
(stop.tv_usec - start.tv_usec)));
}
return rc;
}
static int xcast_binomial_tree(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag)
{
orte_daemon_cmd_flag_t command;
orte_grpcomm_mode_t mode;
int rc;
opal_buffer_t *buf;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:entering xcast_binomial",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* binomial xcast can only go through the daemons as app procs are
* not allowed to relay messages.
* first, need to pack the msg and be sure to include routing info so it
* can properly be sent through the daemons
*/
buf = OBJ_NEW(opal_buffer_t);
/* tell the daemon to process and relay */
command = ORTE_DAEMON_PROCESS_AND_RELAY_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* tell the daemon the routing algorithm this xmission is using */
mode = ORTE_GRPCOMM_BINOMIAL;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &mode, 1, ORTE_GRPCOMM_MODE))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* if this isn't intended for the daemon command tag, then we better
* tell the daemon to deliver it to the procs, and what job is supposed
* to get it - this occurs when a caller just wants to send something
* to all the procs in a job. In that use-case, the caller doesn't know
* anything about inserting daemon commands or what routing algo might
* be used, so we have to help them out a little. Functions that are
* sending commands to the daemons themselves are smart enough to know
* what they need to do.
*/
if (ORTE_RML_TAG_DAEMON != tag) {
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
/* copy the payload into the new buffer - this is non-destructive, so our
* caller is still responsible for releasing any memory in the buffer they
* gave to us
*/
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:xcast_binomial: buffer size %ld",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)buf->bytes_used));
/* all we need to do is send this to the HNP - the relay logic
* will ensure everyone else gets it!
*/
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:xcast_binomial: sending %s => %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
/* if I am the HNP, just set things up so the cmd processor gets called.
* We don't want to message ourselves as this can create circular logic
* in the RML. Instead, this macro will set a zero-time event which will
* cause the buffer to be processed by the cmd processor - probably will
* fire right away, but that's okay
* The macro makes a copy of the buffer, so it's okay to release it here
*/
if (orte_process_info.hnp) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
} else {
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
rc = ORTE_SUCCESS;
}
CLEANUP:
OBJ_RELEASE(buf);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:xcast_binomial: completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static int xcast_linear(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag)
{
int rc;
opal_buffer_t *buf;
orte_daemon_cmd_flag_t command;
orte_vpid_t i, range;
orte_process_name_t dummy;
orte_grpcomm_mode_t mode;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:entering xcast_linear",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* since we have to pack some additional info into the buffer to be
* sent to the daemons, we create a new buffer into which we will
* put the intermediate payload - i.e., the info that goes to the
* daemon. This buffer will contain all the info needed by the
* daemon, plus the payload intended for the processes themselves
*/
buf = OBJ_NEW(opal_buffer_t);
/* if we are an application proc, then send this to our HNP so
* we don't try to talk to every daemon directly ourselves. This
* is necessary since we don't know how many daemons there are!
*/
if (!orte_process_info.hnp && !orte_process_info.daemon) {
/* we are an application proc */
/* tell the HNP to relay */
command = ORTE_DAEMON_PROCESS_AND_RELAY_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* tell the daemon the routing algorithm this xmission is using */
mode = ORTE_GRPCOMM_LINEAR;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &mode, 1, ORTE_GRPCOMM_MODE))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
/* if this isn't intended for the daemon command tag, then we better
* tell the daemon to deliver it to the procs, and what job is supposed
* to get it - this occurs when a caller just wants to send something
* to all the procs in a job. In that use-case, the caller doesn't know
* anything about inserting daemon commands or what routing algo might
* be used, so we have to help them out a little. Functions that are
* sending commands to the daemons themselves are smart enough to know
* what they need to do.
*/
if (ORTE_RML_TAG_DAEMON != tag) {
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
/* copy the payload into the new buffer - this is non-destructive, so our
* caller is still responsible for releasing any memory in the buffer they
* gave to us
*/
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:xcast_linear: buffer size %ld",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)buf->bytes_used));
/* if we are not a daemon or the HNP, then just send this to the HNP */
if (!orte_process_info.hnp && !orte_process_info.daemon) {
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
rc = ORTE_SUCCESS;
goto CLEANUP;
}
/* if we are a daemon or the HNP, get the number of daemons out there */
range = orte_process_info.num_procs;
/* we have to account for all of the messages we are about to send
* because the non-blocking send can come back almost immediately - before
* we would get the chance to increment the num_active. This causes us
* to not correctly wakeup and reset the xcast_in_progress flag
*/
/* send the message to each daemon as fast as we can */
dummy.jobid = ORTE_PROC_MY_HNP->jobid;
for (i=0; i < range; i++) {
dummy.vpid = i;
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:xcast_linear: %s => %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&dummy)));
/* if the target is the HNP and I am the HNP, then just setup to call the cmd processor */
if (0 == i && orte_process_info.hnp) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
} else {
if (0 > (rc = orte_rml.send_buffer(&dummy, buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
}
rc = ORTE_SUCCESS;
CLEANUP:
/* release the buffer */
OBJ_RELEASE(buf);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:xcast_linear: completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static int relay_via_hnp(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag) {
opal_buffer_t *buf;
orte_daemon_cmd_flag_t command;
orte_grpcomm_mode_t mode;
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: relaying buffer to HNP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* since we have to pack some additional info into the buffer
* for this case, we create a new buffer into to contain all the
* info needed plus the payload
*/
buf = OBJ_NEW(opal_buffer_t);
/* start by telling the HNP to relay */
command = ORTE_DAEMON_PROCESS_AND_RELAY_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* default to the LINEAR mode since this is equivalent to
* DIRECT for daemons
*/
mode = ORTE_GRPCOMM_LINEAR;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &mode, 1, ORTE_GRPCOMM_MODE))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* if the target isn't the daemon tag, then we have to add the proper
* command so the daemon's know what to do
*/
if (ORTE_RML_TAG_DAEMON != tag) {
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
/* copy the payload into the new buffer - this is non-destructive, so our
* caller is still responsible for releasing any memory in the buffer they
* gave to us
*/
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
rc = ORTE_SUCCESS;
CLEANUP:
OBJ_RELEASE(buf);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: buffer relayed to HNP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static int xcast_direct(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag)
{
int rc;
orte_process_name_t peer;
orte_vpid_t i, num_targets;
opal_buffer_t *buf=NULL, *bfr=buffer;
orte_daemon_cmd_flag_t command;
orte_rml_tag_t target=tag;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: entering xcast_direct",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* if I am applicaton proc */
if (!orte_process_info.hnp &&
!orte_process_info.daemon &&
!orte_process_info.tool) {
/* if this is going to some job other
* than my own, then we have to send it via the HNP as I have
* no way of knowing how many procs are in the other job.
*/
if (ORTE_PROC_MY_NAME->jobid != job) {
if (ORTE_SUCCESS != (rc = relay_via_hnp(job, buffer, tag))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
/* if it is my jobid, then we can just send this ourselves -
* set the target tag
*/
target = tag;
/* set number of procs to the #procs in our job */
num_targets = orte_process_info.num_procs;
/* point to the right buffer */
bfr = buffer;
/* go to send it */
goto SEND;
}
/* if I am a daemon */
if (orte_process_info.daemon) {
/* if this is going to another job, then I have to relay
* it through the HNP as I have no idea how many procs
* are in that job
*/
if (ORTE_PROC_MY_NAME->jobid != job) {
if (ORTE_SUCCESS != (rc = relay_via_hnp(job, buffer, tag))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
/* if this is going to the daemon job to
* someplace other than the daemon cmd processor, then I need to add
* a command to the buffer so the recipient daemons know what to do
*/
if (ORTE_RML_TAG_DAEMON != tag) {
/* setup a buffer to handle the additional info */
buf = OBJ_NEW(opal_buffer_t);
/* add the proper command so the daemon's know what to do */
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* copy the payload into the new buffer - this is non-destructive, so our
* caller is still responsible for releasing any memory in the buffer they
* gave to us
*/
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* point to correct buffer to be sent */
bfr = buf;
/* send this to the daemon tag so it gets processed correctly */
target = ORTE_RML_TAG_DAEMON;
/* set the number of targets to be the number of daemons */
num_targets = orte_process_info.num_procs;
/* send it */
goto SEND;
}
}
/* if I am the HNP */
if (orte_process_info.hnp) {
orte_job_t *jdata;
/* if this is going to the daemon job */
if (ORTE_PROC_MY_NAME->jobid == job) {
/* if this is going someplace other than the daemon cmd
* processor, then I need to add a command to the buffer
* so the recipient daemons know what to do
*/
if (ORTE_RML_TAG_DAEMON != tag) {
/* since we have to pack some additional info into the buffer
* for this case, we create a new buffer to contain all the
* info needed plus the payload
*/
buf = OBJ_NEW(opal_buffer_t);
/* if the target isn't the daemon tag, then we have to add the proper
* command so the daemon's know what to do
*/
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* copy the payload into the new buffer - this is non-destructive, so our
* caller is still responsible for releasing any memory in the buffer they
* gave to us
*/
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* point to correct buffer to be sent */
bfr = buf;
/* send this to the daemon tag so it gets processed correctly */
target = ORTE_RML_TAG_DAEMON;
/* set the number of targets to be the number of daemons */
num_targets = orte_process_info.num_procs;
/* send it */
goto SEND;
} else {
/* if already going to the daemon tag, then just point to
* the right places and send it
*/
bfr = buffer;
target = tag;
num_targets = orte_process_info.num_procs;
goto SEND;
}
}
/* if this is going to any other job,
* then I need to know the number of procs in that job so I can
* send it
*/
if (NULL == (jdata = orte_get_job_data_object(job))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto CLEANUP;
}
/* set the number of targets */
num_targets = jdata->num_procs;
/* set the tag */
target = tag;
/* point to correct buffer to be sent */
bfr = buffer;
/* send it */
goto SEND;
}
SEND:
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s xcast_direct: buffer size %ld",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)buffer->bytes_used));
peer.jobid = job;
for(i=0; i<num_targets; i++) {
peer.vpid = i;
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s xcast_direct: %s => %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer)));
/* if I am the HNP, just set things up so the cmd processor gets called.
* We don't want to message ourselves as this can create circular logic
* in the RML. Instead, this macro will set a zero-time event which will
* cause the buffer to be processed by the cmd processor - probably will
* fire right away, but that's okay
* The macro makes a copy of the buffer, so it's okay to release it later
*/
if (peer.jobid == ORTE_PROC_MY_NAME->jobid &&
peer.vpid == ORTE_PROC_MY_NAME->vpid &&
orte_process_info.hnp) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, bfr, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
} else {
if (0 > (rc = orte_rml.send_buffer(&peer, bfr, target, 0))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
rc = ORTE_SUCCESS;
}
}
rc = ORTE_SUCCESS;
CLEANUP:
/* release buf if used */
if (NULL != buf) {
OBJ_RELEASE(buf);
}
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: xcast_direct completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
{
orte_process_name_t name;
int rc;
orte_vpid_t i;
opal_buffer_t tmpbuf;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: entering allgather",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* everything happens within my jobid */
name.jobid = ORTE_PROC_MY_NAME->jobid;
if (0 != ORTE_PROC_MY_NAME->vpid) {
/* everyone but rank=0 sends data */
name.vpid = 0;
if (0 > orte_rml.send_buffer(&name, sbuf, ORTE_RML_TAG_ALLGATHER, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
return ORTE_ERR_COMM_FAILURE;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather buffer sent",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* now receive the final result from rank=0 */
if (0 > orte_rml.recv_buffer(ORTE_NAME_WILDCARD, rbuf, ORTE_RML_TAG_ALLGATHER, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
return ORTE_ERR_COMM_FAILURE;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather buffer received",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
/* seed the outgoing buffer with the num_procs so it can be unpacked */
if (ORTE_SUCCESS != (rc = opal_dss.pack(rbuf, &orte_process_info.num_procs, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* put my own information into the outgoing buffer */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, sbuf))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather collecting buffers",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* rank=0 receives everyone else's data */
for (i=1; i < orte_process_info.num_procs; i++) {
name.vpid = i;
OBJ_CONSTRUCT(&tmpbuf, opal_buffer_t);
if (0 > orte_rml.recv_buffer(&name, &tmpbuf, ORTE_RML_TAG_ALLGATHER, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
return ORTE_ERR_COMM_FAILURE;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather buffer %ld received",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (long)i));
/* append this data to the rbuf */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, &tmpbuf))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* clear out the tmpbuf */
OBJ_DESTRUCT(&tmpbuf);
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather xcasting collected data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* xcast the results */
xcast(ORTE_PROC_MY_NAME->jobid, rbuf, ORTE_RML_TAG_ALLGATHER);
/* xcast automatically ensures that the sender -always- gets a copy
* of the message. This is required to ensure proper operation of the
* launch system as the HNP -must- get a copy itself. So we have to
* post our own receive here so that we don't leave a message rattling
* around in our RML
*/
OBJ_CONSTRUCT(&tmpbuf, opal_buffer_t);
if (0 > (rc = orte_rml.recv_buffer(ORTE_NAME_WILDCARD, &tmpbuf, ORTE_RML_TAG_ALLGATHER, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather buffer received",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* don't need the received buffer - we already have what we need */
OBJ_DESTRUCT(&tmpbuf);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: allgather completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
static int allgather_list(opal_list_t *names, opal_buffer_t *sbuf, opal_buffer_t *rbuf)
{
opal_list_item_t *item;
orte_namelist_t *peer, *root;
orte_std_cntr_t i, num_peers;
opal_buffer_t tmpbuf;
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: entering allgather_list",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* the first entry on the list is the "root" that collects
* all the data - everyone else just sends and gets back
* the results
*/
root = (orte_namelist_t*)opal_list_get_first(names);
if (OPAL_EQUAL != opal_dss.compare(&root->name, ORTE_PROC_MY_NAME, ORTE_NAME)) {
/* everyone but root sends data */
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather_list: sending my data to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&root->name)));
if (0 > (rc = orte_rml.send_buffer(&root->name, sbuf, ORTE_RML_TAG_ALLGATHER_LIST, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* now receive the final result */
if (0 > (rc = orte_rml.recv_buffer(&root->name, rbuf, ORTE_RML_TAG_ALLGATHER_LIST, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather_list: received result",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
/* count how many peers are participating, including myself */
num_peers = (orte_std_cntr_t)opal_list_get_size(names);
/* seed the outgoing buffer with the num_procs so it can be unpacked */
if (ORTE_SUCCESS != (rc = opal_dss.pack(rbuf, &num_peers, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* put my own information into the outgoing buffer */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, sbuf))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* root receives everyone else's data */
for (i=1; i < num_peers; i++) {
/* receive the buffer from this process */
OBJ_CONSTRUCT(&tmpbuf, opal_buffer_t);
if (0 > orte_rml.recv_buffer(ORTE_NAME_WILDCARD, &tmpbuf, ORTE_RML_TAG_ALLGATHER_LIST, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
return ORTE_ERR_COMM_FAILURE;
}
/* append this data to the rbuf */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, &tmpbuf))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* clear out the tmpbuf */
OBJ_DESTRUCT(&tmpbuf);
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather_list: received all data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* broadcast the results */
for (item = opal_list_get_first(names);
item != opal_list_get_end(names);
item = opal_list_get_next(item)) {
peer = (orte_namelist_t*)item;
/* skip myself */
if (OPAL_EQUAL == opal_dss.compare(&root->name, &peer->name, ORTE_NAME)) {
continue;
}
/* transmit the buffer to this process */
if (0 > orte_rml.send_buffer(&peer->name, rbuf, ORTE_RML_TAG_ALLGATHER_LIST, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
return ORTE_ERR_COMM_FAILURE;
}
}
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: allgather_list completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
static int barrier(void)
{
orte_process_name_t name;
orte_vpid_t i;
opal_buffer_t buf;
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: entering barrier",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* everything happens within the same jobid */
name.jobid = ORTE_PROC_MY_NAME->jobid;
/* All non-root send & receive zero-length message. */
if (0 != ORTE_PROC_MY_NAME->vpid) {
name.vpid = 0;
OBJ_CONSTRUCT(&buf, opal_buffer_t);
i=0;
opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s sending barrier",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
rc = orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_BARRIER,0);
if (rc < 0) {
ORTE_ERROR_LOG(rc);
return rc;
}
OBJ_DESTRUCT(&buf);
/* get the release from rank=0 */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
rc = orte_rml.recv_buffer(ORTE_NAME_WILDCARD,&buf,ORTE_RML_TAG_BARRIER,0);
if (rc < 0) {
ORTE_ERROR_LOG(rc);
return rc;
}
OBJ_DESTRUCT(&buf);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s received barrier release",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
for (i = 1; i < orte_process_info.num_procs; i++) {
name.vpid = (orte_vpid_t)i;
OBJ_CONSTRUCT(&buf, opal_buffer_t);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s barrier %ld received",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (long)i));
rc = orte_rml.recv_buffer(&name,&buf,ORTE_RML_TAG_BARRIER,0);
if (rc < 0) {
ORTE_ERROR_LOG(rc);
return rc;
}
OBJ_DESTRUCT(&buf);
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s barrier xcasting release",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* xcast the release */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */
xcast(ORTE_PROC_MY_NAME->jobid, &buf, ORTE_RML_TAG_BARRIER);
OBJ_DESTRUCT(&buf);
/* xcast automatically ensures that the sender -always- gets a copy
* of the message. This is required to ensure proper operation of the
* launch system as the HNP -must- get a copy itself. So we have to
* post our own receive here so that we don't leave a message rattling
* around in our RML
*/
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (0 > (rc = orte_rml.recv_buffer(ORTE_NAME_WILDCARD, &buf, ORTE_RML_TAG_BARRIER, 0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s barrier release received",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
OBJ_DESTRUCT(&buf);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: barrier completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
static int chain_recips(opal_list_t *names)
{
orte_namelist_t *target;
/* chain just sends to the next vpid up the line */
if (ORTE_PROC_MY_NAME->vpid < orte_process_info.num_procs-1) {
/* I am not at the end of the chain */
if (NULL == (target = OBJ_NEW(orte_namelist_t))) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
target->name.jobid = ORTE_PROC_MY_NAME->jobid;
target->name.vpid = ORTE_PROC_MY_NAME->vpid + 1;
opal_list_append(names, &target->item);
}
return ORTE_SUCCESS;
}
static int binomial_recips(opal_list_t *names)
{
int i, bitmap, peer, size, rank, hibit, mask;
orte_namelist_t *target;
/* compute the bitmap */
bitmap = opal_cube_dim((int)orte_process_info.num_procs);
rank = (int)ORTE_PROC_MY_NAME->vpid;
size = (int)orte_process_info.num_procs;
hibit = opal_hibit(rank, bitmap);
--bitmap;
for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) {
peer = rank | mask;
if (peer < size) {
if (NULL == (target = OBJ_NEW(orte_namelist_t))) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
target->name.jobid = ORTE_PROC_MY_NAME->jobid;
target->name.vpid = (orte_vpid_t)peer;
opal_list_append(names, &target->item);
}
}
return ORTE_SUCCESS;
}
static int linear_recips(opal_list_t *names)
{
orte_namelist_t *target;
orte_vpid_t i;
/* if we are not the HNP, we just return - only
* the HNP sends in this mode
*/
if (!orte_process_info.hnp) {
return ORTE_SUCCESS;
}
/* if we are the HNP, then just add the names of
* all daemons to the list
*/
for (i=1; i < orte_process_info.num_procs; i++) {
if (NULL == (target = OBJ_NEW(orte_namelist_t))) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
target->name.jobid = ORTE_PROC_MY_NAME->jobid;
target->name.vpid = i;
opal_list_append(names, &target->item);
}
return ORTE_SUCCESS;
}
static int next_recips(opal_list_t *names, orte_grpcomm_mode_t mode)
{
int rc;
/* check the mode to select the proper algo */
switch (mode) {
case ORTE_GRPCOMM_CHAIN:
rc = chain_recips(names);
break;
case ORTE_GRPCOMM_BINOMIAL:
rc = binomial_recips(names);
break;
case ORTE_GRPCOMM_LINEAR:
rc = linear_recips(names);
break;
default:
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
break;
}
return rc;
}
/*************** MODEX SECTION **************/
/**
* MODEX DESIGN
*
* Modex data is always associated with a given orte process name, in
* an opal hash table. The hash table is necessary because modex data is
* received for entire jobids and when working with
* dynamic processes, it is possible we will receive data for a
* process not yet in the ompi_proc_all() list of processes. This
* information must be kept for later use, because if accept/connect
* causes the proc to be added to the ompi_proc_all() list, it could
* cause a connection storm. Therefore, we use an
* opal_hast_table_t backing store to contain all modex information.
*
* While we could add the now discovered proc into the ompi_proc_all()
* list, this has some problems, in that we don't have the
* architecture and hostname information needed to properly fill in
* the ompi_proc_t structure and we don't want to cause RML
* communication to get it when we don't really need to know anything
* about the remote proc.
*
* All data put into the modex (or received from the modex) is
* associated with a given proc,attr_name pair. The data structures
* to maintain this data look something like:
*
* opal_hash_table_t modex_data -> list of attr_proc_t objects
*
* +-----------------------------+
* | modex_proc_data_t |
* | - opal_list_item_t |
* +-----------------------------+
* | opal_mutex_t modex_lock |
* | bool modex_received_data | 1
* | opal_list_t modules | ---------+
* +-----------------------------+ |
* * |
* +--------------------------------+ <--------+
* | modex_module_data_t |
* | - opal_list_item_t |
* +--------------------------------+
* | mca_base_component_t component |
* | void *module_data |
* | size_t module_data_size |
* +--------------------------------+
*
*/
/**
* Modex data for a particular orte process
*
* Locking infrastructure and list of module data for a given orte
* process name. The name association is maintained in the
* modex_data hash table.
*/
struct modex_proc_data_t {
/** Structure can be put on lists (including in hash tables) */
opal_list_item_t super;
/* Lock held whenever the modex data for this proc is being
modified */
opal_mutex_t modex_lock;
/* True if modex data has ever been received from this process,
false otherwise. */
bool modex_received_data;
/* List of modex_module_data_t structures containing all data
received from this process, sorted by component name. */
opal_list_t modex_module_data;
};
typedef struct modex_proc_data_t modex_proc_data_t;
static void
modex_construct(modex_proc_data_t * modex)
{
OBJ_CONSTRUCT(&modex->modex_lock, opal_mutex_t);
modex->modex_received_data = false;
OBJ_CONSTRUCT(&modex->modex_module_data, opal_list_t);
}
static void
modex_destruct(modex_proc_data_t * modex)
{
OBJ_DESTRUCT(&modex->modex_module_data);
OBJ_DESTRUCT(&modex->modex_lock);
}
OBJ_CLASS_INSTANCE(modex_proc_data_t, opal_object_t,
modex_construct, modex_destruct);
/**
* Data for a particular attribute
*
* Container for data for a particular module,attribute pair. This
* structure should be contained in the modex_module_data list in an
* modex_proc_data_t structure to maintain an association with a
* given proc. The list is then searched for a matching attribute
* name.
*
* While searching the list or reading from (or writing to) this
* structure, the lock in the proc_data_t should be held.
*/
struct modex_attr_data_t {
/** Structure can be put on lists */
opal_list_item_t super;
/** Attribute name */
char * attr_name;
/** Binary blob of data associated with this proc,component pair */
void *attr_data;
/** Size (in bytes) of module_data */
size_t attr_data_size;
};
typedef struct modex_attr_data_t modex_attr_data_t;
static void
modex_attr_construct(modex_attr_data_t * module)
{
module->attr_name = NULL;
module->attr_data = NULL;
module->attr_data_size = 0;
}
static void
modex_attr_destruct(modex_attr_data_t * module)
{
if (NULL != module->attr_name) {
free(module->attr_name);
}
if (NULL != module->attr_data) {
free(module->attr_data);
}
}
OBJ_CLASS_INSTANCE(modex_attr_data_t,
opal_list_item_t,
modex_attr_construct,
modex_attr_destruct);
/**
* Find data for a given attribute in a given modex_proc_data_t
* container.
*
* The proc_data's modex_lock must be held during this
* search.
*/
static modex_attr_data_t *
modex_lookup_attr_data(modex_proc_data_t *proc_data,
const char *attr_name,
bool create_if_not_found)
{
modex_attr_data_t *attr_data = NULL;
for (attr_data = (modex_attr_data_t *) opal_list_get_first(&proc_data->modex_module_data);
attr_data != (modex_attr_data_t *) opal_list_get_end(&proc_data->modex_module_data);
attr_data = (modex_attr_data_t *) opal_list_get_next(attr_data)) {
if (0 == strcmp(attr_name, attr_data->attr_name)) {
return attr_data;
}
}
if (create_if_not_found) {
attr_data = OBJ_NEW(modex_attr_data_t);
if (NULL == attr_data) return NULL;
attr_data->attr_name = strdup(attr_name);
opal_list_append(&proc_data->modex_module_data, &attr_data->super);
return attr_data;
}
return NULL;
}
/**
* Find modex_proc_data_t container associated with given
* orte_process_name_t.
*
* The global lock should *NOT* be held when
* calling this function.
*/
static modex_proc_data_t*
modex_lookup_orte_proc(const orte_process_name_t *orte_proc)
{
modex_proc_data_t *proc_data = NULL;
OPAL_THREAD_LOCK(&orte_grpcomm_basic.mutex);
opal_hash_table_get_value_uint64(&orte_grpcomm_basic.modex_data,
orte_util_hash_name(orte_proc), (void**)&proc_data);
if (NULL == proc_data) {
/* The proc clearly exists, so create a modex structure
for it */
proc_data = OBJ_NEW(modex_proc_data_t);
if (NULL == proc_data) {
opal_output(0, "grpcomm_basic_modex_lookup_orte_proc: unable to allocate modex_proc_data_t\n");
OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex);
return NULL;
}
opal_hash_table_set_value_uint64(&orte_grpcomm_basic.modex_data,
orte_util_hash_name(orte_proc), proc_data);
}
OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex);
return proc_data;
}
static int set_proc_attr(const char *attr_name,
const void *data,
size_t size)
{
int rc;
OPAL_THREAD_LOCK(&orte_grpcomm_basic.mutex);
/* Pack the attribute name information into the local buffer */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&orte_grpcomm_basic.modex_buffer, &attr_name, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* pack the size */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&orte_grpcomm_basic.modex_buffer, &size, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* Pack the actual data into the buffer */
if (0 != size) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(&orte_grpcomm_basic.modex_buffer, (void *) data, size, OPAL_BYTE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
/* track the number of entries */
++orte_grpcomm_basic.modex_num_entries;
cleanup:
OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex);
return rc;
}
static int get_proc_attr(const orte_process_name_t proc,
const char * attribute_name, void **val,
size_t *size)
{
modex_proc_data_t *proc_data;
modex_attr_data_t *attr_data;
proc_data = modex_lookup_orte_proc(&proc);
if (NULL == proc_data) return ORTE_ERR_NOT_FOUND;
OPAL_THREAD_LOCK(&proc_data->modex_lock);
/* look up attribute */
attr_data = modex_lookup_attr_data(proc_data, attribute_name, false);
/* copy the data out to the user */
if ((NULL == attr_data) ||
(attr_data->attr_data_size == 0)) {
opal_output(0, "grpcomm_basic_get_proc_attr: no attr avail or zero byte size");
*val = NULL;
*size = 0;
} else {
void *copy = malloc(attr_data->attr_data_size);
if (copy == NULL) {
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
return ORTE_ERR_OUT_OF_RESOURCE;
}
memcpy(copy, attr_data->attr_data, attr_data->attr_data_size);
*val = copy;
*size = attr_data->attr_data_size;
}
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
return ORTE_SUCCESS;
}
static int modex(opal_list_t *procs)
{
opal_buffer_t buf, rbuf;
orte_std_cntr_t i, j, num_procs, num_entries;
void *bytes = NULL;
orte_std_cntr_t cnt;
orte_process_name_t proc_name;
modex_proc_data_t *proc_data;
modex_attr_data_t *attr_data;
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: modex entered",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* setup the buffer that will actually be sent */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
OBJ_CONSTRUCT(&rbuf, opal_buffer_t);
/* put our process name in the buffer so it can be unpacked later */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* put the number of entries into the buffer */
OPAL_THREAD_LOCK(&orte_grpcomm_basic.mutex);
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_grpcomm_basic.modex_num_entries, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex);
goto cleanup;
}
/* if there are entries, non-destructively copy the data across */
if (0 < orte_grpcomm_basic.modex_num_entries) {
if (ORTE_SUCCESS != (opal_dss.copy_payload(&buf, &orte_grpcomm_basic.modex_buffer))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex);
goto cleanup;
}
}
OPAL_THREAD_UNLOCK(&orte_grpcomm_basic.mutex);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s modex: executing allgather",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* exchange the buffer with the list of peers (if provided) or all my peers */
if (NULL == procs) {
if (ORTE_SUCCESS != (rc = allgather(&buf, &rbuf))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
} else {
if (ORTE_SUCCESS != (rc = allgather_list(procs, &buf, &rbuf))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s modex: processing modex info",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* process the results */
/* extract the number of procs that put data in the buffer */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* process the buffer */
for (i=0; i < num_procs; i++) {
/* unpack the process name */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* look up the modex data structure */
proc_data = modex_lookup_orte_proc(&proc_name);
if (proc_data == NULL) {
/* report the error */
opal_output(0, "grpcomm_basic_modex: received modex info for unknown proc %s\n",
ORTE_NAME_PRINT(&proc_name));
rc = ORTE_ERR_NOT_FOUND;
goto cleanup;
}
/* unpack the number of entries for this proc */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_entries, &cnt, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_THREAD_LOCK(&proc_data->modex_lock);
/*
* Extract the attribute names and values
*/
for (j = 0; j < num_entries; j++) {
size_t num_bytes;
char *attr_name;
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &attr_name, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
goto cleanup;
}
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
goto cleanup;
}
if (num_bytes != 0) {
if (NULL == (bytes = malloc(num_bytes))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
rc = ORTE_ERR_OUT_OF_RESOURCE;
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
goto cleanup;
}
cnt = (orte_std_cntr_t) num_bytes;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, bytes, &cnt, OPAL_BYTE))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
goto cleanup;
}
num_bytes = cnt;
} else {
bytes = NULL;
}
/*
* Lookup the corresponding modex structure
*/
if (NULL == (attr_data = modex_lookup_attr_data(proc_data,
attr_name, true))) {
opal_output(0, "grpcomm_basic_modex: modex_lookup_attr_data failed\n");
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
rc = ORTE_ERR_NOT_FOUND;
goto cleanup;
}
if (NULL != attr_data->attr_data) {
/* some pre-existing value must be here - release it */
free(attr_data->attr_data);
}
attr_data->attr_data = bytes;
attr_data->attr_data_size = num_bytes;
proc_data->modex_received_data = true;
}
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
}
cleanup:
OBJ_DESTRUCT(&buf);
OBJ_DESTRUCT(&rbuf);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: modex completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static int purge_proc_attrs(void)
{
/*
* Purge the attributes
*/
opal_hash_table_remove_all(&orte_grpcomm_basic.modex_data);
OBJ_DESTRUCT(&orte_grpcomm_basic.modex_data);
OBJ_CONSTRUCT(&orte_grpcomm_basic.modex_data, opal_hash_table_t);
opal_hash_table_init(&orte_grpcomm_basic.modex_data, 256);
/*
* Clear the modex buffer
*/
OBJ_DESTRUCT(&orte_grpcomm_basic.modex_buffer);
OBJ_CONSTRUCT(&orte_grpcomm_basic.modex_buffer, opal_buffer_t);
orte_grpcomm_basic.modex_num_entries = 0;
return ORTE_SUCCESS;
}
orte_grpcomm_base_module_t orte_grpcomm_basic_module = {
xcast,
allgather,
allgather_list,
barrier,
next_recips,
set_proc_attr,
get_proc_attr,
modex,
purge_proc_attrs
};