1
1
The intercomm "merge" function can create a linkage between procs that was not reflected anywhere in a modex, and so at least some of the procs in the resulting communicator don't know how to talk to some of the new communicator's peers.

For example, consider the case where:

1. parent job A comm_spawns a process (job B) - these processes exchange modex and can communicate

2. parent job A now comm_spawns another process (job C) - again, these can communicate, but the proc in C knows nothing of B

3. do an intercomm merge across the communicators created by the two comm_spawns. This puts B and C into the same communicator, but they know nothing about how to talk to each other as they were not involved in any exchange of contact info. Hence, collectives on that communicator now fail. 

This fix adds an API to the ompi/dpm framework that (a) exchanges the modex info across the procs in the merge to ensure all procs know how to communicate, and (b) calls add_procs to give the btl's a chance to select transports to any new procs.

cmr:v1.7.3:reviewer=jsquyres

This commit was SVN r29166.

The following Trac tickets were found above:
  Ticket 2904 --> https://svn.open-mpi.org/trac/ompi/ticket/2904
Этот коммит содержится в:
Ralph Castain 2013-09-15 15:00:40 +00:00
родитель df7654e8cf
Коммит 497c7e6abb
10 изменённых файлов: 273 добавлений и 6 удалений

Просмотреть файл

@ -66,6 +66,7 @@ OMPI_DECLSPEC int ompi_dpm_base_disconnect_waitall (int count, ompi_dpm_base_dis
int ompi_dpm_base_null_connect_accept (ompi_communicator_t *comm, int root,
char *port_string, bool send_first,
ompi_communicator_t **newcomm);
int ompi_dpm_base_null_merge(ompi_communicator_t *comm, int root);
int ompi_dpm_base_null_disconnect(ompi_communicator_t *comm);
int ompi_dpm_base_null_spawn(int count, char **array_of_commands,
char ***array_of_argv,

Просмотреть файл

@ -32,6 +32,7 @@
OMPI_DECLSPEC ompi_dpm_base_module_t ompi_dpm = {
NULL,
ompi_dpm_base_null_connect_accept,
ompi_dpm_base_null_merge,
ompi_dpm_base_null_disconnect,
ompi_dpm_base_null_spawn,
ompi_dpm_base_null_dyn_init,

Просмотреть файл

@ -44,6 +44,11 @@ int ompi_dpm_base_null_connect_accept (ompi_communicator_t *comm, int root,
return OMPI_ERR_NOT_SUPPORTED;
}
int ompi_dpm_base_null_merge(ompi_communicator_t *comm, int root)
{
return OMPI_ERR_NOT_SUPPORTED;
}
int ompi_dpm_base_null_disconnect(ompi_communicator_t *comm)
{
return OMPI_SUCCESS;

Просмотреть файл

@ -55,6 +55,16 @@ typedef int (*ompi_dpm_base_module_connect_accept_fn_t)(ompi_communicator_t *com
char *port, bool send_first,
ompi_communicator_t **newcomm);
/*
* Merge communicator - ensure that all processes in a communicator
* know how to talk to each other. This is used to support Intercomm_merge
* as multiple merges can occur between jobs that have not exchanged
* modex information. We assume that the root process knows how to
* communicate via OOB with all processes in the communicator - i.e., that
* the root process is common across the communicators involved in the merge
*/
typedef int (*ompi_dpm_base_module_merge_fn_t)(ompi_communicator_t *comm, int root);
/* define a callback function for use by non-blocking persistent connect/accept operations */
typedef void (*ompi_dpm_base_paccept_connect_callback_fn_t)(ompi_communicator_t *newcomm,
ompi_proc_t *remote_proc,
@ -177,6 +187,8 @@ struct ompi_dpm_base_module_1_0_0_t {
ompi_dpm_base_module_init_fn_t init;
/* connect/accept */
ompi_dpm_base_module_connect_accept_fn_t connect_accept;
/* merge */
ompi_dpm_base_module_merge_fn_t merge;
/* disconnect */
ompi_dpm_base_module_disconnect_fn_t disconnect;
/* spawn processes */

Просмотреть файл

@ -43,6 +43,7 @@
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/base/base.h"
#include "orte/mca/plm/plm.h"
#include "orte/mca/oob/oob.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/rmaps/rmaps.h"
@ -76,6 +77,7 @@ static int init(void);
static int connect_accept (ompi_communicator_t *comm, int root,
char *port_string, bool send_first,
ompi_communicator_t **newcomm);
static int merge(ompi_communicator_t *comm, int root);
static int disconnect(ompi_communicator_t *comm);
static int spawn(int count, char **array_of_commands,
char ***array_of_argv,
@ -104,6 +106,7 @@ static void dpm_pclose(char *port);
ompi_dpm_base_module_t ompi_dpm_orte_module = {
init,
connect_accept,
merge,
disconnect,
spawn,
dyn_init,
@ -226,11 +229,11 @@ static int connect_accept(ompi_communicator_t *comm, int root,
ORTE_RML_TAG_COLL_ID_REQ,
orte_rml_send_callback, NULL);
/* wait for the id */
xfer.active = true; // must set before the recv is defined in case the msg is waiting for us
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLL_ID,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
/* wait for response */
xfer.active = true;
OMPI_WAIT_FOR_COMPLETION(xfer.active);
i=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
@ -251,11 +254,11 @@ static int connect_accept(ompi_communicator_t *comm, int root,
rc = orte_rml.send_buffer_nb(&port, nbuf, tag, orte_rml_send_callback, NULL);
} else {
/* wait to recv the collective id */
xfer.active = true; // must set before the recv is defined in case the msg is waiting for us
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, tag,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
/* wait for response */
xfer.active = true;
OMPI_WAIT_FOR_COMPLETION(xfer.active);
i=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
@ -518,11 +521,11 @@ static int connect_accept(ompi_communicator_t *comm, int root,
}
/* perform it */
modex.active = true;
if (OMPI_SUCCESS != (rc = orte_grpcomm.modex(&modex))) {
ORTE_ERROR_LOG(rc);
goto exit;
}
modex.active = true;
OMPI_WAIT_FOR_COMPLETION(modex.active);
OBJ_DESTRUCT(&modex);
@ -656,6 +659,197 @@ static int connect_accept(ompi_communicator_t *comm, int root,
return rc;
}
static int merge(ompi_communicator_t *comm, int root)
{
int rc=OMPI_SUCCESS, rank, size, i;
opal_buffer_t *buffer=NULL;
orte_rml_recv_cb_t xfer;
ompi_group_t *group=comm->c_local_group;
ompi_proc_t *proc, **newprocs;
opal_list_t data;
opal_value_t *kv;
int32_t num_entries, cnt, n;
opal_hwloc_locality_t locality;
orte_vpid_t daemon;
size = ompi_comm_size ( comm );
rank = ompi_comm_rank ( comm );
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:merge rank %d with root %d of size %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
rank, root, size));
/* if we are rank 0 of the root communicator, then
* we are the only process guaranteed to know the
* modex info for everyone in this merging communicator
*/
if (root && 0 == rank) {
/* prepare and send the modex data */
buffer = OBJ_NEW(opal_buffer_t);
/* collect all the modex info from the processes in the comm */
for (i=0 ; i<size ; i++) {
if (NULL == (proc = ompi_group_peer_lookup(group,i))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
OBJ_CONSTRUCT(&data, opal_list_t);
if (ORTE_SUCCESS != (rc = opal_db.fetch_multiple((opal_identifier_t*)&proc->proc_name, NULL, &data))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
OPAL_LIST_DESTRUCT(&data);
return rc;
}
/* pack the number of entries for this proc */
num_entries = opal_list_get_size(&data);
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &num_entries, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
OPAL_LIST_DESTRUCT(&data);
return rc;
}
/* if there are entries, pack them */
while (NULL != (kv = (opal_value_t*)opal_list_remove_first(&data))) {
if (ORTE_SUCCESS != (opal_dss.pack(buffer, &kv, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
break;
}
OBJ_RELEASE(kv);
}
OPAL_LIST_DESTRUCT(&data);
}
/* send it to each of the other processes */
for (i=0 ; i<size ; i++) {
if (rank == i) {
/* don't bother sending to myself */
continue;
}
if (NULL == (proc = ompi_group_peer_lookup(group,i))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
break;
}
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:merge sending modex data to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->proc_name)));
/* maintain accounting */
OBJ_RETAIN(buffer);
orte_rml.send_buffer_nb(&proc->proc_name, buffer,
ORTE_RML_TAG_MERGE,
orte_rml_send_callback, NULL);
}
/* maintain accounting - have to ensure we do the
* release of the buffer object while in an OOB event
* to avoid threaded race condition
*/
ORTE_OOB_RELEASE(buffer);
} else {
/* wait for data to arrive */
OBJ_CONSTRUCT(&xfer, orte_rml_recv_cb_t);
xfer.active = true;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MERGE,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:merge waiting to recv modex data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
OMPI_WAIT_FOR_COMPLETION(xfer.active);
/* load it into our local database - we know the entries
* were packed in comm rank order
*
* Because we may have added modex info for procs, we have to
* give the BTL's a chance to determine how to connect to
* remote peers, so track the procs
*/
newprocs = (ompi_proc_t**)calloc(size, sizeof(ompi_proc_t*));
for (i=0 ; i<size ; i++) {
if (NULL == (proc = ompi_group_peer_lookup(group,i))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OBJ_DESTRUCT(&xfer);
free(newprocs);
return ORTE_ERR_NOT_FOUND;
}
/* track this proc */
newprocs[i] = proc;
/* unpack the number of entries for this proc */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &num_entries, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
free(newprocs);
return rc;
}
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:merge unpack data for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->proc_name)));
/* unpack and store the entries */
daemon = ORTE_VPID_INVALID;
for (n=0; n < num_entries; n++) {
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &kv, &cnt, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
free(newprocs);
return rc;
}
/* if this is me, dump the data - we already have it in the db */
if (ORTE_PROC_MY_NAME->jobid == proc->proc_name.jobid &&
ORTE_PROC_MY_NAME->vpid == proc->proc_name.vpid) {
OBJ_RELEASE(kv);
} else if (0 == strcmp(kv->key, ORTE_DB_LOCALITY)) {
/* ignore locality entries */
OBJ_RELEASE(kv);
} else {
/* store it in the database */
if (ORTE_SUCCESS != (rc = opal_db.store_pointer((opal_identifier_t*)&proc->proc_name, OPAL_DB_INTERNAL, kv))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
free(newprocs);
return rc;
}
/* if it was the daemon vpid for this proc, save it */
if (0 == strcmp(kv->key, ORTE_DB_DAEMON_VPID)) {
daemon = kv->data.uint32;
}
/* do not release the kv - the db holds that pointer */
}
}
/* set the locality - if the jobid is different, then
* we declare the proc to be non-local to ensure shared
* memory isn't used
*/
if (ORTE_PROC_MY_NAME->jobid != proc->proc_name.jobid) {
locality = OPAL_PROC_NON_LOCAL;
} else if (daemon == ORTE_PROC_MY_DAEMON->vpid) {
locality = OPAL_PROC_ON_NODE;
} else {
locality = OPAL_PROC_NON_LOCAL;
}
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)&proc->proc_name, OPAL_DB_INTERNAL,
ORTE_DB_LOCALITY, &locality, OPAL_HWLOC_LOCALITY_T))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
free(newprocs);
return rc;
}
}
OBJ_DESTRUCT(&xfer);
/* now trigger the PML */
if (OMPI_SUCCESS != (rc = MCA_PML_CALL(add_procs(newprocs, size)))) {
ORTE_ERROR_LOG(rc);
}
free(newprocs);
}
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:merge complete",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static int disconnect(ompi_communicator_t *comm)
{
ompi_dpm_base_disconnect_obj *dobj;

Просмотреть файл

@ -11,6 +11,7 @@
* All rights reserved.
* Copyright (c) 2006-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2006-2009 University of Houston. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -27,6 +28,7 @@
#include "ompi/communicator/communicator.h"
#include "ompi/proc/proc.h"
#include "ompi/memchecker.h"
#include "ompi/mca/dpm/dpm.h"
#if OPAL_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES
#pragma weak MPI_Intercomm_merge = PMPI_Intercomm_merge
@ -117,6 +119,14 @@ int MPI_Intercomm_merge(MPI_Comm intercomm, int high,
OBJ_RELEASE(new_group_pointer);
new_group_pointer = MPI_GROUP_NULL;
/* ensure all processes in the merged communicator know how
* to communicate to each other
*/
rc = ompi_dpm.merge(newcomp, first);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
/* Determine context id. It is identical to f_2_c_handle */
rc = ompi_comm_nextcid ( newcomp, /* new comm */
intercomm, /* old comm */

Просмотреть файл

@ -95,6 +95,12 @@ MCA_BASE_FRAMEWORK_DECLARE(orte, oob, "Out-of-Band Messaging Subsystem",
mca_oob_base_static_components, 0);
void orte_oob_base_object_release(int fd, short args, void *cbdata)
{
orte_oob_caddy_t *cd = (orte_oob_caddy_t*)cbdata;
OBJ_RELEASE(cd->object);
}
OBJ_CLASS_INSTANCE(mca_oob_base_component_t,
opal_list_item_t,
NULL, NULL);
@ -113,3 +119,6 @@ OBJ_CLASS_INSTANCE(orte_oob_base_peer_t,
opal_object_t,
pr_cons, pr_des);
OBJ_CLASS_INSTANCE(orte_oob_caddy_t,
opal_object_t,
NULL, NULL);

Просмотреть файл

@ -77,6 +77,28 @@ typedef struct {
MCA_BASE_VERSION_2_0_0, \
"oob", 2, 0, 0
/* macro to safely release an object being used by OOB */
typedef struct {
opal_object_t super;
opal_event_t ev;
void *object;
} orte_oob_caddy_t;
OBJ_CLASS_DECLARATION(orte_oob_caddy_t);
#define ORTE_OOB_RELEASE(p) \
do { \
orte_oob_caddy_t *cd; \
cd = OBJ_NEW(orte_oob_caddy_t); \
cd->object = (p); \
opal_event_set(orte_event_base, &cd->ev, -1, \
OPAL_EV_WRITE, \
orte_oob_base_object_release, cd); \
opal_event_set_priority(&cd->ev, ORTE_MSG_PRI); \
opal_event_active(&cd->ev, OPAL_EV_WRITE, 1); \
} while(0);
ORTE_DECLSPEC void orte_oob_base_object_release(int fd, short args, void *cbdata);
END_C_DECLS
#endif

Просмотреть файл

@ -142,6 +142,9 @@ BEGIN_C_DECLS
/* sensor data */
#define ORTE_RML_TAG_SENSOR_DATA 47
/* merge support */
#define ORTE_RML_TAG_MERGE 48
#define ORTE_RML_TAG_MAX 100

Просмотреть файл

@ -65,7 +65,8 @@ main(int argc, char *argv[])
}
/* All done */
fprintf(stderr, "%s: finalizing\n", whoami);
fflush(stderr);
MPI_Finalize();
return 0;
}
@ -115,9 +116,11 @@ do_parent(char *argv[], int rank, int count)
err = MPI_Intercomm_merge(ab_c_inter, 0, &abc_intra);
printf( "%s: intercomm_merge(%d) (%d) [rank %d]\n", whoami, 0, err, rank );
sleep(20);
err = MPI_Barrier(abc_intra);
printf( "%s: barrier (%d)\n", whoami, err );
fflush(stdout);
fflush(stderr);
}
@ -148,10 +151,17 @@ do_target(char* argv[], MPI_Comm parent)
printf( "%s: intercomm_merge(%d) (%d) [rank %d]\n", whoami, first, err, rank );
err = MPI_Barrier(merge1);
printf( "%s: barrier (%d)\n", whoami, err );
fflush(stdout);
fflush(stderr);
MPI_Comm_free(&merge1);
printf( "%s: merge1 free\n", whoami );
fflush(stdout);
MPI_Comm_free(&inter);
printf( "%s: inter free\n", whoami );
fflush(stdout);
MPI_Comm_free(&intra);
printf( "%s: intra free\n", whoami );
fflush(stdout);
MPI_Comm_disconnect(&parent);
}