From 68912d04a876bc29644f30f9a916e001a2fba92f Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Sun, 31 Jan 2016 20:29:55 -0800 Subject: [PATCH] Fix the grpcomm operations at scale. Restore the direct component to be the default, and to execute a rollup collective. This may in fact be faster than the alternatives, and something appears broken at scale when using brks in particular. Turn off the rcd and brks components as they don't work at scale right now - they can be restored at some future point when someone can debug them. Adjust to Jeff's quibbles Fixes open-mpi/mpi#1215 --- orte/mca/grpcomm/base/grpcomm_base_frame.c | 3 +- orte/mca/grpcomm/base/grpcomm_base_stubs.c | 50 ++++++++- orte/mca/grpcomm/brks/.opal_ignore | 0 orte/mca/grpcomm/direct/grpcomm_direct.c | 106 +++++++++--------- .../grpcomm/direct/grpcomm_direct_component.c | 4 +- orte/mca/grpcomm/grpcomm.h | 2 + orte/mca/grpcomm/rcd/.opal_ignore | 0 orte/mca/odls/base/odls_base_default_fns.c | 8 +- 8 files changed, 115 insertions(+), 58 deletions(-) create mode 100644 orte/mca/grpcomm/brks/.opal_ignore create mode 100644 orte/mca/grpcomm/rcd/.opal_ignore diff --git a/orte/mca/grpcomm/base/grpcomm_base_frame.c b/orte/mca/grpcomm/base/grpcomm_base_frame.c index c6362c7162..242e4410f0 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_frame.c +++ b/orte/mca/grpcomm/base/grpcomm_base_frame.c @@ -12,7 +12,7 @@ * All rights reserved. * Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights * reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -122,6 +122,7 @@ static void ccon(orte_grpcomm_coll_t *p) OBJ_CONSTRUCT(&p->distance_mask_recv, opal_bitmap_t); p->dmns = NULL; p->ndmns = 0; + p->nexpected = 0; p->nreported = 0; p->cbfunc = NULL; p->cbdata = NULL; diff --git a/orte/mca/grpcomm/base/grpcomm_base_stubs.c b/orte/mca/grpcomm/base/grpcomm_base_stubs.c index 9e2b657287..621b645da2 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_stubs.c +++ b/orte/mca/grpcomm/base/grpcomm_base_stubs.c @@ -157,7 +157,7 @@ static void allgather_stub(int fd, short args, void *cbdata) ret = opal_hash_table_set_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void *)&cd->sig->seq_num); if (OPAL_SUCCESS != ret) { OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output, - "%s rpcomm:base:allgather can't not add new signature to hash table", + "%s rpcomm:base:allgather cannot add new signature to hash table", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_ERROR_LOG(ret); OBJ_RELEASE(cd); @@ -208,6 +208,9 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig { orte_grpcomm_coll_t *coll; int rc; + orte_namelist_t *nm; + opal_list_t children; + size_t n; /* search the existing tracker list to see if this already exists */ OPAL_LIST_FOREACH(coll, &orte_grpcomm_base.ongoing, orte_grpcomm_coll_t) { @@ -254,6 +257,30 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig ORTE_ERROR_LOG(rc); return NULL; } + /* cycle thru the array of daemons and compare them to our + * children in the routing tree, counting the ones that match + * so we know how many daemons we should receive contributions from */ + OBJ_CONSTRUCT(&children, opal_list_t); + orte_routed.get_routing_list(&children); + while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&children))) { + for (n=0; n < coll->ndmns; n++) { + if (nm->name.vpid == coll->dmns[n]) { + coll->nexpected++; + break; + } + } + OBJ_RELEASE(nm); + } + OPAL_LIST_DESTRUCT(&children); + /* see if I am in the array of participants - note that I may + * be in the rollup tree even though I'm not participating + * in the collective itself */ + for (n=0; n < coll->ndmns; n++) { + if (coll->dmns[n] == ORTE_PROC_MY_NAME->vpid) { + coll->nexpected++; + break; + } + } return coll; } @@ -292,6 +319,9 @@ static int create_dmns(orte_grpcomm_signature_t *sig, /* all daemons hosting this jobid are participating */ if (NULL == (jdata = orte_get_job_data_object(sig->signature[0].jobid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; return ORTE_ERR_NOT_FOUND; } if (NULL == jdata->map) { @@ -326,7 +356,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig, /* should never happen */ ORTE_ERROR_LOG(ORTE_ERROR); free(dns); - return ORTE_ERROR; + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; + return ORTE_ERR_NOT_FOUND; } OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output, "%s grpcomm:base:create_dmns adding daemon %s to array", @@ -343,6 +376,9 @@ static int create_dmns(orte_grpcomm_signature_t *sig, if (NULL == (jdata = orte_get_job_data_object(sig->signature[n].jobid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); OPAL_LIST_DESTRUCT(&ds); + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; return ORTE_ERR_NOT_FOUND; } opal_output_verbose(5, orte_grpcomm_base_framework.framework_output, @@ -352,12 +388,17 @@ static int create_dmns(orte_grpcomm_signature_t *sig, if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sig->signature[n].vpid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); OPAL_LIST_DESTRUCT(&ds); + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; return ORTE_ERR_NOT_FOUND; } if (NULL == proc->node || NULL == proc->node->daemon) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); OPAL_LIST_DESTRUCT(&ds); ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; return ORTE_ERR_NOT_FOUND; } vpid = proc->node->daemon->name.vpid; @@ -377,7 +418,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig, if (0 == opal_list_get_size(&ds)) { ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); OPAL_LIST_DESTRUCT(&ds); - return ORTE_ERR_BAD_PARAM; + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; + return ORTE_ERR_NOT_FOUND; } dns = (orte_vpid_t*)malloc(opal_list_get_size(&ds) * sizeof(orte_vpid_t)); nds = 0; diff --git a/orte/mca/grpcomm/brks/.opal_ignore b/orte/mca/grpcomm/brks/.opal_ignore new file mode 100644 index 0000000000..e69de29bb2 diff --git a/orte/mca/grpcomm/direct/grpcomm_direct.c b/orte/mca/grpcomm/direct/grpcomm_direct.c index efc4671230..4fc737865c 100644 --- a/orte/mca/grpcomm/direct/grpcomm_direct.c +++ b/orte/mca/grpcomm/direct/grpcomm_direct.c @@ -5,7 +5,7 @@ * Copyright (c) 2011 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. All * rights reserved. - * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. * Copyright (c) 2014 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -124,7 +124,7 @@ static int xcast(orte_vpid_t *vpids, static int allgather(orte_grpcomm_coll_t *coll, opal_buffer_t *buf) { - int rc, ret; + int rc; opal_buffer_t *relay; OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, @@ -143,35 +143,16 @@ static int allgather(orte_grpcomm_coll_t *coll, return rc; } - /* if we are the HNP and nobody else is participating, - * then just execute the xcast */ - if (ORTE_PROC_IS_HNP && 1 == coll->ndmns) { - /* pack the status - success since the allgather completed. This - * would be an error if we timeout instead */ - ret = ORTE_SUCCESS; - if (OPAL_SUCCESS != (rc = opal_dss.pack(relay, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(relay); - return rc; - } - /* pass along the payload */ - opal_dss.copy_payload(relay, buf); - orte_grpcomm.xcast(coll->sig, ORTE_RML_TAG_COLL_RELEASE, relay); - OBJ_RELEASE(relay); - return ORTE_SUCCESS; - } - /* pass along the payload */ opal_dss.copy_payload(relay, buf); - /* otherwise, we need to send this to the HNP for - * processing */ + /* send this to ourselves for processing */ OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, - "%s grpcomm:direct:allgather sending to HNP", + "%s grpcomm:direct:allgather sending to ourself", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - /* send the info to the HNP for tracking */ - rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, relay, + /* send the info to ourselves for tracking */ + rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_NAME, relay, ORTE_RML_TAG_ALLGATHER_DIRECT, orte_rml_send_callback, NULL); return rc; @@ -212,35 +193,60 @@ static void allgather_recv(int status, orte_process_name_t* sender, opal_dss.copy_payload(&coll->bucket, buffer); OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, - "%s grpcomm:direct allgather recv ndmns %d nrep %d", + "%s grpcomm:direct allgather recv nexpected %d nrep %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (int)coll->ndmns, (int)coll->nreported)); + (int)coll->nexpected, (int)coll->nreported)); - /* if all participating daemons have reported */ - if (coll->ndmns == coll->nreported) { - reply = OBJ_NEW(opal_buffer_t); - /* pack the signature */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) { - ORTE_ERROR_LOG(rc); + /* see if everyone has reported */ + if (coll->nreported == coll->nexpected) { + if (ORTE_PROC_IS_HNP) { + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, + "%s grpcomm:direct allgather HNP reports complete", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + /* the allgather is complete - send the xcast */ + reply = OBJ_NEW(opal_buffer_t); + /* pack the signature */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(sig); + return; + } + /* pack the status - success since the allgather completed. This + * would be an error if we timeout instead */ + ret = ORTE_SUCCESS; + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(sig); + return; + } + /* transfer the collected bucket */ + opal_dss.copy_payload(reply, &coll->bucket); + /* send the release via xcast */ + (void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply); OBJ_RELEASE(reply); - OBJ_RELEASE(sig); - return; + } else { + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, + "%s grpcomm:direct allgather rollup complete - sending to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(ORTE_PROC_MY_PARENT))); + /* relay the bucket upward */ + reply = OBJ_NEW(opal_buffer_t); + /* pack the signature */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(sig); + return; + } + /* transfer the collected bucket */ + opal_dss.copy_payload(reply, &coll->bucket); + /* send the info to our parent */ + rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_PARENT, reply, + ORTE_RML_TAG_ALLGATHER_DIRECT, + orte_rml_send_callback, NULL); } - /* pack the status - success since the allgather completed. This - * would be an error if we timeout instead */ - ret = ORTE_SUCCESS; - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(sig); - return; - } - /* transfer the collected bucket */ - opal_dss.copy_payload(reply, &coll->bucket); - - /* send the release via xcast */ - (void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply); - OBJ_RELEASE(reply); } OBJ_RELEASE(sig); } diff --git a/orte/mca/grpcomm/direct/grpcomm_direct_component.c b/orte/mca/grpcomm/direct/grpcomm_direct_component.c index ac4b6e693f..3c6cad000d 100644 --- a/orte/mca/grpcomm/direct/grpcomm_direct_component.c +++ b/orte/mca/grpcomm/direct/grpcomm_direct_component.c @@ -1,7 +1,7 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2011 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2011-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2014 Intel, Inc. All rights reserved. * $COPYRIGHT$ @@ -55,7 +55,7 @@ static int direct_register(void) /* make the priority adjustable so users can select * direct for use by apps without affecting daemons */ - my_priority = 1; + my_priority = 85; (void) mca_base_component_var_register(c, "priority", "Priority of the grpcomm direct component", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, diff --git a/orte/mca/grpcomm/grpcomm.h b/orte/mca/grpcomm/grpcomm.h index f3be029c0e..00ddccacc4 100644 --- a/orte/mca/grpcomm/grpcomm.h +++ b/orte/mca/grpcomm/grpcomm.h @@ -77,6 +77,8 @@ typedef struct { size_t ndmns; /** my index in the dmns array */ unsigned long my_rank; + /* number of buckets expected */ + size_t nexpected; /* number reported in */ size_t nreported; /* distance masks for receive */ diff --git a/orte/mca/grpcomm/rcd/.opal_ignore b/orte/mca/grpcomm/rcd/.opal_ignore new file mode 100644 index 0000000000..e69de29bb2 diff --git a/orte/mca/odls/base/odls_base_default_fns.c b/orte/mca/odls/base/odls_base_default_fns.c index bed85583ff..fc1c758d4b 100644 --- a/orte/mca/odls/base/odls_base_default_fns.c +++ b/orte/mca/odls/base/odls_base_default_fns.c @@ -14,7 +14,7 @@ * Copyright (c) 2011-2013 Los Alamos National Security, LLC. * All rights reserved. * Copyright (c) 2011-2013 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. * Copyright (c) 2014 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -249,6 +249,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, orte_app_context_t *app; bool found; orte_node_t *node; + bool newmap = false; OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output, "%s odls:constructing child list", @@ -398,6 +399,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, /* ensure the map object is present */ if (NULL == jdata->map) { jdata->map = OBJ_NEW(orte_job_map_t); + newmap = true; } /* if we have a file map, then we need to load it */ @@ -455,7 +457,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, if (!found) { OBJ_RETAIN(dmn->node); opal_pointer_array_add(jdata->map->nodes, dmn->node); - jdata->map->num_nodes++; + if (newmap) { + jdata->map->num_nodes++; + } } /* see if it belongs to us */