Fix collectives for jobs running across partial allocations
This commit was SVN r26267.
Этот коммит содержится в:
родитель
5d14fa7546
Коммит
4d16790836
@ -69,7 +69,7 @@ typedef enum {
|
||||
} orte_grpcomm_internal_stage_t;
|
||||
|
||||
/* structure for tracking collective operations */
|
||||
struct orte_grpcomm_collective_t {
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
orte_grpcomm_coll_id_t id;
|
||||
/* flag that user can poll on to know when collective
|
||||
@ -113,8 +113,7 @@ struct orte_grpcomm_collective_t {
|
||||
*/
|
||||
orte_grpcomm_collective_cbfunc_t next_cb;
|
||||
void *next_cbdata;
|
||||
};
|
||||
typedef struct orte_grpcomm_collective_t orte_grpcomm_collective_t;
|
||||
} orte_grpcomm_collective_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_grpcomm_collective_t);
|
||||
|
||||
END_C_DECLS
|
||||
|
@ -308,6 +308,20 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *data,
|
||||
/* release the data since it has now been copied into our buffer */
|
||||
free(bo.bytes);
|
||||
|
||||
/* pack the collective ids */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &jdata->peer_modex, 1, ORTE_GRPCOMM_COLL_ID_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &jdata->peer_init_barrier, 1, ORTE_GRPCOMM_COLL_ID_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(data, &jdata->peer_fini_barrier, 1, ORTE_GRPCOMM_COLL_ID_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* pack the procs for this job */
|
||||
for (j=0; j < jdata->procs->size; j++) {
|
||||
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, j))) {
|
||||
@ -622,6 +636,23 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
|
||||
goto REPORT_ERROR;
|
||||
}
|
||||
|
||||
/* unpack the collective ids */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jdata->peer_modex, &cnt, ORTE_GRPCOMM_COLL_ID_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jdata->peer_init_barrier, &cnt, ORTE_GRPCOMM_COLL_ID_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(data, &jdata->peer_fini_barrier, &cnt, ORTE_GRPCOMM_COLL_ID_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* unpack the procs */
|
||||
for (j=0; j < jdata->num_procs; j++) {
|
||||
cnt=1;
|
||||
@ -646,6 +677,35 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
|
||||
}
|
||||
|
||||
COMPLETE:
|
||||
/* if we don't have any local procs, create
|
||||
* the collectives so the job doesn't stall
|
||||
*/
|
||||
if (0 == jdata->num_local_procs) {
|
||||
orte_grpcomm_collective_t *coll;
|
||||
orte_namelist_t *nm;
|
||||
coll = OBJ_NEW(orte_grpcomm_collective_t);
|
||||
coll->id = jdata->peer_modex;
|
||||
nm = OBJ_NEW(orte_namelist_t);
|
||||
nm->name.jobid = jdata->jobid;
|
||||
nm->name.vpid = ORTE_VPID_WILDCARD;
|
||||
opal_list_append(&coll->participants, &nm->super);
|
||||
opal_list_append(&orte_grpcomm_base.active_colls, &coll->super);
|
||||
coll = OBJ_NEW(orte_grpcomm_collective_t);
|
||||
coll->id = jdata->peer_init_barrier;
|
||||
nm = OBJ_NEW(orte_namelist_t);
|
||||
nm->name.jobid = jdata->jobid;
|
||||
nm->name.vpid = ORTE_VPID_WILDCARD;
|
||||
opal_list_append(&coll->participants, &nm->super);
|
||||
opal_list_append(&orte_grpcomm_base.active_colls, &coll->super);
|
||||
coll = OBJ_NEW(orte_grpcomm_collective_t);
|
||||
coll->id = jdata->peer_fini_barrier;
|
||||
nm = OBJ_NEW(orte_namelist_t);
|
||||
nm->name.jobid = jdata->jobid;
|
||||
nm->name.vpid = ORTE_VPID_WILDCARD;
|
||||
opal_list_append(&coll->participants, &nm->super);
|
||||
opal_list_append(&orte_grpcomm_base.active_colls, &coll->super);
|
||||
}
|
||||
|
||||
/* progress any pending collectives */
|
||||
orte_grpcomm_base_progress_collectives();
|
||||
|
||||
|
@ -144,7 +144,6 @@ void orte_plm_base_setup_job(int fd, short args, void *cbdata)
|
||||
int i;
|
||||
orte_app_context_t *app;
|
||||
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
|
||||
orte_grpcomm_coll_id_t modex, bar1, bar2;
|
||||
char *modx_par, *modx_val;
|
||||
char *bar1_par, *bar1_val;
|
||||
char *bar2_par, *bar2_val;
|
||||
@ -183,15 +182,15 @@ void orte_plm_base_setup_job(int fd, short args, void *cbdata)
|
||||
}
|
||||
|
||||
/* get collective ids for the std MPI operations */
|
||||
modex = orte_grpcomm_base_get_coll_id();
|
||||
caddy->jdata->peer_modex = orte_grpcomm_base_get_coll_id();
|
||||
modx_par = mca_base_param_environ_variable("orte", NULL, "peer_modex_id");
|
||||
asprintf(&modx_val, "%d", modex);
|
||||
bar1 = orte_grpcomm_base_get_coll_id();
|
||||
asprintf(&modx_val, "%d", caddy->jdata->peer_modex);
|
||||
caddy->jdata->peer_init_barrier = orte_grpcomm_base_get_coll_id();
|
||||
bar1_par = mca_base_param_environ_variable("orte", NULL, "peer_init_barrier_id");
|
||||
asprintf(&bar1_val, "%d", bar1);
|
||||
bar2 = orte_grpcomm_base_get_coll_id();
|
||||
asprintf(&bar1_val, "%d", caddy->jdata->peer_init_barrier);
|
||||
caddy->jdata->peer_fini_barrier = orte_grpcomm_base_get_coll_id();
|
||||
bar2_par = mca_base_param_environ_variable("orte", NULL, "peer_fini_barrier_id");
|
||||
asprintf(&bar2_val, "%d", bar2);
|
||||
asprintf(&bar2_val, "%d", caddy->jdata->peer_fini_barrier);
|
||||
|
||||
/* if app recovery is not defined, set apps to defaults */
|
||||
for (i=0; i < caddy->jdata->apps->size; i++) {
|
||||
|
@ -359,6 +359,10 @@ typedef struct {
|
||||
* (wildcard), or none (invalid)
|
||||
*/
|
||||
orte_vpid_t stdin_target;
|
||||
/* collective ids */
|
||||
orte_grpcomm_coll_id_t peer_modex;
|
||||
orte_grpcomm_coll_id_t peer_init_barrier;
|
||||
orte_grpcomm_coll_id_t peer_fini_barrier;
|
||||
/* total slots allocated to this job */
|
||||
orte_std_cntr_t total_slots_alloc;
|
||||
/* number of procs in this job */
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user