1
1

Revert r29166 in favor of a better solution from George

This commit was SVN r29199.

The following SVN revision numbers were found above:
  r29166 --> open-mpi/ompi@497c7e6abb
Этот коммит содержится в:
Ralph Castain 2013-09-18 01:41:26 +00:00
родитель 55273f1c98
Коммит 99611ac1d2
9 изменённых файлов: 3 добавлений и 260 удалений

Просмотреть файл

@ -66,7 +66,6 @@ OMPI_DECLSPEC int ompi_dpm_base_disconnect_waitall (int count, ompi_dpm_base_dis
int ompi_dpm_base_null_connect_accept (ompi_communicator_t *comm, int root, int ompi_dpm_base_null_connect_accept (ompi_communicator_t *comm, int root,
char *port_string, bool send_first, char *port_string, bool send_first,
ompi_communicator_t **newcomm); ompi_communicator_t **newcomm);
int ompi_dpm_base_null_merge(ompi_communicator_t *comm, int root);
int ompi_dpm_base_null_disconnect(ompi_communicator_t *comm); int ompi_dpm_base_null_disconnect(ompi_communicator_t *comm);
int ompi_dpm_base_null_spawn(int count, char **array_of_commands, int ompi_dpm_base_null_spawn(int count, char **array_of_commands,
char ***array_of_argv, char ***array_of_argv,

Просмотреть файл

@ -32,7 +32,6 @@
OMPI_DECLSPEC ompi_dpm_base_module_t ompi_dpm = { OMPI_DECLSPEC ompi_dpm_base_module_t ompi_dpm = {
NULL, NULL,
ompi_dpm_base_null_connect_accept, ompi_dpm_base_null_connect_accept,
ompi_dpm_base_null_merge,
ompi_dpm_base_null_disconnect, ompi_dpm_base_null_disconnect,
ompi_dpm_base_null_spawn, ompi_dpm_base_null_spawn,
ompi_dpm_base_null_dyn_init, ompi_dpm_base_null_dyn_init,

Просмотреть файл

@ -44,11 +44,6 @@ int ompi_dpm_base_null_connect_accept (ompi_communicator_t *comm, int root,
return OMPI_ERR_NOT_SUPPORTED; return OMPI_ERR_NOT_SUPPORTED;
} }
int ompi_dpm_base_null_merge(ompi_communicator_t *comm, int root)
{
return OMPI_ERR_NOT_SUPPORTED;
}
int ompi_dpm_base_null_disconnect(ompi_communicator_t *comm) int ompi_dpm_base_null_disconnect(ompi_communicator_t *comm)
{ {
return OMPI_SUCCESS; return OMPI_SUCCESS;

Просмотреть файл

@ -55,16 +55,6 @@ typedef int (*ompi_dpm_base_module_connect_accept_fn_t)(ompi_communicator_t *com
char *port, bool send_first, char *port, bool send_first,
ompi_communicator_t **newcomm); ompi_communicator_t **newcomm);
/*
* Merge communicator - ensure that all processes in a communicator
* know how to talk to each other. This is used to support Intercomm_merge
* as multiple merges can occur between jobs that have not exchanged
* modex information. We assume that the root process knows how to
* communicate via OOB with all processes in the communicator - i.e., that
* the root process is common across the communicators involved in the merge
*/
typedef int (*ompi_dpm_base_module_merge_fn_t)(ompi_communicator_t *comm, int root);
/* define a callback function for use by non-blocking persistent connect/accept operations */ /* define a callback function for use by non-blocking persistent connect/accept operations */
typedef void (*ompi_dpm_base_paccept_connect_callback_fn_t)(ompi_communicator_t *newcomm, typedef void (*ompi_dpm_base_paccept_connect_callback_fn_t)(ompi_communicator_t *newcomm,
ompi_proc_t *remote_proc, ompi_proc_t *remote_proc,
@ -187,8 +177,6 @@ struct ompi_dpm_base_module_1_0_0_t {
ompi_dpm_base_module_init_fn_t init; ompi_dpm_base_module_init_fn_t init;
/* connect/accept */ /* connect/accept */
ompi_dpm_base_module_connect_accept_fn_t connect_accept; ompi_dpm_base_module_connect_accept_fn_t connect_accept;
/* merge */
ompi_dpm_base_module_merge_fn_t merge;
/* disconnect */ /* disconnect */
ompi_dpm_base_module_disconnect_fn_t disconnect; ompi_dpm_base_module_disconnect_fn_t disconnect;
/* spawn processes */ /* spawn processes */

Просмотреть файл

@ -43,7 +43,6 @@
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/base/base.h" #include "orte/mca/grpcomm/base/base.h"
#include "orte/mca/plm/plm.h" #include "orte/mca/plm/plm.h"
#include "orte/mca/oob/oob.h"
#include "orte/mca/rml/rml.h" #include "orte/mca/rml/rml.h"
#include "orte/mca/rml/rml_types.h" #include "orte/mca/rml/rml_types.h"
#include "orte/mca/rmaps/rmaps.h" #include "orte/mca/rmaps/rmaps.h"
@ -77,7 +76,6 @@ static int init(void);
static int connect_accept (ompi_communicator_t *comm, int root, static int connect_accept (ompi_communicator_t *comm, int root,
char *port_string, bool send_first, char *port_string, bool send_first,
ompi_communicator_t **newcomm); ompi_communicator_t **newcomm);
static int merge(ompi_communicator_t *comm, int root);
static int disconnect(ompi_communicator_t *comm); static int disconnect(ompi_communicator_t *comm);
static int spawn(int count, char **array_of_commands, static int spawn(int count, char **array_of_commands,
char ***array_of_argv, char ***array_of_argv,
@ -106,7 +104,6 @@ static void dpm_pclose(char *port);
ompi_dpm_base_module_t ompi_dpm_orte_module = { ompi_dpm_base_module_t ompi_dpm_orte_module = {
init, init,
connect_accept, connect_accept,
merge,
disconnect, disconnect,
spawn, spawn,
dyn_init, dyn_init,
@ -229,11 +226,11 @@ static int connect_accept(ompi_communicator_t *comm, int root,
ORTE_RML_TAG_COLL_ID_REQ, ORTE_RML_TAG_COLL_ID_REQ,
orte_rml_send_callback, NULL); orte_rml_send_callback, NULL);
/* wait for the id */ /* wait for the id */
xfer.active = true; // must set before the recv is defined in case the msg is waiting for us
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLL_ID, orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLL_ID,
ORTE_RML_NON_PERSISTENT, ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer); orte_rml_recv_callback, &xfer);
/* wait for response */ /* wait for response */
xfer.active = true;
OMPI_WAIT_FOR_COMPLETION(xfer.active); OMPI_WAIT_FOR_COMPLETION(xfer.active);
i=1; i=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) { if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
@ -254,11 +251,11 @@ static int connect_accept(ompi_communicator_t *comm, int root,
rc = orte_rml.send_buffer_nb(&port, nbuf, tag, orte_rml_send_callback, NULL); rc = orte_rml.send_buffer_nb(&port, nbuf, tag, orte_rml_send_callback, NULL);
} else { } else {
/* wait to recv the collective id */ /* wait to recv the collective id */
xfer.active = true; // must set before the recv is defined in case the msg is waiting for us
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, tag, orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, tag,
ORTE_RML_NON_PERSISTENT, ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer); orte_rml_recv_callback, &xfer);
/* wait for response */ /* wait for response */
xfer.active = true;
OMPI_WAIT_FOR_COMPLETION(xfer.active); OMPI_WAIT_FOR_COMPLETION(xfer.active);
i=1; i=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) { if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
@ -521,11 +518,11 @@ static int connect_accept(ompi_communicator_t *comm, int root,
} }
/* perform it */ /* perform it */
modex.active = true;
if (OMPI_SUCCESS != (rc = orte_grpcomm.modex(&modex))) { if (OMPI_SUCCESS != (rc = orte_grpcomm.modex(&modex))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto exit; goto exit;
} }
modex.active = true;
OMPI_WAIT_FOR_COMPLETION(modex.active); OMPI_WAIT_FOR_COMPLETION(modex.active);
OBJ_DESTRUCT(&modex); OBJ_DESTRUCT(&modex);
@ -659,197 +656,6 @@ static int connect_accept(ompi_communicator_t *comm, int root,
return rc; return rc;
} }
static int merge(ompi_communicator_t *comm, int root)
{
int rc=OMPI_SUCCESS, rank, size, i;
opal_buffer_t *buffer=NULL;
orte_rml_recv_cb_t xfer;
ompi_group_t *group=comm->c_local_group;
ompi_proc_t *proc, **newprocs;
opal_list_t data;
opal_value_t *kv;
int32_t num_entries, cnt, n;
opal_hwloc_locality_t locality;
orte_vpid_t daemon;
size = ompi_comm_size ( comm );
rank = ompi_comm_rank ( comm );
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:merge rank %d with root %d of size %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
rank, root, size));
/* if we are rank 0 of the root communicator, then
* we are the only process guaranteed to know the
* modex info for everyone in this merging communicator
*/
if (root && 0 == rank) {
/* prepare and send the modex data */
buffer = OBJ_NEW(opal_buffer_t);
/* collect all the modex info from the processes in the comm */
for (i=0 ; i<size ; i++) {
if (NULL == (proc = ompi_group_peer_lookup(group,i))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
OBJ_CONSTRUCT(&data, opal_list_t);
if (ORTE_SUCCESS != (rc = opal_db.fetch_multiple((opal_identifier_t*)&proc->proc_name, NULL, &data))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
OPAL_LIST_DESTRUCT(&data);
return rc;
}
/* pack the number of entries for this proc */
num_entries = opal_list_get_size(&data);
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &num_entries, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
OPAL_LIST_DESTRUCT(&data);
return rc;
}
/* if there are entries, pack them */
while (NULL != (kv = (opal_value_t*)opal_list_remove_first(&data))) {
if (ORTE_SUCCESS != (opal_dss.pack(buffer, &kv, 1, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
break;
}
OBJ_RELEASE(kv);
}
OPAL_LIST_DESTRUCT(&data);
}
/* send it to each of the other processes */
for (i=0 ; i<size ; i++) {
if (rank == i) {
/* don't bother sending to myself */
continue;
}
if (NULL == (proc = ompi_group_peer_lookup(group,i))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rc = ORTE_ERR_NOT_FOUND;
break;
}
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:merge sending modex data to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->proc_name)));
/* maintain accounting */
OBJ_RETAIN(buffer);
orte_rml.send_buffer_nb(&proc->proc_name, buffer,
ORTE_RML_TAG_MERGE,
orte_rml_send_callback, NULL);
}
/* maintain accounting - have to ensure we do the
* release of the buffer object while in an OOB event
* to avoid threaded race condition
*/
ORTE_OOB_RELEASE(buffer);
} else {
/* wait for data to arrive */
OBJ_CONSTRUCT(&xfer, orte_rml_recv_cb_t);
xfer.active = true;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MERGE,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:merge waiting to recv modex data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
OMPI_WAIT_FOR_COMPLETION(xfer.active);
/* load it into our local database - we know the entries
* were packed in comm rank order
*
* Because we may have added modex info for procs, we have to
* give the BTL's a chance to determine how to connect to
* remote peers, so track the procs
*/
newprocs = (ompi_proc_t**)calloc(size, sizeof(ompi_proc_t*));
for (i=0 ; i<size ; i++) {
if (NULL == (proc = ompi_group_peer_lookup(group,i))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OBJ_DESTRUCT(&xfer);
free(newprocs);
return ORTE_ERR_NOT_FOUND;
}
/* track this proc */
newprocs[i] = proc;
/* unpack the number of entries for this proc */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &num_entries, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
free(newprocs);
return rc;
}
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:merge unpack data for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->proc_name)));
/* unpack and store the entries */
daemon = ORTE_VPID_INVALID;
for (n=0; n < num_entries; n++) {
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &kv, &cnt, OPAL_VALUE))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
free(newprocs);
return rc;
}
/* if this is me, dump the data - we already have it in the db */
if (ORTE_PROC_MY_NAME->jobid == proc->proc_name.jobid &&
ORTE_PROC_MY_NAME->vpid == proc->proc_name.vpid) {
OBJ_RELEASE(kv);
} else if (0 == strcmp(kv->key, ORTE_DB_LOCALITY)) {
/* ignore locality entries */
OBJ_RELEASE(kv);
} else {
/* store it in the database */
if (ORTE_SUCCESS != (rc = opal_db.store_pointer((opal_identifier_t*)&proc->proc_name, OPAL_DB_INTERNAL, kv))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
free(newprocs);
return rc;
}
/* if it was the daemon vpid for this proc, save it */
if (0 == strcmp(kv->key, ORTE_DB_DAEMON_VPID)) {
daemon = kv->data.uint32;
}
/* do not release the kv - the db holds that pointer */
}
}
/* set the locality - if the jobid is different, then
* we declare the proc to be non-local to ensure shared
* memory isn't used
*/
if (ORTE_PROC_MY_NAME->jobid != proc->proc_name.jobid) {
locality = OPAL_PROC_NON_LOCAL;
} else if (daemon == ORTE_PROC_MY_DAEMON->vpid) {
locality = OPAL_PROC_ON_NODE;
} else {
locality = OPAL_PROC_NON_LOCAL;
}
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)&proc->proc_name, OPAL_DB_INTERNAL,
ORTE_DB_LOCALITY, &locality, OPAL_HWLOC_LOCALITY_T))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
free(newprocs);
return rc;
}
}
OBJ_DESTRUCT(&xfer);
/* now trigger the PML */
if (OMPI_SUCCESS != (rc = MCA_PML_CALL(add_procs(newprocs, size)))) {
ORTE_ERROR_LOG(rc);
}
free(newprocs);
}
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:merge complete",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return rc;
}
static int disconnect(ompi_communicator_t *comm) static int disconnect(ompi_communicator_t *comm)
{ {
ompi_dpm_base_disconnect_obj *dobj; ompi_dpm_base_disconnect_obj *dobj;

Просмотреть файл

@ -11,7 +11,6 @@
* All rights reserved. * All rights reserved.
* Copyright (c) 2006-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2006-2012 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2006-2009 University of Houston. All rights reserved. * Copyright (c) 2006-2009 University of Houston. All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
@ -28,7 +27,6 @@
#include "ompi/communicator/communicator.h" #include "ompi/communicator/communicator.h"
#include "ompi/proc/proc.h" #include "ompi/proc/proc.h"
#include "ompi/memchecker.h" #include "ompi/memchecker.h"
#include "ompi/mca/dpm/dpm.h"
#if OPAL_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES #if OPAL_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES
#pragma weak MPI_Intercomm_merge = PMPI_Intercomm_merge #pragma weak MPI_Intercomm_merge = PMPI_Intercomm_merge
@ -119,14 +117,6 @@ int MPI_Intercomm_merge(MPI_Comm intercomm, int high,
OBJ_RELEASE(new_group_pointer); OBJ_RELEASE(new_group_pointer);
new_group_pointer = MPI_GROUP_NULL; new_group_pointer = MPI_GROUP_NULL;
/* ensure all processes in the merged communicator know how
* to communicate to each other
*/
rc = ompi_dpm.merge(newcomp, first);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
/* Determine context id. It is identical to f_2_c_handle */ /* Determine context id. It is identical to f_2_c_handle */
rc = ompi_comm_nextcid ( newcomp, /* new comm */ rc = ompi_comm_nextcid ( newcomp, /* new comm */
intercomm, /* old comm */ intercomm, /* old comm */

Просмотреть файл

@ -95,12 +95,6 @@ MCA_BASE_FRAMEWORK_DECLARE(orte, oob, "Out-of-Band Messaging Subsystem",
mca_oob_base_static_components, 0); mca_oob_base_static_components, 0);
void orte_oob_base_object_release(int fd, short args, void *cbdata)
{
orte_oob_caddy_t *cd = (orte_oob_caddy_t*)cbdata;
OBJ_RELEASE(cd->object);
}
OBJ_CLASS_INSTANCE(mca_oob_base_component_t, OBJ_CLASS_INSTANCE(mca_oob_base_component_t,
opal_list_item_t, opal_list_item_t,
NULL, NULL); NULL, NULL);
@ -119,6 +113,3 @@ OBJ_CLASS_INSTANCE(orte_oob_base_peer_t,
opal_object_t, opal_object_t,
pr_cons, pr_des); pr_cons, pr_des);
OBJ_CLASS_INSTANCE(orte_oob_caddy_t,
opal_object_t,
NULL, NULL);

Просмотреть файл

@ -77,28 +77,6 @@ typedef struct {
MCA_BASE_VERSION_2_0_0, \ MCA_BASE_VERSION_2_0_0, \
"oob", 2, 0, 0 "oob", 2, 0, 0
/* macro to safely release an object being used by OOB */
typedef struct {
opal_object_t super;
opal_event_t ev;
void *object;
} orte_oob_caddy_t;
OBJ_CLASS_DECLARATION(orte_oob_caddy_t);
#define ORTE_OOB_RELEASE(p) \
do { \
orte_oob_caddy_t *cd; \
cd = OBJ_NEW(orte_oob_caddy_t); \
cd->object = (p); \
opal_event_set(orte_event_base, &cd->ev, -1, \
OPAL_EV_WRITE, \
orte_oob_base_object_release, cd); \
opal_event_set_priority(&cd->ev, ORTE_MSG_PRI); \
opal_event_active(&cd->ev, OPAL_EV_WRITE, 1); \
} while(0);
ORTE_DECLSPEC void orte_oob_base_object_release(int fd, short args, void *cbdata);
END_C_DECLS END_C_DECLS
#endif #endif

Просмотреть файл

@ -142,9 +142,6 @@ BEGIN_C_DECLS
/* sensor data */ /* sensor data */
#define ORTE_RML_TAG_SENSOR_DATA 47 #define ORTE_RML_TAG_SENSOR_DATA 47
/* merge support */
#define ORTE_RML_TAG_MERGE 48
#define ORTE_RML_TAG_MAX 100 #define ORTE_RML_TAG_MAX 100