1
1

Fully implement the inbound binomial allgather for daemon-based collectives. Supports both modex and barrier operations.

Comm_spawn still uses the rank=0 method - shifting that algo to the daemons is under study.

This commit was SVN r18115.
Этот коммит содержится в:
Ralph Castain 2008-04-09 22:10:53 +00:00
родитель 7cb1e72f76
Коммит 3a0d09300b
39 изменённых файлов: 1017 добавлений и 2081 удалений

Просмотреть файл

@ -38,6 +38,7 @@
#include "ompi/mca/bml/base/base.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/runtime/orte_globals.h"
#include "ompi/runtime/ompi_cr.h"
#include "ompi/runtime/ompi_module_exchange.h"

Просмотреть файл

@ -296,6 +296,8 @@ static int rte_init(char flags)
node = OBJ_NEW(orte_node_t);
node->name = strdup(orte_process_info.nodename);
node->index = opal_pointer_array_add(orte_node_pool, node);
/* record our node */
orte_hnpnode = node;
/* create and store a proc object for us */
proc = OBJ_NEW(orte_proc_t);

Просмотреть файл

@ -24,5 +24,4 @@ libmca_grpcomm_la_SOURCES += \
base/grpcomm_base_select.c \
base/grpcomm_base_open.c \
base/grpcomm_base_allgather.c \
base/grpcomm_base_barrier.c \
base/grpcomm_base_modex.c

Просмотреть файл

@ -62,9 +62,6 @@ ORTE_DECLSPEC extern orte_grpcomm_base_component_t mca_grpcomm_base_selected_com
ORTE_DECLSPEC int orte_grpcomm_base_allgather_list(opal_list_t *names,
opal_buffer_t *sbuf,
opal_buffer_t *rbuf);
ORTE_DECLSPEC int orte_grpcomm_base_allgather(opal_buffer_t *sbuf,
opal_buffer_t *rbuf);
ORTE_DECLSPEC int orte_grpcomm_base_barrier(void);
ORTE_DECLSPEC int orte_grpcomm_base_set_proc_attr(const char *attr_name,
const void *data,
size_t size);

Просмотреть файл

@ -25,6 +25,7 @@
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif /* HAVE_SYS_TIME_H */
#include <zlib.h>
#include "opal/threads/condition.h"
#include "opal/util/output.h"
@ -42,17 +43,9 @@
#include "orte/mca/grpcomm/base/base.h"
static bool allgather_failed;
static bool allgather_timer;
static orte_std_cntr_t allgather_num_recvd;
static opal_buffer_t *allgather_buf;
static void allgather_timer_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
allgather_timer = true;
}
static void allgather_server_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
@ -75,7 +68,7 @@ static void allgather_server_recv(int status, orte_process_name_t* sender,
++allgather_num_recvd;
/* reissue the recv */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER,
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER,
ORTE_RML_NON_PERSISTENT, allgather_server_recv, NULL);
if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(rc);
@ -103,238 +96,6 @@ static void allgather_client_recv(int status, orte_process_name_t* sender,
++allgather_num_recvd;
}
int orte_grpcomm_base_allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
{
int rc;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
struct timeval ompistart, ompistop;
opal_buffer_t coll;
orte_rml_tag_t target_tag=ORTE_RML_TAG_ALLGATHER_SERVER;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: entering allgather",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* everyone sends data to their local daemon */
OBJ_CONSTRUCT(&coll, opal_buffer_t);
/* tell the daemon to collect the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&coll, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&coll);
return rc;
}
/* tell the daemon where it is eventually to be delivered */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&coll, &target_tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&coll);
return rc;
}
/* add our data to it */
opal_dss.copy_payload(&coll, sbuf);
/* send to local daemon */
if (0 > orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &coll, ORTE_RML_TAG_DAEMON, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_DESTRUCT(&coll);
return ORTE_ERR_COMM_FAILURE;
}
OBJ_DESTRUCT(&coll);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather buffer sent",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/*** RANK != 0 ***/
if (0 != ORTE_PROC_MY_NAME->vpid) {
/* setup the buffer that will recv the results */
allgather_buf = OBJ_NEW(opal_buffer_t);
/* now receive the final result from rank=0. Be sure to do this in
* a manner that allows us to return without being in a recv!
*/
allgather_num_recvd = 0;
allgather_failed = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_CLIENT,
ORTE_RML_NON_PERSISTENT, allgather_client_recv, NULL);
if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(rc);
return rc;
}
ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, 1);
/* if the allgather failed, return an error */
if (allgather_failed) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(allgather_buf);
return ORTE_ERR_COMM_FAILURE;
}
/* copy payload to the caller's buffer */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, allgather_buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(allgather_buf);
return rc;
}
OBJ_RELEASE(allgather_buf);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather buffer received",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (orte_timing) {
/* if we are rank=N, send a message back to indicate
* the xcast completed for timing purposes
*/
opal_buffer_t buf;
orte_std_cntr_t i=0;
orte_process_name_t name;
if (ORTE_PROC_MY_NAME->vpid == orte_process_info.num_procs-1) {
name.jobid = ORTE_PROC_MY_NAME->jobid;
name.vpid = 0;
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */
orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_ALLGATHER_TIMER,0);
OBJ_DESTRUCT(&buf);
}
}
return ORTE_SUCCESS;
}
if (orte_timing) {
gettimeofday(&ompistart, NULL);
}
/*** RANK = 0 ***/
/* seed the outgoing buffer with the num_procs so it can be unpacked */
if (ORTE_SUCCESS != (rc = opal_dss.pack(rbuf, &orte_process_info.num_procs, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* my info will be included in the collected buffers as part of
* the daemon's collective operation
*/
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather collecting buffers from %ld daemons",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)orte_process_info.num_daemons));
/* setup the recv conditions */
allgather_failed = false;
allgather_num_recvd = 0;
/* setup the buffer that will recv the results */
allgather_buf = OBJ_NEW(opal_buffer_t);
/* post the non-blocking recv */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER,
ORTE_RML_NON_PERSISTENT, allgather_server_recv, NULL);
if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(rc);
return rc;
}
ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, orte_process_info.num_daemons);
/* cancel the lingering recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(allgather_buf);
return rc;
}
if (orte_timing) {
gettimeofday(&ompistop, NULL);
opal_output(0, "allgather[%ld]: time to collect inbound data %ld usec",
(long)ORTE_PROC_MY_NAME->vpid,
(long int)((ompistop.tv_sec - ompistart.tv_sec)*1000000 +
(ompistop.tv_usec - ompistart.tv_usec)));
gettimeofday(&ompistart, NULL);
}
/* if the allgather failed, say so */
if (allgather_failed) {
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather failed!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
OBJ_RELEASE(allgather_buf);
return ORTE_ERROR;
}
/* copy the received info to the caller's buffer */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, allgather_buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(allgather_buf);
return rc;
}
OBJ_RELEASE(allgather_buf);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather xcasting collected data - buffer size %ld",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)rbuf->bytes_used));
/* xcast the results */
orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, rbuf, ORTE_RML_TAG_ALLGATHER_CLIENT);
if (orte_timing) {
/* setup a receive to hear when the rank=N proc has received the data
* release - in most xcast schemes, this will always be the final recvr
*/
allgather_timer = false;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_TIMER,
ORTE_RML_NON_PERSISTENT, allgather_timer_recv, NULL);
ORTE_PROGRESSED_WAIT(allgather_timer, 0, 1);
gettimeofday(&ompistop, NULL);
opal_output(0, "allgather[%ld]: time to complete outbound xcast %ld usec",
(long)ORTE_PROC_MY_NAME->vpid,
(long int)((ompistop.tv_sec - ompistart.tv_sec)*1000000 +
(ompistop.tv_usec - ompistart.tv_usec)));
}
/* xcast automatically ensures that the sender -always- gets a copy
* of the message. This is required to ensure proper operation of the
* launch system as the HNP -must- get a copy itself. So we have to
* post our own receive here so that we don't leave a message rattling
* around in our RML
*/
/* setup the buffer that will recv the results */
allgather_buf = OBJ_NEW(opal_buffer_t);
/* receive the echo'd message. Be sure to do this in
* a manner that allows us to return without being in a recv!
*/
allgather_num_recvd = 0;
allgather_failed = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_CLIENT,
ORTE_RML_NON_PERSISTENT, allgather_client_recv, NULL);
if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(rc);
return rc;
}
ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, 1);
/* if the allgather failed, return an error */
if (allgather_failed) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(allgather_buf);
return ORTE_ERR_COMM_FAILURE;
}
/* don't need the received buffer - we already have what we need in rbuf */
OBJ_DESTRUCT(allgather_buf);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: allgather completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
static orte_std_cntr_t allgather_num_sent;
static void allgather_send_cb(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
@ -370,7 +131,7 @@ int orte_grpcomm_base_allgather_list(opal_list_t *names, opal_buffer_t *sbuf, op
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&root->name)));
if (0 > orte_rml.send_buffer(&root->name, sbuf, ORTE_RML_TAG_ALLGATHER_SERVER, 0)) {
if (0 > orte_rml.send_buffer(&root->name, sbuf, ORTE_RML_TAG_ALLGATHER, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
return ORTE_ERR_COMM_FAILURE;
}
@ -387,7 +148,7 @@ int orte_grpcomm_base_allgather_list(opal_list_t *names, opal_buffer_t *sbuf, op
*/
allgather_num_recvd = 0;
allgather_failed = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_CLIENT,
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER,
ORTE_RML_NON_PERSISTENT, allgather_client_recv, NULL);
if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(rc);
@ -448,7 +209,7 @@ int orte_grpcomm_base_allgather_list(opal_list_t *names, opal_buffer_t *sbuf, op
(long)num_peers-1));
/* post the non-blocking recv */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER,
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER,
ORTE_RML_NON_PERSISTENT, allgather_server_recv, NULL);
if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(rc);
@ -458,7 +219,7 @@ int orte_grpcomm_base_allgather_list(opal_list_t *names, opal_buffer_t *sbuf, op
ORTE_PROGRESSED_WAIT(allgather_failed, allgather_num_recvd, num_peers-1);
/* cancel the lingering recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER_SERVER))) {
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(allgather_buf);
return rc;
@ -489,7 +250,7 @@ int orte_grpcomm_base_allgather_list(opal_list_t *names, opal_buffer_t *sbuf, op
}
/* transmit the buffer to this process */
if (0 > orte_rml.send_buffer_nb(&peer->name, rbuf, ORTE_RML_TAG_ALLGATHER_CLIENT,
if (0 > orte_rml.send_buffer_nb(&peer->name, rbuf, ORTE_RML_TAG_ALLGATHER,
0, allgather_send_cb, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
return ORTE_ERR_COMM_FAILURE;

Просмотреть файл

@ -1,245 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#include <string.h>
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif /* HAVE_SYS_TIME_H */
#include "opal/threads/condition.h"
#include "opal/util/output.h"
#include "opal/util/bit_ops.h"
#include "opal/class/opal_hash_table.h"
#include "orte/util/proc_info.h"
#include "opal/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/name_fns.h"
#include "orte/orted/orted.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/grpcomm/base/base.h"
static orte_std_cntr_t barrier_num_recvd;
static bool barrier_failed;
static bool barrier_timer;
static void barrier_server_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
int rc;
/* bump counter */
++barrier_num_recvd;
/* reissue the recv */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_SERVER,
ORTE_RML_NON_PERSISTENT, barrier_server_recv, NULL);
if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(rc);
barrier_failed = true;
}
}
static void barrier_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
/* bump counter */
++barrier_num_recvd;
}
static void barrier_timer_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
barrier_timer = true;
}
int orte_grpcomm_base_barrier(void)
{
orte_std_cntr_t i=0;
opal_buffer_t buf;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
orte_rml_tag_t target_tag=ORTE_RML_TAG_BARRIER_SERVER;
int rc;
struct timeval ompistart, ompistop;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: entering barrier",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* everyone sends barrier to local daemon */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* tell the daemon to collect the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
/* tell the daemon where it is eventually to be delivered */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &target_tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
/* send to local daemon */
if (0 > orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &buf, ORTE_RML_TAG_DAEMON, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_DESTRUCT(&buf);
return ORTE_ERR_COMM_FAILURE;
}
OBJ_DESTRUCT(&buf);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s barrier sent",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/*** RANK != 0 ***/
if (0 != ORTE_PROC_MY_NAME->vpid) {
/* now receive the release from rank=0. Be sure to do this in
* a manner that allows us to return without being in a recv!
*/
barrier_num_recvd = 0;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_CLIENT,
ORTE_RML_NON_PERSISTENT, barrier_recv, NULL);
if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(rc);
return rc;
}
ORTE_PROGRESSED_WAIT(false, barrier_num_recvd, 1);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s received barrier release",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (orte_timing) {
/* if we are rank=N, send a message back to indicate
* the xcast completed for timing purposes
*/
orte_process_name_t name;
if (ORTE_PROC_MY_NAME->vpid == orte_process_info.num_procs-1) {
name.jobid = ORTE_PROC_MY_NAME->jobid;
name.vpid = 0;
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.pack(&buf, &i, 1, ORTE_STD_CNTR); /* put something meaningless here */
orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_BARRIER_TIMER,0);
OBJ_DESTRUCT(&buf);
}
}
return ORTE_SUCCESS;
}
if (orte_timing) {
gettimeofday(&ompistart, NULL);
}
/*** RANK = 0 ***/
/* setup to recv the barrier messages from all peers */
barrier_num_recvd = 0;
barrier_failed = false;
/* post the non-blocking recv */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_SERVER,
ORTE_RML_NON_PERSISTENT, barrier_server_recv, NULL);
if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(rc);
return rc;
}
ORTE_PROGRESSED_WAIT(barrier_failed, barrier_num_recvd, orte_process_info.num_daemons);
/* cancel the lingering recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_SERVER))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (orte_timing) {
gettimeofday(&ompistop, NULL);
opal_output(0, "barrier[%ld]: time to collect inbound data %ld usec",
(long)ORTE_PROC_MY_NAME->vpid,
(long int)((ompistop.tv_sec - ompistart.tv_sec)*1000000 +
(ompistop.tv_usec - ompistart.tv_usec)));
gettimeofday(&ompistart, NULL);
}
/* if the barrier failed, say so */
if (barrier_failed) {
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s barrier failed!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_ERROR;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s barrier xcasting release",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* xcast the release */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, &buf, ORTE_RML_TAG_BARRIER_CLIENT);
OBJ_DESTRUCT(&buf);
if (orte_timing) {
/* setup a receive to hear when the rank=N proc has received the barrier
* release - in most xcast schemes, this will always be the final recvr
*/
barrier_timer = false;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_TIMER,
ORTE_RML_NON_PERSISTENT, barrier_timer_recv, NULL);
ORTE_PROGRESSED_WAIT(barrier_timer, 0, 1);
gettimeofday(&ompistop, NULL);
opal_output(0, "barrier[%ld]: time to complete outbound xcast %ld usec",
(long)ORTE_PROC_MY_NAME->vpid,
(long int)((ompistop.tv_sec - ompistart.tv_sec)*1000000 +
(ompistop.tv_usec - ompistart.tv_usec)));
}
/* xcast automatically ensures that the sender -always- gets a copy
* of the message. This is required to ensure proper operation of the
* launch system as the HNP -must- get a copy itself. So we have to
* post our own receive here so that we don't leave a message rattling
* around in our RML
*/
barrier_num_recvd = 0;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_CLIENT,
ORTE_RML_NON_PERSISTENT, barrier_recv, NULL);
if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(rc);
return rc;
}
ORTE_PROGRESSED_WAIT(false, barrier_num_recvd, 1);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: barrier completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}

Просмотреть файл

@ -46,8 +46,8 @@
#include "grpcomm_basic.h"
/* set the default xovers */
#define XCAST_LINEAR_XOVER_DEFAULT 2
#define XCAST_BINOMIAL_XOVER_DEFAULT 16
#define XCAST_LINEAR_XOVER_DEFAULT 0
#define XCAST_BINOMIAL_XOVER_DEFAULT 0
/*

Просмотреть файл

@ -57,6 +57,14 @@ static int xcast_linear(orte_jobid_t job,
static int xcast_direct(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag);
static int find_parent(int rank, int parent, int me, int num_procs,
int *num_children, opal_list_t *children);
/* Local global variables */
static orte_process_name_t my_parent;
static opal_list_t *my_children;
static int my_num_children;
/* Static API's */
static int init(void);
@ -64,16 +72,27 @@ static void finalize(void);
static int xcast(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag);
static int next_recips(opal_list_t *names, orte_grpcomm_mode_t mode);
static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
static int barrier(void);
static opal_list_t* next_recips(orte_grpcomm_mode_t mode);
static int daemon_collective(orte_jobid_t jobid,
orte_std_cntr_t num_local_contributors,
orte_grpcomm_coll_t type,
opal_buffer_t *data,
orte_rmaps_dp_t flag,
opal_value_array_t *participants);
static int update_trees(void);
/* Module def */
orte_grpcomm_base_module_t orte_grpcomm_basic_module = {
init,
finalize,
xcast,
orte_grpcomm_base_allgather,
allgather,
orte_grpcomm_base_allgather_list,
orte_grpcomm_base_barrier,
barrier,
daemon_collective,
update_trees,
next_recips,
orte_grpcomm_base_set_proc_attr,
orte_grpcomm_base_get_proc_attr,
@ -89,6 +108,11 @@ static int init(void)
{
int rc;
/* setup the local global variables */
if (orte_process_info.hnp || orte_process_info.daemon) {
update_trees();
}
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) {
ORTE_ERROR_LOG(rc);
}
@ -100,9 +124,44 @@ static int init(void)
*/
static void finalize(void)
{
opal_list_item_t *item;
if (orte_process_info.hnp || orte_process_info.daemon) {
/* deconstruct the child list */
while (NULL != (item = opal_list_remove_first(my_children))) {
OBJ_RELEASE(item);
}
OBJ_RELEASE(my_children);
my_num_children = 0;
}
orte_grpcomm_base_modex_finalize();
}
static int update_trees(void)
{
opal_list_item_t *item;
if (NULL != my_children) {
while (NULL != (item = opal_list_remove_first(my_children))) {
OBJ_RELEASE(item);
}
} else {
my_children = OBJ_NEW(opal_list_t);
}
my_num_children = 0;
my_parent.jobid = ORTE_PROC_MY_NAME->jobid;
my_parent.vpid = find_parent(0, 0, ORTE_PROC_MY_NAME->vpid,
orte_process_info.num_procs,
&my_num_children, my_children);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic update trees found %d children num_procs %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
my_num_children, orte_process_info.num_procs));
return ORTE_SUCCESS;
}
/**
* A "broadcast-like" function to a job's processes.
@ -132,21 +191,9 @@ static int xcast(orte_jobid_t job,
(long)orte_process_info.num_procs,
(long)orte_grpcomm_basic.xcast_linear_xover,
(long)orte_grpcomm_basic.xcast_binomial_xover));
if (orte_process_info.num_procs < 2 || orte_abnormal_term_ordered) {
/* if there is only one proc in the system, then we must
* use the direct mode - there is no other option. Note that
* since the HNP is the one that typically does xcast sends,
* only one daemon means that the HNP is sending to
* itself. This is required as an HNP starts
* itself up
*
* NOTE: although we allow users to alter crossover points
* for selecting specific xcast modes, this required
* use-case behavior MUST always be retained or else
* singletons and HNP startup will fail!
*
* We also insist that the direct xcast mode be used when
if (orte_abnormal_term_ordered) {
/* We insist that the direct xcast mode be used when
* an orted has failed as we cannot rely on alternative
* methods to reach all orteds and/or procs
*/
@ -266,6 +313,7 @@ static int xcast_binomial_tree(orte_jobid_t job,
if (orte_process_info.hnp) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
} else {
/* otherwise, send it to the HNP for relay */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
@ -290,8 +338,6 @@ static int xcast_linear(orte_jobid_t job,
int rc;
opal_buffer_t *buf;
orte_daemon_cmd_flag_t command;
orte_vpid_t i, range;
orte_process_name_t dummy;
orte_grpcomm_mode_t mode;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
@ -309,16 +355,20 @@ static int xcast_linear(orte_jobid_t job,
/* if we are an application proc, then send this to our HNP so
* we don't try to talk to every daemon directly ourselves. This
* is necessary since we don't know how many daemons there are!
*
* Likewise, a daemon who is not the HNP will also let the HNP
* act as the relay to avoid opening unnecessary connections
* and rattling messages around the system if daemons are not
* fully connected
*/
if (!orte_process_info.hnp && !orte_process_info.daemon) {
/* we are an application proc */
if (!orte_process_info.hnp) {
/* tell the HNP to relay */
command = ORTE_DAEMON_PROCESS_AND_RELAY_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* tell the daemon the routing algorithm this xmission is using */
/* tell the HNP the routing algorithm this xmission is using */
mode = ORTE_GRPCOMM_LINEAR;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &mode, 1, ORTE_GRPCOMM_MODE))) {
ORTE_ERROR_LOG(rc);
@ -365,40 +415,23 @@ static int xcast_linear(orte_jobid_t job,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)buf->bytes_used));
/* if we are not a daemon or the HNP, then just send this to the HNP */
if (!orte_process_info.hnp && !orte_process_info.daemon) {
/* if I am the HNP, just set things up so the cmd processor gets called.
* We don't want to message ourselves as this can create circular logic
* in the RML. Instead, this macro will set a zero-time event which will
* cause the buffer to be processed by the cmd processor - probably will
* fire right away, but that's okay
* The macro makes a copy of the buffer, so it's okay to release it here
*/
if (orte_process_info.hnp) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
} else {
/* otherwise, send it to the HNP for relay */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
rc = ORTE_SUCCESS;
goto CLEANUP;
}
/* if we are a daemon or the HNP, get the number of daemons out there */
range = orte_process_info.num_procs;
/* send the message to each daemon as fast as we can */
dummy.jobid = ORTE_PROC_MY_HNP->jobid;
for (i=0; i < range; i++) {
dummy.vpid = i;
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:xcast_linear: %s => %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&dummy)));
/* if the target is the HNP and I am the HNP, then just setup to call the cmd processor */
if (0 == i && orte_process_info.hnp) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
} else {
if (0 > (rc = orte_rml.send_buffer(&dummy, buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
}
rc = ORTE_SUCCESS;
CLEANUP:
/* release the buffer */
@ -704,97 +737,672 @@ CLEANUP:
return rc;
}
static int chain_recips(opal_list_t *names)
static opal_list_t* next_recips(orte_grpcomm_mode_t mode)
{
orte_namelist_t *target;
/* chain just sends to the next vpid up the line */
if (ORTE_PROC_MY_NAME->vpid < orte_process_info.num_procs-1) {
/* I am not at the end of the chain */
if (NULL == (target = OBJ_NEW(orte_namelist_t))) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
target->name.jobid = ORTE_PROC_MY_NAME->jobid;
target->name.vpid = ORTE_PROC_MY_NAME->vpid + 1;
opal_list_append(names, &target->item);
/* check the mode to select the proper algo */
switch (mode) {
case ORTE_GRPCOMM_CHAIN:
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
return NULL;
break;
case ORTE_GRPCOMM_BINOMIAL:
return my_children;
break;
case ORTE_GRPCOMM_LINEAR:
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
return NULL;
break;
default:
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return NULL;
break;
}
return ORTE_SUCCESS;
}
static int binomial_recips(opal_list_t *names)
static bool barrier_recvd;
static bool barrier_timer;
static void barrier_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
int i, bitmap, peer, size, rank, hibit, mask;
orte_namelist_t *target;
/* flag as recvd */
barrier_recvd = true;
}
static void barrier_timer_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
barrier_timer = true;
}
static int find_parent(int rank, int parent, int me, int num_procs,
int *num_children, opal_list_t *children)
{
int i, bitmap, peer, hibit, mask, found;
orte_namelist_t *child;
/* compute the bitmap */
bitmap = opal_cube_dim((int)orte_process_info.num_procs);
rank = (int)ORTE_PROC_MY_NAME->vpid;
size = (int)orte_process_info.num_procs;
/* is this me? */
if (me == rank) {
bitmap = opal_cube_dim(num_procs);
hibit = opal_hibit(rank, bitmap);
--bitmap;
for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) {
peer = rank | mask;
if (peer < num_procs) {
if (NULL != children) {
child = OBJ_NEW(orte_namelist_t);
child->name.jobid = ORTE_PROC_MY_NAME->jobid;
child->name.vpid = peer;
OPAL_OUTPUT_VERBOSE((3, orte_grpcomm_base_output,
"%s grpcomm:basic find-parent found child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name)));
opal_list_append(children, &child->item);
}
(*num_children)++;
}
}
OPAL_OUTPUT_VERBOSE((3, orte_grpcomm_base_output,
"%s grpcomm:basic find-parent found parent %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
parent));
return parent;
}
/* find the children of this rank */
bitmap = opal_cube_dim(num_procs);
hibit = opal_hibit(rank, bitmap);
--bitmap;
for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) {
peer = rank | mask;
if (peer < size) {
if (NULL == (target = OBJ_NEW(orte_namelist_t))) {
return ORTE_ERR_OUT_OF_RESOURCE;
if (peer < num_procs) {
/* execute compute on this child */
if (0 <= (found = find_parent(peer, rank, me, num_procs, num_children, children))) {
return found;
}
target->name.jobid = ORTE_PROC_MY_NAME->jobid;
target->name.vpid = (orte_vpid_t)peer;
opal_list_append(names, &target->item);
}
}
return ORTE_SUCCESS;
return -1;
}
static int linear_recips(opal_list_t *names)
static int barrier(void)
{
orte_namelist_t *target;
orte_vpid_t i;
opal_buffer_t buf;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
orte_grpcomm_coll_t coll_type=ORTE_GRPCOMM_BARRIER;
int rc;
struct timeval ompistart, ompistop;
/* if we are not the HNP, we just return - only
* the HNP sends in this mode
*/
if (!orte_process_info.hnp) {
return ORTE_SUCCESS;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic entering barrier",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (orte_timing && ORTE_PROC_MY_NAME->vpid == 0) {
gettimeofday(&ompistart, NULL);
}
/* if we are the HNP, then just add the names of
* all daemons to the list
*/
for (i=1; i < orte_process_info.num_procs; i++) {
if (NULL == (target = OBJ_NEW(orte_namelist_t))) {
return ORTE_ERR_OUT_OF_RESOURCE;
/* everyone sends barrier to local daemon */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* tell the daemon to collect the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
/* tell the daemon we are doing a barrier */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &coll_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
/* send to local daemon */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
OBJ_DESTRUCT(&buf);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:basic barrier sent",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* now receive the release. Be sure to do this in
* a manner that allows us to return without being in a recv!
*/
barrier_recvd = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER,
ORTE_RML_NON_PERSISTENT, barrier_recv, NULL);
if (rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
ORTE_PROGRESSED_WAIT(barrier_recvd, 0, 1);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:basic received barrier release",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (orte_timing) {
if (ORTE_PROC_MY_NAME->vpid == 0) {
/* setup a receive to hear when the rank=N proc has received the data
* release - in most xcast schemes, this will always be the final recvr
*/
barrier_timer = false;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLLECTIVE_TIMER,
ORTE_RML_NON_PERSISTENT, barrier_timer_recv, NULL);
if (rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
ORTE_PROGRESSED_WAIT(barrier_timer, 0, 1);
gettimeofday(&ompistop, NULL);
opal_output(0, "%s time to complete barrier %ld usec",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long int)((ompistop.tv_sec - ompistart.tv_sec)*1000000 +
(ompistop.tv_usec - ompistart.tv_usec)));
} else if (ORTE_PROC_MY_NAME->vpid == orte_process_info.num_procs-1) {
/* if we are rank=N, send a message back to indicate
* the xcast completed for timing purposes
*/
orte_process_name_t name;
name.jobid = ORTE_PROC_MY_NAME->jobid;
name.vpid = 0;
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (0 > (rc = orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_COLLECTIVE_TIMER,0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
rc = ORTE_SUCCESS;
OBJ_DESTRUCT(&buf);
}
target->name.jobid = ORTE_PROC_MY_NAME->jobid;
target->name.vpid = i;
opal_list_append(names, &target->item);
}
return ORTE_SUCCESS;
}
static int next_recips(opal_list_t *names, orte_grpcomm_mode_t mode)
static opal_buffer_t *allgather_buf;
static orte_std_cntr_t allgather_complete;
static void allgather_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
int rc;
/* check the mode to select the proper algo */
switch (mode) {
case ORTE_GRPCOMM_CHAIN:
rc = chain_recips(names);
break;
case ORTE_GRPCOMM_BINOMIAL:
rc = binomial_recips(names);
break;
case ORTE_GRPCOMM_LINEAR:
rc = linear_recips(names);
break;
default:
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
break;
/* xfer the data */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(allgather_buf, buffer))) {
ORTE_ERROR_LOG(rc);
}
allgather_complete = true;
}
static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
{
int rc;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
struct timeval ompistart, ompistop;
opal_buffer_t coll;
orte_grpcomm_coll_t coll_type=ORTE_GRPCOMM_ALLGATHER;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic entering allgather",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (orte_timing && ORTE_PROC_MY_NAME->vpid == 0) {
gettimeofday(&ompistart, NULL);
}
/* everyone sends data to their local daemon */
OBJ_CONSTRUCT(&coll, opal_buffer_t);
/* tell the daemon to collect the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&coll, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&coll);
return rc;
}
/* tell the daemon we are doing an allgather */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&coll, &coll_type, 1, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&coll);
return rc;
}
/* add our data to it */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&coll, sbuf))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&coll);
return rc;
}
/* send to local daemon */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &coll, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&coll);
return rc;
}
OBJ_DESTRUCT(&coll);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:basic allgather buffer sent",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* setup the buffer that will recv the results */
allgather_buf = OBJ_NEW(opal_buffer_t);
/* now receive the final result. Be sure to do this in
* a manner that allows us to return without being in a recv!
*/
allgather_complete = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER,
ORTE_RML_NON_PERSISTENT, allgather_recv, NULL);
if (rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
ORTE_PROGRESSED_WAIT(allgather_complete, 0, 1);
/* copy payload to the caller's buffer */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, allgather_buf))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(allgather_buf);
return rc;
}
OBJ_RELEASE(allgather_buf);
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s allgather buffer received",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (orte_timing) {
if (ORTE_PROC_MY_NAME->vpid == 0) {
/* setup a receive to hear when the rank=N proc has received the data
* release - in most xcast schemes, this will always be the final recvr
*/
barrier_timer = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLLECTIVE_TIMER,
ORTE_RML_NON_PERSISTENT, barrier_timer_recv, NULL);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
ORTE_PROGRESSED_WAIT(barrier_timer, 0, 1);
gettimeofday(&ompistop, NULL);
opal_output(0, "%s allgather: time to complete %ld usec",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long int)((ompistop.tv_sec - ompistart.tv_sec)*1000000 +
(ompistop.tv_usec - ompistart.tv_usec)));
} else if (ORTE_PROC_MY_NAME->vpid == orte_process_info.num_procs-1) {
/* if we are rank=N, send a message back to indicate
* the xcast completed for timing purposes
*/
orte_process_name_t name;
opal_buffer_t buf;
name.jobid = ORTE_PROC_MY_NAME->jobid;
name.vpid = 0;
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (0 > (rc = orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_COLLECTIVE_TIMER,0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = ORTE_SUCCESS;
OBJ_DESTRUCT(&buf);
}
}
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic allgather completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}
static orte_std_cntr_t collective_num_recvd;
static bool collective_failed;
static opal_buffer_t *collection;
static orte_std_cntr_t num_contributors;
static void collective_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
int rc;
orte_std_cntr_t contributors, cnt;
/* bump counter */
++collective_num_recvd;
/* extract the #contributors */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &contributors, &cnt, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
}
num_contributors += contributors;
/* xfer the data */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(collection, buffer))) {
ORTE_ERROR_LOG(rc);
collective_failed = true;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:basic collective recv - got %d bytes from %s with %d contributors",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)(buffer->bytes_used-sizeof(orte_std_cntr_t)),
ORTE_NAME_PRINT(sender), (int)contributors));
/* reissue the recv */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE,
ORTE_RML_NON_PERSISTENT, collective_recv, NULL);
if (rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
collective_failed = true;
}
}
static int daemon_leader(orte_jobid_t jobid,
orte_std_cntr_t num_local_contributors,
orte_grpcomm_coll_t type,
opal_buffer_t *data,
orte_rmaps_dp_t flag,
opal_value_array_t *participants)
{
int rc;
opal_buffer_t buf;
int num_children;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective - I am the leader!",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (ORTE_RMAPS_ALL_DAEMONS == flag) {
/* if everyone is participating, then I must be the HNP,
* so the #children is just the #children determined for
* my outgoing xcast
*/
num_children = my_num_children;
} else if (ORTE_RMAPS_ALL_EXCEPT_HNP == flag) {
/* if the HNP has no local procs, then it won't
* know that a collective is underway, so that means
* I must be rank=1. The number of messages I must get
* therefore consists of both my children + all other
* children of rank=0 as they will redirect their messages
* to me
*/
num_children = 0;
/* find #children for rank=0 */
find_parent(0, 0, 0, orte_process_info.num_procs, &num_children, NULL);
/* I am one of those children, so we should get num_children-1 of
* my peers sending to me, plus my own children
*/
num_children = num_children - 1 + my_num_children;
} else if (ORTE_RMAPS_DAEMON_SUBSET == flag) {
/* for this first cut, all members will send to me direct,
* so the #children I should hear from is just the
* size of the value array - 1
*/
num_children = opal_value_array_get_size(participants) - 1;
} else {
/* no idea */
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
/* setup to recv the messages from my children */
collective_num_recvd = 0;
collective_failed = false;
collection = OBJ_NEW(opal_buffer_t);
num_contributors = num_local_contributors; /* seed with the number I added */
/* ensure my data gets included in the outcome */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(collection, data))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(collection);
return rc;
}
/* if we have children, get their messages */
if (0 < num_children) {
/* post the non-blocking recv */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE,
ORTE_RML_NON_PERSISTENT, collective_recv, NULL);
if (rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(collection);
return rc;
}
ORTE_PROGRESSED_WAIT(collective_failed, collective_num_recvd, num_children);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective - leader has received collective from %d children",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_children));
/* cancel the lingering recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(collection);
return rc;
}
}
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (ORTE_GRPCOMM_BARRIER == type) {
if (ORTE_SUCCESS != (rc = xcast(jobid, &buf, ORTE_RML_TAG_BARRIER))) {
ORTE_ERROR_LOG(rc);
}
} else if (ORTE_GRPCOMM_ALLGATHER == type) {
/* send the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &num_contributors, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, collection))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = xcast(jobid, &buf, ORTE_RML_TAG_ALLGATHER))) {
ORTE_ERROR_LOG(rc);
}
} else {
/* no other collectives currently supported! */
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
rc = ORTE_ERR_NOT_IMPLEMENTED;
}
cleanup:
OBJ_RELEASE(collection);
OBJ_DESTRUCT(&buf);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective - leader has completed collective",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static int daemon_collective(orte_jobid_t jobid,
orte_std_cntr_t num_local_contributors,
orte_grpcomm_coll_t type,
opal_buffer_t *data,
orte_rmaps_dp_t flag,
opal_value_array_t *participants)
{
orte_process_name_t lead, parent;
orte_vpid_t *vptr;
int num_children;
opal_buffer_t buf;
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective entered with dp flag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)flag));
parent.jobid = ORTE_PROC_MY_NAME->jobid;
lead.jobid = ORTE_PROC_MY_NAME->jobid;
/* if the participation is full, then the HNP is the lead */
if (ORTE_RMAPS_ALL_DAEMONS == flag) {
lead.vpid = ORTE_PROC_MY_HNP->vpid;
} else if (ORTE_RMAPS_ALL_EXCEPT_HNP == flag) {
/* if the HNP has no local procs, then it won't
* know that a collective is underway, so let
* rank=1 be the lead
*/
lead.vpid = 1;
} else if (ORTE_RMAPS_DAEMON_SUBSET == flag) {
/* let the first proc in the array be the lead */
vptr = (orte_vpid_t*)opal_value_array_get_item(participants, 0);
lead.vpid = *vptr;
} else {
/* no idea */
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
/* if I am the lead, do my own thing */
if (ORTE_PROC_MY_NAME->vpid == lead.vpid) {
return daemon_leader(jobid, num_local_contributors, type, data, flag, participants);
}
/* I am NOT the lead, so I first must figure out how many children
* I need to collect messages from and who my parent will be
*/
if (ORTE_RMAPS_ALL_DAEMONS == flag) {
/* everyone is participating, so my parent and
* num_children can be as initially computed
*/
parent.vpid = my_parent.vpid;
num_children = my_num_children;
} else if (ORTE_RMAPS_ALL_EXCEPT_HNP == flag) {
/* if the HNP has no local procs, then it won't
* know that a collective is underway, so we need
* to send to rank=1 if our parent would have been
* rank=0. Our num_children, though,
* remains unchanged
*/
if (0 == my_parent.vpid) {
parent.vpid = 1;
} else {
/* just send as normal */
parent.vpid = my_parent.vpid;
}
num_children = my_num_children;
} else if (ORTE_RMAPS_DAEMON_SUBSET == flag) {
/* regardless of mode, we always send direct */
num_children = 0;
parent.vpid = lead.vpid;
} else {
/* no idea */
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective preparing to receive from %d children",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
num_children));
/* setup for collecting data */
collection = OBJ_NEW(opal_buffer_t);
num_contributors = num_local_contributors; /* seed with the number I added */
/* ensure my data gets included in the outcome */
opal_dss.copy_payload(collection, data);
/* if num_children > 0, setup recv's to wait until we hear from
* them all - the recv will look just like that for the leader,
* collecting data and #contributors
*/
if (0 < num_children) {
/* setup to recv the messages from my children */
collective_num_recvd = 0;
collective_failed = false;
/* post the non-blocking recv */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE,
ORTE_RML_NON_PERSISTENT, collective_recv, NULL);
if (rc != ORTE_SUCCESS) {
ORTE_ERROR_LOG(rc);
return rc;
}
ORTE_PROGRESSED_WAIT(collective_failed, collective_num_recvd, num_children);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective - I have received collective from children",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* cancel the lingering recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLLECTIVE))) {
ORTE_ERROR_LOG(rc);
return rc;
}
}
/* construct and send message to our parent */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* insert #contributors */
opal_dss.pack(&buf, &num_contributors, 1, ORTE_STD_CNTR);
if (ORTE_GRPCOMM_BARRIER == type) {
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective sending barrier to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&parent)));
if (0 > (rc = orte_rml.send_buffer(&parent,&buf,ORTE_RML_TAG_DAEMON_COLLECTIVE,0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc= ORTE_SUCCESS;
} else if (ORTE_GRPCOMM_ALLGATHER == type) {
/* xfer the data */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, collection))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* send the data */
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective sending allgather data to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&parent)));
if (0 > (rc = orte_rml.send_buffer(&parent,&buf,ORTE_RML_TAG_DAEMON_COLLECTIVE,0))) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = ORTE_SUCCESS;
} else {
/* we don't currently support any other collectives */
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
rc = ORTE_ERR_NOT_IMPLEMENTED;
}
OBJ_DESTRUCT(&buf);
OBJ_RELEASE(collection);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:basic daemon_collective completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}

Просмотреть файл

@ -44,9 +44,9 @@ static int xcast(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag);
static int orte_grpcomm_cnos_barrier(void);
static int orte_grpcomm_cnos_barrier(orte_jobid_t jobid);
static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
static int allgather(orte_jobid_t jobid, opal_buffer_t *sbuf, opal_buffer_t *rbuf);
static int allgather_list(opal_list_t *names, opal_buffer_t *sbuf, opal_buffer_t *rbuf);
@ -110,7 +110,7 @@ static int xcast(orte_jobid_t job,
}
static int
orte_grpcomm_cnos_barrier(void)
orte_grpcomm_cnos_barrier(orte_jobid_t jobid)
{
#if OMPI_GRPCOMM_CNOS_HAVE_BARRIER
cnos_barrier();
@ -119,7 +119,7 @@ orte_grpcomm_cnos_barrier(void)
return ORTE_SUCCESS;
}
static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
static int allgather(orte_jobid_t jobid, opal_buffer_t *sbuf, opal_buffer_t *rbuf)
{
int rc;
orte_std_cntr_t zero=0;

Просмотреть файл

@ -1,43 +0,0 @@
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
sources = \
grpcomm_exp.h \
grpcomm_exp.c \
grpcomm_exp_component.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if OMPI_BUILD_grpcomm_exp_DSO
component_noinst =
component_install = mca_grpcomm_exp.la
else
component_noinst = libmca_grpcomm_exp.la
component_install =
endif
mcacomponentdir = $(pkglibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_grpcomm_exp_la_SOURCES = $(sources)
mca_grpcomm_exp_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_grpcomm_exp_la_SOURCES =$(sources)
libmca_grpcomm_exp_la_LDFLAGS = -module -avoid-version

Просмотреть файл

@ -1,13 +0,0 @@
# -*- shell-script -*-
#
# Copyright (c) 2007 Sandia National Laboratories. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# MCA_grpcomm_exp_CONFIG([action-if-found], [action-if-not-found])
# -----------------------------------------------------------
AC_DEFUN([MCA_grpcomm_exp_CONFIG], [$1])

Просмотреть файл

@ -1,30 +0,0 @@
# -*- shell-script -*-
#
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
# University Research and Technology
# Corporation. All rights reserved.
# Copyright (c) 2004-2005 The University of Tennessee and The University
# of Tennessee Research Foundation. All rights
# reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2007 Los Alamos National Security, LLC. All rights
# reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Specific to this module
PARAM_CONFIG_FILES="Makefile"
#
# Set the config priority so that this
# component will build for all environs -except-
# those special ones that do not support it
PARAM_CONFIG_PRIORITY=10

Просмотреть файл

@ -1,931 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#include <string.h>
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif /* HAVE_SYS_TIME_H */
#include "opal/threads/condition.h"
#include "opal/util/output.h"
#include "opal/util/bit_ops.h"
#include "opal/class/opal_hash_table.h"
#include "orte/util/proc_info.h"
#include "opal/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/odls/odls_types.h"
#include "orte/mca/rml/rml.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/name_fns.h"
#include "orte/orted/orted.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/grpcomm/base/base.h"
#include "grpcomm_exp.h"
/* Local functions */
static int xcast_binomial_tree(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag);
static int xcast_linear(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag);
static int xcast_direct(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag);
/* Static API's */
static int init(void);
static void finalize(void);
static int xcast(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag);
static int next_recips(opal_list_t *names, orte_grpcomm_mode_t mode);
static int new_barrier(void);
/* Module def */
orte_grpcomm_base_module_t orte_grpcomm_exp_module = {
init,
finalize,
xcast,
orte_grpcomm_base_allgather,
orte_grpcomm_base_allgather_list,
new_barrier,
next_recips,
orte_grpcomm_base_set_proc_attr,
orte_grpcomm_base_get_proc_attr,
orte_grpcomm_base_modex,
orte_grpcomm_base_purge_proc_attrs
};
/**
* Initialize the module
*/
static int init(void)
{
int rc;
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/**
* Finalize the module
*/
static void finalize(void)
{
orte_grpcomm_base_modex_finalize();
}
/**
* A "broadcast-like" function to a job's processes.
* @param jobid The job whose processes are to receive the message
* @param buffer The data to broadcast
*/
static int xcast(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag)
{
int rc = ORTE_SUCCESS;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:xcast sent to job %s tag %ld",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(job), (long)tag));
/* if there is no message to send, then just return ok */
if (NULL == buffer) {
return ORTE_SUCCESS;
}
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:xcast: num_procs %ld linear xover: %ld binomial xover: %ld",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)orte_process_info.num_procs,
(long)orte_grpcomm_exp.xcast_linear_xover,
(long)orte_grpcomm_exp.xcast_binomial_xover));
if (orte_process_info.num_procs < 2 || orte_abnormal_term_ordered) {
/* if there is only one proc in the system, then we must
* use the direct mode - there is no other option. Note that
* since the HNP is the one that typically does xcast sends,
* only one daemon means that the HNP is sending to
* itself. This is required as an HNP starts
* itself up
*
* NOTE: although we allow users to alter crossover points
* for selecting specific xcast modes, this required
* use-case behavior MUST always be retained or else
* singletons and HNP startup will fail!
*
* We also insist that the direct xcast mode be used when
* an orted has failed as we cannot rely on alternative
* methods to reach all orteds and/or procs
*/
rc = xcast_direct(job, buffer, tag);
goto DONE;
}
/* now use the crossover points to select the proper transmission
* mode. We have built-in default crossover points for this
* decision tree, but the user is free to alter them as
* they wish via MCA params
*/
if (orte_process_info.num_procs < orte_grpcomm_exp.xcast_linear_xover) {
rc = xcast_direct(job, buffer, tag);
} else if (orte_process_info.num_procs < orte_grpcomm_exp.xcast_binomial_xover) {
rc = xcast_linear(job, buffer, tag);
} else {
rc = xcast_binomial_tree(job, buffer, tag);
}
DONE:
return rc;
}
static int xcast_binomial_tree(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag)
{
orte_daemon_cmd_flag_t command;
orte_grpcomm_mode_t mode;
int rc;
opal_buffer_t *buf;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:entering xcast_binomial",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* binomial xcast can only go through the daemons as app procs are
* not allowed to relay messages.
* first, need to pack the msg and be sure to include routing info so it
* can properly be sent through the daemons
*/
buf = OBJ_NEW(opal_buffer_t);
/* tell the daemon to process and relay */
command = ORTE_DAEMON_PROCESS_AND_RELAY_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* tell the daemon the routing algorithm this xmission is using */
mode = ORTE_GRPCOMM_BINOMIAL;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &mode, 1, ORTE_GRPCOMM_MODE))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* if this isn't intended for the daemon command tag, then we better
* tell the daemon to deliver it to the procs, and what job is supposed
* to get it - this occurs when a caller just wants to send something
* to all the procs in a job. In that use-case, the caller doesn't know
* anything about inserting daemon commands or what routing algo might
* be used, so we have to help them out a little. Functions that are
* sending commands to the daemons themselves are smart enough to know
* what they need to do.
*/
if (ORTE_RML_TAG_DAEMON != tag) {
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
/* copy the payload into the new buffer - this is non-destructive, so our
* caller is still responsible for releasing any memory in the buffer they
* gave to us
*/
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:xcast_binomial: buffer size %ld",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)buf->bytes_used));
/* all we need to do is send this to the HNP - the relay logic
* will ensure everyone else gets it!
*/
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:xcast_binomial: sending %s => %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
/* if I am the HNP, just set things up so the cmd processor gets called.
* We don't want to message ourselves as this can create circular logic
* in the RML. Instead, this macro will set a zero-time event which will
* cause the buffer to be processed by the cmd processor - probably will
* fire right away, but that's okay
* The macro makes a copy of the buffer, so it's okay to release it here
*/
if (orte_process_info.hnp) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
} else {
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
rc = ORTE_SUCCESS;
}
CLEANUP:
OBJ_RELEASE(buf);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:xcast_binomial: completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static int xcast_linear(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag)
{
int rc;
opal_buffer_t *buf;
orte_daemon_cmd_flag_t command;
orte_vpid_t i, range;
orte_process_name_t dummy;
orte_grpcomm_mode_t mode;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:entering xcast_linear",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* since we have to pack some additional info into the buffer to be
* sent to the daemons, we create a new buffer into which we will
* put the intermediate payload - i.e., the info that goes to the
* daemon. This buffer will contain all the info needed by the
* daemon, plus the payload intended for the processes themselves
*/
buf = OBJ_NEW(opal_buffer_t);
/* if we are an application proc, then send this to our HNP so
* we don't try to talk to every daemon directly ourselves. This
* is necessary since we don't know how many daemons there are!
*/
if (!orte_process_info.hnp && !orte_process_info.daemon) {
/* we are an application proc */
/* tell the HNP to relay */
command = ORTE_DAEMON_PROCESS_AND_RELAY_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* tell the daemon the routing algorithm this xmission is using */
mode = ORTE_GRPCOMM_LINEAR;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &mode, 1, ORTE_GRPCOMM_MODE))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
/* if this isn't intended for the daemon command tag, then we better
* tell the daemon to deliver it to the procs, and what job is supposed
* to get it - this occurs when a caller just wants to send something
* to all the procs in a job. In that use-case, the caller doesn't know
* anything about inserting daemon commands or what routing algo might
* be used, so we have to help them out a little. Functions that are
* sending commands to the daemons themselves are smart enough to know
* what they need to do.
*/
if (ORTE_RML_TAG_DAEMON != tag) {
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
/* copy the payload into the new buffer - this is non-destructive, so our
* caller is still responsible for releasing any memory in the buffer they
* gave to us
*/
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:xcast_linear: buffer size %ld",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)buf->bytes_used));
/* if we are not a daemon or the HNP, then just send this to the HNP */
if (!orte_process_info.hnp && !orte_process_info.daemon) {
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
rc = ORTE_SUCCESS;
goto CLEANUP;
}
/* if we are a daemon or the HNP, get the number of daemons out there */
range = orte_process_info.num_procs;
/* send the message to each daemon as fast as we can */
dummy.jobid = ORTE_PROC_MY_HNP->jobid;
for (i=0; i < range; i++) {
dummy.vpid = i;
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s grpcomm:xcast_linear: %s => %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&dummy)));
/* if the target is the HNP and I am the HNP, then just setup to call the cmd processor */
if (0 == i && orte_process_info.hnp) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
} else {
if (0 > (rc = orte_rml.send_buffer(&dummy, buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
}
rc = ORTE_SUCCESS;
CLEANUP:
/* release the buffer */
OBJ_RELEASE(buf);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:xcast_linear: completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static int relay_via_hnp(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag) {
opal_buffer_t *buf;
orte_daemon_cmd_flag_t command;
orte_grpcomm_mode_t mode;
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: relaying buffer to HNP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* since we have to pack some additional info into the buffer
* for this case, we create a new buffer into to contain all the
* info needed plus the payload
*/
buf = OBJ_NEW(opal_buffer_t);
/* start by telling the HNP to relay */
command = ORTE_DAEMON_PROCESS_AND_RELAY_CMD;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* default to the LINEAR mode since this is equivalent to
* DIRECT for daemons
*/
mode = ORTE_GRPCOMM_LINEAR;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &mode, 1, ORTE_GRPCOMM_MODE))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* if the target isn't the daemon tag, then we have to add the proper
* command so the daemon's know what to do
*/
if (ORTE_RML_TAG_DAEMON != tag) {
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
/* copy the payload into the new buffer - this is non-destructive, so our
* caller is still responsible for releasing any memory in the buffer they
* gave to us
*/
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
rc = ORTE_SUCCESS;
CLEANUP:
OBJ_RELEASE(buf);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: buffer relayed to HNP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static int xcast_direct(orte_jobid_t job,
opal_buffer_t *buffer,
orte_rml_tag_t tag)
{
int rc;
orte_process_name_t peer;
orte_vpid_t i, num_targets=0;
opal_buffer_t *buf=NULL, *bfr=buffer;
orte_daemon_cmd_flag_t command;
orte_rml_tag_t target=tag;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: entering xcast_direct",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* if I am applicaton proc */
if (!orte_process_info.hnp &&
!orte_process_info.daemon &&
!orte_process_info.tool) {
/* if this is going to some job other
* than my own, then we have to send it via the HNP as I have
* no way of knowing how many procs are in the other job.
*/
if (ORTE_PROC_MY_NAME->jobid != job) {
if (ORTE_SUCCESS != (rc = relay_via_hnp(job, buffer, tag))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
/* if it is my jobid, then we can just send this ourselves -
* set the target tag
*/
target = tag;
/* set number of procs to the #procs in our job */
num_targets = orte_process_info.num_procs;
/* point to the right buffer */
bfr = buffer;
/* go to send it */
goto SEND;
}
/* if I am a daemon */
if (orte_process_info.daemon) {
/* if this is going to another job, then I have to relay
* it through the HNP as I have no idea how many procs
* are in that job
*/
if (ORTE_PROC_MY_NAME->jobid != job) {
if (ORTE_SUCCESS != (rc = relay_via_hnp(job, buffer, tag))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
}
/* if this is going to the daemon job to
* someplace other than the daemon cmd processor, then I need to add
* a command to the buffer so the recipient daemons know what to do
*/
if (ORTE_RML_TAG_DAEMON != tag) {
/* setup a buffer to handle the additional info */
buf = OBJ_NEW(opal_buffer_t);
/* add the proper command so the daemon's know what to do */
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* copy the payload into the new buffer - this is non-destructive, so our
* caller is still responsible for releasing any memory in the buffer they
* gave to us
*/
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* point to correct buffer to be sent */
bfr = buf;
/* send this to the daemon tag so it gets processed correctly */
target = ORTE_RML_TAG_DAEMON;
/* set the number of targets to be the number of daemons */
num_targets = orte_process_info.num_procs;
/* send it */
goto SEND;
}
}
/* if I am the HNP */
if (orte_process_info.hnp) {
orte_job_t *jdata;
/* if this is going to the daemon job */
if (ORTE_PROC_MY_NAME->jobid == job) {
/* if this is going someplace other than the daemon cmd
* processor, then I need to add a command to the buffer
* so the recipient daemons know what to do
*/
if (ORTE_RML_TAG_DAEMON != tag) {
/* since we have to pack some additional info into the buffer
* for this case, we create a new buffer to contain all the
* info needed plus the payload
*/
buf = OBJ_NEW(opal_buffer_t);
/* if the target isn't the daemon tag, then we have to add the proper
* command so the daemon's know what to do
*/
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &job, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* copy the payload into the new buffer - this is non-destructive, so our
* caller is still responsible for releasing any memory in the buffer they
* gave to us
*/
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, buffer))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* point to correct buffer to be sent */
bfr = buf;
/* send this to the daemon tag so it gets processed correctly */
target = ORTE_RML_TAG_DAEMON;
/* set the number of targets to be the number of daemons */
num_targets = orte_process_info.num_procs;
/* send it */
goto SEND;
} else {
/* if already going to the daemon tag, then just point to
* the right places and send it
*/
bfr = buffer;
target = tag;
num_targets = orte_process_info.num_procs;
goto SEND;
}
}
/* if this is going to any other job,
* then I need to know the number of procs in that job so I can
* send it
*/
if (NULL == (jdata = orte_get_job_data_object(job))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto CLEANUP;
}
/* set the number of targets */
num_targets = jdata->num_procs;
/* set the tag */
target = tag;
/* point to correct buffer to be sent */
bfr = buffer;
/* send it */
goto SEND;
}
SEND:
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s xcast_direct: buffer size %ld",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)buffer->bytes_used));
peer.jobid = job;
for(i=0; i<num_targets; i++) {
peer.vpid = i;
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s xcast_direct: %s => %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer)));
/* if I am the HNP, just set things up so the cmd processor gets called.
* We don't want to message ourselves as this can create circular logic
* in the RML. Instead, this macro will set a zero-time event which will
* cause the buffer to be processed by the cmd processor - probably will
* fire right away, but that's okay
* The macro makes a copy of the buffer, so it's okay to release it later
*/
if (peer.jobid == ORTE_PROC_MY_NAME->jobid &&
peer.vpid == ORTE_PROC_MY_NAME->vpid &&
orte_process_info.hnp) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, bfr, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
} else {
if (0 > (rc = orte_rml.send_buffer(&peer, bfr, target, 0))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
rc = ORTE_SUCCESS;
}
}
rc = ORTE_SUCCESS;
CLEANUP:
/* release buf if used */
if (NULL != buf) {
OBJ_RELEASE(buf);
}
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm: xcast_direct completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static int chain_recips(opal_list_t *names)
{
orte_namelist_t *target;
/* chain just sends to the next vpid up the line */
if (ORTE_PROC_MY_NAME->vpid < orte_process_info.num_procs-1) {
/* I am not at the end of the chain */
if (NULL == (target = OBJ_NEW(orte_namelist_t))) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
target->name.jobid = ORTE_PROC_MY_NAME->jobid;
target->name.vpid = ORTE_PROC_MY_NAME->vpid + 1;
opal_list_append(names, &target->item);
}
return ORTE_SUCCESS;
}
static int binomial_recips(opal_list_t *names)
{
int i, bitmap, peer, size, rank, hibit, mask;
orte_namelist_t *target;
/* compute the bitmap */
bitmap = opal_cube_dim((int)orte_process_info.num_procs);
rank = (int)ORTE_PROC_MY_NAME->vpid;
size = (int)orte_process_info.num_procs;
hibit = opal_hibit(rank, bitmap);
--bitmap;
for (i = hibit + 1, mask = 1 << i; i <= bitmap; ++i, mask <<= 1) {
peer = rank | mask;
if (peer < size) {
if (NULL == (target = OBJ_NEW(orte_namelist_t))) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
target->name.jobid = ORTE_PROC_MY_NAME->jobid;
target->name.vpid = (orte_vpid_t)peer;
opal_list_append(names, &target->item);
}
}
return ORTE_SUCCESS;
}
static int linear_recips(opal_list_t *names)
{
orte_namelist_t *target;
orte_vpid_t i;
/* if we are not the HNP, we just return - only
* the HNP sends in this mode
*/
if (!orte_process_info.hnp) {
return ORTE_SUCCESS;
}
/* if we are the HNP, then just add the names of
* all daemons to the list
*/
for (i=1; i < orte_process_info.num_procs; i++) {
if (NULL == (target = OBJ_NEW(orte_namelist_t))) {
return ORTE_ERR_OUT_OF_RESOURCE;
}
target->name.jobid = ORTE_PROC_MY_NAME->jobid;
target->name.vpid = i;
opal_list_append(names, &target->item);
}
return ORTE_SUCCESS;
}
static int next_recips(opal_list_t *names, orte_grpcomm_mode_t mode)
{
int rc;
/* check the mode to select the proper algo */
switch (mode) {
case ORTE_GRPCOMM_CHAIN:
rc = chain_recips(names);
break;
case ORTE_GRPCOMM_BINOMIAL:
rc = binomial_recips(names);
break;
case ORTE_GRPCOMM_LINEAR:
rc = linear_recips(names);
break;
default:
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
break;
}
return rc;
}
/*** TEST AREA FOR NEW COLLECTIVES ***/
#define DEGREE 2
static orte_std_cntr_t barrier_num_recvd;
static bool barrier_failed;
static void barrier_server_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata)
{
int rc;
/* bump counter */
++barrier_num_recvd;
/* reissue the recv */
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_SERVER,
ORTE_RML_NON_PERSISTENT, barrier_server_recv, NULL);
if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(rc);
barrier_failed = true;
}
}
static int new_barrier(void)
{
orte_vpid_t first_child, num_daemons;
orte_std_cntr_t num_children;
opal_buffer_t buf;
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
orte_rml_tag_t target_tag=ORTE_RML_TAG_BARRIER_SERVER;
orte_process_name_t parent;
int rc;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:exp: barrier entered",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* if I am not the HNP or daemon, then just send to my
* local daemon
*/
if (!orte_process_info.hnp &&
!orte_process_info.daemon) {
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* tell the daemon to collect the data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
/* tell the daemon where it is eventually to be delivered */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &target_tag, 1, ORTE_RML_TAG))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
/* send to local daemon */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &buf, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}
OBJ_DESTRUCT(&buf);
return ORTE_SUCCESS;
}
num_daemons = orte_process_info.num_procs;
first_child = (ORTE_PROC_MY_NAME->vpid * DEGREE) + 1;
if (first_child < num_daemons) {
/* How many children do I have? */
if (first_child + DEGREE > num_daemons) {
num_children = num_daemons - first_child;
} else {
num_children = DEGREE;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"Rank %d has %d children, first child %d\n",
ORTE_PROC_MY_NAME->vpid,
num_children, first_child));
/* Use non-blocking receives so that they can progress
* simultaneously
*/
barrier_num_recvd = 0;
barrier_failed = false;
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_SERVER,
ORTE_RML_NON_PERSISTENT, barrier_server_recv, NULL);
if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) {
ORTE_ERROR_LOG(rc);
return rc;
}
ORTE_PROGRESSED_WAIT(false, barrier_num_recvd, num_children);
/* cancel the lingering recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_SERVER))) {
ORTE_ERROR_LOG(rc);
return rc;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s got all data from children",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}
/* If you have a parent, send */
if (0 != ORTE_PROC_MY_NAME->vpid) {
/* setup the buffer */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
parent.jobid = ORTE_PROC_MY_NAME->jobid;
parent.vpid = (ORTE_PROC_MY_NAME->vpid - 1) / DEGREE;
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
"%s sending to parent %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&parent)));
if (0 > orte_rml.send_buffer(&parent, &buf, ORTE_RML_TAG_BARRIER_SERVER, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_DESTRUCT(&buf);
return ORTE_ERR_COMM_FAILURE;
}
OBJ_DESTRUCT(&buf);
}
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
"%s grpcomm:exp: barrier completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return ORTE_SUCCESS;
}

Просмотреть файл

@ -1,64 +0,0 @@
/* -*- C -*-
*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*
*/
#ifndef GRPCOMM_EXP_H
#define GRPCOMM_RCP_H
#include "orte_config.h"
#include "orte/types.h"
#include "opal/threads/mutex.h"
#include "opal/threads/condition.h"
#include "opal/class/opal_object.h"
#include "orte/mca/grpcomm/grpcomm.h"
BEGIN_C_DECLS
/*
* globals
*/
/*
* globals needed within component
*/
typedef struct {
orte_vpid_t xcast_linear_xover;
orte_vpid_t xcast_binomial_xover;
} orte_grpcomm_exp_globals_t;
extern orte_grpcomm_exp_globals_t orte_grpcomm_exp;
/*
* Component open / close
*/
int orte_grpcomm_exp_open(void);
int orte_grpcomm_exp_close(void);
orte_grpcomm_base_module_t* orte_grpcomm_exp_init(int *priority);
/*
* Grpcomm interfaces
*/
ORTE_MODULE_DECLSPEC extern orte_grpcomm_base_component_t mca_grpcomm_exp_component;
extern orte_grpcomm_base_module_t orte_grpcomm_exp_module;
END_C_DECLS
#endif

Просмотреть файл

@ -1,126 +0,0 @@
/* -*- C -*-
*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file:
*
* The Open MPI Name Server
*
* The Open MPI Name Server provides unique name ranges for processes
* within the universe. Each universe will have one name server
* running within the seed daemon. This is done to prevent the
* inadvertent duplication of names.
*/
/*
* includes
*/
#include "orte_config.h"
#include "orte/constants.h"
#include "opal/threads/mutex.h"
#include "opal/class/opal_list.h"
#include "opal/util/output.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/mca_base_param.h"
#include "orte/util/proc_info.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/rml.h"
#include "grpcomm_exp.h"
/* set the default xovers */
#define XCAST_LINEAR_XOVER_DEFAULT 2
#define XCAST_BINOMIAL_XOVER_DEFAULT 16
/*
* Struct of function pointers that need to be initialized
*/
orte_grpcomm_base_component_t mca_grpcomm_exp_component = {
{
ORTE_GRPCOMM_BASE_VERSION_2_0_0,
"exp", /* MCA module name */
ORTE_MAJOR_VERSION, /* MCA module major version */
ORTE_MINOR_VERSION, /* MCA module minor version */
ORTE_RELEASE_VERSION, /* MCA module release version */
orte_grpcomm_exp_open, /* module open */
orte_grpcomm_exp_close /* module close */
},
{
/* The component is checkpoint ready */
MCA_BASE_METADATA_PARAM_CHECKPOINT
},
orte_grpcomm_exp_init /* component init */
};
/*
* instantiate globals needed within exp component
*/
orte_grpcomm_exp_globals_t orte_grpcomm_exp;
/* Open the component */
int orte_grpcomm_exp_open(void)
{
char *mode;
mca_base_component_t *c = &mca_grpcomm_exp_component.grpcomm_version;
int tmp;
mca_base_param_reg_int(c, "xcast_linear_xover",
"Number of daemons where use of linear xcast mode is to begin",
false, false, XCAST_LINEAR_XOVER_DEFAULT, &tmp);
orte_grpcomm_exp.xcast_linear_xover = tmp;
mca_base_param_reg_int(c, "xcast_binomial_xover",
"Number of daemons where use of binomial xcast mode is to begin",
false, false, XCAST_BINOMIAL_XOVER_DEFAULT, &tmp);
orte_grpcomm_exp.xcast_binomial_xover = tmp;
mca_base_param_reg_string(c, "xcast_mode",
"Select xcast mode (\"linear\" | \"binomial\" | \"direct\")",
false, false, "none", &mode);
if (0 == strcmp(mode, "binomial")) {
orte_grpcomm_exp.xcast_binomial_xover = 0;
orte_grpcomm_exp.xcast_linear_xover = 0;
} else if (0 == strcmp(mode, "linear")) {
orte_grpcomm_exp.xcast_linear_xover = 0;
orte_grpcomm_exp.xcast_binomial_xover = INT_MAX;
} else if (0 == strcmp(mode, "direct")) {
orte_grpcomm_exp.xcast_binomial_xover = INT_MAX;
orte_grpcomm_exp.xcast_linear_xover = INT_MAX;
} else if (0 != strcmp(mode, "none")) {
opal_output(0, "grpcomm_exp_xcast_mode: unknown option %s - using defaults", mode);
}
return ORTE_SUCCESS;
}
int orte_grpcomm_exp_close(void)
{
return ORTE_SUCCESS;
}
orte_grpcomm_base_module_t* orte_grpcomm_exp_init(int *priority)
{
/* only taken when specified */
*priority = 0;
return &orte_grpcomm_exp_module;
}

Просмотреть файл

@ -39,8 +39,10 @@
#include "opal/mca/mca.h"
#include "opal/class/opal_list.h"
#include "opal/class/opal_value_array.h"
#include "opal/dss/dss_types.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/odls/odls_types.h"
@ -72,8 +74,21 @@ typedef int (*orte_grpcomm_base_module_allgather_list_fn_t)(opal_list_t *names,
/* barrier function */
typedef int (*orte_grpcomm_base_module_barrier_fn_t)(void);
/* daemon collective operations */
typedef int (*orte_grpcomm_base_module_daemon_collective_fn_t)(orte_jobid_t jobid,
orte_std_cntr_t num_local_contributors,
orte_grpcomm_coll_t type,
opal_buffer_t *data,
orte_rmaps_dp_t flag,
opal_value_array_t *participants);
/* update the xcast trees - called after a change to the number of daemons
* in the system
*/
typedef int (*orte_grpcomm_base_module_update_trees_fn_t)(void);
/* for collectives, return next recipients in the chain */
typedef int (*orte_gprcomm_base_module_next_recipients_fn_t)(opal_list_t *list, orte_grpcomm_mode_t mode);
typedef opal_list_t* (*orte_gprcomm_base_module_next_recipients_fn_t)(orte_grpcomm_mode_t mode);
/** DATA EXCHANGE FUNCTIONS - SEE ompi/runtime/ompi_module_exchange.h FOR A DESCRIPTION
* OF HOW THIS ALL WORKS
@ -81,12 +96,12 @@ typedef int (*orte_gprcomm_base_module_next_recipients_fn_t)(opal_list_t *list,
/* send an attribute buffer */
typedef int (*orte_grpcomm_base_module_modex_set_proc_attr_fn_t)(const char* attr_name,
const void *buffer, size_t size);
const void *buffer, size_t size);
/* get an attribute buffer */
typedef int (*orte_grpcomm_base_module_modex_get_proc_attr_fn_t)(const orte_process_name_t name,
const char* attr_name,
void **buffer, size_t *size);
const char* attr_name,
void **buffer, size_t *size);
/* perform a modex operation */
typedef int (*orte_grpcomm_base_module_modex_fn_t)(opal_list_t *procs);
@ -106,6 +121,8 @@ struct orte_grpcomm_base_module_2_0_0_t {
orte_grpcomm_base_module_allgather_fn_t allgather;
orte_grpcomm_base_module_allgather_list_fn_t allgather_list;
orte_grpcomm_base_module_barrier_fn_t barrier;
orte_grpcomm_base_module_daemon_collective_fn_t daemon_collective;
orte_grpcomm_base_module_update_trees_fn_t update_trees;
orte_gprcomm_base_module_next_recipients_fn_t next_recipients;
/* modex functions */
orte_grpcomm_base_module_modex_set_proc_attr_fn_t set_proc_attr;

Просмотреть файл

@ -54,6 +54,14 @@ typedef uint8_t orte_grpcomm_mode_t;
/* linear - HNP sends direct to all daemons */
#define ORTE_GRPCOMM_LINEAR (orte_grpcomm_mode_t) 3
/*
* Define collective types
*/
typedef uint8_t orte_grpcomm_coll_t;
#define ORTE_GRPCOMM_COLL_T OPAL_UINT8
#define ORTE_GRPCOMM_BARRIER 0x01
#define ORTE_GRPCOMM_ALLGATHER 0x02
END_C_DECLS

Просмотреть файл

@ -38,6 +38,7 @@ int orte_odls_base_close(void)
OBJ_DESTRUCT(&orte_odls_globals.mutex);
OBJ_DESTRUCT(&orte_odls_globals.cond);
OBJ_DESTRUCT(&orte_odls_globals.children);
OBJ_DESTRUCT(&orte_odls_globals.jobs);
/* if no components are available, then punt */
if (!orte_odls_base.components_available) {

Просмотреть файл

@ -48,6 +48,7 @@
#include "orte/mca/ess/base/base.h"
#include "orte/mca/plm/base/base.h"
#include "orte/mca/routed/base/base.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/util/context_fns.h"
#include "orte/util/name_fns.h"
@ -129,6 +130,12 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
*/
map = jdata->map;
/* pack the flag indicating daemon participation in this launch */
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &map->daemon_participation, 1, ORTE_RMAPS_DP_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the number of nodes participating in this launch */
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &map->num_nodes, 1, ORTE_STD_CNTR))) {
ORTE_ERROR_LOG(rc);
@ -230,7 +237,8 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
orte_process_name_t proc, daemon;
char *slot_str;
bool node_oversubscribed;
orte_odls_job_t *jobdat;
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:constructing child list",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -249,13 +257,17 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
*node_included = false;
*oversubscribed = false;
*override_oversubscribed = false;
/* unpack the jobid we are to launch */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, job, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* setup jobdat object for this job */
jobdat = OBJ_NEW(orte_odls_job_t);
jobdat->jobid = *job;
opal_list_append(&orte_odls_globals.jobs, &jobdat->super);
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:construct_child_list unpacking data to launch job %s",
@ -303,7 +315,14 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
return rc;
}
/* UNPACK THE NODE-SPECIFIC DATA */
/* UNPACK THE JOB MAP DATA */
/* unpack the flag indicating daemon participation in this launch */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jobdat->dp, &cnt, ORTE_RMAPS_DP_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* unpack the number of nodes participating in this launch */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &num_nodes, &cnt, ORTE_STD_CNTR))) {
@ -323,6 +342,14 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
ORTE_ERROR_LOG(rc);
return rc;
}
/* if daemon participation is sparse, add this daemon to the
* list of those participating
*/
if (ORTE_RMAPS_DAEMON_SUBSET == jobdat->dp) {
opal_value_array_append_item(&jobdat->daemons, &daemon.vpid);
}
/* unpack the number of procs on this node */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &num_procs, &cnt, ORTE_VPID))) {
@ -380,7 +407,6 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
child->slot_list = strdup(slot_str);
free(slot_str);
}
child->num_nodes = num_nodes; /* save #nodes in launch */
/* protect operation on the global list of children */
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
opal_list_append(&orte_odls_globals.children, &child->super);
@ -705,6 +731,8 @@ int orte_odls_base_default_launch_local(orte_jobid_t job,
bool launch_failed=true;
opal_buffer_t alert;
orte_std_cntr_t proc_rank;
orte_std_cntr_t num_daemons;
orte_odls_job_t *jobdat;
/* protect operations involving the global list of children */
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
@ -799,19 +827,30 @@ int orte_odls_base_default_launch_local(orte_jobid_t job,
/* setup to report the proc state to the HNP */
OBJ_CONSTRUCT(&alert, opal_buffer_t);
/* find a child for this job */
for (item = opal_list_get_first(&orte_odls_globals.children);
item != opal_list_get_end(&orte_odls_globals.children);
/* find the jobdat for this job */
jobdat = NULL;
for (item = opal_list_get_first(&orte_odls_globals.jobs);
item != opal_list_get_end(&orte_odls_globals.jobs);
item = opal_list_get_next(item)) {
child = (orte_odls_child_t*)item;
jobdat = (orte_odls_job_t*)item;
/* is this child part of the specified job? */
if (child->name->jobid == job) {
/* is this the specified job? */
if (jobdat->jobid == job) {
break;
}
}
if (NULL == child) {
if (NULL == jobdat) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
goto unlock;
}
if (ORTE_RMAPS_ALL_DAEMONS == jobdat->dp) {
num_daemons = orte_process_info.num_procs;
} else if (ORTE_RMAPS_ALL_EXCEPT_HNP == jobdat->dp) {
num_daemons = orte_process_info.num_procs-1;
} else {
ORTE_ERROR_LOG(ORTE_ERR_NOT_IMPLEMENTED);
rc = ORTE_ERR_NOT_IMPLEMENTED;
goto unlock;
}
@ -821,7 +860,7 @@ int orte_odls_base_default_launch_local(orte_jobid_t job,
num_local_procs,
vpid_range,
total_slots_alloc,
child->num_nodes,
num_daemons,
oversubscribed,
&apps[i]->env))) {
@ -1900,6 +1939,7 @@ CLEANUP:
return rc;
}
static orte_std_cntr_t num_local_contributors;
static bool all_children_participated(orte_jobid_t job)
{
@ -1925,6 +1965,7 @@ static bool all_children_participated(orte_jobid_t job)
/* if we get here, then everyone in the job has participated - cleanout
* their flags so they can do this again!
*/
num_local_contributors = 0;
for (item = opal_list_get_first(&orte_odls_globals.children);
item != opal_list_get_end(&orte_odls_globals.children);
item = opal_list_get_next(item)) {
@ -1934,6 +1975,7 @@ static bool all_children_participated(orte_jobid_t job)
if (OPAL_EQUAL == opal_dss.compare(&child->name->jobid, &job, ORTE_JOBID)) {
/* clear flag */
child->coll_recvd = false;
++num_local_contributors;
}
}
return true;
@ -1941,18 +1983,18 @@ static bool all_children_participated(orte_jobid_t job)
}
static opal_buffer_t *collection_bucket=NULL;
static orte_rml_tag_t collective_target_tag;
static orte_grpcomm_coll_t collective_type;
int orte_odls_base_default_collect_data(orte_process_name_t *proc,
opal_buffer_t *buf)
{
opal_list_item_t *item;
orte_odls_child_t *child;
int rc;
int rc= ORTE_SUCCESS;
bool found=false;
orte_process_name_t collector;
orte_std_cntr_t n;
orte_odls_job_t *jobdat;
/* protect operations involving the global list of children */
OPAL_THREAD_LOCK(&orte_odls_globals.mutex);
@ -1989,20 +2031,20 @@ int orte_odls_base_default_collect_data(orte_process_name_t *proc,
*/
child->alive = true;
}
/* unpack the target tag for this collective */
/* unpack the collective type */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &collective_target_tag, &n, ORTE_RML_TAG))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &collective_type, &n, ORTE_GRPCOMM_COLL_T))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* if the collection bucket isn't initialized, do so now */
if (NULL == collection_bucket) {
collection_bucket = OBJ_NEW(opal_buffer_t);
}
/* store the provided data */
/* collect the provided data */
opal_dss.copy_payload(collection_bucket, buf);
/* flag this proc as having participated */
@ -2010,28 +2052,49 @@ int orte_odls_base_default_collect_data(orte_process_name_t *proc,
/* now check to see if everyone in this job has participated */
if (all_children_participated(proc->jobid)) {
/* once everyone participates, send the collection
* bucket to the rank=0 proc of this job
*/
collector.jobid = proc->jobid;
collector.vpid = 0;
/* once everyone participates, do the specified collective */
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls: sending collection bucket to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&collector)));
"%s odls: executing collective",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* go ahead and send it */
if (0 > (rc = orte_rml.send_buffer(&collector, collection_bucket, collective_target_tag, 0))) {
ORTE_ERROR_LOG(rc);
/* find the jobdat for this job */
jobdat = NULL;
for (item = opal_list_get_first(&orte_odls_globals.jobs);
item != opal_list_get_end(&orte_odls_globals.jobs);
item = opal_list_get_next(item)) {
jobdat = (orte_odls_job_t*)item;
/* is this the specified job? */
if (jobdat->jobid == proc->jobid) {
break;
}
}
if (NULL == jobdat) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
OBJ_RELEASE(collection_bucket);
goto CLEANUP;
}
if (ORTE_SUCCESS != (rc = orte_grpcomm.daemon_collective(proc->jobid, num_local_contributors,
collective_type, collection_bucket,
jobdat->dp, &jobdat->daemons))) {
ORTE_ERROR_LOG(rc);
}
/* release the collection bucket for reuse */
OBJ_RELEASE(collection_bucket);
OPAL_OUTPUT_VERBOSE((1, orte_odls_globals.output,
"%s odls: collective completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}
CLEANUP:
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
return ORTE_SUCCESS;
return rc;
}

Просмотреть файл

@ -26,8 +26,9 @@
#include "opal/util/output.h"
#include "opal/util/trace.h"
#include "opal/util/argv.h"
#include "opal/class/opal_value_array.h"
#include "opal/dss/dss.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/plm/plm_types.h"
#include "orte/util/name_fns.h"
@ -69,7 +70,6 @@ static void orte_odls_child_constructor(orte_odls_child_t *ptr)
ptr->cpu_set = 0xffffffff;
ptr->rml_uri = NULL;
ptr->slot_list = NULL;
}
static void orte_odls_child_destructor(orte_odls_child_t *ptr)
{
@ -82,6 +82,23 @@ OBJ_CLASS_INSTANCE(orte_odls_child_t,
orte_odls_child_constructor,
orte_odls_child_destructor);
/* instance the job list object */
static void orte_odls_job_constructor(orte_odls_job_t *ptr)
{
ptr->jobid = ORTE_JOBID_INVALID;
ptr->dp = 0;
OBJ_CONSTRUCT(&ptr->daemons, opal_value_array_t);
opal_value_array_init(&ptr->daemons, sizeof(orte_vpid_t));
}
static void orte_odls_job_destructor(orte_odls_job_t *ptr)
{
OBJ_DESTRUCT(&ptr->daemons);
}
OBJ_CLASS_INSTANCE(orte_odls_job_t,
opal_list_item_t,
orte_odls_job_constructor,
orte_odls_job_destructor);
/*
* Framework global variables
*/
@ -106,7 +123,8 @@ int orte_odls_base_open(void)
OBJ_CONSTRUCT(&orte_odls_globals.mutex, opal_mutex_t);
OBJ_CONSTRUCT(&orte_odls_globals.cond, opal_condition_t);
OBJ_CONSTRUCT(&orte_odls_globals.children, opal_list_t);
OBJ_CONSTRUCT(&orte_odls_globals.jobs, opal_list_t);
/* Open up all available components */
if (ORTE_SUCCESS !=

Просмотреть файл

@ -50,14 +50,13 @@ BEGIN_C_DECLS
* our children. This can subsequently be used to order termination
* or pass signals without looking the info up again.
*/
typedef struct orte_odls_child_t {
typedef struct {
opal_list_item_t super; /* required to place this on a list */
orte_process_name_t *name; /* the OpenRTE name of the proc */
orte_vpid_t local_rank; /* local rank of the proc on this node */
pid_t pid; /* local pid of the proc */
orte_std_cntr_t app_idx; /* index of the app_context for this proc */
bool alive; /* is this proc alive? */
orte_std_cntr_t num_nodes; /* #nodes involved in launching this child */
bool coll_recvd; /* collective operation recvd */
orte_proc_state_t state; /* the state of the process */
orte_exit_code_t exit_code; /* process exit code */
@ -67,7 +66,16 @@ typedef struct orte_odls_child_t {
} orte_odls_child_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_child_t);
typedef struct orte_odls_globals_t {
typedef struct {
opal_list_item_t super; /* required to place this on a list */
orte_jobid_t jobid; /* jobid for this job */
orte_rmaps_dp_t dp; /* daemon participation for this job */
opal_value_array_t daemons; /* vpids of participating daemons */
} orte_odls_job_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_job_t);
typedef struct {
/** Verbose/debug output stream */
int output;
/** Time to allow process to forcibly die */
@ -78,6 +86,8 @@ typedef struct orte_odls_globals_t {
opal_condition_t cond;
/* list of children for this orted */
opal_list_t children;
/* list of jobs for this orted */
opal_list_t jobs;
} orte_odls_globals_t;
ORTE_DECLSPEC extern orte_odls_globals_t orte_odls_globals;

Просмотреть файл

@ -112,6 +112,7 @@ int orte_plm_base_launch_apps(orte_jobid_t job)
int rc;
orte_process_name_t name = {ORTE_JOBID_INVALID, 0};
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:launch_apps for job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -128,7 +129,7 @@ int orte_plm_base_launch_apps(orte_jobid_t job)
return rc;
}
/* let the local launcher provide its required data */
/* get the local launcher's required data */
if (ORTE_SUCCESS != (rc = orte_odls.get_add_procs_data(buffer, job))) {
ORTE_ERROR_LOG(rc);
return rc;
@ -330,6 +331,10 @@ int orte_plm_base_daemon_callback(orte_std_cntr_t num_daemons)
/* all done launching - update the num_procs in my local structure */
orte_process_info.num_procs = jdatorted->num_procs;
/* update the grpcomm xcast tree(s) */
if (ORTE_SUCCESS != (rc = orte_grpcomm.update_trees())) {
ORTE_ERROR_LOG(rc);
}
/* get wireup info for daemons per the selected routing module */
wireup = OBJ_NEW(opal_buffer_t);

Просмотреть файл

@ -66,7 +66,6 @@ int orte_rmaps_base_map_job(orte_job_t *jdata)
}
/* load it with the system defaults */
map->policy = orte_rmaps_base.policy;
map->no_use_local = orte_rmaps_base.no_use_local;
map->pernode = orte_rmaps_base.pernode;
map->npernode = orte_rmaps_base.npernode;
map->oversubscribe = orte_rmaps_base.oversubscribe;

Просмотреть файл

@ -110,7 +110,9 @@ int orte_rmaps_base_open(void)
mca_base_param_reg_int_name("rmaps", "base_no_schedule_local",
"If false, allow scheduling MPI applications on the same node as mpirun (default). If true, do not schedule any MPI applications on the same node as mpirun",
false, false, (int)false, &value);
orte_rmaps_base.no_use_local = OPAL_INT_TO_BOOL(value);
if (value) {
orte_rmaps_base.policy |= ORTE_RMAPS_NO_USE_LOCAL;
}
/* Should we oversubscribe or not? */
/** default condition that allows oversubscription */

Просмотреть файл

@ -44,7 +44,7 @@
* Query the registry for all nodes allocated to a specified app_context
*/
int orte_rmaps_base_get_target_nodes(opal_list_t *allocated_nodes, orte_std_cntr_t *total_num_slots,
orte_app_context_t *app, bool nolocal)
orte_app_context_t *app, uint8_t policy)
{
opal_list_item_t *item, *next;
orte_node_t *node, **nodes;
@ -130,7 +130,7 @@ int orte_rmaps_base_get_target_nodes(opal_list_t *allocated_nodes, orte_std_cntr
/* If the "no local" option was set, then remove the local node
from the list */
if (nolocal) {
if (policy & ORTE_RMAPS_NO_USE_LOCAL) {
for (item = opal_list_get_first(allocated_nodes);
item != opal_list_get_end(allocated_nodes);
item = opal_list_get_next(item) ) {
@ -191,35 +191,6 @@ int orte_rmaps_base_get_target_nodes(opal_list_t *allocated_nodes, orte_std_cntr
}
/*
* Query the registry for all nodes allocated to a specified job
*/
#if 0
static int compare(opal_list_item_t **a, opal_list_item_t **b)
{
orte_ras_proc_t *aa = *((orte_ras_proc_t **) a);
orte_ras_proc_t *bb = *((orte_ras_proc_t **) b);
return (aa->rank - bb->rank);
}
#endif
int orte_rmaps_base_get_target_procs(opal_list_t *procs)
{
#if 0
int rc;
/* get the allocation for this job */
if(ORTE_SUCCESS != (rc = orte_ras.proc_query(procs))) {
ORTE_ERROR_LOG(rc);
return rc;
}
opal_list_sort(procs, compare);
#endif
return ORTE_SUCCESS;
}
int orte_rmaps_base_add_proc_to_map(orte_job_map_t *map, orte_node_t *node,
bool oversubscribed, orte_proc_t *proc)
{
@ -269,6 +240,10 @@ PROCESS:
OBJ_RETAIN(proc);
++node->num_procs;
/* if this is the HNP, flag that the HNP has local procs */
if (node == orte_hnpnode) {
map->hnp_has_local_procs = true;
}
return ORTE_SUCCESS;
}
@ -362,102 +337,6 @@ int orte_rmaps_base_claim_slot(orte_job_t *jdata,
}
#if 0
static int orte_find_unallocated_proc_in_map(orte_ras_proc_t *proc, orte_job_map_t *map, orte_proc_t **mproc)
{
orte_mapped_node_t *mnode;
opal_list_item_t *item, *item2;
int i;
for (item = opal_list_get_first(&map->nodes);
item != opal_list_get_end(&map->nodes);
item = opal_list_get_next(item)) {
mnode = (orte_mapped_node_t*)item;
if (strcmp(proc->node_name, mnode->nodename)) {
continue;
}
for (item2 = opal_list_get_first(&mnode->procs),i=1;
item2 != opal_list_get_end(&mnode->procs);
item2 = opal_list_get_next(item2),i++) {
*mproc = (orte_mapped_proc_t*)item2;
if (NULL == (*mproc)->slot_list) {
return ORTE_SUCCESS;
}
}
}
return ORTE_ERROR;
}
#endif
int orte_rmaps_base_rearrange_map(orte_app_context_t *app, orte_job_map_t *map, opal_list_t *procs)
{
#if 0
opal_list_item_t *proc_item, *map_node_item, *map_proc_item;
orte_mapped_node_t *mnode;
bool *used_ranks; /* an array for string used ranks */
orte_std_cntr_t used_rank_index;
orte_std_cntr_t assigned_procs = 0;
orte_ras_proc_t *proc;
orte_mapped_proc_t *mproc;
int rc;
used_ranks = (bool *)calloc(map->vpid_range, sizeof(bool));
for (proc_item = opal_list_get_first(procs);
proc_item != opal_list_get_end(procs) && assigned_procs < app->num_procs;
proc_item = opal_list_get_next(proc_item)) {
proc = (orte_ras_proc_t *)proc_item;
if (proc->rank != ORTE_VPID_MAX) {
/* Check if this proc belong to this map */
if (proc->rank >= map->vpid_start && proc->rank < (map->vpid_start + map->vpid_range)) {
if (ORTE_SUCCESS != (rc = orte_find_unallocated_proc_in_map(proc, map, &mproc))){
free (used_ranks);
ORTE_ERROR_LOG(rc);
return rc;
}
mproc->slot_list = strdup(proc->cpu_list);
mproc->rank = proc->rank;
mproc->name.vpid = proc->rank;
mproc->maped_rank = true;
used_rank_index = proc->rank - map->vpid_start;
used_ranks[used_rank_index] = true;
assigned_procs ++;
}
}else if (NULL != proc->cpu_list) {
if (ORTE_SUCCESS != (rc = orte_find_unallocated_proc_in_map(proc, map, &mproc))){
continue; /* since there is not a specifiv rank continue searching */
}
mproc->slot_list = strdup(proc->cpu_list);
assigned_procs ++;
}
}
if(assigned_procs > 0) {
used_rank_index = 0;
for (map_node_item = opal_list_get_first(&map->nodes);
map_node_item != opal_list_get_end(&map->nodes);
map_node_item = opal_list_get_next(map_node_item)) {
mnode = (orte_mapped_node_t*)map_node_item;
for (map_proc_item = opal_list_get_first(&mnode->procs);
map_proc_item != opal_list_get_end(&mnode->procs);
map_proc_item = opal_list_get_next(map_proc_item)) {
mproc = (orte_mapped_proc_t*)map_proc_item;
if (mproc->maped_rank) {
continue;
}
while (used_ranks[used_rank_index]){
used_rank_index++;
}
mproc->rank = map->vpid_start + used_rank_index;
mproc->name.vpid = mproc->rank;
used_rank_index++;
}
}
}
free (used_ranks);
#endif
return ORTE_SUCCESS;
}
int orte_rmaps_base_compute_usage(orte_job_t *jdata)
{
orte_std_cntr_t i, j;
@ -516,6 +395,7 @@ int orte_rmaps_base_define_daemons(orte_job_map_t *map)
orte_proc_t *proc;
orte_job_t *daemons;
orte_std_cntr_t i;
orte_vpid_t numdaemons;
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_rmaps_base.rmaps_output,
@ -528,6 +408,7 @@ int orte_rmaps_base_define_daemons(orte_job_map_t *map)
ORTE_ERROR_LOG(ORTE_ERR_FATAL);
return ORTE_ERR_FATAL;
}
numdaemons=0;
/* go through the nodes in the map, checking each one's daemon name
*/
@ -562,6 +443,8 @@ int orte_rmaps_base_define_daemons(orte_job_map_t *map)
return rc;
}
++daemons->num_procs;
/* count number of daemons being used */
++numdaemons;
/* point the node to the daemon */
node->daemon = proc;
OBJ_RETAIN(proc); /* maintain accounting */
@ -572,36 +455,29 @@ int orte_rmaps_base_define_daemons(orte_job_map_t *map)
map->daemon_vpid_start = proc->name.vpid;
}
} else {
/* this daemon was previously defined - see if it has launched. The daemons
* are stored in vpid order, so just look it up
*/
if (daemons->procs->size < (orte_std_cntr_t)node->daemon->name.vpid ||
daemons->num_procs < node->daemon->name.vpid) {
/* well that is bad */
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
proc = (orte_proc_t*)daemons->procs->addr[node->daemon->name.vpid];
if (NULL == proc) {
/* well that is bad too */
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
if (NULL != proc->rml_uri) {
node->daemon_launched = true;
OPAL_OUTPUT_VERBOSE((5, orte_rmaps_base.rmaps_output,
"%s rmaps:base:define_daemons existing daemon %s already launched",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name)));
} else {
node->daemon_launched = false;
OPAL_OUTPUT_VERBOSE((5, orte_rmaps_base.rmaps_output,
"%s rmaps:base:define_daemons existing daemon %s has not been launched",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->name)));
}
/* this daemon was previously defined - flag it */
node->daemon_launched = true;
OPAL_OUTPUT_VERBOSE((5, orte_rmaps_base.rmaps_output,
"%s rmaps:base:define_daemons existing daemon %s already launched",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&node->daemon->name)));
/* count number of daemons being used */
++numdaemons;
}
}
/* check how many daemons we are using and set flag accordingly - this
* is required so that daemon-based collectives can correctly operate
*/
if (numdaemons == daemons->num_procs) {
/* everyone is being used */
map->daemon_participation = ORTE_RMAPS_ALL_DAEMONS;
} else if (numdaemons == daemons->num_procs-1 &&
!map->hnp_has_local_procs) {
map->daemon_participation = ORTE_RMAPS_ALL_EXCEPT_HNP;
} else {
map->daemon_participation = ORTE_RMAPS_DAEMON_SUBSET;
}
return ORTE_SUCCESS;
}

Просмотреть файл

@ -62,7 +62,7 @@ int orte_rmaps_base_add_proc_to_map(orte_job_map_t *map, orte_node_t *node,
ORTE_DECLSPEC int orte_rmaps_base_get_target_nodes(opal_list_t* node_list,
orte_std_cntr_t *total_num_slots,
orte_app_context_t *app,
bool no_use_local);
uint8_t policy);
ORTE_DECLSPEC int orte_rmaps_base_get_target_procs(opal_list_t *procs);
ORTE_DECLSPEC int orte_rmaps_base_update_node_usage(opal_list_t *nodes);

Просмотреть файл

@ -393,7 +393,7 @@ static int orte_rmaps_rf_map(orte_job_t *jdata)
*/
OBJ_CONSTRUCT(&node_list, opal_list_t);
if(ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list, &num_slots, app,
map->no_use_local))) {
map->policy))) {
ORTE_ERROR_LOG(rc);
goto error;
}

Просмотреть файл

@ -31,10 +31,27 @@
BEGIN_C_DECLS
#define ORTE_RMAPS_NOPOL 0x00
#define ORTE_RMAPS_BYNODE 0x01
#define ORTE_RMAPS_BYSLOT 0x02
#define ORTE_RMAPS_BYUSER 0x04
/*
* Define flags indicating the policy used to perform the map
*/
#define ORTE_RMAPS_NOPOL 0x00
#define ORTE_RMAPS_BYNODE 0x01
#define ORTE_RMAPS_BYSLOT 0x02
#define ORTE_RMAPS_BYUSER 0x04
#define ORTE_RMAPS_NO_USE_LOCAL 0x08
/*
* Define a flag that indicates the level of daemon participation
* in a launch
*/
typedef uint8_t orte_rmaps_dp_t;
#define ORTE_RMAPS_DP_T OPAL_UINT8
#define ORTE_RMAPS_ALL_DAEMONS 0x01
#define ORTE_RMAPS_ALL_EXCEPT_HNP 0x02
#define ORTE_RMAPS_DAEMON_SUBSET 0x04
/*
* Structure that represents the mapping of a job to an
@ -44,11 +61,12 @@ struct orte_job_map_t {
opal_object_t super;
/* save the mapping configuration */
uint8_t policy;
bool no_use_local;
bool hnp_has_local_procs;
bool pernode;
orte_std_cntr_t npernode;
bool oversubscribe;
bool display_map;
orte_rmaps_dp_t daemon_participation;
/* *** */
/* number of new daemons required to be launched
* to support this job map

Просмотреть файл

@ -347,7 +347,7 @@ static int orte_rmaps_rr_map(orte_job_t *jdata)
*/
OBJ_CONSTRUCT(&node_list, opal_list_t);
if(ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list, &num_slots, app,
map->no_use_local))) {
map->policy))) {
ORTE_ERROR_LOG(rc);
goto error;
}
@ -423,11 +423,11 @@ static int orte_rmaps_rr_map(orte_job_t *jdata)
/** set the num_procs to equal the number of slots on these mapped nodes - if
user has specified "-bynode", then set it to the number of nodes
*/
if (map->policy == ORTE_RMAPS_BYNODE) {
if (map->policy & ORTE_RMAPS_BYNODE) {
app->num_procs = num_nodes;
} else if (map->policy == ORTE_RMAPS_BYSLOT) {
} else if (map->policy & ORTE_RMAPS_BYSLOT) {
app->num_procs = num_slots;
} else if (map->policy == ORTE_RMAPS_BYUSER) {
} else if (map->policy & ORTE_RMAPS_BYUSER) {
/* we can't handle this - it should have been set when we got
* the map info. If it wasn't, then we can only error out
*/

Просмотреть файл

@ -89,7 +89,7 @@ static int orte_rmaps_seq_map(orte_job_t *jdata)
*/
OBJ_CONSTRUCT(&node_list, opal_list_t);
if(ORTE_SUCCESS != (rc = orte_rmaps_base_get_target_nodes(&node_list, &num_slots, app,
map->no_use_local))) {
map->policy))) {
ORTE_ERROR_LOG(rc);
goto error;
}

Просмотреть файл

@ -66,36 +66,36 @@ BEGIN_C_DECLS
#define ORTE_RML_TAG_RML_ROUTE 13
#define ORTE_RML_TAG_ALLGATHER_SERVER 14
#define ORTE_RML_TAG_ALLGATHER_CLIENT 15
#define ORTE_RML_TAG_BARRIER_SERVER 16
#define ORTE_RML_TAG_BARRIER_CLIENT 17
#define ORTE_RML_TAG_ALLGATHER 14
#define ORTE_RML_TAG_BARRIER 15
#define ORTE_RML_TAG_INIT_ROUTES 18
#define ORTE_RML_TAG_UPDATE_ROUTES 19
#define ORTE_RML_TAG_SYNC 20
#define ORTE_RML_TAG_INIT_ROUTES 16
#define ORTE_RML_TAG_UPDATE_ROUTES 17
#define ORTE_RML_TAG_SYNC 18
/* For FileM Base */
#define ORTE_RML_TAG_FILEM_BASE 21
#define ORTE_RML_TAG_FILEM_BASE_RESP 22
#define ORTE_RML_TAG_FILEM_BASE 19
#define ORTE_RML_TAG_FILEM_BASE_RESP 20
/* For FileM RSH Component */
#define ORTE_RML_TAG_FILEM_RSH 23
#define ORTE_RML_TAG_FILEM_RSH 21
/* For SnapC Framework */
#define ORTE_RML_TAG_SNAPC 24
#define ORTE_RML_TAG_SNAPC_FULL 25
#define ORTE_RML_TAG_SNAPC 22
#define ORTE_RML_TAG_SNAPC_FULL 23
/* For tools */
#define ORTE_RML_TAG_TOOL 26
#define ORTE_RML_TAG_TOOL 24
/* support data store/lookup */
#define ORTE_RML_TAG_DATA_SERVER 27
#define ORTE_RML_TAG_DATA_CLIENT 28
#define ORTE_RML_TAG_DATA_SERVER 25
#define ORTE_RML_TAG_DATA_CLIENT 26
/* timing related */
#define ORTE_RML_TAG_BARRIER_TIMER 29
#define ORTE_RML_TAG_ALLGATHER_TIMER 30
#define ORTE_RML_TAG_COLLECTIVE_TIMER 27
/* daemon collectives */
#define ORTE_RML_TAG_DAEMON_COLLECTIVE 28
#define ORTE_RML_TAG_MAX 100

Просмотреть файл

@ -100,7 +100,7 @@ static void send_relay(int fd, short event, void *data)
opal_buffer_t *buffer = mev->buffer;
orte_rml_tag_t tag = mev->tag;
orte_grpcomm_mode_t relay_mode;
opal_list_t recips;
opal_list_t *recips;
opal_list_item_t *item;
int ret;
@ -108,9 +108,6 @@ static void send_relay(int fd, short event, void *data)
"%s orte:daemon:send_relay",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* setup a list of next recipients */
OBJ_CONSTRUCT(&recips, opal_list_t);
/* we pass the relay_mode in the mev "tag" field. This is a bit
* of a hack as the two sizes may not exactly match. However, since
* the rml_tag is an int32, it is doubtful we will ever see a
@ -118,37 +115,41 @@ static void send_relay(int fd, short event, void *data)
*/
relay_mode = (orte_grpcomm_mode_t)tag;
/* if the mode is linear and we are the HNP, don't ask for
* next recipients as this will generate a potentially very
* long list! Instead, just look over the known daemons
*/
if (ORTE_GRPCOMM_LINEAR == relay_mode && orte_process_info.hnp) {
if (ORTE_GRPCOMM_LINEAR == relay_mode) {
orte_process_name_t dummy;
orte_vpid_t i;
/* send the message to each daemon as fast as we can - but
* not to us!
/* if we are NOT the HNP, do nothing */
if (!orte_process_info.hnp) {
goto CLEANUP;
}
/* if the mode is linear and we are the HNP, don't ask for
* next recipients as this will generate a potentially very
* long list! Instead, just send the message to each daemon
* as fast as we can - but not to us!
*/
dummy.jobid = ORTE_PROC_MY_HNP->jobid;
for (i=1; i < orte_process_info.num_procs; i++) {
dummy.vpid = i;
if (0 > (ret = orte_rml.send_buffer(&dummy, buffer, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
if (0 > (ret = orte_rml.send_buffer(&dummy, buffer, ORTE_RML_TAG_DAEMON, 0))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
}
goto CLEANUP;
}
/* ask the active grpcomm module for the next recipients */
if (ORTE_SUCCESS != (ret = orte_grpcomm.next_recipients(&recips, relay_mode))) {
ORTE_ERROR_LOG(ret);
if (NULL == (recips = orte_grpcomm.next_recipients(relay_mode))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
goto CLEANUP;
}
/* send the message - if we are at the end of the chain, then there
* will be nothing on the list, so remove_first will return NULL
/* send the message - do not deconstruct the list! it doesn't belong
* to us
*/
while (NULL != (item = opal_list_remove_first(&recips))) {
for (item = opal_list_get_first(recips);
item != opal_list_get_end(recips);
item = opal_list_get_next(item)) {
orte_namelist_t *target = (orte_namelist_t*)item;
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
@ -160,12 +161,10 @@ static void send_relay(int fd, short event, void *data)
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
OBJ_RELEASE(item);
}
CLEANUP:
/* cleanup */
OBJ_DESTRUCT(&recips);
OBJ_RELEASE(mev);
}

Просмотреть файл

@ -271,7 +271,7 @@ int orte_dt_copy_map(orte_job_map_t **dest, orte_job_map_t *src, opal_data_type_
/* copy data into it */
(*dest)->policy = src->policy;
(*dest)->no_use_local = src->no_use_local;
(*dest)->hnp_has_local_procs = src->hnp_has_local_procs;
(*dest)->pernode = src->pernode;
(*dest)->npernode = src->npernode;
(*dest)->oversubscribe = src->oversubscribe;

Просмотреть файл

@ -739,8 +739,8 @@ int orte_dt_pack_map(opal_buffer_t *buffer, const void *src,
return rc;
}
/* pack the no_use_local flag */
if (ORTE_SUCCESS != (rc = opal_dss.pack_buffer(buffer, &(maps[i]->no_use_local), 1, OPAL_BOOL))) {
/* pack the hnp_has_local_procs flag */
if (ORTE_SUCCESS != (rc = opal_dss.pack_buffer(buffer, &(maps[i]->hnp_has_local_procs), 1, OPAL_BOOL))) {
ORTE_ERROR_LOG(rc);
return rc;
}

Просмотреть файл

@ -453,9 +453,9 @@ int orte_dt_print_map(char **output, char *prefix, orte_job_map_t *src, opal_dat
}
asprintf(&pfx, "%s\t", pfx2);
asprintf(&tmp, "\n%sMap generated by mapping policy: %x\n%sNo-use-local: %s\tPernode: %s\tNpernode: %ld\tOversubscribe allowed: %s\tDisplay: %s",
asprintf(&tmp, "\n%sMap generated by mapping policy: %x\n%sHNP has local procs: %s\tPernode: %s\tNpernode: %ld\tOversubscribe allowed: %s\tDisplay: %s",
pfx2, src->policy,
pfx, (src->no_use_local) ? "TRUE" : "FALSE",
pfx, (src->hnp_has_local_procs) ? "TRUE" : "FALSE",
(src->pernode) ? "TRUE" : "FALSE", (long)src->npernode,
(src->oversubscribe) ? "TRUE" : "FALSE",
(src->display_map) ? "TRUE" : "FALSE");

Просмотреть файл

@ -811,10 +811,10 @@ int orte_dt_unpack_map(opal_buffer_t *buffer, void *dest,
return rc;
}
/* unpack the no_use_local flag */
/* unpack the hnp_has_local_procs flag */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack_buffer(buffer,
&(maps[i]->no_use_local), &n, OPAL_BOOL))) {
&(maps[i]->hnp_has_local_procs), &n, OPAL_BOOL))) {
ORTE_ERROR_LOG(rc);
return rc;
}

Просмотреть файл

@ -50,12 +50,15 @@ bool orte_debug_daemons_file_flag = false;
bool orted_spin_flag = false;
bool orte_static_ports = false;
bool orte_keep_fqdn_hostnames = false;
int orte_debug_output = -1;
char **orte_launch_environ;
char **orted_cmd_line=NULL;
int orte_exit, orteds_exit;
int orte_exit_status = 0;
bool orte_abnormal_term_ordered = false;
orte_node_t *orte_hnpnode = NULL;
int orte_timeout_usec_per_proc;
float orte_max_timeout;
char *orte_default_hostfile;

Просмотреть файл

@ -295,6 +295,7 @@ ORTE_DECLSPEC extern char **orted_cmd_line;
ORTE_DECLSPEC extern int orte_exit, orteds_exit;
ORTE_DECLSPEC extern int orte_exit_status;
ORTE_DECLSPEC extern bool orte_abnormal_term_ordered;
ORTE_DECLSPEC extern orte_node_t *orte_hnpnode;
ORTE_DECLSPEC extern int orte_timeout_usec_per_proc;
ORTE_DECLSPEC extern float orte_max_timeout;

Просмотреть файл

@ -280,7 +280,7 @@ OBJ_CLASS_INSTANCE(orte_proc_t,
static void orte_job_map_construct(orte_job_map_t* map)
{
map->policy = ORTE_RMAPS_BYSLOT; /* default to byslot mapping as per orterun options */
map->no_use_local = false;
map->hnp_has_local_procs = false;
map->pernode = false;
map->npernode = 0;
map->oversubscribe = true; /* default to allowing oversubscribe */