Separate the gathering of collective data by jobid
This commit was SVN r18357.
Этот коммит содержится в:
родитель
ba5615a18f
Коммит
8e846bf7f2
@ -60,6 +60,7 @@ typedef uint8_t orte_grpcomm_mode_t;
|
||||
typedef uint8_t orte_grpcomm_coll_t;
|
||||
#define ORTE_GRPCOMM_COLL_T OPAL_UINT8
|
||||
|
||||
#define ORTE_GRPCOMM_COLL_NONE 0x00
|
||||
#define ORTE_GRPCOMM_BARRIER 0x01
|
||||
#define ORTE_GRPCOMM_ALLGATHER 0x02
|
||||
|
||||
|
@ -1938,9 +1938,6 @@ static bool all_children_participated(orte_jobid_t job)
|
||||
|
||||
}
|
||||
|
||||
static opal_buffer_t *collection_bucket=NULL;
|
||||
static orte_grpcomm_coll_t collective_type;
|
||||
|
||||
int orte_odls_base_default_collect_data(orte_process_name_t *proc,
|
||||
opal_buffer_t *buf)
|
||||
{
|
||||
@ -1998,20 +1995,38 @@ int orte_odls_base_default_collect_data(orte_process_name_t *proc,
|
||||
opal_list_append(&orte_odls_globals.jobs, &jobdat->super);
|
||||
}
|
||||
|
||||
/* find the jobdat for this job */
|
||||
jobdat = NULL;
|
||||
for (item = opal_list_get_first(&orte_odls_globals.jobs);
|
||||
item != opal_list_get_end(&orte_odls_globals.jobs);
|
||||
item = opal_list_get_next(item)) {
|
||||
jobdat = (orte_odls_job_t*)item;
|
||||
|
||||
/* is this the specified job? */
|
||||
if (jobdat->jobid == proc->jobid) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == jobdat) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
rc = ORTE_ERR_NOT_FOUND;
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* unpack the collective type */
|
||||
n = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &collective_type, &n, ORTE_GRPCOMM_COLL_T))) {
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, &jobdat->collective_type, &n, ORTE_GRPCOMM_COLL_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* if the collection bucket isn't initialized, do so now */
|
||||
if (NULL == collection_bucket) {
|
||||
collection_bucket = OBJ_NEW(opal_buffer_t);
|
||||
if (NULL == jobdat->collection_bucket) {
|
||||
jobdat->collection_bucket = OBJ_NEW(opal_buffer_t);
|
||||
}
|
||||
|
||||
/* collect the provided data */
|
||||
opal_dss.copy_payload(collection_bucket, buf);
|
||||
opal_dss.copy_payload(jobdat->collection_bucket, buf);
|
||||
|
||||
/* flag this proc as having participated */
|
||||
child->coll_recvd = true;
|
||||
@ -2024,33 +2039,14 @@ int orte_odls_base_default_collect_data(orte_process_name_t *proc,
|
||||
"%s odls: executing collective",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* find the jobdat for this job */
|
||||
jobdat = NULL;
|
||||
for (item = opal_list_get_first(&orte_odls_globals.jobs);
|
||||
item != opal_list_get_end(&orte_odls_globals.jobs);
|
||||
item = opal_list_get_next(item)) {
|
||||
jobdat = (orte_odls_job_t*)item;
|
||||
|
||||
/* is this the specified job? */
|
||||
if (jobdat->jobid == proc->jobid) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == jobdat) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
rc = ORTE_ERR_NOT_FOUND;
|
||||
OBJ_RELEASE(collection_bucket);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.daemon_collective(proc->jobid, num_local_contributors,
|
||||
collective_type, collection_bucket,
|
||||
jobdat->collective_type, jobdat->collection_bucket,
|
||||
jobdat->hnp_has_local_procs))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
|
||||
/* release the collection bucket for reuse */
|
||||
OBJ_RELEASE(collection_bucket);
|
||||
OBJ_RELEASE(jobdat->collection_bucket);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_odls_globals.output,
|
||||
"%s odls: collective completed",
|
||||
|
@ -92,6 +92,8 @@ static void orte_odls_job_constructor(orte_odls_job_t *ptr)
|
||||
ptr->hnp_has_local_procs = false;
|
||||
ptr->procmap = NULL;
|
||||
ptr->pmap = NULL;
|
||||
ptr->collection_bucket = NULL;
|
||||
ptr->collective_type = ORTE_GRPCOMM_COLL_NONE;
|
||||
}
|
||||
static void orte_odls_job_destructor(orte_odls_job_t *ptr)
|
||||
{
|
||||
@ -114,6 +116,10 @@ static void orte_odls_job_destructor(orte_odls_job_t *ptr)
|
||||
free(ptr->pmap->bytes);
|
||||
free(ptr->pmap);
|
||||
}
|
||||
|
||||
if (NULL != ptr->collection_bucket) {
|
||||
OBJ_RELEASE(ptr->collection_bucket);
|
||||
}
|
||||
}
|
||||
OBJ_CLASS_INSTANCE(orte_odls_job_t,
|
||||
opal_list_item_t,
|
||||
|
@ -31,8 +31,9 @@
|
||||
#include "opal/class/opal_pointer_array.h"
|
||||
#include "opal/threads/mutex.h"
|
||||
#include "opal/threads/condition.h"
|
||||
|
||||
#include "opal/dss/dss_types.h"
|
||||
|
||||
#include "orte/mca/grpcomm/grpcomm_types.h"
|
||||
#include "orte/mca/plm/plm_types.h"
|
||||
#include "orte/mca/rmaps/rmaps_types.h"
|
||||
#include "orte/mca/rml/rml_types.h"
|
||||
@ -70,16 +71,18 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_child_t);
|
||||
* List object to locally store job related info
|
||||
*/
|
||||
typedef struct orte_odls_job_t {
|
||||
opal_list_item_t super; /* required to place this on a list */
|
||||
orte_jobid_t jobid; /* jobid for this data */
|
||||
orte_app_context_t **apps; /* app_contexts for this job */
|
||||
orte_std_cntr_t num_apps; /* number of app_contexts */
|
||||
orte_std_cntr_t total_slots_alloc;
|
||||
orte_vpid_t num_procs;
|
||||
uint8_t num_local_procs;
|
||||
bool hnp_has_local_procs;
|
||||
orte_pmap_t *procmap; /* map of procs/node, local ranks */
|
||||
opal_byte_object_t *pmap; /* byte object version of procmap */
|
||||
opal_list_item_t super; /* required to place this on a list */
|
||||
orte_jobid_t jobid; /* jobid for this data */
|
||||
orte_app_context_t **apps; /* app_contexts for this job */
|
||||
orte_std_cntr_t num_apps; /* number of app_contexts */
|
||||
orte_std_cntr_t total_slots_alloc;
|
||||
orte_vpid_t num_procs;
|
||||
uint8_t num_local_procs;
|
||||
bool hnp_has_local_procs;
|
||||
orte_pmap_t *procmap; /* map of procs/node, local ranks */
|
||||
opal_byte_object_t *pmap; /* byte object version of procmap */
|
||||
opal_buffer_t *collection_bucket;
|
||||
orte_grpcomm_coll_t collective_type;
|
||||
} orte_odls_job_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_odls_job_t);
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user