diff --git a/orte/mca/grpcomm/bad/grpcomm_bad_module.c b/orte/mca/grpcomm/bad/grpcomm_bad_module.c index c8370f83b8..bf6b4146b2 100644 --- a/orte/mca/grpcomm/bad/grpcomm_bad_module.c +++ b/orte/mca/grpcomm/bad/grpcomm_bad_module.c @@ -54,6 +54,7 @@ static int xcast(orte_jobid_t job, orte_rml_tag_t tag); static int bad_allgather(orte_grpcomm_collective_t *coll); static int bad_barrier(orte_grpcomm_collective_t *coll); +static int bad_modex(orte_grpcomm_collective_t *modex); /* Module def */ orte_grpcomm_base_module_t orte_grpcomm_bad_module = { @@ -62,7 +63,7 @@ orte_grpcomm_base_module_t orte_grpcomm_bad_module = { xcast, bad_allgather, bad_barrier, - orte_grpcomm_base_modex + bad_modex }; /** @@ -133,29 +134,14 @@ CLEANUP: return rc; } - -static int bad_barrier(orte_grpcomm_collective_t *coll) +static void process_barrier(int fd, short args, void *cbdata) { + orte_grpcomm_caddy_t *caddy = (orte_grpcomm_caddy_t*)cbdata; + orte_grpcomm_collective_t *coll = caddy->op; int rc; opal_buffer_t *buf; orte_namelist_t *nm; - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output, - "%s grpcomm:bad entering barrier", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* if I am alone, just execute the callback */ - if (1 == orte_process_info.num_procs) { - coll->active = false; - if (NULL != coll->cbfunc) { - coll->cbfunc(NULL, coll->cbdata); - } - return ORTE_SUCCESS; - } - - /* mark the collective as active */ - coll->active = true; - /* setup the collective */ opal_list_append(&orte_grpcomm_base.active_colls, &coll->super); @@ -183,41 +169,47 @@ static int bad_barrier(orte_grpcomm_collective_t *coll) ORTE_ERROR_LOG(rc); OBJ_RELEASE(buf); opal_list_remove_item(&orte_grpcomm_base.active_colls, &coll->super); - return rc; + return; } OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base.output, "%s grpcomm:bad barrier underway", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - return rc; } -static int bad_allgather(orte_grpcomm_collective_t *gather) +static int bad_barrier(orte_grpcomm_collective_t *coll) { - int rc; - opal_buffer_t *buf; - orte_namelist_t *nm; - opal_list_item_t *item; - OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output, - "%s grpcomm:bad entering allgather", + "%s grpcomm:bad entering barrier", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - /* if I am alone and nobody else is participating, then - * nothing really to do - */ - if (1 == orte_process_info.num_procs && - 0 == opal_list_get_size(&gather->participants)) { - gather->active = false; - if (NULL != gather->cbfunc) { - gather->cbfunc(&gather->buffer, gather->cbdata); + /* if I am alone, just execute the callback */ + if (1 == orte_process_info.num_procs) { + coll->active = false; + if (NULL != coll->cbfunc) { + coll->cbfunc(NULL, coll->cbdata); } return ORTE_SUCCESS; } /* mark the collective as active */ - gather->active = true; + coll->active = true; + + /* push it into the event library for processing as + * we will be accessing global lists + */ + ORTE_GRPCOMM_ACTIVATE(coll, process_barrier); + return ORTE_SUCCESS; +} + +static void process_allgather(int fd, short args, void *cbdata) +{ + orte_grpcomm_caddy_t *caddy = (orte_grpcomm_caddy_t*)cbdata; + orte_grpcomm_collective_t *gather = caddy->op; + int rc; + opal_buffer_t *buf; + orte_namelist_t *nm; + opal_list_item_t *item; /* if this is an original request, then record the collective */ if (NULL == gather->next_cb) { @@ -250,7 +242,7 @@ static int bad_allgather(orte_grpcomm_collective_t *gather) ORTE_ERROR_LOG(rc); OBJ_RELEASE(buf); opal_list_remove_item(&orte_grpcomm_base.active_colls, &gather->super); - return rc; + return; } } else { /* send directly to each participant - note that this will @@ -274,15 +266,54 @@ static int bad_allgather(orte_grpcomm_collective_t *gather) ORTE_ERROR_LOG(rc); OBJ_RELEASE(buf); opal_list_remove_item(&orte_grpcomm_base.active_colls, &gather->super); - return rc; + return; } } - return ORTE_SUCCESS; + return; } OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output, "%s grpcomm:bad allgather underway", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); +} + +static int bad_allgather(orte_grpcomm_collective_t *gather) +{ + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base.output, + "%s grpcomm:bad entering allgather", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + /* if I am alone and nobody else is participating, then + * nothing really to do + */ + if (1 == orte_process_info.num_procs && + 0 == opal_list_get_size(&gather->participants)) { + gather->active = false; + if (NULL != gather->cbfunc) { + gather->cbfunc(&gather->buffer, gather->cbdata); + } + return ORTE_SUCCESS; + } + + /* mark the collective as active */ + gather->active = true; + + /* push it into the event library for processing as + * we will be accessing global lists + */ + ORTE_GRPCOMM_ACTIVATE(gather, process_allgather); + return ORTE_SUCCESS; +} + +static int bad_modex(orte_grpcomm_collective_t *modex) +{ + /* mark the collective as active */ + modex->active = true; + + /* we need to get this into the event library + * to avoid race conditions with modex data arriving + * from other sources via the RML + */ + ORTE_GRPCOMM_ACTIVATE(modex, orte_grpcomm_base_modex); return ORTE_SUCCESS; } diff --git a/orte/mca/grpcomm/base/base.h b/orte/mca/grpcomm/base/base.h index 917ee1f6cb..815314c937 100644 --- a/orte/mca/grpcomm/base/base.h +++ b/orte/mca/grpcomm/base/base.h @@ -66,6 +66,28 @@ typedef struct { #endif } orte_grpcomm_base_t; +typedef struct { + opal_object_t super; + opal_event_t ev; + orte_grpcomm_collective_t *op; +} orte_grpcomm_caddy_t; +OBJ_CLASS_DECLARATION(orte_grpcomm_caddy_t); + +#define ORTE_GRPCOMM_ACTIVATE(o, cb) \ + do { \ + orte_grpcomm_caddy_t *caddy; \ + OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base.output, \ + "%s ACTIVATING GRCPCOMM OP %d at %s:%d", \ + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \ + (o)->id, __FILE__, __LINE__)); \ + caddy = OBJ_NEW(orte_grpcomm_caddy_t); \ + caddy->op = (o); \ + opal_event_set(orte_event_base, &caddy->ev, -1, \ + OPAL_EV_WRITE, (cb), caddy); \ + opal_event_set_priority(&caddy->ev, ORTE_MSG_PRI); \ + opal_event_active(&caddy->ev, OPAL_EV_WRITE, 1); \ + } while(0); + ORTE_DECLSPEC extern orte_grpcomm_base_t orte_grpcomm_base; ORTE_DECLSPEC orte_grpcomm_collective_t* orte_grpcomm_base_setup_collective(orte_grpcomm_coll_id_t id); @@ -82,7 +104,7 @@ ORTE_DECLSPEC void orte_grpcomm_base_rollup_recv(int status, orte_process_name_t /* modex support */ ORTE_DECLSPEC void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata); ORTE_DECLSPEC void orte_grpcomm_base_store_modex(opal_buffer_t *rbuf, void *cbdata); -ORTE_DECLSPEC int orte_grpcomm_base_modex(orte_grpcomm_collective_t *modex); +ORTE_DECLSPEC void orte_grpcomm_base_modex(int fd, short args, void *cbdata); ORTE_DECLSPEC int orte_grpcomm_base_pack_modex_entries(opal_buffer_t *buf); ORTE_DECLSPEC int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name, opal_buffer_t *rbuf); diff --git a/orte/mca/grpcomm/base/grpcomm_base_modex.c b/orte/mca/grpcomm/base/grpcomm_base_modex.c index 58fb56a3f5..b562fb5350 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_modex.c +++ b/orte/mca/grpcomm/base/grpcomm_base_modex.c @@ -62,8 +62,10 @@ orte_grpcomm_coll_id_t orte_grpcomm_base_get_coll_id(void) /*************** MODEX SECTION **************/ -int orte_grpcomm_base_modex(orte_grpcomm_collective_t *modex) +void orte_grpcomm_base_modex(int fd, short args, void *cbdata) { + orte_grpcomm_caddy_t *caddy = (orte_grpcomm_caddy_t*)cbdata; + orte_grpcomm_collective_t *modex = caddy->op; int rc; orte_namelist_t *nm; opal_list_item_t *item; @@ -76,7 +78,6 @@ int orte_grpcomm_base_modex(orte_grpcomm_collective_t *modex) if (0 == opal_list_get_size(&modex->participants)) { /* record the collective */ - modex->active = true; modex->next_cbdata = modex; opal_list_append(&orte_grpcomm_base.active_colls, &modex->super); @@ -138,7 +139,6 @@ int orte_grpcomm_base_modex(orte_grpcomm_collective_t *modex) /* now add the modex to the global list of active collectives */ modex->next_cb = orte_grpcomm_base_store_peer_modex; modex->next_cbdata = modex; - modex->active = true; opal_list_append(&orte_grpcomm_base.active_colls, &modex->super); /* this is not amongst our peers, but rather between a select @@ -215,10 +215,10 @@ int orte_grpcomm_base_modex(orte_grpcomm_collective_t *modex) "%s grpcomm:base:modex: modex posted", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - return ORTE_SUCCESS; + return; cleanup: - return rc; + return; } void orte_grpcomm_base_store_peer_modex(opal_buffer_t *rbuf, void *cbdata) diff --git a/orte/mca/grpcomm/base/grpcomm_base_open.c b/orte/mca/grpcomm/base/grpcomm_base_open.c index e1f4de594c..f445784450 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_open.c +++ b/orte/mca/grpcomm/base/grpcomm_base_open.c @@ -9,7 +9,7 @@ * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. - * Copyright (c) 2011 Los Alamos National Security, LLC. + * Copyright (c) 2011-2012 Los Alamos National Security, LLC. * All rights reserved. * $COPYRIGHT$ * @@ -138,3 +138,7 @@ OBJ_CLASS_INSTANCE(orte_grpcomm_collective_t, opal_list_item_t, collective_constructor, collective_destructor); + +OBJ_CLASS_INSTANCE(orte_grpcomm_caddy_t, + opal_object_t, + NULL, NULL);