1
1

Fix the grpcomm operations at scale. Restore the direct component to be the default, and to execute a rollup collective. This may in fact be faster than the alternatives, and something appears broken at scale when using brks in particular. Turn off the rcd and brks components as they don't work at scale right now - they can be restored at some future point when someone can debug them.

Adjust to Jeff's quibbles

Fixes open-mpi/mpi#1215
Этот коммит содержится в:
Ralph Castain 2016-01-31 20:29:55 -08:00
родитель 6eac6a8b00
Коммит 68912d04a8
8 изменённых файлов: 115 добавлений и 58 удалений

Просмотреть файл

@ -12,7 +12,7 @@
* All rights reserved.
* Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -122,6 +122,7 @@ static void ccon(orte_grpcomm_coll_t *p)
OBJ_CONSTRUCT(&p->distance_mask_recv, opal_bitmap_t);
p->dmns = NULL;
p->ndmns = 0;
p->nexpected = 0;
p->nreported = 0;
p->cbfunc = NULL;
p->cbdata = NULL;

Просмотреть файл

@ -157,7 +157,7 @@ static void allgather_stub(int fd, short args, void *cbdata)
ret = opal_hash_table_set_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void *)&cd->sig->seq_num);
if (OPAL_SUCCESS != ret) {
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
"%s rpcomm:base:allgather can't not add new signature to hash table",
"%s rpcomm:base:allgather cannot add new signature to hash table",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cd);
@ -208,6 +208,9 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig
{
orte_grpcomm_coll_t *coll;
int rc;
orte_namelist_t *nm;
opal_list_t children;
size_t n;
/* search the existing tracker list to see if this already exists */
OPAL_LIST_FOREACH(coll, &orte_grpcomm_base.ongoing, orte_grpcomm_coll_t) {
@ -254,6 +257,30 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig
ORTE_ERROR_LOG(rc);
return NULL;
}
/* cycle thru the array of daemons and compare them to our
* children in the routing tree, counting the ones that match
* so we know how many daemons we should receive contributions from */
OBJ_CONSTRUCT(&children, opal_list_t);
orte_routed.get_routing_list(&children);
while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&children))) {
for (n=0; n < coll->ndmns; n++) {
if (nm->name.vpid == coll->dmns[n]) {
coll->nexpected++;
break;
}
}
OBJ_RELEASE(nm);
}
OPAL_LIST_DESTRUCT(&children);
/* see if I am in the array of participants - note that I may
* be in the rollup tree even though I'm not participating
* in the collective itself */
for (n=0; n < coll->ndmns; n++) {
if (coll->dmns[n] == ORTE_PROC_MY_NAME->vpid) {
coll->nexpected++;
break;
}
}
return coll;
}
@ -292,6 +319,9 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
/* all daemons hosting this jobid are participating */
if (NULL == (jdata = orte_get_job_data_object(sig->signature[0].jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
if (NULL == jdata->map) {
@ -326,7 +356,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
/* should never happen */
ORTE_ERROR_LOG(ORTE_ERROR);
free(dns);
return ORTE_ERROR;
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:create_dmns adding daemon %s to array",
@ -343,6 +376,9 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
if (NULL == (jdata = orte_get_job_data_object(sig->signature[n].jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_LIST_DESTRUCT(&ds);
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
opal_output_verbose(5, orte_grpcomm_base_framework.framework_output,
@ -352,12 +388,17 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sig->signature[n].vpid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_LIST_DESTRUCT(&ds);
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
if (NULL == proc->node || NULL == proc->node->daemon) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_LIST_DESTRUCT(&ds);
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
vpid = proc->node->daemon->name.vpid;
@ -377,7 +418,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
if (0 == opal_list_get_size(&ds)) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
OPAL_LIST_DESTRUCT(&ds);
return ORTE_ERR_BAD_PARAM;
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
dns = (orte_vpid_t*)malloc(opal_list_get_size(&ds) * sizeof(orte_vpid_t));
nds = 0;

0
orte/mca/grpcomm/brks/.opal_ignore Обычный файл
Просмотреть файл

Просмотреть файл

@ -5,7 +5,7 @@
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All
* rights reserved.
* Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -124,7 +124,7 @@ static int xcast(orte_vpid_t *vpids,
static int allgather(orte_grpcomm_coll_t *coll,
opal_buffer_t *buf)
{
int rc, ret;
int rc;
opal_buffer_t *relay;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
@ -143,35 +143,16 @@ static int allgather(orte_grpcomm_coll_t *coll,
return rc;
}
/* if we are the HNP and nobody else is participating,
* then just execute the xcast */
if (ORTE_PROC_IS_HNP && 1 == coll->ndmns) {
/* pack the status - success since the allgather completed. This
* would be an error if we timeout instead */
ret = ORTE_SUCCESS;
if (OPAL_SUCCESS != (rc = opal_dss.pack(relay, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(relay);
return rc;
}
/* pass along the payload */
opal_dss.copy_payload(relay, buf);
orte_grpcomm.xcast(coll->sig, ORTE_RML_TAG_COLL_RELEASE, relay);
OBJ_RELEASE(relay);
return ORTE_SUCCESS;
}
/* pass along the payload */
opal_dss.copy_payload(relay, buf);
/* otherwise, we need to send this to the HNP for
* processing */
/* send this to ourselves for processing */
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:allgather sending to HNP",
"%s grpcomm:direct:allgather sending to ourself",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* send the info to the HNP for tracking */
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, relay,
/* send the info to ourselves for tracking */
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_NAME, relay,
ORTE_RML_TAG_ALLGATHER_DIRECT,
orte_rml_send_callback, NULL);
return rc;
@ -212,35 +193,60 @@ static void allgather_recv(int status, orte_process_name_t* sender,
opal_dss.copy_payload(&coll->bucket, buffer);
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct allgather recv ndmns %d nrep %d",
"%s grpcomm:direct allgather recv nexpected %d nrep %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)coll->ndmns, (int)coll->nreported));
(int)coll->nexpected, (int)coll->nreported));
/* if all participating daemons have reported */
if (coll->ndmns == coll->nreported) {
reply = OBJ_NEW(opal_buffer_t);
/* pack the signature */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc);
/* see if everyone has reported */
if (coll->nreported == coll->nexpected) {
if (ORTE_PROC_IS_HNP) {
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct allgather HNP reports complete",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* the allgather is complete - send the xcast */
reply = OBJ_NEW(opal_buffer_t);
/* pack the signature */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* pack the status - success since the allgather completed. This
* would be an error if we timeout instead */
ret = ORTE_SUCCESS;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* transfer the collected bucket */
opal_dss.copy_payload(reply, &coll->bucket);
/* send the release via xcast */
(void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
} else {
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct allgather rollup complete - sending to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_PARENT)));
/* relay the bucket upward */
reply = OBJ_NEW(opal_buffer_t);
/* pack the signature */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* transfer the collected bucket */
opal_dss.copy_payload(reply, &coll->bucket);
/* send the info to our parent */
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_PARENT, reply,
ORTE_RML_TAG_ALLGATHER_DIRECT,
orte_rml_send_callback, NULL);
}
/* pack the status - success since the allgather completed. This
* would be an error if we timeout instead */
ret = ORTE_SUCCESS;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* transfer the collected bucket */
opal_dss.copy_payload(reply, &coll->bucket);
/* send the release via xcast */
(void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply);
OBJ_RELEASE(reply);
}
OBJ_RELEASE(sig);
}

Просмотреть файл

@ -1,7 +1,7 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2015 Los Alamos National Security, LLC. All rights
* Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
@ -55,7 +55,7 @@ static int direct_register(void)
/* make the priority adjustable so users can select
* direct for use by apps without affecting daemons
*/
my_priority = 1;
my_priority = 85;
(void) mca_base_component_var_register(c, "priority",
"Priority of the grpcomm direct component",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,

Просмотреть файл

@ -77,6 +77,8 @@ typedef struct {
size_t ndmns;
/** my index in the dmns array */
unsigned long my_rank;
/* number of buckets expected */
size_t nexpected;
/* number reported in */
size_t nreported;
/* distance masks for receive */

0
orte/mca/grpcomm/rcd/.opal_ignore Обычный файл
Просмотреть файл

Просмотреть файл

@ -14,7 +14,7 @@
* Copyright (c) 2011-2013 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2011-2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013-2015 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
@ -249,6 +249,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
orte_app_context_t *app;
bool found;
orte_node_t *node;
bool newmap = false;
OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output,
"%s odls:constructing child list",
@ -398,6 +399,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
/* ensure the map object is present */
if (NULL == jdata->map) {
jdata->map = OBJ_NEW(orte_job_map_t);
newmap = true;
}
/* if we have a file map, then we need to load it */
@ -455,7 +457,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
if (!found) {
OBJ_RETAIN(dmn->node);
opal_pointer_array_add(jdata->map->nodes, dmn->node);
jdata->map->num_nodes++;
if (newmap) {
jdata->map->num_nodes++;
}
}
/* see if it belongs to us */