1
1

Replace the PML barrier with an RTE barrier for now until we can come up with a better solution for connectionless BTLs.

Refs trac:4643

This commit was SVN r31915.

The following Trac tickets were found above:
  Ticket 4643 --> https://svn.open-mpi.org/trac/ompi/ticket/4643
Этот коммит содержится в:
Ralph Castain 2014-06-01 16:08:56 +00:00
родитель cc219218a7
Коммит cf2c7381d0
3 изменённых файлов: 296 добавлений и 79 удалений

Просмотреть файл

@ -69,7 +69,7 @@
/* Local static variables */
static opal_mutex_t ompi_dpm_port_mutex;
static orte_rml_tag_t next_tag;
static opal_list_t orte_dpm_acceptors, orte_dpm_connectors;
static opal_list_t orte_dpm_acceptors, orte_dpm_connectors, dynamics;
static uint32_t next_preq=0;
/* API functions */
@ -120,6 +120,22 @@ ompi_dpm_base_module_t ompi_dpm_orte_module = {
dpm_pclose
};
typedef struct {
opal_list_item_t super;
opal_event_t ev;
bool event_active;
uint32_t id;
uint32_t cid;
orte_grpcomm_coll_id_t disconnectid;
orte_rml_tag_t tag;
ompi_dpm_base_paccept_connect_callback_fn_t cbfunc;
void *cbdata;
} orte_dpm_prequest_t;
OBJ_CLASS_INSTANCE(orte_dpm_prequest_t,
opal_list_item_t,
NULL, NULL);
static void connect_complete(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
@ -133,6 +149,7 @@ static int init(void)
next_tag = OMPI_RML_TAG_DYNAMIC;
OBJ_CONSTRUCT(&orte_dpm_acceptors, opal_list_t);
OBJ_CONSTRUCT(&orte_dpm_connectors, opal_list_t);
OBJ_CONSTRUCT(&dynamics, opal_list_t);
/* post a receive for pconnect request responses */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
@ -161,14 +178,15 @@ static int connect_accept(ompi_communicator_t *comm, int root,
orte_rml_tag_t tag=ORTE_RML_TAG_INVALID;
opal_buffer_t *nbuf=NULL, *nrbuf=NULL;
ompi_proc_t **proc_list=NULL, **new_proc_list;
int i,j, new_proc_len;
int32_t i,j, new_proc_len;
ompi_group_t *new_group_pointer;
orte_grpcomm_coll_id_t id;
orte_grpcomm_coll_id_t id[2];
orte_grpcomm_collective_t modex;
orte_namelist_t *nm;
orte_rml_recv_cb_t xfer;
orte_process_name_t carport;
orte_dpm_prequest_t *preq;
OPAL_OUTPUT_VERBOSE((1, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:connect_accept with port %s %s",
@ -221,44 +239,58 @@ static int connect_accept(ompi_communicator_t *comm, int root,
if (NULL == nbuf) {
return OMPI_ERROR;
}
/* send the request - doesn't have to include any data */
/* tell the HNP how many id's we need - we need one for
* executing the connect_accept, and another when we
* disconnect
*/
i = 2;
if (OPAL_SUCCESS != (rc = opal_dss.pack(nbuf, &i, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(nbuf);
return OMPI_ERROR;
}
/* send the request */
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, nbuf,
ORTE_RML_TAG_COLL_ID_REQ,
orte_rml_send_callback, NULL);
/* wait for the id */
/* wait for the id's */
xfer.active = true;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLL_ID,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
/* wait for response */
OMPI_WAIT_FOR_COMPLETION(xfer.active);
i=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
return OMPI_ERROR;
}
OBJ_DESTRUCT(&xfer);
/* send it to my peer on the other side */
/* create a buffer to send to the other side */
nbuf = OBJ_NEW(opal_buffer_t);
if (NULL == nbuf) {
return OMPI_ERROR;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(nbuf, &id, 1, ORTE_GRPCOMM_COLL_ID_T))) {
/* unpack the id's */
i=2;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer.data, id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
goto exit;
OBJ_DESTRUCT(&xfer);
return OMPI_ERROR;
}
/* send them to my peer on the other side */
if (ORTE_SUCCESS != (rc = opal_dss.pack(nbuf, id, 2, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
return OMPI_ERROR;
}
OBJ_DESTRUCT(&xfer); // done with the received data
rc = orte_rml.send_buffer_nb(&port, nbuf, tag, orte_rml_send_callback, NULL);
} else {
/* wait to recv the collective id */
/* wait to recv the collective id's */
xfer.active = true;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, tag,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
/* wait for response */
OMPI_WAIT_FOR_COMPLETION(xfer.active);
i=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer.data, &id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
/* unpack the id's */
i=2;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer.data, id, &i, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&xfer);
return OMPI_ERROR;
@ -266,6 +298,10 @@ static int connect_accept(ompi_communicator_t *comm, int root,
OBJ_DESTRUCT(&xfer);
}
OPAL_OUTPUT_VERBOSE((1, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:connect_accept working with new collective ids %u %u",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), id[0], id[1]));
/* Generate the message buffer containing the number of processes and the list of
participating processes */
nbuf = OBJ_NEW(opal_buffer_t);
@ -273,12 +309,11 @@ static int connect_accept(ompi_communicator_t *comm, int root,
return OMPI_ERROR;
}
/* pass the collective id so we can all use it */
if (ORTE_SUCCESS != (rc = opal_dss.pack(nbuf, &id, 1, ORTE_GRPCOMM_COLL_ID_T))) {
/* pass the collective id's so we can all use them */
if (ORTE_SUCCESS != (rc = opal_dss.pack(nbuf, id, 2, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
goto exit;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(nbuf, &size, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
goto exit;
@ -419,13 +454,12 @@ static int connect_accept(ompi_communicator_t *comm, int root,
goto exit;
}
/* unload the collective id */
num_vals = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(nrbuf, &id, &num_vals, ORTE_GRPCOMM_COLL_ID_T))) {
/* unload collective id's */
num_vals = 2;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(nrbuf, id, &num_vals, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
goto exit;
}
num_vals = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(nrbuf, &rsize, &num_vals, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
@ -506,7 +540,7 @@ static int connect_accept(ompi_communicator_t *comm, int root,
/* setup the modex */
OBJ_CONSTRUCT(&modex, orte_grpcomm_collective_t);
modex.id = id;
modex.id = id[0];
modex.active = true;
/* copy across the list of participants */
@ -617,6 +651,12 @@ static int connect_accept(ompi_communicator_t *comm, int root,
goto exit;
}
/* track this communicator's disconnect collective id */
preq = OBJ_NEW(orte_dpm_prequest_t);
preq->cid = newcomp->c_contextid;
preq->disconnectid = id[1];
opal_list_append(&dynamics, &preq->super);
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:connect_accept activate comm",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -660,20 +700,106 @@ static int connect_accept(ompi_communicator_t *comm, int root,
return rc;
}
static int construct_peers(ompi_group_t *group, opal_list_t *peers)
{
int i;
orte_namelist_t *nm;
ompi_proc_t *proct;
if (OMPI_GROUP_IS_DENSE(group)) {
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:disconnect group is dense",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
for (i=0; i < group->grp_proc_count; i++) {
if (NULL == (proct = group->grp_proc_pointers[i])) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
/* add to the list of peers */
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:disconnect adding participant %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proct->proc_name)));
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = proct->proc_name.jobid;
nm->name.vpid = proct->proc_name.vpid;
opal_list_append(peers, &nm->super);
}
} else {
for (i=0; i < group->grp_proc_count; i++) {
/* lookup this proc_t to get the process name */
if (NULL == (proct = ompi_group_peer_lookup(group, i))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
/* add to the list of peers */
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:disconnect adding participant %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proct->proc_name)));
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = proct->proc_name.jobid;
nm->name.vpid = proct->proc_name.vpid;
opal_list_append(peers, &nm->super);
}
}
return ORTE_SUCCESS;
}
static int disconnect(ompi_communicator_t *comm)
{
int ret;
ompi_rte_collective_t *coll;
orte_dpm_prequest_t *req, *preq;
ompi_group_t *group;
/* JMS Temporarily disable PML-based barrier and use RTE-based
barrier instead. This is related to
https://svn.open-mpi.org/trac/ompi/ticket/4643. */
#if 0
int ret;
ompi_rte_collective_t *coll;
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:disconnect comm_cid %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), comm->c_contextid));
/* find this communicator's conn-accept request */
req = NULL;
OPAL_LIST_FOREACH(preq, &dynamics, orte_dpm_prequest_t) {
if (preq->cid == comm->c_contextid) {
req = preq;
break;
}
}
if (NULL == req) {
/* we are hosed */
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:disconnect collective tracker for comm_cid %d NOT FOUND",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), comm->c_contextid));
return OMPI_ERROR;
}
/* setup the collective */
coll = OBJ_NEW(ompi_rte_collective_t);
/* Ralph points out that we need a unique barrier ID for this
operation (similar to what we do in connect/accept). So #if0
out this code for now; Ralph will be working on this
shortly. */
coll->id = ompi_process_info.peer_fini_barrier;
coll->id = req->disconnectid;
/* the daemons will have no knowledge of this collective, so
* it must be done across the peers in the communicator.
* RHC: assuming for now that this must flow across all
* local and remote group members */
group = comm->c_local_group;
if (ORTE_SUCCESS != (ret = construct_peers(group, &coll->participants))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(coll);
return ret;
}
/* do the same for the remote group */
group = comm->c_remote_group;
if (ORTE_SUCCESS != (ret = construct_peers(group, &coll->participants))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(coll);
return ret;
}
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:disconnect calling barrier on comm_cid %d using id %d with %d participants",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), comm->c_contextid, (int)coll->id,
(int)opal_list_get_size(&coll->participants)));
coll->active = true;
if (OMPI_SUCCESS != (ret = ompi_rte_barrier(coll))) {
OMPI_ERROR_LOG(ret);
@ -684,13 +810,15 @@ static int disconnect(ompi_communicator_t *comm)
OMPI_WAIT_FOR_COMPLETION(coll->active);
OBJ_RELEASE(coll);
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:disconnect barrier complete for comm_cid %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), comm->c_contextid));
/* release this tracker */
opal_list_remove_item(&dynamics, &req->super);
OBJ_RELEASE(req);
return OMPI_SUCCESS;
#else
ompi_dpm_base_disconnect_obj *dobj;
dobj = ompi_dpm_base_disconnect_init (comm);
return ompi_dpm_base_disconnect_waitall(1, &dobj);
#endif
}
static int spawn(int count, const char *array_of_commands[],
@ -780,7 +908,6 @@ static int spawn(int count, const char *array_of_commands[],
}
/* record the number of procs to be generated */
app->num_procs = array_of_maxprocs[i];
jdata->num_procs += app->num_procs;
/* copy over the argv array */
counter = 1;
@ -1323,19 +1450,6 @@ static int finalize(void)
return OMPI_SUCCESS;
}
typedef struct {
opal_list_item_t super;
opal_event_t ev;
bool event_active;
uint32_t id;
orte_rml_tag_t tag;
ompi_dpm_base_paccept_connect_callback_fn_t cbfunc;
void *cbdata;
} orte_dpm_prequest_t;
OBJ_CLASS_INSTANCE(orte_dpm_prequest_t,
opal_list_item_t,
NULL, NULL);
static void timeout_cb(int fd, short args, void *cbdata)
{
orte_dpm_prequest_t *req = (orte_dpm_prequest_t*)cbdata;

Просмотреть файл

@ -137,10 +137,12 @@ CLEANUP:
static void process_barrier(int fd, short args, void *cbdata)
{
orte_grpcomm_caddy_t *caddy = (orte_grpcomm_caddy_t*)cbdata;
orte_grpcomm_collective_t *coll = caddy->op;
orte_grpcomm_collective_t *coll = caddy->op, *cptr;
opal_list_item_t *item;
int rc;
opal_buffer_t *buf;
orte_namelist_t *nm;
bool found;
OBJ_RELEASE(caddy);
@ -161,9 +163,6 @@ static void process_barrier(int fd, short args, void *cbdata)
return;
}
/* setup the collective */
opal_list_append(&orte_grpcomm_base.active_colls, &coll->super);
if (0 == opal_list_get_size(&coll->participants)) {
/* add a wildcard name to the participants so the daemon knows
* that everyone in my job must participate
@ -172,27 +171,114 @@ static void process_barrier(int fd, short args, void *cbdata)
nm->name.jobid = ORTE_PROC_MY_NAME->jobid;
nm->name.vpid = ORTE_VPID_WILDCARD;
opal_list_append(&coll->participants, &nm->super);
}
/* pack the collective - no data should be involved, but we need
* to ensure we get the header info correct so it can be
* unpacked without error
*/
buf = OBJ_NEW(opal_buffer_t);
orte_grpcomm_base_pack_collective(buf, ORTE_PROC_MY_NAME->jobid,
coll, ORTE_GRPCOMM_INTERNAL_STG_APP);
/* setup the collective */
opal_list_append(&orte_grpcomm_base.active_colls, &coll->super);
/* send the buffer to my daemon */
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buf, ORTE_RML_TAG_COLLECTIVE,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
opal_list_remove_item(&orte_grpcomm_base.active_colls, &coll->super);
/* pack the collective - no data should be involved, but we need
* to ensure we get the header info correct so it can be
* unpacked without error
*/
buf = OBJ_NEW(opal_buffer_t);
orte_grpcomm_base_pack_collective(buf, ORTE_PROC_MY_NAME->jobid,
coll, ORTE_GRPCOMM_INTERNAL_STG_APP);
/* send the buffer to my daemon */
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buf, ORTE_RML_TAG_COLLECTIVE,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
opal_list_remove_item(&orte_grpcomm_base.active_colls, &coll->super);
coll->active = false;
return;
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:bad barrier with daemons underway",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return;
}
/* if the participants were specified, then we must do a direct
* barrier across them since the daemons won't know anything about
* the collective and/or who is participating. We have to start by
* seeing if the collective is already present - a race condition
* exists where other participants may have already sent us their
* contribution. This would place the collective on the global
* array, but leave it marked as "inactive" until we call
* modex with the list of participants */
found = false;
OPAL_LIST_FOREACH(cptr, &orte_grpcomm_base.active_colls, orte_grpcomm_collective_t) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s CHECKING COLL id %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
cptr->id));
if (coll->id == cptr->id) {
found = true;
/* remove the old entry - we will replace it
* with the barrier one
*/
opal_list_remove_item(&orte_grpcomm_base.active_colls, &cptr->super);
break;
}
}
if (found) {
/* since it already exists, the list of
* targets contains the list of procs
* that have already sent us their info. Cycle
* thru the targets and move those entries to
* the barrier object
*/
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:bad collective %d already exists - removing prior copy",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)coll->id));
while (NULL != (item = opal_list_remove_first(&cptr->targets))) {
opal_list_append(&coll->targets, item);
}
/* cleanup */
OBJ_RELEASE(cptr);
}
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:bad adding collective %d with %d participants to global list",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)coll->id, (int)opal_list_get_size(&coll->participants)));
/* now add the barrier to the global list of active collectives */
opal_list_append(&orte_grpcomm_base.active_colls, &coll->super);
/* pack the collective id */
if (ORTE_SUCCESS != (rc = opal_dss.pack(&coll->buffer, &coll->id, 1, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
coll->active = false;
return;
}
/* send directly to each participant - note that this will
* include ourselves, which is fine as it will aid in
* determining the collective is complete
*/
OPAL_LIST_FOREACH(nm, &coll->participants, orte_namelist_t) {
buf = OBJ_NEW(opal_buffer_t);
opal_dss.copy_payload(buf, &coll->buffer);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:bad sending collective %d to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)coll->id,
ORTE_NAME_PRINT(&nm->name)));
if (0 > (rc = orte_rml.send_buffer_nb(&nm->name, buf,
ORTE_RML_TAG_COLLECTIVE,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
opal_list_remove_item(&orte_grpcomm_base.active_colls, &coll->super);
coll->active = false;
return;
}
}
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:bad barrier underway",
"%s grpcomm:bad: barrier posted",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}

Просмотреть файл

@ -136,21 +136,38 @@ static void coll_id_req(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
orte_grpcomm_coll_id_t id;
orte_grpcomm_coll_id_t *id;
opal_buffer_t *relay;
int rc;
int32_t num, n;
/* unpack the number of id's being requested */
n=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &num, &n, OPAL_INT32))) {
if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
ORTE_ERROR_LOG(rc);
return;
}
/* assume one id was requested */
num = 1;
}
id = (orte_grpcomm_coll_id_t*)malloc(num * sizeof(orte_grpcomm_coll_id_t));
for (n=0; n < num; n++) {
id[n] = orte_grpcomm_base_get_coll_id();
}
id = orte_grpcomm_base_get_coll_id();
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:receive proc %s requested coll id - returned id %d",
"%s grpcomm:base:receive proc %s requested %d coll id's",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender), id));
ORTE_NAME_PRINT(sender), num));
relay = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.pack(relay, &id, 1, ORTE_GRPCOMM_COLL_ID_T))) {
if (ORTE_SUCCESS != (rc = opal_dss.pack(relay, id, num, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(relay);
free(id);
return;
}
free(id);
if (0 > (rc = orte_rml.send_buffer_nb(sender, relay, ORTE_RML_TAG_COLL_ID,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);