1
1

Merge pull request #1335 from rhc54/topic/gcom

Cleanup grpcomm race conditions
Этот коммит содержится в:
rhc54 2016-02-04 05:47:51 -08:00
родитель 6eac6a8b00 68912d04a8
Коммит f38ad4adf3
8 изменённых файлов: 115 добавлений и 58 удалений

Просмотреть файл

@ -12,7 +12,7 @@
* All rights reserved.
* Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -122,6 +122,7 @@ static void ccon(orte_grpcomm_coll_t *p)
OBJ_CONSTRUCT(&p->distance_mask_recv, opal_bitmap_t);
p->dmns = NULL;
p->ndmns = 0;
p->nexpected = 0;
p->nreported = 0;
p->cbfunc = NULL;
p->cbdata = NULL;

Просмотреть файл

@ -157,7 +157,7 @@ static void allgather_stub(int fd, short args, void *cbdata)
ret = opal_hash_table_set_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void *)&cd->sig->seq_num);
if (OPAL_SUCCESS != ret) {
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
"%s rpcomm:base:allgather can't not add new signature to hash table",
"%s rpcomm:base:allgather cannot add new signature to hash table",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cd);
@ -208,6 +208,9 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig
{
orte_grpcomm_coll_t *coll;
int rc;
orte_namelist_t *nm;
opal_list_t children;
size_t n;
/* search the existing tracker list to see if this already exists */
OPAL_LIST_FOREACH(coll, &orte_grpcomm_base.ongoing, orte_grpcomm_coll_t) {
@ -254,6 +257,30 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig
ORTE_ERROR_LOG(rc);
return NULL;
}
/* cycle thru the array of daemons and compare them to our
* children in the routing tree, counting the ones that match
* so we know how many daemons we should receive contributions from */
OBJ_CONSTRUCT(&children, opal_list_t);
orte_routed.get_routing_list(&children);
while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&children))) {
for (n=0; n < coll->ndmns; n++) {
if (nm->name.vpid == coll->dmns[n]) {
coll->nexpected++;
break;
}
}
OBJ_RELEASE(nm);
}
OPAL_LIST_DESTRUCT(&children);
/* see if I am in the array of participants - note that I may
* be in the rollup tree even though I'm not participating
* in the collective itself */
for (n=0; n < coll->ndmns; n++) {
if (coll->dmns[n] == ORTE_PROC_MY_NAME->vpid) {
coll->nexpected++;
break;
}
}
return coll;
}
@ -292,6 +319,9 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
/* all daemons hosting this jobid are participating */
if (NULL == (jdata = orte_get_job_data_object(sig->signature[0].jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
if (NULL == jdata->map) {
@ -326,7 +356,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
/* should never happen */
ORTE_ERROR_LOG(ORTE_ERROR);
free(dns);
return ORTE_ERROR;
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:create_dmns adding daemon %s to array",
@ -343,6 +376,9 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
if (NULL == (jdata = orte_get_job_data_object(sig->signature[n].jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_LIST_DESTRUCT(&ds);
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
opal_output_verbose(5, orte_grpcomm_base_framework.framework_output,
@ -352,12 +388,17 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sig->signature[n].vpid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_LIST_DESTRUCT(&ds);
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
if (NULL == proc->node || NULL == proc->node->daemon) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_LIST_DESTRUCT(&ds);
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
vpid = proc->node->daemon->name.vpid;
@ -377,7 +418,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
if (0 == opal_list_get_size(&ds)) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
OPAL_LIST_DESTRUCT(&ds);
return ORTE_ERR_BAD_PARAM;
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
dns = (orte_vpid_t*)malloc(opal_list_get_size(&ds) * sizeof(orte_vpid_t));
nds = 0;

0
orte/mca/grpcomm/brks/.opal_ignore Обычный файл
Просмотреть файл

Просмотреть файл

@ -5,7 +5,7 @@
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All
* rights reserved.
* Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -124,7 +124,7 @@ static int xcast(orte_vpid_t *vpids,
static int allgather(orte_grpcomm_coll_t *coll,
opal_buffer_t *buf)
{
int rc, ret;
int rc;
opal_buffer_t *relay;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
@ -143,35 +143,16 @@ static int allgather(orte_grpcomm_coll_t *coll,
return rc;
}
/* if we are the HNP and nobody else is participating,
* then just execute the xcast */
if (ORTE_PROC_IS_HNP && 1 == coll->ndmns) {
/* pack the status - success since the allgather completed. This
* would be an error if we timeout instead */
ret = ORTE_SUCCESS;
if (OPAL_SUCCESS != (rc = opal_dss.pack(relay, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(relay);
return rc;
}
/* pass along the payload */
opal_dss.copy_payload(relay, buf);
orte_grpcomm.xcast(coll->sig, ORTE_RML_TAG_COLL_RELEASE, relay);
OBJ_RELEASE(relay);
return ORTE_SUCCESS;
}
/* pass along the payload */
opal_dss.copy_payload(relay, buf);
/* otherwise, we need to send this to the HNP for
* processing */
/* send this to ourselves for processing */
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:allgather sending to HNP",
"%s grpcomm:direct:allgather sending to ourself",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* send the info to the HNP for tracking */
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, relay,
/* send the info to ourselves for tracking */
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_NAME, relay,
ORTE_RML_TAG_ALLGATHER_DIRECT,
orte_rml_send_callback, NULL);
return rc;
@ -212,35 +193,60 @@ static void allgather_recv(int status, orte_process_name_t* sender,
opal_dss.copy_payload(&coll->bucket, buffer);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct allgather recv ndmns %d nrep %d",
"%s grpcomm:direct allgather recv nexpected %d nrep %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)coll->ndmns, (int)coll->nreported));
(int)coll->nexpected, (int)coll->nreported));
/* if all participating daemons have reported */
if (coll->ndmns == coll->nreported) {
reply = OBJ_NEW(opal_buffer_t);
/* pack the signature */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc);
/* see if everyone has reported */
if (coll->nreported == coll->nexpected) {
if (ORTE_PROC_IS_HNP) {
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct allgather HNP reports complete",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* the allgather is complete - send the xcast */
reply = OBJ_NEW(opal_buffer_t);
/* pack the signature */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* pack the status - success since the allgather completed. This
* would be an error if we timeout instead */
ret = ORTE_SUCCESS;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* transfer the collected bucket */
opal_dss.copy_payload(reply, &coll->bucket);
/* send the release via xcast */
(void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
} else {
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct allgather rollup complete - sending to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_PARENT)));
/* relay the bucket upward */
reply = OBJ_NEW(opal_buffer_t);
/* pack the signature */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* transfer the collected bucket */
opal_dss.copy_payload(reply, &coll->bucket);
/* send the info to our parent */
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_PARENT, reply,
ORTE_RML_TAG_ALLGATHER_DIRECT,
orte_rml_send_callback, NULL);
}
/* pack the status - success since the allgather completed. This
* would be an error if we timeout instead */
ret = ORTE_SUCCESS;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* transfer the collected bucket */
opal_dss.copy_payload(reply, &coll->bucket);
/* send the release via xcast */
(void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply);
OBJ_RELEASE(reply);
}
OBJ_RELEASE(sig);
}

Просмотреть файл

@ -1,7 +1,7 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2015 Los Alamos National Security, LLC. All rights
* Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
@ -55,7 +55,7 @@ static int direct_register(void)
/* make the priority adjustable so users can select
* direct for use by apps without affecting daemons
*/
my_priority = 1;
my_priority = 85;
(void) mca_base_component_var_register(c, "priority",
"Priority of the grpcomm direct component",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,

Просмотреть файл

@ -77,6 +77,8 @@ typedef struct {
size_t ndmns;
/** my index in the dmns array */
unsigned long my_rank;
/* number of buckets expected */
size_t nexpected;
/* number reported in */
size_t nreported;
/* distance masks for receive */

0
orte/mca/grpcomm/rcd/.opal_ignore Обычный файл
Просмотреть файл

Просмотреть файл

@ -14,7 +14,7 @@
* Copyright (c) 2011-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2011-2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013-2015 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -249,6 +249,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
orte_app_context_t *app;
bool found;
orte_node_t *node;
bool newmap = false;
OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output,
"%s odls:constructing child list",
@ -398,6 +399,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
/* ensure the map object is present */
if (NULL == jdata->map) {
jdata->map = OBJ_NEW(orte_job_map_t);
newmap = true;
}
/* if we have a file map, then we need to load it */
@ -455,7 +457,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
if (!found) {
OBJ_RETAIN(dmn->node);
opal_pointer_array_add(jdata->map->nodes, dmn->node);
jdata->map->num_nodes++;
if (newmap) {
jdata->map->num_nodes++;
}
}
/* see if it belongs to us */