Fix comm_spawn when ORTE progress thread is enabled by ensuring that all operations on the global list of active collectives are done in events to avoid conflicts.
This commit was SVN r27658.
Этот коммит содержится в:
родитель
3e1b13b13a
Коммит
c26ed7dcdd
@ -54,6 +54,7 @@ static int xcast(orte_jobid_t job,
|
||||
orte_rml_tag_t tag);
|
||||
static int bad_allgather(orte_grpcomm_collective_t *coll);
|
||||
static int bad_barrier(orte_grpcomm_collective_t *coll);
|
||||
static int bad_modex(orte_grpcomm_collective_t *modex);
|
||||
|
||||
/* Module def */
|
||||
orte_grpcomm_base_module_t orte_grpcomm_bad_module = {
|
||||
@ -62,7 +63,7 @@ orte_grpcomm_base_module_t orte_grpcomm_bad_module = {
|
||||
xcast,
|
||||
bad_allgather,
|
||||
bad_barrier,
|
||||
orte_grpcomm_base_modex
|
||||
bad_modex
|
||||
};
|
||||
|
||||
/**
|
||||
@ -133,29 +134,14 @@ CLEANUP:
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
static int bad_barrier(orte_grpcomm_collective_t *coll)
|
||||
static void process_barrier(int fd, short args, void *cbdata)
|
||||
{
|
||||
orte_grpcomm_caddy_t *caddy = (orte_grpcomm_caddy_t*)cbdata;
|
||||
orte_grpcomm_collective_t *coll = caddy->op;
|
||||
int rc;
|
||||
opal_buffer_t *buf;
|
||||
orte_namelist_t *nm;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||
"%s grpcomm:bad entering barrier",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* if I am alone, just execute the callback */
|
||||
if (1 == orte_process_info.num_procs) {
|
||||
coll->active = false;
|
||||
if (NULL != coll->cbfunc) {
|
||||
coll->cbfunc(NULL, coll->cbdata);
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* mark the collective as active */
|
||||
coll->active = true;
|
||||
|
||||
/* setup the collective */
|
||||
opal_list_append(&orte_grpcomm_base.active_colls, &coll->super);
|
||||
|
||||
@ -183,41 +169,47 @@ static int bad_barrier(orte_grpcomm_collective_t *coll)
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(buf);
|
||||
opal_list_remove_item(&orte_grpcomm_base.active_colls, &coll->super);
|
||||
return rc;
|
||||
return;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output,
|
||||
"%s grpcomm:bad barrier underway",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int bad_allgather(orte_grpcomm_collective_t *gather)
|
||||
static int bad_barrier(orte_grpcomm_collective_t *coll)
|
||||
{
|
||||
int rc;
|
||||
opal_buffer_t *buf;
|
||||
orte_namelist_t *nm;
|
||||
opal_list_item_t *item;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||
"%s grpcomm:bad entering allgather",
|
||||
"%s grpcomm:bad entering barrier",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* if I am alone and nobody else is participating, then
|
||||
* nothing really to do
|
||||
*/
|
||||
if (1 == orte_process_info.num_procs &&
|
||||
0 == opal_list_get_size(&gather->participants)) {
|
||||
gather->active = false;
|
||||
if (NULL != gather->cbfunc) {
|
||||
gather->cbfunc(&gather->buffer, gather->cbdata);
|
||||
/* if I am alone, just execute the callback */
|
||||
if (1 == orte_process_info.num_procs) {
|
||||
coll->active = false;
|
||||
if (NULL != coll->cbfunc) {
|
||||
coll->cbfunc(NULL, coll->cbdata);
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* mark the collective as active */
|
||||
gather->active = true;
|
||||
coll->active = true;
|
||||
|
||||
/* push it into the event library for processing as
|
||||
* we will be accessing global lists
|
||||
*/
|
||||
ORTE_GRPCOMM_ACTIVATE(coll, process_barrier);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static void process_allgather(int fd, short args, void *cbdata)
|
||||
{
|
||||
orte_grpcomm_caddy_t *caddy = (orte_grpcomm_caddy_t*)cbdata;
|
||||
orte_grpcomm_collective_t *gather = caddy->op;
|
||||
int rc;
|
||||
opal_buffer_t *buf;
|
||||
orte_namelist_t *nm;
|
||||
opal_list_item_t *item;
|
||||
|
||||
/* if this is an original request, then record the collective */
|
||||
if (NULL == gather->next_cb) {
|
||||
@ -250,7 +242,7 @@ static int bad_allgather(orte_grpcomm_collective_t *gather)
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(buf);
|
||||
opal_list_remove_item(&orte_grpcomm_base.active_colls, &gather->super);
|
||||
return rc;
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
/* send directly to each participant - note that this will
|
||||
@ -274,15 +266,54 @@ static int bad_allgather(orte_grpcomm_collective_t *gather)
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(buf);
|
||||
opal_list_remove_item(&orte_grpcomm_base.active_colls, &gather->super);
|
||||
return rc;
|
||||
return;
|
||||
}
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||
"%s grpcomm:bad allgather underway",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
}
|
||||
|
||||
static int bad_allgather(orte_grpcomm_collective_t *gather)
|
||||
{
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output,
|
||||
"%s grpcomm:bad entering allgather",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* if I am alone and nobody else is participating, then
|
||||
* nothing really to do
|
||||
*/
|
||||
if (1 == orte_process_info.num_procs &&
|
||||
0 == opal_list_get_size(&gather->participants)) {
|
||||
gather->active = false;
|
||||
if (NULL != gather->cbfunc) {
|
||||
gather->cbfunc(&gather->buffer, gather->cbdata);
|
||||
}
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* mark the collective as active */
|
||||
gather->active = true;
|
||||
|
||||
/* push it into the event library for processing as
|
||||
* we will be accessing global lists
|
||||
*/
|
||||
ORTE_GRPCOMM_ACTIVATE(gather, process_allgather);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static int bad_modex(orte_grpcomm_collective_t *modex)
|
||||
{
|
||||
/* mark the collective as active */
|
||||
modex->active = true;
|
||||
|
||||
/* we need to get this into the event library
|
||||
* to avoid race conditions with modex data arriving
|
||||
* from other sources via the RML
|
||||
*/
|
||||
ORTE_GRPCOMM_ACTIVATE(modex, orte_grpcomm_base_modex);
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
@ -66,6 +66,28 @@ typedef struct {
|
||||
#endif
|
||||
} orte_grpcomm_base_t;
|
||||
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
opal_event_t ev;
|
||||
orte_grpcomm_collective_t *op;
|
||||
} orte_grpcomm_caddy_t;
|
||||
OBJ_CLASS_DECLARATION(orte_grpcomm_caddy_t);
|
||||
|
||||
#define ORTE_GRPCOMM_ACTIVATE(o, cb) \
|
||||
do { \
|
||||
orte_grpcomm_caddy_t *caddy; \
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, \
|
||||
"%s ACTIVATING GRCPCOMM OP %d at %s:%d", \
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
|
||||
(o)->id, __FILE__, __LINE__)); \
|
||||
caddy = OBJ_NEW(orte_grpcomm_caddy_t); \
|
||||
caddy->op = (o); \
|
||||
opal_event_set(orte_event_base, &caddy->ev, -1, \
|
||||
OPAL_EV_WRITE, (cb), caddy); \
|
||||
opal_event_set_priority(&caddy->ev, ORTE_MSG_PRI); \
|
||||
opal_event_active(&caddy->ev, OPAL_EV_WRITE, 1); \
|
||||
} while(0);
|
||||
|
||||
ORTE_DECLSPEC extern orte_grpcomm_base_t orte_grpcomm_base;
|
||||
|
||||
ORTE_DECLSPEC orte_grpcomm_collective_t* orte_grpcomm_base_setup_collective(orte_grpcomm_coll_id_t id);
|
||||
@ -82,7 +104,7 @@ ORTE_DECLSPEC void orte_grpcomm_base_rollup_recv(int status, orte_process_name_t
|
||||
/* modex support */
|
||||
ORTE_DECLSPEC void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata);
|
||||
ORTE_DECLSPEC void orte_grpcomm_base_store_modex(opal_buffer_t *rbuf, void *cbdata);
|
||||
ORTE_DECLSPEC int orte_grpcomm_base_modex(orte_grpcomm_collective_t *modex);
|
||||
ORTE_DECLSPEC void orte_grpcomm_base_modex(int fd, short args, void *cbdata);
|
||||
ORTE_DECLSPEC int orte_grpcomm_base_pack_modex_entries(opal_buffer_t *buf);
|
||||
ORTE_DECLSPEC int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name,
|
||||
opal_buffer_t *rbuf);
|
||||
|
@ -62,8 +62,10 @@ orte_grpcomm_coll_id_t orte_grpcomm_base_get_coll_id(void)
|
||||
|
||||
|
||||
/*************** MODEX SECTION **************/
|
||||
int orte_grpcomm_base_modex(orte_grpcomm_collective_t *modex)
|
||||
void orte_grpcomm_base_modex(int fd, short args, void *cbdata)
|
||||
{
|
||||
orte_grpcomm_caddy_t *caddy = (orte_grpcomm_caddy_t*)cbdata;
|
||||
orte_grpcomm_collective_t *modex = caddy->op;
|
||||
int rc;
|
||||
orte_namelist_t *nm;
|
||||
opal_list_item_t *item;
|
||||
@ -76,7 +78,6 @@ int orte_grpcomm_base_modex(orte_grpcomm_collective_t *modex)
|
||||
|
||||
if (0 == opal_list_get_size(&modex->participants)) {
|
||||
/* record the collective */
|
||||
modex->active = true;
|
||||
modex->next_cbdata = modex;
|
||||
opal_list_append(&orte_grpcomm_base.active_colls, &modex->super);
|
||||
|
||||
@ -138,7 +139,6 @@ int orte_grpcomm_base_modex(orte_grpcomm_collective_t *modex)
|
||||
/* now add the modex to the global list of active collectives */
|
||||
modex->next_cb = orte_grpcomm_base_store_peer_modex;
|
||||
modex->next_cbdata = modex;
|
||||
modex->active = true;
|
||||
opal_list_append(&orte_grpcomm_base.active_colls, &modex->super);
|
||||
|
||||
/* this is not amongst our peers, but rather between a select
|
||||
@ -215,10 +215,10 @@ int orte_grpcomm_base_modex(orte_grpcomm_collective_t *modex)
|
||||
"%s grpcomm:base:modex: modex posted",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
return;
|
||||
|
||||
cleanup:
|
||||
return rc;
|
||||
return;
|
||||
}
|
||||
|
||||
void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata)
|
||||
|
@ -9,7 +9,7 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2011 Los Alamos National Security, LLC.
|
||||
* Copyright (c) 2011-2012 Los Alamos National Security, LLC.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
@ -138,3 +138,7 @@ OBJ_CLASS_INSTANCE(orte_grpcomm_collective_t,
|
||||
opal_list_item_t,
|
||||
collective_constructor,
|
||||
collective_destructor);
|
||||
|
||||
OBJ_CLASS_INSTANCE(orte_grpcomm_caddy_t,
|
||||
opal_object_t,
|
||||
NULL, NULL);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user