a523dba41d
We have been getting several requests for new collectives that need to be inserted in various places of the MPI layer, all in support of either checkpoint/restart or various research efforts. Until now, this would require that the collective id's be generated at launch. which required modification s to ORTE and other places. We chose not to make collectives reusable as the race conditions associated with resetting collective counters are daunti ng. This commit extends the collective system to allow self-generation of collective id's that the daemons need to support, thereby allowing developers to request any number of collectives for their work. There is one restriction: RTE collectives must occur at the process level - i.e., we don't curren tly have a way of tagging the collective to a specific thread. From the comment in the code: * In order to allow scalable * generation of collective id's, they are formed as: * * top 32-bits are the jobid of the procs involved in * the collective. For collectives across multiple jobs * (e.g., in a connect_accept), the daemon jobid will * be used as the id will be issued by mpirun. This * won't cause problems because daemons don't use the * collective_id * * bottom 32-bits are a rolling counter that recycles * when the max is hit. The daemon will cleanup each * collective upon completion, so this means a job can * never have more than 2**32 collectives going on at * a time. If someone needs more than that - they've got * a problem. * * Note that this means (for now) that RTE-level collectives * cannot be done by individual threads - they must be * done at the overall process level. This is required as * there is no guaranteed ordering for the collective id's, * and all the participants must agree on the id of the * collective they are executing. So if thread A on one * process asks for a collective id before thread B does, * but B asks before A on another process, the collectives will * be mixed and not result in the expected behavior. We may * find a way to relax this requirement in the future by * adding a thread context id to the jobid field (maybe taking the * lower 16-bits of that field). This commit includes a test program (orte/test/mpi/coll_test.c) that cycles 100 times across barrier and modex collectives. This commit was SVN r32203.
390 строки
15 KiB
C
390 строки
15 KiB
C
/* -*- Mode: C; c-basic-offset:4 ; -*- */
|
|
/*
|
|
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2009 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2009 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
|
|
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
|
|
* Copyright (c) 2011-2013 Los Alamos National Security, LLC.
|
|
* All rights reserved.
|
|
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "orte_config.h"
|
|
#include "orte/constants.h"
|
|
#include "orte/types.h"
|
|
|
|
#include <string.h>
|
|
#ifdef HAVE_SYS_TIME_H
|
|
#include <sys/time.h>
|
|
#endif /* HAVE_SYS_TIME_H */
|
|
|
|
#include "opal/threads/condition.h"
|
|
#include "opal/util/output.h"
|
|
#include "opal/class/opal_hash_table.h"
|
|
#include "opal/dss/dss.h"
|
|
#include "opal/mca/dstore/dstore.h"
|
|
#include "opal/mca/hwloc/base/base.h"
|
|
|
|
#include "orte/mca/errmgr/errmgr.h"
|
|
#include "orte/mca/ess/ess.h"
|
|
#include "orte/mca/rml/rml.h"
|
|
#include "orte/util/attr.h"
|
|
#include "orte/util/name_fns.h"
|
|
#include "orte/util/nidmap.h"
|
|
#include "orte/util/proc_info.h"
|
|
#include "orte/orted/orted.h"
|
|
#include "orte/runtime/orte_globals.h"
|
|
#include "orte/runtime/orte_wait.h"
|
|
|
|
#include "orte/mca/grpcomm/base/base.h"
|
|
#include "orte/mca/grpcomm/grpcomm.h"
|
|
|
|
orte_grpcomm_coll_id_t orte_grpcomm_base_get_coll_id(orte_process_name_t *nm)
|
|
{
|
|
orte_grpcomm_coll_id_t id;
|
|
opal_list_t myvals;
|
|
opal_value_t *kv;
|
|
uint64_t n;
|
|
|
|
OBJ_CONSTRUCT(&myvals, opal_list_t);
|
|
if (ORTE_SUCCESS != opal_dstore.fetch(opal_dstore_internal,
|
|
(opal_identifier_t*)nm,
|
|
ORTE_DB_COLL_ID_CNTR, &myvals)) {
|
|
/* start the counter */
|
|
kv = OBJ_NEW(opal_value_t);
|
|
kv->key = strdup(ORTE_DB_COLL_ID_CNTR);
|
|
kv->type = OPAL_UINT32;
|
|
kv->data.uint32 = 0;
|
|
opal_list_append(&myvals, &kv->super);
|
|
}
|
|
kv = (opal_value_t*)opal_list_get_first(&myvals);
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s CURRENT COLL ID COUNTER %u FOR PROC %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
kv->data.uint32, ORTE_NAME_PRINT(nm)));
|
|
/* construct the next collective id for this job */
|
|
id = kv->data.uint32;
|
|
n = (uint64_t)nm->jobid << 32;
|
|
id |= (n & 0xffffffff00000000);
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s ASSIGNED COLL ID %lu",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
(unsigned long)id));
|
|
|
|
/* rotate to the next value */
|
|
kv->data.uint32++;
|
|
if (UINT32_MAX == kv->data.uint32) {
|
|
/* need to rotate around */
|
|
kv->data.uint32 = 0;
|
|
}
|
|
if (ORTE_SUCCESS != opal_dstore.store(opal_dstore_internal,
|
|
(opal_identifier_t*)nm, kv)) {
|
|
OPAL_LIST_DESTRUCT(&myvals);
|
|
return ORTE_GRPCOMM_COLL_ID_INVALID;
|
|
}
|
|
OPAL_LIST_DESTRUCT(&myvals);
|
|
|
|
return id;
|
|
}
|
|
|
|
|
|
/*************** MODEX SECTION **************/
|
|
void orte_grpcomm_base_modex(int fd, short args, void *cbdata)
|
|
{
|
|
orte_grpcomm_caddy_t *caddy = (orte_grpcomm_caddy_t*)cbdata;
|
|
orte_grpcomm_collective_t *modex = caddy->op;
|
|
int rc;
|
|
orte_namelist_t *nm;
|
|
opal_list_item_t *item;
|
|
bool found;
|
|
orte_grpcomm_collective_t *cptr;
|
|
|
|
OBJ_RELEASE(caddy);
|
|
|
|
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:modex: performing modex",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
/* if we are a singleton and routing isn't enabled,
|
|
* then we have nobody with which to communicate, so
|
|
* we can just declare success
|
|
*/
|
|
if ((orte_process_info.proc_type & ORTE_PROC_SINGLETON) &&
|
|
!orte_routing_is_enabled) {
|
|
if (NULL != modex->cbfunc) {
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
|
|
"%s CALLING MODEX RELEASE",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
modex->cbfunc(NULL, modex->cbdata);
|
|
}
|
|
/* flag the collective as complete */
|
|
modex->active = false;
|
|
return;
|
|
}
|
|
|
|
if (0 == opal_list_get_size(&modex->participants)) {
|
|
/* record the collective */
|
|
modex->next_cbdata = modex;
|
|
opal_list_append(&orte_grpcomm_base.active_colls, &modex->super);
|
|
|
|
/* put our process name in the buffer so it can be unpacked later */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* this is between our peers, so only collect info marked for them */
|
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&modex->buffer, opal_dstore_peer))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* add a wildcard name to the participants so the daemon knows
|
|
* the jobid that is involved in this collective
|
|
*/
|
|
nm = OBJ_NEW(orte_namelist_t);
|
|
nm->name.jobid = ORTE_PROC_MY_NAME->jobid;
|
|
nm->name.vpid = ORTE_VPID_WILDCARD;
|
|
opal_list_append(&modex->participants, &nm->super);
|
|
modex->next_cb = orte_grpcomm_base_store_modex;
|
|
} else {
|
|
/* see if the collective is already present - a race condition
|
|
* exists where other participants may have already sent us their
|
|
* contribution. This would place the collective on the global
|
|
* array, but leave it marked as "inactive" until we call
|
|
* modex with the list of participants
|
|
*/
|
|
found = false;
|
|
for (item = opal_list_get_first(&orte_grpcomm_base.active_colls);
|
|
item != opal_list_get_end(&orte_grpcomm_base.active_colls);
|
|
item = opal_list_get_next(item)) {
|
|
cptr = (orte_grpcomm_collective_t*)item;
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s CHECKING COLL id %lu",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
(unsigned long)cptr->id));
|
|
|
|
if (modex->id == cptr->id) {
|
|
found = true;
|
|
/* remove the old entry - we will replace it
|
|
* with the modex one
|
|
*/
|
|
opal_list_remove_item(&orte_grpcomm_base.active_colls, item);
|
|
break;
|
|
}
|
|
}
|
|
if (found) {
|
|
/* since it already exists, the list of
|
|
* targets contains the list of procs
|
|
* that have already sent us their info. Cycle
|
|
* thru the targets and move those entries to
|
|
* the modex object
|
|
*/
|
|
while (NULL != (item = opal_list_remove_first(&cptr->targets))) {
|
|
opal_list_append(&modex->targets, item);
|
|
}
|
|
/* copy the previously-saved data across */
|
|
opal_dss.copy_payload(&modex->local_bucket, &cptr->local_bucket);
|
|
/* cleanup */
|
|
OBJ_RELEASE(cptr);
|
|
}
|
|
/* now add the modex to the global list of active collectives */
|
|
modex->next_cb = orte_grpcomm_base_store_modex;
|
|
modex->next_cbdata = modex;
|
|
opal_list_append(&orte_grpcomm_base.active_colls, &modex->super);
|
|
|
|
/* pack the collective id */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, &modex->id, 1, ORTE_GRPCOMM_COLL_ID_T))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* pack our name */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* this is not amongst our peers, but rather between a select
|
|
* group of processes - e.g., during a connect/accept operation.
|
|
* Thus, we need to include the non-peer info as well as our peers
|
|
* since we can't tell what the other participants may already have
|
|
*/
|
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&modex->buffer, opal_dstore_peer))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
/* pack our name to keep things straight during unpack */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(&modex->buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&modex->buffer, opal_dstore_nonpeer))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:full:modex: executing allgather",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
/* execute the allgather */
|
|
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(modex))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:modex: modex posted",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
return;
|
|
|
|
cleanup:
|
|
return;
|
|
}
|
|
|
|
void orte_grpcomm_base_store_modex(opal_buffer_t *rbuf, void *cbdata)
|
|
{
|
|
int rc, j, cnt;
|
|
int32_t num_recvd_entries;
|
|
orte_process_name_t pname;
|
|
orte_grpcomm_collective_t *modex = (orte_grpcomm_collective_t*)cbdata;
|
|
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
|
|
"%s STORING MODEX DATA",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
|
|
/* unpack the process name */
|
|
cnt=1;
|
|
while (ORTE_SUCCESS == (rc = opal_dss.unpack(rbuf, &pname, &cnt, ORTE_NAME))) {
|
|
/* unpack the number of entries for this proc */
|
|
cnt=1;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &num_recvd_entries, &cnt, OPAL_INT32))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:update_modex_entries: adding %d entries for proc %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_recvd_entries,
|
|
ORTE_NAME_PRINT(&pname)));
|
|
|
|
/*
|
|
* Extract the attribute names and values
|
|
*/
|
|
for (j = 0; j < num_recvd_entries; j++) {
|
|
opal_value_t *kv;
|
|
cnt = 1;
|
|
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &kv, &cnt, OPAL_VALUE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
OPAL_OUTPUT_VERBOSE((10, orte_grpcomm_base_framework.framework_output,
|
|
"%s STORING MODEX DATA FROM %s FOR %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&pname), kv->key));
|
|
/* if this is me, dump the data - we already have it in the db */
|
|
if (ORTE_PROC_MY_NAME->jobid != pname.jobid ||
|
|
ORTE_PROC_MY_NAME->vpid != pname.vpid) {
|
|
/* store it in the database - we store this as "nonpeer"
|
|
* since the "peer" datastore is for storing our own data - i.e.,
|
|
* it contains data we will share with our peers
|
|
*/
|
|
if (ORTE_SUCCESS != (rc = opal_dstore.store(opal_dstore_nonpeer,
|
|
(opal_identifier_t*)&pname, kv))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
}
|
|
OBJ_RELEASE(kv);
|
|
}
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s store:peer:modex: completed modex entry for proc %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(&pname)));
|
|
}
|
|
|
|
cleanup:
|
|
if (NULL == cbdata) {
|
|
return;
|
|
}
|
|
/* cleanup the list, but don't release the
|
|
* collective object as it was passed into us
|
|
*/
|
|
opal_list_remove_item(&orte_grpcomm_base.active_colls, &modex->super);
|
|
/* notify that the modex is complete */
|
|
if (NULL != modex->cbfunc) {
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
|
|
"%s CALLING MODEX RELEASE",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
modex->cbfunc(NULL, modex->cbdata);
|
|
} else {
|
|
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
|
|
"%s store:peer:modex NO MODEX RELEASE CBFUNC",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
|
}
|
|
/* flag the collective as complete */
|
|
modex->active = false;
|
|
}
|
|
|
|
int orte_grpcomm_base_pack_modex_entries(opal_buffer_t *buf, int handle)
|
|
{
|
|
int rc;
|
|
int32_t num_entries;
|
|
opal_value_t *kv;
|
|
opal_list_t data;
|
|
|
|
/* fetch any global or local data */
|
|
OBJ_CONSTRUCT(&data, opal_list_t);
|
|
if (ORTE_SUCCESS != (rc = opal_dstore.fetch(handle,
|
|
(opal_identifier_t*)ORTE_PROC_MY_NAME,
|
|
NULL, &data))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
num_entries = opal_list_get_size(&data);
|
|
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:pack_modex: reporting %d entries",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), num_entries));
|
|
|
|
/* put the number of entries into the buffer */
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &num_entries, 1, OPAL_INT32))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
goto cleanup;
|
|
}
|
|
|
|
/* if there are entries, store them */
|
|
while (NULL != (kv = (opal_value_t*)opal_list_remove_first(&data))) {
|
|
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
|
|
"%s grpcomm:base:pack_modex: packing entry for %s",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), kv->key));
|
|
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &kv, 1, OPAL_VALUE))) {
|
|
ORTE_ERROR_LOG(rc);
|
|
break;
|
|
}
|
|
OBJ_RELEASE(kv);
|
|
}
|
|
|
|
cleanup:
|
|
OPAL_LIST_DESTRUCT(&data);
|
|
|
|
return rc;
|
|
}
|