Complete implementation of the multicast-based grpcomm module
This commit was SVN r24548.
Этот коммит содержится в:
родитель
fa40f5d7c3
Коммит
795ca2cff2
@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
#include "opal/dss/dss.h"
|
#include "opal/dss/dss.h"
|
||||||
#include "opal/runtime/opal.h"
|
#include "opal/runtime/opal.h"
|
||||||
|
#include "opal/runtime/opal_progress.h"
|
||||||
#include "opal/threads/mutex.h"
|
#include "opal/threads/mutex.h"
|
||||||
#include "opal/threads/condition.h"
|
#include "opal/threads/condition.h"
|
||||||
|
|
||||||
@ -49,9 +50,6 @@ static int xcast(orte_jobid_t job,
|
|||||||
static int mcast_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
|
static int mcast_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
|
||||||
static int mcast_barrier(void);
|
static int mcast_barrier(void);
|
||||||
static int modex(opal_list_t *procs);
|
static int modex(opal_list_t *procs);
|
||||||
static int get_proc_attr(const orte_process_name_t proc,
|
|
||||||
const char * attribute_name, void **val,
|
|
||||||
size_t *size);
|
|
||||||
|
|
||||||
/* Module def */
|
/* Module def */
|
||||||
orte_grpcomm_base_module_t orte_grpcomm_mcast_module = {
|
orte_grpcomm_base_module_t orte_grpcomm_mcast_module = {
|
||||||
@ -62,7 +60,7 @@ orte_grpcomm_base_module_t orte_grpcomm_mcast_module = {
|
|||||||
orte_grpcomm_base_allgather_list,
|
orte_grpcomm_base_allgather_list,
|
||||||
mcast_barrier,
|
mcast_barrier,
|
||||||
orte_grpcomm_base_set_proc_attr,
|
orte_grpcomm_base_set_proc_attr,
|
||||||
get_proc_attr,
|
orte_grpcomm_base_get_proc_attr,
|
||||||
modex,
|
modex,
|
||||||
orte_grpcomm_base_purge_proc_attrs
|
orte_grpcomm_base_purge_proc_attrs
|
||||||
};
|
};
|
||||||
@ -108,6 +106,13 @@ static int init(void)
|
|||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||||
|
ORTE_RML_TAG_DAEMON_COLLECTIVE,
|
||||||
|
ORTE_RML_NON_PERSISTENT,
|
||||||
|
orte_grpcomm_base_daemon_coll_recv,
|
||||||
|
NULL))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
@ -158,7 +163,7 @@ static int xcast(orte_jobid_t job,
|
|||||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||||
|
|
||||||
/* insert the target tag */
|
/* insert the target tag */
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &tag, 1, ORTE_RML_TAG_T))) {
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &tag, 1, ORTE_RML_TAG))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
goto CLEANUP;
|
goto CLEANUP;
|
||||||
}
|
}
|
||||||
@ -180,7 +185,7 @@ static int xcast(orte_jobid_t job,
|
|||||||
* for processing
|
* for processing
|
||||||
*/
|
*/
|
||||||
n=1;
|
n=1;
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &rmltag, &n, ORTE_RML_TAG_T))) {
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &rmltag, &n, ORTE_RML_TAG))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
goto CLEANUP;
|
goto CLEANUP;
|
||||||
}
|
}
|
||||||
@ -355,47 +360,15 @@ static int modex(opal_list_t *procs)
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
if (NULL != procs) {
|
if (NULL != procs) {
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_full_modex(procs, false))) {
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_full_modex(procs, true))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
}
|
} else {
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(true))) {
|
||||||
if (OPAL_ENABLE_HETEROGENEOUS_SUPPORT) {
|
ORTE_ERROR_LOG(rc);
|
||||||
/* decide if we need to add the architecture to the modex. Check
|
|
||||||
* first to see if hetero is enabled - if not, then we clearly
|
|
||||||
* don't need to exchange arch's as they are all identical
|
|
||||||
*/
|
|
||||||
/* Case 1: If different apps in this job were built differently - e.g., some
|
|
||||||
* are built 32-bit while others are built 64-bit - then we need to modex
|
|
||||||
* regardless of any other consideration. The user is reqd to tell us via a
|
|
||||||
* cmd line option if this situation exists, which will result in an mca param
|
|
||||||
* being set for us, so all we need to do is check for the global boolean
|
|
||||||
* that corresponds to that param
|
|
||||||
*
|
|
||||||
* Case 2: the nodes are hetero, but the app binaries were built
|
|
||||||
* the same - i.e., either they are both 32-bit, or they are both 64-bit, but
|
|
||||||
* no mixing of the two. In this case, we include the info in the modex
|
|
||||||
*/
|
|
||||||
if (orte_hetero_apps || !orte_homogeneous_nodes) {
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
|
||||||
"%s grpcomm:mcast: modex is required",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
||||||
|
|
||||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_peer_modex(false))) {
|
|
||||||
ORTE_ERROR_LOG(rc);
|
|
||||||
}
|
|
||||||
return rc;
|
|
||||||
}
|
}
|
||||||
}
|
return rc;
|
||||||
|
|
||||||
/* no modex is required - see if the data was included in the launch message */
|
|
||||||
if (orte_send_profile) {
|
|
||||||
/* the info was provided in the nidmap - there is nothing more we have to do */
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
|
||||||
"%s grpcomm:mcast:modex using nidmap",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
||||||
return ORTE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||||
@ -405,57 +378,165 @@ static int modex(opal_list_t *procs)
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int get_proc_attr(const orte_process_name_t proc,
|
static void process_msg(opal_buffer_t *data)
|
||||||
const char * attribute_name, void **val,
|
|
||||||
size_t *size)
|
|
||||||
{
|
{
|
||||||
orte_nid_t *nid;
|
orte_jobid_t jobid;
|
||||||
|
orte_odls_job_t *jobdat;
|
||||||
|
orte_odls_child_t *child;
|
||||||
|
orte_std_cntr_t n;
|
||||||
opal_list_item_t *item;
|
opal_list_item_t *item;
|
||||||
orte_attr_t *attr;
|
int32_t num_contributors;
|
||||||
|
opal_buffer_t buf;
|
||||||
|
int rc;
|
||||||
|
int32_t numc;
|
||||||
|
orte_rml_tag_t rmltag;
|
||||||
|
opal_list_t daemons;
|
||||||
|
orte_process_name_t proc;
|
||||||
|
orte_vpid_t i, daemonvpid;
|
||||||
|
orte_namelist_t *nm;
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
||||||
|
"%s grpcomm:mcast:daemon_coll: daemon collective called",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
/* find this proc's node in the nidmap */
|
/* unpack the jobid using this collective */
|
||||||
if (NULL == (nid = orte_util_lookup_nid((orte_process_name_t*)&proc))) {
|
n = 1;
|
||||||
/* proc wasn't found - return error */
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobid, &n, ORTE_JOBID))) {
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
ORTE_ERROR_LOG(rc);
|
||||||
"%s grpcomm:mcast:get_proc_attr: no modex entry for proc %s",
|
return;
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
||||||
ORTE_NAME_PRINT(&proc)));
|
|
||||||
return ORTE_ERR_NOT_FOUND;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* look for this attribute */
|
/* lookup the job record for it */
|
||||||
for (item = opal_list_get_first(&nid->attrs);
|
jobdat = NULL;
|
||||||
item != opal_list_get_end(&nid->attrs);
|
for (item = opal_list_get_first(&orte_local_jobdata);
|
||||||
|
item != opal_list_get_end(&orte_local_jobdata);
|
||||||
item = opal_list_get_next(item)) {
|
item = opal_list_get_next(item)) {
|
||||||
attr = (orte_attr_t*)item;
|
jobdat = (orte_odls_job_t*)item;
|
||||||
if (0 == strcmp(attr->name, attribute_name)) {
|
|
||||||
/* copy the data to the caller */
|
/* is this the specified job? */
|
||||||
void *copy = malloc(attr->size);
|
if (jobdat->jobid == jobid) {
|
||||||
|
break;
|
||||||
if (copy == NULL) {
|
|
||||||
return ORTE_ERR_OUT_OF_RESOURCE;
|
|
||||||
}
|
|
||||||
memcpy(copy, attr->bytes, attr->size);
|
|
||||||
*val = copy;
|
|
||||||
*size = attr->size;
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
|
||||||
"%s grpcomm:mcast:get_proc_attr: found %d bytes for attr %s on proc %s",
|
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)attr->size,
|
|
||||||
attribute_name, ORTE_NAME_PRINT(&proc)));
|
|
||||||
return ORTE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (NULL == jobdat) {
|
||||||
|
/* race condition - someone sent us a collective before we could
|
||||||
|
* parse the add_local_procs cmd. Just add the jobdat object
|
||||||
|
* and continue
|
||||||
|
*/
|
||||||
|
jobdat = OBJ_NEW(orte_odls_job_t);
|
||||||
|
jobdat->jobid = jobid;
|
||||||
|
opal_list_append(&orte_local_jobdata, &jobdat->super);
|
||||||
|
}
|
||||||
|
|
||||||
/* get here if attribute isn't found */
|
/* it may be possible to get here prior to having actually finished processing our
|
||||||
|
* local launch msg due to the race condition between different nodes and when
|
||||||
|
* they start their individual procs. Hence, we have to first ensure that we
|
||||||
|
* -have- finished processing the launch msg, or else we won't know whether
|
||||||
|
* or not to wait before sending this on
|
||||||
|
*/
|
||||||
|
OPAL_THREAD_LOCK(&jobdat->lock);
|
||||||
|
while (!jobdat->launch_msg_processed) {
|
||||||
|
opal_condition_wait(&jobdat->cond, &jobdat->lock);
|
||||||
|
}
|
||||||
|
OPAL_THREAD_UNLOCK(&jobdat->lock);
|
||||||
|
|
||||||
|
/* unpack the tag for this collective */
|
||||||
|
n = 1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &rmltag, &n, ORTE_RML_TAG))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* unpack the number of contributors in this data bucket */
|
||||||
|
n = 1;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &num_contributors, &n, OPAL_INT32))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
jobdat->num_contributors += num_contributors;
|
||||||
|
|
||||||
|
/* xfer the data */
|
||||||
|
opal_dss.copy_payload(&jobdat->collection_bucket, data);
|
||||||
|
|
||||||
|
/* count the number of participants collected */
|
||||||
|
jobdat->num_collected++;
|
||||||
|
|
||||||
|
/* if we haven't already done so, figure out how many participants we
|
||||||
|
* should be expecting
|
||||||
|
*/
|
||||||
|
if (jobdat->num_participating < 0) {
|
||||||
|
/* initialize the counter */
|
||||||
|
jobdat->num_participating = 0;
|
||||||
|
/* create a list of all daemons in job */
|
||||||
|
OBJ_CONSTRUCT(&daemons, opal_list_t);
|
||||||
|
for (i=0; i < orte_process_info.num_procs; i++) {
|
||||||
|
nm = OBJ_NEW(orte_namelist_t);
|
||||||
|
nm->name.vpid = i;
|
||||||
|
opal_list_append(&daemons, &nm->item);
|
||||||
|
}
|
||||||
|
/* count the number of daemons that have procs for this job */
|
||||||
|
proc.jobid = jobid;
|
||||||
|
proc.vpid = 0;
|
||||||
|
while (proc.vpid < jobdat->num_procs && 0 < opal_list_get_size(&daemons)) {
|
||||||
|
/* get the daemon that hosts this proc */
|
||||||
|
daemonvpid = orte_ess.proc_get_daemon(&proc);
|
||||||
|
/* is it on list? */
|
||||||
|
for (item = opal_list_get_first(&daemons);
|
||||||
|
item != opal_list_get_end(&daemons);
|
||||||
|
item = opal_list_get_next(item)) {
|
||||||
|
nm = (orte_namelist_t*)item;
|
||||||
|
if (nm->name.vpid == daemonvpid) {
|
||||||
|
jobdat->num_participating++;
|
||||||
|
opal_list_remove_item(&daemons, item);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* next proc */
|
||||||
|
proc.vpid++;
|
||||||
|
}
|
||||||
|
OBJ_DESTRUCT(&daemons);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (jobdat->num_collected != jobdat->num_participating) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output,
|
||||||
"%s grpcomm:mcast:get_proc_attr: no attr avail or zero byte size for proc %s attribute %s",
|
"%s grpcomm:mcast:daemon_coll: collection complete",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
ORTE_NAME_PRINT(&proc), attribute_name));
|
|
||||||
*val = NULL;
|
|
||||||
*size = 0;
|
|
||||||
|
|
||||||
return ORTE_SUCCESS;
|
/* setup a buffer to send the results back to the job members */
|
||||||
|
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||||
|
|
||||||
|
/* add any collected data */
|
||||||
|
numc = jobdat->num_contributors;
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &numc, 1, OPAL_INT32))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, &jobdat->collection_bucket))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
/* reset everything for next collective */
|
||||||
|
jobdat->num_contributors = 0;
|
||||||
|
jobdat->num_collected = 0;
|
||||||
|
OBJ_DESTRUCT(&jobdat->collection_bucket);
|
||||||
|
OBJ_CONSTRUCT(&jobdat->collection_bucket, opal_buffer_t);
|
||||||
|
/* send the buffer to each of my children from this job */
|
||||||
|
if (0 < jobdat->num_local_procs) {
|
||||||
|
for (item = opal_list_get_first(&orte_local_children);
|
||||||
|
item != opal_list_get_end(&orte_local_children);
|
||||||
|
item = opal_list_get_next(item)) {
|
||||||
|
child = (orte_odls_child_t*)item;
|
||||||
|
|
||||||
|
if (child->name->jobid == jobdat->jobid) {
|
||||||
|
if (0 > (rc = orte_rml.send_buffer(child->name, &buf, rmltag, 0))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void daemon_recv(int status,
|
static void daemon_recv(int status,
|
||||||
@ -469,42 +550,48 @@ static void daemon_recv(int status,
|
|||||||
orte_rml_tag_t rmltag;
|
orte_rml_tag_t rmltag;
|
||||||
int rc;
|
int rc;
|
||||||
|
|
||||||
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||||
|
"%s grpcomm:mcast: recvd mcast message",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
|
||||||
/* unpack the rml tag */
|
/* unpack the rml tag */
|
||||||
n=1;
|
n=1;
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &rmltag, &n, ORTE_RML_TAG_T))) {
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &rmltag, &n, ORTE_RML_TAG))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (tag) {
|
switch (rmltag) {
|
||||||
case ORTE_RML_TAG_DAEMON:
|
case ORTE_RML_TAG_DAEMON:
|
||||||
/* this is a cmd, so deliver it */
|
/* this is a cmd, so deliver it */
|
||||||
ORTE_MESSAGE_EVENT(sender, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||||
break;
|
"%s grpcomm:mcast: recvd command",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
ORTE_MESSAGE_EVENT(sender, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
|
||||||
|
/* cycle the progress engine since we are not in the event lib */
|
||||||
|
opal_progress();
|
||||||
|
break;
|
||||||
|
|
||||||
case ORTE_RML_TAG_BARRIER:
|
case ORTE_RML_TAG_BARRIER:
|
||||||
OPAL_THREAD_LOCK(&barrier.lock);
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||||
/* the recv is the trigger */
|
"%s grpcomm:mcast: recvd barrier",
|
||||||
barrier.recvd = 1;
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
opal_condition_broadcast(&barrier.cond);
|
/* send barrier release to my children */
|
||||||
OPAL_THREAD_UNLOCK(&barrier.lock);
|
process_msg(buf);
|
||||||
|
break;
|
||||||
|
|
||||||
break;
|
case ORTE_RML_TAG_ALLGATHER:
|
||||||
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||||
case ORTE_RML_TAG_ALLGATHER:
|
"%s grpcomm:mcast: recvd allgather",
|
||||||
OPAL_THREAD_LOCK(&allgather.lock);
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
allgather.recvd += 1;
|
process_msg(buf);
|
||||||
/* xfer the data */
|
break;
|
||||||
opal_dss.copy_payload(&allgather.results, buf);
|
|
||||||
/* check for completion */
|
|
||||||
if (orte_process_info.num_procs <= allgather.recvd) {
|
|
||||||
opal_condition_broadcast(&allgather.cond);
|
|
||||||
}
|
|
||||||
OPAL_THREAD_UNLOCK(&allgather.lock);
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
break;
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||||
|
"%s grpcomm:mcast: recvd unrecognized tag %d",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag));
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -547,7 +634,29 @@ void orte_grpcomm_mcast_daemon_coll(orte_process_name_t* sender, opal_buffer_t*
|
|||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
goto CLEANUP;
|
goto CLEANUP;
|
||||||
}
|
}
|
||||||
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &rmltag, 1, ORTE_RML_TAG))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
goto CLEANUP;
|
||||||
|
}
|
||||||
|
|
||||||
CLEANUP:
|
/* copy the rest of the data */
|
||||||
|
opal_dss.copy_payload(&buf, buffer);
|
||||||
|
|
||||||
|
/* now share it across all daemons */
|
||||||
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||||
|
"%s grpcomm:mcast: sending collective results",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_rmcast.send_buffer(ORTE_RMCAST_SYS_CHANNEL,
|
||||||
|
ORTE_RMCAST_TAG_MSG, &buf))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* since I cannot recv my own mcast, process this myself */
|
||||||
|
daemon_recv(ORTE_SUCCESS, ORTE_RMCAST_SYS_CHANNEL, 0,
|
||||||
|
ORTE_RMCAST_TAG_MSG, ORTE_PROC_MY_NAME,
|
||||||
|
&buf, NULL);
|
||||||
|
|
||||||
|
CLEANUP:
|
||||||
|
OBJ_DESTRUCT(&buf);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,8 @@
|
|||||||
#include "opal/mca/base/base.h"
|
#include "opal/mca/base/base.h"
|
||||||
#include "opal/mca/base/mca_base_param.h"
|
#include "opal/mca/base/mca_base_param.h"
|
||||||
|
|
||||||
|
#include "orte/mca/rmcast/rmcast.h"
|
||||||
|
|
||||||
#include "orte/mca/grpcomm/base/base.h"
|
#include "orte/mca/grpcomm/base/base.h"
|
||||||
#include "grpcomm_mcast.h"
|
#include "grpcomm_mcast.h"
|
||||||
|
|
||||||
@ -70,6 +72,14 @@ int orte_grpcomm_mcast_component_query(mca_base_module_t **module, int *priority
|
|||||||
return ORTE_ERROR;
|
return ORTE_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (NULL == orte_rmcast.recv_buffer_nb) {
|
||||||
|
/* check any API to ensure mcast support was built */
|
||||||
|
opal_output(0, "MCAST GRPCOMM WAS SPECIFIED, BUT MULTICAST SUPPORT WAS NOT BUILT");
|
||||||
|
*priority = 0;
|
||||||
|
*module = NULL;
|
||||||
|
return ORTE_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
*priority = 1000;
|
*priority = 1000;
|
||||||
*module = (mca_base_module_t *)&orte_grpcomm_mcast_module;
|
*module = (mca_base_module_t *)&orte_grpcomm_mcast_module;
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user