1
1

Revert r32222, r32210, and r32203 as they created a problem when daemon collectives did not involve app procs on every node. Instead, modify the ompi/mca/rte/orte/rte_orte.h to add a new function that allows apps to request new daemon collective ids for use in barrier and modex operations. This will only appear in ORTE-based installations, but it is only being used by a couple of researchers at the moment.

Update the orte/test/mpi/coll_test.c test to show the revised example.

This commit was SVN r32234.

The following SVN revision numbers were found above:
  r32203 --> open-mpi/ompi@a523dba41d
  r32210 --> open-mpi/ompi@2ce11ed5c4
  r32222 --> open-mpi/ompi@d55f16db50
Этот коммит содержится в:
Ralph Castain 2014-07-15 03:48:00 +00:00
родитель 5f6b9be221
Коммит 6c5e592785
27 изменённых файлов: 475 добавлений и 222 удалений

Просмотреть файл

@ -299,9 +299,8 @@ static int connect_accept(ompi_communicator_t *comm, int root,
}
OPAL_OUTPUT_VERBOSE((1, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:connect_accept working with new collective ids %lu %lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)id[0], (unsigned long)id[1]));
"%s dpm:orte:connect_accept working with new collective ids %u %u",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), id[0], id[1]));
/* Generate the message buffer containing the number of processes and the list of
participating processes */

Просмотреть файл

@ -20,15 +20,11 @@
#include "ompi_config.h"
#include "ompi/constants.h"
#include "ompi/info/info.h"
struct ompi_proc_t;
#include "opal/threads/threads.h"
#include "orte/types.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/grpcomm/base/base.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/routed/routed.h"
@ -37,6 +33,10 @@ struct ompi_proc_t;
#include "orte/util/name_fns.h"
#include "orte/util/proc_info.h"
#include "ompi/info/info.h"
struct ompi_proc_t;
struct ompi_communicator_t;
BEGIN_C_DECLS
/* Process name objects and operations */
@ -69,7 +69,7 @@ typedef orte_ns_cmp_bitmask_t ompi_rte_cmp_bitmask_t;
typedef orte_grpcomm_coll_id_t ompi_rte_collective_id_t;
OMPI_DECLSPEC int ompi_rte_modex(ompi_rte_collective_t *coll);
#define ompi_rte_barrier(a) orte_grpcomm.barrier(a)
#define ompi_rte_get_collective_id(a) orte_grpcomm_base_get_coll_id(a)
OMPI_DECLSPEC orte_grpcomm_coll_id_t ompi_rte_get_collective_id(const struct ompi_communicator_t *comm);
/* Process info struct and values */
typedef orte_node_rank_t ompi_node_rank_t;

Просмотреть файл

@ -44,6 +44,7 @@
#include "ompi/debuggers/debuggers.h"
#include "ompi/proc/proc.h"
#include "ompi/runtime/params.h"
#include "ompi/communicator/communicator.h"
extern ompi_rte_orte_component_t mca_rte_orte_component;
static void recv_callback(int status, orte_process_name_t* sender,
@ -406,3 +407,67 @@ static void recv_callback(int status, orte_process_name_t* sender,
/* release */
opal_mutex_unlock(&mca_rte_orte_component.lock);
}
/* everybody involved in the collective has to call this function. However,
* only the "root" process (i.e., rank=0 in this communicator) will send
* the collective id request to the HNP. The HNP will then xcast the
* assigned value to all daemons so that every daemon knows about it. This
* will ensure that daemons properly handle the request. The daemons will
* relay the received ID to their local procs */
orte_grpcomm_coll_id_t ompi_rte_get_collective_id(const struct ompi_communicator_t *comm)
{
opal_buffer_t *nbuf;
int32_t i, rc;
orte_rml_recv_cb_t xfer;
orte_grpcomm_coll_id_t id;
uint8_t flag=1;
/* everybody waits for the id */
OBJ_CONSTRUCT(&xfer, orte_rml_recv_cb_t);
xfer.active = true;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_FULL_COLL_ID,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
/* the lowest member of the communicator requests the communicator
* id from mpirun */
if (0 == ompi_comm_rank((ompi_communicator_t*)comm)) {
nbuf = OBJ_NEW(opal_buffer_t);
if (NULL == nbuf) {
return OMPI_ERROR;
}
/* tell the HNP we want one id */
i = 1;
if (OPAL_SUCCESS != (rc = opal_dss.pack(nbuf, &i, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(nbuf);
return OMPI_ERROR;
}
/* tell the HNP this is to be a global value */
if (OPAL_SUCCESS != (rc = opal_dss.pack(nbuf, &flag, 1, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(nbuf);
return OMPI_ERROR;
}
/* send the request */
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, nbuf,
ORTE_RML_TAG_COLL_ID_REQ,
orte_rml_send_callback, NULL);
}
/* wait for response */
OMPI_WAIT_FOR_COMPLETION(xfer.active);
/* extract the id */
i=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
return OMPI_ERROR;
}
OBJ_DESTRUCT(&xfer); // done with the received data
return id;
}

Просмотреть файл

@ -228,7 +228,7 @@ int ompi_mpi_finalize(void)
https://svn.open-mpi.org/trac/ompi/ticket/4669#comment:4 for
more details). */
coll = OBJ_NEW(ompi_rte_collective_t);
coll->id = ompi_rte_get_collective_id(OMPI_PROC_MY_NAME);
coll->id = ompi_process_info.peer_fini_barrier;
coll->active = true;
if (OMPI_SUCCESS != (ret = ompi_rte_barrier(coll))) {
OMPI_ERROR_LOG(ret);

Просмотреть файл

@ -628,7 +628,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
* as it will not return until the exchange is complete
*/
coll = OBJ_NEW(ompi_rte_collective_t);
coll->id = ompi_rte_get_collective_id(OMPI_PROC_MY_NAME);
coll->id = ompi_process_info.peer_modex;
coll->active = true;
if (OMPI_SUCCESS != (ret = ompi_rte_modex(coll))) {
error = "rte_modex failed";
@ -817,7 +817,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
/* wait for everyone to reach this point */
coll = OBJ_NEW(ompi_rte_collective_t);
coll->id = ompi_rte_get_collective_id(OMPI_PROC_MY_NAME);
coll->id = ompi_process_info.peer_init_barrier;
coll->active = true;
if (OMPI_SUCCESS != (ret = ompi_rte_barrier(coll))) {
error = "rte_barrier failed";

2
orte/mca/ess/env/ess_env_module.c поставляемый
Просмотреть файл

@ -174,7 +174,7 @@ static int rte_init(void)
if (ORTE_PROC_IS_NON_MPI && !orte_do_not_barrier) {
orte_grpcomm_collective_t coll;
OBJ_CONSTRUCT(&coll, orte_grpcomm_collective_t);
coll.id = orte_grpcomm_base_get_coll_id(ORTE_PROC_MY_NAME);
coll.id = orte_process_info.peer_modex;
coll.active = true;
if (ORTE_SUCCESS != (ret = orte_grpcomm.modex(&coll))) {
ORTE_ERROR_LOG(ret);

Просмотреть файл

@ -49,7 +49,6 @@
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/grpcomm/base/base.h"
#include "orte/mca/rml/rml.h"
#include "orte/util/proc_info.h"
#include "orte/util/show_help.h"
@ -357,7 +356,7 @@ static int rte_init(void)
if (ORTE_PROC_IS_NON_MPI && !orte_do_not_barrier) {
orte_grpcomm_collective_t coll;
OBJ_CONSTRUCT(&coll, orte_grpcomm_collective_t);
coll.id = orte_grpcomm_base_get_coll_id(ORTE_PROC_MY_NAME);
coll.id = orte_process_info.peer_modex;
coll.active = true;
if (ORTE_SUCCESS != (ret = orte_grpcomm.modex(&coll))) {
ORTE_ERROR_LOG(ret);

Просмотреть файл

@ -194,6 +194,11 @@ static int rte_init(void)
return rc;
}
/* set the collective ids */
orte_process_info.peer_modex = 0;
orte_process_info.peer_init_barrier = 1;
orte_process_info.peer_fini_barrier = 2;
/* to the best of our knowledge, we are alone */
orte_process_info.my_node_rank = 0;
orte_process_info.my_local_rank = 0;

Просмотреть файл

@ -210,9 +210,9 @@ static void process_barrier(int fd, short args, void *cbdata)
found = false;
OPAL_LIST_FOREACH(cptr, &orte_grpcomm_base.active_colls, orte_grpcomm_collective_t) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s CHECKING COLL id %lu",
"%s CHECKING COLL id %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)cptr->id));
cptr->id));
if (coll->id == cptr->id) {
found = true;
@ -231,9 +231,9 @@ static void process_barrier(int fd, short args, void *cbdata)
* the barrier object
*/
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:bad collective %lu already exists - removing prior copy",
"%s grpcomm:bad collective %d already exists - removing prior copy",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)coll->id));
(int)coll->id));
while (NULL != (item = opal_list_remove_first(&cptr->targets))) {
opal_list_append(&coll->targets, item);
}
@ -241,9 +241,9 @@ static void process_barrier(int fd, short args, void *cbdata)
OBJ_RELEASE(cptr);
}
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:bad adding collective %lu with %d participants to global list",
"%s grpcomm:bad adding collective %d with %d participants to global list",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)coll->id, (int)opal_list_get_size(&coll->participants)));
(int)coll->id, (int)opal_list_get_size(&coll->participants)));
/* now add the barrier to the global list of active collectives */
opal_list_append(&orte_grpcomm_base.active_colls, &coll->super);
@ -262,9 +262,9 @@ static void process_barrier(int fd, short args, void *cbdata)
buf = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(buf, &coll->buffer);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:bad sending collective %lu to %s",
"%s grpcomm:bad sending collective %d to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)coll->id,
(int)coll->id,
ORTE_NAME_PRINT(&nm->name)));
if (0 > (rc = orte_rml.send_buffer_nb(&nm->name, buf,
ORTE_RML_TAG_COLLECTIVE,
@ -302,6 +302,7 @@ static void process_allgather(int fd, short args, void *cbdata)
int rc;
opal_buffer_t *buf;
orte_namelist_t *nm;
opal_list_item_t *item;
OBJ_RELEASE(caddy);
@ -343,9 +344,9 @@ static void process_allgather(int fd, short args, void *cbdata)
gather, ORTE_GRPCOMM_INTERNAL_STG_APP);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:bad sending collective %lu to our daemon",
"%s grpcomm:bad sending collective %d to our daemon",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)gather->id));
(int)gather->id));
/* send to our daemon */
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buf,
ORTE_RML_TAG_COLLECTIVE,
@ -360,13 +361,16 @@ static void process_allgather(int fd, short args, void *cbdata)
* include ourselves, which is fine as it will aid in
* determining the collective is complete
*/
OPAL_LIST_FOREACH(nm, &gather->participants, orte_namelist_t) {
for (item = opal_list_get_first(&gather->participants);
item != opal_list_get_end(&gather->participants);
item = opal_list_get_next(item)) {
nm = (orte_namelist_t*)item;
buf = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(buf, &gather->buffer);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:bad sending collective %lu to %s",
"%s grpcomm:bad sending collective %d to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)gather->id,
(int)gather->id,
ORTE_NAME_PRINT(&nm->name)));
if (0 > (rc = orte_rml.send_buffer_nb(&nm->name, buf,
ORTE_RML_TAG_COLLECTIVE,

Просмотреть файл

@ -83,9 +83,9 @@ OBJ_CLASS_DECLARATION(orte_grpcomm_caddy_t);
do { \
orte_grpcomm_caddy_t *caddy; \
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output, \
"%s ACTIVATING GRCPCOMM OP %lu at %s:%d", \
"%s ACTIVATING GRCPCOMM OP %d at %s:%d", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
(unsigned long)(o)->id, __FILE__, __LINE__)); \
(o)->id, __FILE__, __LINE__)); \
caddy = OBJ_NEW(orte_grpcomm_caddy_t); \
caddy->op = (o); \
opal_event_set(orte_event_base, &caddy->ev, -1, \
@ -96,9 +96,9 @@ OBJ_CLASS_DECLARATION(orte_grpcomm_caddy_t);
ORTE_DECLSPEC extern orte_grpcomm_base_t orte_grpcomm_base;
ORTE_DECLSPEC orte_grpcomm_collective_t* orte_grpcomm_base_setup_collective(orte_jobid_t jobid, orte_grpcomm_coll_id_t id);
ORTE_DECLSPEC orte_grpcomm_collective_t* orte_grpcomm_base_setup_collective(orte_grpcomm_coll_id_t id);
ORTE_DECLSPEC void orte_grpcomm_base_progress_collectives(void);
ORTE_DECLSPEC orte_grpcomm_coll_id_t orte_grpcomm_base_get_coll_id(orte_process_name_t *nm);
ORTE_DECLSPEC orte_grpcomm_coll_id_t orte_grpcomm_base_get_coll_id(void);
ORTE_DECLSPEC void orte_grpcomm_base_pack_collective(opal_buffer_t *relay,
orte_jobid_t jobid,
orte_grpcomm_collective_t *coll,

Просмотреть файл

@ -93,13 +93,16 @@ MCA_BASE_FRAMEWORK_DECLARE(orte, grpcomm, NULL, NULL, orte_grpcomm_base_open, or
mca_grpcomm_base_static_components, 0);
orte_grpcomm_collective_t* orte_grpcomm_base_setup_collective(orte_jobid_t jobid, orte_grpcomm_coll_id_t id)
orte_grpcomm_collective_t* orte_grpcomm_base_setup_collective(orte_grpcomm_coll_id_t id)
{
opal_list_item_t *item;
orte_grpcomm_collective_t *cptr, *coll;
orte_namelist_t *nm;
coll = NULL;
OPAL_LIST_FOREACH(cptr, &orte_grpcomm_base.active_colls, orte_grpcomm_collective_t) {
for (item = opal_list_get_first(&orte_grpcomm_base.active_colls);
item != opal_list_get_end(&orte_grpcomm_base.active_colls);
item = opal_list_get_next(item)) {
cptr = (orte_grpcomm_collective_t*)item;
if (id == cptr->id) {
coll = cptr;
break;
@ -109,11 +112,6 @@ orte_grpcomm_collective_t* orte_grpcomm_base_setup_collective(orte_jobid_t jobid
coll = OBJ_NEW(orte_grpcomm_collective_t);
coll->id = id;
opal_list_append(&orte_grpcomm_base.active_colls, &coll->super);
/* need to add the vpid name to the participants */
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = jobid;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_append(&coll->participants, &nm->super);
}
return coll;
@ -122,7 +120,7 @@ orte_grpcomm_collective_t* orte_grpcomm_base_setup_collective(orte_jobid_t jobid
/* local objects */
static void collective_constructor(orte_grpcomm_collective_t *ptr)
{
ptr->id = ORTE_GRPCOMM_COLL_ID_INVALID;
ptr->id = -1;
ptr->active = false;
ptr->num_local_recvd = 0;
OBJ_CONSTRUCT(&ptr->local_bucket, opal_buffer_t);

Просмотреть файл

@ -38,66 +38,27 @@
#include "opal/mca/dstore/dstore.h"
#include "opal/mca/hwloc/base/base.h"
#include "orte/util/proc_info.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/rml/rml.h"
#include "orte/util/attr.h"
#include "orte/runtime/orte_globals.h"
#include "orte/util/name_fns.h"
#include "orte/util/nidmap.h"
#include "orte/util/proc_info.h"
#include "orte/orted/orted.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/grpcomm/base/base.h"
#include "orte/mca/grpcomm/grpcomm.h"
orte_grpcomm_coll_id_t orte_grpcomm_base_get_coll_id(orte_process_name_t *nm)
orte_grpcomm_coll_id_t orte_grpcomm_base_get_coll_id(void)
{
orte_grpcomm_coll_id_t id;
opal_list_t myvals;
opal_value_t *kv;
uint64_t n;
OBJ_CONSTRUCT(&myvals, opal_list_t);
if (ORTE_SUCCESS != opal_dstore.fetch(opal_dstore_internal,
(opal_identifier_t*)nm,
ORTE_DB_COLL_ID_CNTR, &myvals)) {
/* start the counter */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(ORTE_DB_COLL_ID_CNTR);
kv->type = OPAL_UINT32;
kv->data.uint32 = 0;
opal_list_append(&myvals, &kv->super);
}
kv = (opal_value_t*)opal_list_get_first(&myvals);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s CURRENT COLL ID COUNTER %u FOR PROC %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
kv->data.uint32, ORTE_NAME_PRINT(nm)));
/* construct the next collective id for this job */
id = kv->data.uint32;
n = (uint64_t)nm->jobid << 32;
id |= (n & 0xffffffff00000000);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s ASSIGNED COLL ID %lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)id));
/* assign the next collective id */
id = orte_grpcomm_base.coll_id;
/* rotate to the next value */
kv->data.uint32++;
if (UINT32_MAX == kv->data.uint32) {
/* need to rotate around */
kv->data.uint32 = 0;
}
if (ORTE_SUCCESS != opal_dstore.store(opal_dstore_internal,
(opal_identifier_t*)nm, kv)) {
OPAL_LIST_DESTRUCT(&myvals);
return ORTE_GRPCOMM_COLL_ID_INVALID;
}
OPAL_LIST_DESTRUCT(&myvals);
orte_grpcomm_base.coll_id++;
return id;
}
@ -174,9 +135,9 @@ void orte_grpcomm_base_modex(int fd, short args, void *cbdata)
item = opal_list_get_next(item)) {
cptr = (orte_grpcomm_collective_t*)item;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s CHECKING COLL id %lu",
"%s CHECKING COLL id %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)cptr->id));
cptr->id));
if (modex->id == cptr->id) {
found = true;

Просмотреть файл

@ -140,6 +140,8 @@ static void coll_id_req(int status, orte_process_name_t* sender,
opal_buffer_t *relay;
int rc;
int32_t num, n;
uint8_t flag;
orte_daemon_cmd_flag_t cmd;
/* unpack the number of id's being requested */
n=1;
@ -151,32 +153,71 @@ static void coll_id_req(int status, orte_process_name_t* sender,
/* assume one id was requested */
num = 1;
}
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
/* read the flag */
n=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &n, OPAL_UINT8))) {
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
return;
}
/* assume this was not a global request */
flag = 0;
}
}
/* coll id requests are for multi-job collectives, so we
* assign a collective id from the DAEMON job as not all
* procs in the participating jobs are required to participate */
id = (orte_grpcomm_coll_id_t*)malloc(num * sizeof(orte_grpcomm_coll_id_t));
for (n=0; n < num; n++) {
id[n] = orte_grpcomm_base_get_coll_id(ORTE_PROC_MY_NAME);
id[n] = orte_grpcomm_base_get_coll_id();
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:receive proc %s requested %d coll id's",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender), num));
relay = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.pack(relay, id, num, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(relay);
if (0 == flag) {
relay = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.pack(relay, id, num, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(relay);
free(id);
return;
}
free(id);
return;
}
free(id);
if (0 > (rc = orte_rml.send_buffer_nb(sender, relay, ORTE_RML_TAG_COLL_ID,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(relay);
return;
if (0 > (rc = orte_rml.send_buffer_nb(sender, relay, ORTE_RML_TAG_COLL_ID,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(relay);
return;
}
} else {
relay = OBJ_NEW(opal_buffer_t);
/* pack the coll_id cmd */
cmd = ORTE_DAEMON_NEW_COLL_ID;
if (ORTE_SUCCESS != (rc = opal_dss.pack(relay, &cmd, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(relay);
free(id);
return;
}
/* pack the jobid of the requestor */
if (ORTE_SUCCESS != (rc = opal_dss.pack(relay, &sender->jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(relay);
free(id);
return;
}
/* pack the id */
if (ORTE_SUCCESS != (rc = opal_dss.pack(relay, id, num, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(relay);
free(id);
return;
}
free(id);
/* xcast it to the daemons as they need to process it
* prior to passing it down to their procs */
orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, relay, ORTE_RML_TAG_DAEMON);
}
}
@ -201,8 +242,8 @@ static void app_recv(int status, orte_process_name_t* sender,
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:receive processing collective return for id %lu recvd from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (unsigned long)id, ORTE_NAME_PRINT(sender)));
"%s grpcomm:base:receive processing collective return for id %d recvd from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), id, ORTE_NAME_PRINT(sender)));
/* if the sender is my daemon, then this collective is
* a global one and is complete
@ -215,9 +256,9 @@ static void app_recv(int status, orte_process_name_t* sender,
item = opal_list_get_next(item)) {
coll = (orte_grpcomm_collective_t*)item;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s CHECKING COLL id %lu",
"%s CHECKING COLL id %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)coll->id));
coll->id));
if (id == coll->id) {
/* see if the collective needs another step */
@ -264,9 +305,9 @@ static void app_recv(int status, orte_process_name_t* sender,
item = opal_list_get_next(item)) {
cptr = (orte_grpcomm_collective_t*)item;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s CHECKING COLL id %lu",
"%s CHECKING COLL id %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)cptr->id));
cptr->id));
if (id == cptr->id) {
/* aha - we do have it */
@ -457,23 +498,21 @@ static void daemon_local_recv(int status, orte_process_name_t* sender,
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s WORKING COLLECTIVE %lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)id));
"%s WORKING COLLECTIVE %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), id));
/* setup the collective for this id - if it's already present,
* then this will just return the existing structure
*/
coll = orte_grpcomm_base_setup_collective(sender->jobid, id);
coll = orte_grpcomm_base_setup_collective(id);
/* record this proc's participation and its data */
coll->num_local_recvd++;
opal_dss.copy_payload(&coll->local_bucket, buffer);
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s PROGRESSING COLLECTIVE %lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)id));
"%s PROGRESSING COLLECTIVE %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), id));
orte_grpcomm_base_progress_collectives();
}
@ -520,15 +559,15 @@ void orte_grpcomm_base_progress_collectives(void)
while (item != opal_list_get_end(&orte_grpcomm_base.active_colls)) {
coll = (orte_grpcomm_collective_t*)item;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s PROGRESSING COLL id %lu",
"%s PROGRESSING COLL id %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)coll->id));
coll->id));
/* if this collective is already locally complete, then ignore it */
if (coll->locally_complete) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s COLL %lu IS LOCALLY COMPLETE",
"%s COLL %d IS LOCALLY COMPLETE",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)coll->id));
coll->id));
goto next_coll;
}
/* get the jobid of the participants in this collective */
@ -542,9 +581,9 @@ void orte_grpcomm_base_progress_collectives(void)
* this collective
*/
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s COLL %lu JOBID %s NOT FOUND",
"%s COLL %d JOBID %s NOT FOUND",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)coll->id, ORTE_JOBID_PRINT(nm->name.jobid)));
coll->id, ORTE_JOBID_PRINT(nm->name.jobid)));
goto next_coll;
}
/* all local procs from this job are required to participate */
@ -556,9 +595,8 @@ void orte_grpcomm_base_progress_collectives(void)
/* see if all reqd participants are done */
if (jdata->num_local_procs == coll->num_local_recvd) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s COLLECTIVE %lu LOCALLY COMPLETE - SENDING TO GLOBAL COLLECTIVE",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)coll->id));
"%s COLLECTIVE %d LOCALLY COMPLETE - SENDING TO GLOBAL COLLECTIVE",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), coll->id));
/* mark it as locally complete */
coll->locally_complete = true;
/* pack the collective */
@ -608,14 +646,13 @@ static void daemon_coll_recv(int status, orte_process_name_t* sender,
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:daemon_coll: WORKING COLLECTIVE %lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(unsigned long)id));
"%s grpcomm:base:daemon_coll: WORKING COLLECTIVE %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), id));
/* setup the collective for this id - if it's already present,
* then this will just return the existing structure
*/
coll = orte_grpcomm_base_setup_collective(sender->jobid, id);
coll = orte_grpcomm_base_setup_collective(id);
/* record that we received a bucket */
coll->num_peer_buckets++;
@ -738,11 +775,31 @@ static void daemon_coll_recv(int status, orte_process_name_t* sender,
OBJ_RELEASE(nm);
}
if (jdata->num_procs != coll->num_global_recvd) {
/* determine how many contributors we need to recv - we know
* that all job objects were found, so we can skip that test
* while counting
*/
np = 0;
for (item = opal_list_get_first(&coll->participants);
item != opal_list_get_end(&coll->participants);
item = opal_list_get_next(item)) {
nm = (orte_namelist_t*)item;
/* get the job object for this participant */
jdata = orte_get_job_data_object(nm->name.jobid);
if (ORTE_VPID_WILDCARD == nm->name.vpid) {
/* all procs from this job are required to participate */
np += jdata->num_procs;
} else {
np++;
}
}
/* are we done? */
if (np != coll->num_global_recvd) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:daemon_coll: MISSING CONTRIBUTORS: nprocs %s num_global_recvd %s",
"%s grpcomm:base:daemon_coll: MISSING CONTRIBUTORS: np %s ngr %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_VPID_PRINT(jdata->num_procs),
ORTE_VPID_PRINT(np),
ORTE_VPID_PRINT(coll->num_global_recvd)));
return;
}
@ -750,11 +807,28 @@ static void daemon_coll_recv(int status, orte_process_name_t* sender,
/* since we discovered that the collective is complete, we
* need to send it to all the participants
*/
relay = OBJ_NEW(opal_buffer_t);
opal_dss.pack(relay, &coll->id, 1, ORTE_GRPCOMM_COLL_ID_T);
opal_dss.copy_payload(relay, &coll->buffer);
orte_grpcomm.xcast(jdata->jobid, relay, ORTE_RML_TAG_COLLECTIVE);
OBJ_RELEASE(relay);
for (item = opal_list_get_first(&coll->participants);
item != opal_list_get_end(&coll->participants);
item = opal_list_get_next(item)) {
nm = (orte_namelist_t*)item;
relay = OBJ_NEW(opal_buffer_t);
opal_dss.pack(relay, &coll->id, 1, ORTE_GRPCOMM_COLL_ID_T);
opal_dss.copy_payload(relay, &coll->buffer);
/* if the vpid is wildcard, then this goes to
* all daemons for relay
*/
if (ORTE_VPID_WILDCARD == nm->name.vpid) {
orte_grpcomm.xcast(nm->name.jobid, relay, ORTE_RML_TAG_COLLECTIVE);
OBJ_RELEASE(relay);
} else {
/* send it to this proc */
if (0 > orte_rml.send_buffer_nb(&nm->name, relay, ORTE_RML_TAG_COLLECTIVE,
orte_rml_send_callback, NULL)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(relay);
}
}
}
/* remove this collective */
opal_list_remove_item(&orte_grpcomm_base.active_colls, &coll->super);

Просмотреть файл

@ -11,7 +11,6 @@
* All rights reserved.
* Copyright (c) 2011-2012 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -50,39 +49,9 @@ BEGIN_C_DECLS
*/
typedef void (*orte_grpcomm_collective_cbfunc_t)(opal_buffer_t *data, void *cbdata);
/* define a collective_id_t. In order to allow scalable
* generation of collective id's, they are formed as:
*
* top 32-bits are the jobid of the procs involved in
* the collective. For collectives across multiple jobs
* (e.g., in a connect_accept), the daemon jobid will
* be used as the id will be issued by mpirun. This
* won't cause problems because daemons don't use the
* collective_id
*
* bottom 32-bits are a rolling counter that recycles
* when the max is hit. The daemon will cleanup each
* collective upon completion, so this means a job can
* never have more than 2**32 collectives going on at
* a time. If someone needs more than that - they've got
* a problem.
*
* Note that this means (for now) that RTE-level collectives
* cannot be done by individual threads - they must be
* done at the overall process level. This is required as
* there is no guaranteed ordering for the collective id's,
* and all the participants must agree on the id of the
* collective they are executing. So if thread A on one
* process asks for a collective id before thread B does,
* but B asks before A on another process, the collectives will
* be mixed and not result in the expected behavior. We may
* find a way to relax this requirement in the future by
* adding a thread context id to the jobid field (maybe taking the
* lower 16-bits of that field).
*/
typedef uint64_t orte_grpcomm_coll_id_t;
#define ORTE_GRPCOMM_COLL_ID_T OPAL_UINT64
#define ORTE_GRPCOMM_COLL_ID_INVALID UINT64_MAX
typedef int32_t orte_grpcomm_coll_id_t;
#define ORTE_GRPCOMM_COLL_ID_T OPAL_INT32
#define ORTE_GRPCOMM_COLL_ID_REQ -1
typedef int8_t orte_grpcomm_coll_t;
#define ORTE_GRPCOMM_XCAST 1

Просмотреть файл

@ -192,8 +192,11 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
int8_t flag;
int32_t n;
orte_proc_t *pptr, *dmn;
orte_grpcomm_collective_t *coll;
orte_namelist_t *nm;
opal_buffer_t *bptr;
orte_app_context_t *app;
orte_grpcomm_coll_id_t gid, *gidptr;
OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output,
"%s odls:constructing child list",
@ -378,22 +381,49 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
}
COMPLETE:
#if OPAL_ENABLE_FT_CR == 1
{
orte_grpcomm_coll_id_t gid, *gidptr;
orte_grpcomm_collective_t *coll;
/* create the collectives so the job doesn't stall */
gidptr = &gid;
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_SNAPC_INIT_BAR,
(void**)&gidptr, ORTE_GRPCOMM_COLL_ID_T)) {
coll = orte_grpcomm_base_setup_collective(jdata->jobid, *gidptr);
}
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_SNAPC_FINI_BAR,
(void**)&gidptr, ORTE_GRPCOMM_COLL_ID_T)) {
coll = orte_grpcomm_base_setup_collective(jdata->jobid, *gidptr);
}
/* create the collectives so the job doesn't stall */
gidptr = &gid;
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_PEER_MODX_ID,
(void**)&gidptr, ORTE_GRPCOMM_COLL_ID_T)) {
coll = orte_grpcomm_base_setup_collective(*gidptr);
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = jdata->jobid;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_append(&coll->participants, &nm->super);
}
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_INIT_BAR_ID,
(void**)&gidptr, ORTE_GRPCOMM_COLL_ID_T)) {
coll = orte_grpcomm_base_setup_collective(*gidptr);
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = jdata->jobid;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_append(&coll->participants, &nm->super);
}
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_FINI_BAR_ID,
(void**)&gidptr, ORTE_GRPCOMM_COLL_ID_T)) {
coll = orte_grpcomm_base_setup_collective(*gidptr);
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = jdata->jobid;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_append(&coll->participants, &nm->super);
}
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_SNAPC_INIT_BAR,
(void**)&gidptr, ORTE_GRPCOMM_COLL_ID_T)) {
coll = orte_grpcomm_base_setup_collective(*gidptr);
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = jdata->jobid;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_append(&coll->participants, &nm->super);
}
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_SNAPC_FINI_BAR,
(void**)&gidptr, ORTE_GRPCOMM_COLL_ID_T)) {
coll = orte_grpcomm_base_setup_collective(*gidptr);
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = jdata->jobid;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_append(&coll->participants, &nm->super);
}
#endif
/* progress any pending collectives */
orte_grpcomm_base_progress_collectives();
@ -434,10 +464,34 @@ static int odls_base_default_setup_fork(orte_job_t *jdata,
/* add any collective id info to the app's environ */
gidptr = &gid;
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_PEER_MODX_ID,
(void**)&gidptr, ORTE_GRPCOMM_COLL_ID_T)) {
(void) mca_base_var_env_name ("orte_peer_modex_id", &param);
asprintf(&param2, "%d", *gidptr);
opal_setenv(param, param2, true, environ_copy);
free(param);
free(param2);
}
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_INIT_BAR_ID,
(void**)&gidptr, ORTE_GRPCOMM_COLL_ID_T)) {
(void) mca_base_var_env_name ("orte_peer_init_barrier_id", &param);
asprintf(&param2, "%d", *gidptr);
opal_setenv(param, param2, true, environ_copy);
free(param);
free(param2);
}
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_FINI_BAR_ID,
(void**)&gidptr, ORTE_GRPCOMM_COLL_ID_T)) {
(void) mca_base_var_env_name ("orte_peer_fini_barrier_id", &param);
asprintf(&param2, "%d", *gidptr);
opal_setenv(param, param2, true, environ_copy);
free(param);
free(param2);
}
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_SNAPC_INIT_BAR,
(void**)&gidptr, ORTE_GRPCOMM_COLL_ID_T)) {
(void) mca_base_var_env_name ("orte_snapc_init_barrier_id", &param);
asprintf(&param2, "%lu", (unsigned long)gid);
asprintf(&param2, "%d", *gidptr);
opal_setenv(param, param2, true, environ_copy);
free(param);
free(param2);
@ -445,7 +499,7 @@ static int odls_base_default_setup_fork(orte_job_t *jdata,
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_SNAPC_FINI_BAR,
(void**)&gidptr, ORTE_GRPCOMM_COLL_ID_T)) {
(void) mca_base_var_env_name ("orte_snapc_fini_barrier_id", &param);
asprintf(&param2, "%lu", (unsigned long)gid);
asprintf(&param2, "%d", *gidptr);
opal_setenv(param, param2, true, environ_copy);
free(param);
free(param2);

Просмотреть файл

@ -88,6 +88,9 @@ typedef uint8_t orte_daemon_cmd_flag_t;
/* process called "errmgr.abort_procs" */
#define ORTE_DAEMON_ABORT_PROCS_CALLED (orte_daemon_cmd_flag_t) 28
/* new daemon collective id */
#define ORTE_DAEMON_NEW_COLL_ID (orte_daemon_cmd_flag_t) 29
/*
* Struct written up the pipe from the child to the parent.

Просмотреть файл

@ -236,6 +236,7 @@ void orte_plm_base_setup_job(int fd, short args, void *cbdata)
int i;
orte_app_context_t *app;
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
orte_grpcomm_coll_id_t id;
OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output,
"%s plm:base:setup_job",
@ -270,16 +271,20 @@ void orte_plm_base_setup_job(int fd, short args, void *cbdata)
ORTE_FLAG_SET(caddy->jdata, ORTE_JOB_FLAG_RECOVERABLE);
}
/* get collective ids for the std MPI operations */
id = orte_grpcomm_base_get_coll_id();
orte_set_attribute(&caddy->jdata->attributes, ORTE_JOB_PEER_MODX_ID, ORTE_ATTR_GLOBAL, &id, ORTE_GRPCOMM_COLL_ID_T);
id = orte_grpcomm_base_get_coll_id();
orte_set_attribute(&caddy->jdata->attributes, ORTE_JOB_INIT_BAR_ID, ORTE_ATTR_GLOBAL, &id, ORTE_GRPCOMM_COLL_ID_T);
id = orte_grpcomm_base_get_coll_id();
orte_set_attribute(&caddy->jdata->attributes, ORTE_JOB_FINI_BAR_ID, ORTE_ATTR_GLOBAL, &id, ORTE_GRPCOMM_COLL_ID_T);
#if OPAL_ENABLE_FT_CR == 1
{
orte_grpcomm_coll_id_t id;
/* Adrian - I'm not sure if these need to be coordinated by the HNP, or
* can be generated by each proc */
id = orte_grpcomm_base_get_coll_id(ORTE_PROC_MY_NAME);
orte_set_attribute(&caddy->jdata->attributes, ORTE_JOB_SNAPC_INIT_BAR, ORTE_ATTR_GLOBAL, &id, ORTE_GRPCOMM_COLL_ID_T);
id = orte_grpcomm_base_get_coll_id(ORTE_PROC_MY_NAME);
orte_set_attribute(&caddy->jdata->attributes, ORTE_JOB_SNAPC_FINI_BAR, ORTE_ATTR_GLOBAL, &id, ORTE_GRPCOMM_COLL_ID_T);
}
id = orte_grpcomm_base_get_coll_id();
orte_set_attribute(&caddy->jdata->attributes, ORTE_JOB_SNAPC_INIT_BAR, ORTE_ATTR_GLOBAL, &id, ORTE_GRPCOMM_COLL_ID_T);
id = orte_grpcomm_base_get_coll_id();
orte_set_attribute(&caddy->jdata->attributes, ORTE_JOB_SNAPC_FINI_BAR, ORTE_ATTR_GLOBAL, &id, ORTE_GRPCOMM_COLL_ID_T);
#endif
/* if app recovery is not defined, set apps to defaults */

Просмотреть файл

@ -149,6 +149,9 @@ BEGIN_C_DECLS
/* notifier support */
#define ORTE_RML_TAG_NOTIFIER_HNP 50
/* global collective ID request */
#define ORTE_RML_TAG_FULL_COLL_ID 51
#define ORTE_RML_TAG_MAX 100

Просмотреть файл

@ -59,7 +59,7 @@
#include "orte/util/nidmap.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/grpcomm/base/base.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/odls/odls.h"
@ -112,6 +112,9 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
orte_std_cntr_t num_procs, num_new_procs = 0, p;
orte_proc_t *cur_proc = NULL, *prev_proc = NULL;
bool found = false;
orte_grpcomm_collective_t *coll;
orte_namelist_t *nm;
orte_grpcomm_coll_id_t id;
/* unpack the command */
n = 1;
@ -238,7 +241,46 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_ERROR_NAME(ret)));
}
break;
case ORTE_DAEMON_NEW_COLL_ID:
if (orte_debug_daemons_flag) {
opal_output(0, "%s orted_cmd: received new_coll_id",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
}
/* unpack the jobid of the involved party */
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &job, &n, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* unpack the new collective id */
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &id, &n, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* setup a new collective for it - will return the
* existing coll if already present */
coll = orte_grpcomm_base_setup_collective(id);
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = job;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_prepend(&coll->participants, &nm->super);
/* pass it down to any local procs from that jobid */
relay_msg = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(relay_msg, &id, 1, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(relay_msg);
goto CLEANUP;
}
if (ORTE_SUCCESS != (ret = orte_odls.deliver_message(job, relay_msg, ORTE_RML_TAG_FULL_COLL_ID))) {
ORTE_ERROR_LOG(ret);
}
OBJ_RELEASE(relay_msg);
/* progress any pending collectives */
orte_grpcomm_base_progress_collectives();
break;
case ORTE_DAEMON_ABORT_PROCS_CALLED:
if (orte_debug_daemons_flag) {
opal_output(0, "%s orted_cmd: received abort_procs report",

Просмотреть файл

@ -529,6 +529,9 @@ int orte_daemon(int argc, char *argv[])
orte_app_context_t *app;
char *tmp, *nptr, *sysinfo;
int32_t ljob;
orte_grpcomm_collective_t *coll;
orte_namelist_t *nm;
orte_grpcomm_coll_id_t id;
/* setup the singleton's job */
jdata = OBJ_NEW(orte_job_t);
@ -582,19 +585,47 @@ int orte_daemon(int argc, char *argv[])
proc->app_idx = 0;
ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_LOCAL);
/* account for the collectives */
#if OPAL_ENABLE_FT_CR == 1
{
orte_grpcomm_coll_id_t id;
orte_grpcomm_collective_t *coll;
id = orte_grpcomm_base_get_coll_id(ORTE_PROC_MY_NAME);
orte_set_attribute(&jdata->attributes, ORTE_JOB_SNAPC_INIT_BAR, ORTE_ATTR_GLOBAL, &id, ORTE_GRPCOMM_COLL_ID_T);
coll = orte_grpcomm_base_setup_collective(jdata->jobid, id);
/* account for the collectives in its modex/barriers */
id = orte_grpcomm_base_get_coll_id();
orte_set_attribute(&jdata->attributes, ORTE_JOB_PEER_MODX_ID, ORTE_ATTR_GLOBAL, &id, ORTE_GRPCOMM_COLL_ID_T);
coll = orte_grpcomm_base_setup_collective(id);
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = jdata->jobid;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_append(&coll->participants, &nm->super);
id = orte_grpcomm_base_get_coll_id(ORTE_PROC_MY_NAME);
orte_set_attribute(&jdata->attributes, ORTE_JOB_SNAPC_FINI_BAR, ORTE_ATTR_GLOBAL, &id, ORTE_GRPCOMM_COLL_ID_T);
coll = orte_grpcomm_base_setup_collective(jdata->jobid, id);
}
id = orte_grpcomm_base_get_coll_id();
orte_set_attribute(&jdata->attributes, ORTE_JOB_INIT_BAR_ID, ORTE_ATTR_GLOBAL, &id, ORTE_GRPCOMM_COLL_ID_T);
coll = orte_grpcomm_base_setup_collective(id);
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = jdata->jobid;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_append(&coll->participants, &nm->super);
id = orte_grpcomm_base_get_coll_id();
orte_set_attribute(&jdata->attributes, ORTE_JOB_FINI_BAR_ID, ORTE_ATTR_GLOBAL, &id, ORTE_GRPCOMM_COLL_ID_T);
coll = orte_grpcomm_base_setup_collective(id);
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = jdata->jobid;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_append(&coll->participants, &nm->super);
#if OPAL_ENABLE_FT_CR == 1
id = orte_grpcomm_base_get_coll_id();
orte_set_attribute(&jdata->attributes, ORTE_JOB_SNAPC_INIT_BAR, ORTE_ATTR_GLOBAL, &id, ORTE_GRPCOMM_COLL_ID_T);
coll = orte_grpcomm_base_setup_collective(id);
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = jdata->jobid;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_append(&coll->participants, &nm->super);
id = orte_grpcomm_base_get_coll_id();
orte_set_attribute(&jdata->attributes, ORTE_JOB_SNAPC_FINI_BAR, ORTE_ATTR_GLOBAL, &id, ORTE_GRPCOMM_COLL_ID_T);
coll = orte_grpcomm_base_setup_collective(id);
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = jdata->jobid;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_append(&coll->participants, &nm->super);
#endif
/* create a string that contains our uri + sysinfo */

Просмотреть файл

@ -139,7 +139,6 @@ ORTE_DECLSPEC extern int orte_exit_status;
#define ORTE_DB_RMLURI "orte.rmluri"
#define ORTE_DB_HOSTID "orte.hostid"
#define ORTE_DB_GLOBAL_RANK "orte.global.rank"
#define ORTE_DB_COLL_ID_CNTR "orte.coll.id.cntr"
/* State Machine lists */
ORTE_DECLSPEC extern opal_list_t orte_job_states;

Просмотреть файл

@ -90,7 +90,7 @@ ORTE_DECLSPEC void orte_wait_cb_cancel(orte_proc_t *proc);
"%s waiting on progress thread at %s:%d", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
__FILE__, __LINE__); \
while ((flg)) { \
while ((flg)) { \
/* provide a short quiet period so we \
* don't hammer the cpu while waiting \
*/ \

Просмотреть файл

@ -38,29 +38,29 @@ int main(int argc, char* argv[])
for (i=0; i < COLL_TEST_MAX; i++) {
fprintf(stderr, "%d executing barrier %d\n", rank, i);
coll = OBJ_NEW(ompi_rte_collective_t);
coll->id = ompi_rte_get_collective_id(OMPI_PROC_MY_NAME);
coll->id = ompi_rte_get_collective_id(MPI_COMM_WORLD);
coll->active = true;
if (OMPI_SUCCESS != (ret = ompi_rte_barrier(coll))) {
OMPI_ERROR_LOG(ret);
return ret;
}
OMPI_LAZY_WAIT_FOR_COMPLETION(coll->active);
OBJ_RELEASE(coll);
}
for (i=0; i < COLL_TEST_MAX; i++) {
fprintf(stderr, "%d executing modex %d\n", rank, i);
coll = OBJ_NEW(ompi_rte_collective_t);
coll->id = ompi_rte_get_collective_id(OMPI_PROC_MY_NAME);
coll->id = ompi_rte_get_collective_id(MPI_COMM_WORLD);
coll->active = true;
if (OMPI_SUCCESS != (ret = ompi_rte_modex(coll))) {
OMPI_ERROR_LOG(ret);
return ret;
}
OMPI_LAZY_WAIT_FOR_COMPLETION(coll->active);
OBJ_RELEASE(coll);
}
/* wait for barrier to complete */
OMPI_LAZY_WAIT_FOR_COMPLETION(coll->active);
OBJ_RELEASE(coll);
MPI_Finalize();
return 0;
}

Просмотреть файл

@ -195,8 +195,12 @@ const char *orte_attr_key_to_str(orte_attribute_key_t key)
return "JOB-FAIL-NOTIFIED";
case ORTE_JOB_TERM_NOTIFIED:
return "JOB-TERM-NOTIFIED";
case ORTE_JOB_COLL_ID_CNTR:
return "JOB-COLL-ID-CNTR";
case ORTE_JOB_PEER_MODX_ID:
return "JOB-PEER-MODX-ID";
case ORTE_JOB_INIT_BAR_ID:
return "JOB-INIT-BAR-ID";
case ORTE_JOB_FINI_BAR_ID:
return "JOB-FINI-BAR-ID";
case ORTE_PROC_NOBARRIER:
return "PROC-NOBARRIER";

Просмотреть файл

@ -115,7 +115,9 @@ typedef uint16_t orte_job_flags_t;
#define ORTE_JOB_GOVERNOR (ORTE_JOB_START_KEY + 27) // string - governor used for nodes in job
#define ORTE_JOB_FAIL_NOTIFIED (ORTE_JOB_START_KEY + 28) // bool - abnormal term of proc within job has been reported
#define ORTE_JOB_TERM_NOTIFIED (ORTE_JOB_START_KEY + 29) // bool - normal term of job has been reported
#define ORTE_JOB_COLL_ID_CNTR (ORTE_JOB_START_KEY + 30) // uint32_t - counter for current collective id
#define ORTE_JOB_PEER_MODX_ID (ORTE_JOB_START_KEY + 30) // orte_grpcomm_coll_id_t - collective id
#define ORTE_JOB_INIT_BAR_ID (ORTE_JOB_START_KEY + 31) // orte_grpcomm_coll_id_t - collective id
#define ORTE_JOB_FINI_BAR_ID (ORTE_JOB_START_KEY + 32) // orte_grpcomm_coll_id_t - collective id
#define ORTE_JOB_MAX_KEY 300

Просмотреть файл

@ -80,6 +80,9 @@ ORTE_DECLSPEC orte_proc_info_t orte_process_info = {
.cpuset = NULL,
#endif
.app_rank = -1,
.peer_modex = -1,
.peer_init_barrier = -1,
.peer_fini_barrier = -1,
.my_hostid = ORTE_VPID_INVALID,
#if OPAL_ENABLE_FT_CR == 1
.snapc_init_barrier = -1,
@ -89,6 +92,9 @@ ORTE_DECLSPEC orte_proc_info_t orte_process_info = {
static bool init=false;
static int orte_ess_node_rank;
static int orte_peer_modex_id;
static int orte_peer_init_barrier_id;
static int orte_peer_fini_barrier_id;
#if OPAL_ENABLE_FT_CR == 1
static int orte_snapc_init_barrier_id;
static int orte_snapc_fini_barrier_id;
@ -262,6 +268,33 @@ int orte_proc_info(void)
orte_process_info.sync_buf = OBJ_NEW(opal_buffer_t);
/* get the collective id info */
orte_peer_modex_id = -1;
(void) mca_base_var_register ("orte", "orte", NULL, "peer_modex_id", "Peer modex collective id",
MCA_BASE_VAR_TYPE_INT, NULL, 0,
MCA_BASE_VAR_FLAG_INTERNAL,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_CONSTANT,
&orte_peer_modex_id);
orte_process_info.peer_modex = (orte_grpcomm_coll_id_t) orte_peer_modex_id;
orte_peer_init_barrier_id = -1;
(void) mca_base_var_register ("orte", "orte", NULL, "peer_init_barrier_id", "Peer init barrier collective id",
MCA_BASE_VAR_TYPE_INT, NULL, 0,
MCA_BASE_VAR_FLAG_INTERNAL,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_CONSTANT,
&orte_peer_init_barrier_id);
orte_process_info.peer_init_barrier = (orte_grpcomm_coll_id_t) orte_peer_init_barrier_id;
orte_peer_fini_barrier_id = -1;
(void) mca_base_var_register ("orte", "orte", NULL, "peer_fini_barrier_id", "Peer finalize barrier collective id",
MCA_BASE_VAR_TYPE_INT, NULL, 0,
MCA_BASE_VAR_FLAG_INTERNAL,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_CONSTANT,
&orte_peer_fini_barrier_id);
orte_process_info.peer_fini_barrier = (orte_grpcomm_coll_id_t) orte_peer_fini_barrier_id;
#if OPAL_ENABLE_FT_CR == 1
orte_snapc_init_barrier_id = -1;
(void) mca_base_var_register ("orte", "orte", NULL, "snapc_init_barrier_id", "SNAPC init barrier collective id",

Просмотреть файл

@ -129,6 +129,9 @@ struct orte_proc_info_t {
char *cpuset; /**< String-representation of bitmap where we are bound */
#endif
int app_rank; /**< rank within my app_context */
orte_grpcomm_coll_id_t peer_modex; /**< modex collective id */
orte_grpcomm_coll_id_t peer_init_barrier; /**< barrier id during init */
orte_grpcomm_coll_id_t peer_fini_barrier; /**< barrier id during finalize */
orte_vpid_t my_hostid; /** identifies the local host for a coprocessor */
#if OPAL_ENABLE_FT_CR == 1
orte_grpcomm_coll_id_t snapc_init_barrier; /**< barrier id during init */