Add a new grpcomm module that mimics the old 1.2 behavior - it -always- does a modex because it always includes the architecture. Hence, we called it "blind-and-dumb" since it doesn't look to see if this is required - moniker of "bad". :-)
Update the ESS API so we can update the stored arch's should the modex include that info. Update ompi/proc to check/set the arch for remote procs, and add that function call to mpi_init right after the modex is done. Setup to allow other grpcomm modules to decide whether or not to add the arch to the modex, and to detect if other entries have been made. If not, then the modex can just fall through. Begin setting up some logic in the "basic" module to handle different arch situations. For now, default to the "bad" module so we will work in all situations, even though we may be sending around more info than we really require. This fixes ticket #1340 This commit was SVN r18673.
Этот коммит содержится в:
родитель
da6aa57efb
Коммит
955d117f5e
@ -537,7 +537,7 @@ AC_ARG_WITH([rte],
|
||||
if test "$with_rte_support" = "no"; then
|
||||
AC_MSG_RESULT([no])
|
||||
orte_without_full_support=1
|
||||
list_of_frameworks="errmgr,ess-singleton,ess-hnp,ess-tool,ess-env,filem,grpcomm-basic,iof,odls,oob,plm,ras,rmaps,rml,routed,snapc,btl-sm,coll-sm,common-sm,mpool-sm,dpm-orte,pubsub-orte"
|
||||
list_of_frameworks="errmgr,ess-singleton,ess-hnp,ess-tool,ess-env,filem,grpcomm-basic,grpcomm-bad,iof,odls,oob,plm,ras,rmaps,rml,routed,snapc,btl-sm,coll-sm,common-sm,mpool-sm,dpm-orte,pubsub-orte"
|
||||
if test -z $enable_mca_no_build ; then
|
||||
enable_mca_no_build="$list_of_frameworks"
|
||||
else
|
||||
|
@ -120,6 +120,26 @@ int ompi_proc_init(void)
|
||||
proc->proc_flags |= OMPI_PROC_FLAG_LOCAL;
|
||||
}
|
||||
proc->proc_hostname = orte_ess.proc_get_hostname(&proc->proc_name);
|
||||
}
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int ompi_proc_set_arch(void)
|
||||
{
|
||||
ompi_proc_t *proc = NULL;
|
||||
opal_list_item_t *item = NULL;
|
||||
|
||||
OPAL_THREAD_LOCK(&ompi_proc_lock);
|
||||
|
||||
for( item = opal_list_get_first(&ompi_proc_list);
|
||||
item != opal_list_get_end(&ompi_proc_list);
|
||||
item = opal_list_get_next(item)) {
|
||||
proc = (ompi_proc_t*)item;
|
||||
|
||||
if (proc->proc_name.vpid != ORTE_PROC_MY_NAME->vpid) {
|
||||
proc->proc_arch = orte_ess.proc_get_arch(&proc->proc_name);
|
||||
/* if arch is different than mine, create a new convertor for this proc */
|
||||
if (proc->proc_arch != orte_process_info.arch) {
|
||||
@ -132,15 +152,17 @@ int ompi_proc_init(void)
|
||||
true, orte_process_info.nodename,
|
||||
proc->proc_hostname == NULL ? "<hostname unavailable>" :
|
||||
proc->proc_hostname);
|
||||
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
|
||||
return OMPI_ERR_NOT_SUPPORTED;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int ompi_proc_finalize (void)
|
||||
{
|
||||
ompi_proc_t *proc, *nextproc, *endproc;
|
||||
|
@ -119,6 +119,7 @@ OMPI_DECLSPEC extern ompi_proc_t* ompi_proc_local_proc;
|
||||
*/
|
||||
OMPI_DECLSPEC int ompi_proc_init(void);
|
||||
|
||||
OMPI_DECLSPEC int ompi_proc_set_arch(void);
|
||||
|
||||
/**
|
||||
* Finalize the OMPI Process subsystem
|
||||
|
@ -533,6 +533,14 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
|
||||
gettimeofday(&ompistart, NULL);
|
||||
}
|
||||
|
||||
/* identify the architectures of remote procs and setup
|
||||
* their datatype convertors, if required
|
||||
*/
|
||||
if (OMPI_SUCCESS != (ret = ompi_proc_set_arch())) {
|
||||
error = "ompi_proc_set_arch failed";
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* Figure out the final MPI thread levels. If we were not
|
||||
compiled for support for MPI threads, then don't allow
|
||||
MPI_THREAD_MULTIPLE. */
|
||||
|
@ -44,6 +44,7 @@ static char* proc_get_hostname(orte_process_name_t *proc);
|
||||
static uint32_t proc_get_arch(orte_process_name_t *proc);
|
||||
static uint8_t proc_get_local_rank(orte_process_name_t *proc);
|
||||
static uint8_t proc_get_node_rank(orte_process_name_t *proc);
|
||||
static int update_arch(orte_process_name_t *proc, uint32_t arch);
|
||||
|
||||
orte_ess_base_module_t orte_ess_alps_module = {
|
||||
rte_init,
|
||||
@ -54,6 +55,7 @@ orte_ess_base_module_t orte_ess_alps_module = {
|
||||
proc_get_arch,
|
||||
proc_get_local_rank,
|
||||
proc_get_node_rank,
|
||||
update_arch,
|
||||
NULL /* ft_event */
|
||||
};
|
||||
|
||||
@ -219,6 +221,25 @@ static uint32_t proc_get_arch(orte_process_name_t *proc)
|
||||
return nids[node]->arch;
|
||||
}
|
||||
|
||||
static int update_arch(orte_process_name_t *proc, uint32_t arch)
|
||||
{
|
||||
|
||||
int32_t node;
|
||||
orte_nid_t **nids;
|
||||
|
||||
node = pmap[proc->vpid].node;
|
||||
nids = (orte_nid_t**)nidmap.addr;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_output,
|
||||
"%s ess:alps: updating proc %s to arch %0x",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(proc),
|
||||
arch));
|
||||
|
||||
nids[node]->arch = arch;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static uint8_t proc_get_local_rank(orte_process_name_t *proc)
|
||||
{
|
||||
|
||||
|
@ -42,6 +42,7 @@ static char* proc_get_hostname(orte_process_name_t *proc);
|
||||
static uint32_t proc_get_arch(orte_process_name_t *proc);
|
||||
static uint8_t proc_get_local_rank(orte_process_name_t *proc);
|
||||
static uint8_t proc_get_node_rank(orte_process_name_t *proc);
|
||||
static int update_arch(orte_process_name_t *proc, uint32_t arch);
|
||||
|
||||
orte_ess_base_module_t orte_ess_cnos_module = {
|
||||
rte_init,
|
||||
@ -52,6 +53,7 @@ orte_ess_base_module_t orte_ess_cnos_module = {
|
||||
proc_get_arch,
|
||||
proc_get_local_rank,
|
||||
proc_get_node_rank,
|
||||
update_arch,
|
||||
NULL /* ft_event */
|
||||
};
|
||||
|
||||
@ -139,6 +141,11 @@ static uint32_t proc_get_arch(orte_process_name_t *proc)
|
||||
return orte_process_info.arch;
|
||||
}
|
||||
|
||||
static int update_arch(orte_process_name_t *proc, uint32_t arch)
|
||||
{
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static uint8_t proc_get_local_rank(orte_process_name_t *proc)
|
||||
{
|
||||
/* RHC: someone more familiar with CNOS needs to
|
||||
|
21
orte/mca/ess/env/ess_env_module.c
поставляемый
21
orte/mca/ess/env/ess_env_module.c
поставляемый
@ -85,6 +85,7 @@ static char* proc_get_hostname(orte_process_name_t *proc);
|
||||
static uint32_t proc_get_arch(orte_process_name_t *proc);
|
||||
static uint8_t proc_get_local_rank(orte_process_name_t *proc);
|
||||
static uint8_t proc_get_node_rank(orte_process_name_t *proc);
|
||||
static int update_arch(orte_process_name_t *proc, uint32_t arch);
|
||||
|
||||
#if OPAL_ENABLE_FT == 1
|
||||
static int rte_ft_event(int state);
|
||||
@ -100,6 +101,7 @@ orte_ess_base_module_t orte_ess_env_module = {
|
||||
proc_get_arch,
|
||||
proc_get_local_rank,
|
||||
proc_get_node_rank,
|
||||
update_arch,
|
||||
#if OPAL_ENABLE_FT == 1
|
||||
rte_ft_event
|
||||
#else
|
||||
@ -272,6 +274,25 @@ static uint32_t proc_get_arch(orte_process_name_t *proc)
|
||||
return nids[node]->arch;
|
||||
}
|
||||
|
||||
static int update_arch(orte_process_name_t *proc, uint32_t arch)
|
||||
{
|
||||
|
||||
int32_t node;
|
||||
orte_nid_t **nids;
|
||||
|
||||
node = pmap[proc->vpid].node;
|
||||
nids = (orte_nid_t**)nidmap.addr;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_output,
|
||||
"%s ess:env: updating proc %s to arch %0x",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(proc),
|
||||
arch));
|
||||
|
||||
nids[node]->arch = arch;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static uint8_t proc_get_local_rank(orte_process_name_t *proc)
|
||||
{
|
||||
|
||||
|
@ -105,6 +105,11 @@ typedef uint8_t (*orte_ess_base_module_proc_get_local_rank_fn_t)(orte_process_na
|
||||
*/
|
||||
typedef uint8_t (*orte_ess_base_module_proc_get_node_rank_fn_t)(orte_process_name_t *proc);
|
||||
|
||||
/**
|
||||
* Update the arch of a remote process
|
||||
*/
|
||||
typedef int (*orte_ess_base_module_update_arch_fn_t)(orte_process_name_t *proc, uint32_t arch);
|
||||
|
||||
|
||||
/**
|
||||
* Handle fault tolerance updates
|
||||
@ -128,6 +133,7 @@ struct orte_ess_base_module_1_0_0_t {
|
||||
orte_ess_base_module_proc_get_arch_fn_t proc_get_arch;
|
||||
orte_ess_base_module_proc_get_local_rank_fn_t get_local_rank;
|
||||
orte_ess_base_module_proc_get_node_rank_fn_t get_node_rank;
|
||||
orte_ess_base_module_update_arch_fn_t update_arch;
|
||||
orte_ess_base_module_ft_event_fn_t ft_event;
|
||||
};
|
||||
typedef struct orte_ess_base_module_1_0_0_t orte_ess_base_module_1_0_0_t;
|
||||
|
@ -82,6 +82,7 @@ orte_ess_base_module_t orte_ess_hnp_module = {
|
||||
NULL, /* don't need a proc_get_arch fn */
|
||||
NULL, /* don't need a proc_get_local_rank fn */
|
||||
NULL, /* don't need a proc_get_node_rank fn */
|
||||
NULL, /* don't need to update_arch */
|
||||
NULL /* ft_event */
|
||||
};
|
||||
|
||||
|
@ -53,6 +53,7 @@ static char* proc_get_hostname(orte_process_name_t *proc);
|
||||
static uint32_t proc_get_arch(orte_process_name_t *proc);
|
||||
static uint8_t proc_get_local_rank(orte_process_name_t *proc);
|
||||
static uint8_t proc_get_node_rank(orte_process_name_t *proc);
|
||||
static int update_arch(orte_process_name_t *proc, uint32_t arch);
|
||||
|
||||
orte_ess_base_module_t orte_ess_lsf_module = {
|
||||
rte_init,
|
||||
@ -63,6 +64,7 @@ orte_ess_base_module_t orte_ess_lsf_module = {
|
||||
proc_get_arch,
|
||||
proc_get_local_rank,
|
||||
proc_get_node_rank,
|
||||
update_arch,
|
||||
NULL /* ft_event */
|
||||
};
|
||||
|
||||
@ -229,6 +231,25 @@ static uint32_t proc_get_arch(orte_process_name_t *proc)
|
||||
return nids[node]->arch;
|
||||
}
|
||||
|
||||
static int update_arch(orte_process_name_t *proc, uint32_t arch)
|
||||
{
|
||||
|
||||
int32_t node;
|
||||
orte_nid_t **nids;
|
||||
|
||||
node = pmap[proc->vpid].node;
|
||||
nids = (orte_nid_t**)nidmap.addr;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_output,
|
||||
"%s ess:lsf: updating proc %s to arch %0x",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(proc),
|
||||
arch));
|
||||
|
||||
nids[node]->arch = arch;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static uint8_t proc_get_local_rank(orte_process_name_t *proc)
|
||||
{
|
||||
|
||||
|
@ -43,6 +43,7 @@ static char* proc_get_hostname(orte_process_name_t *proc);
|
||||
static uint32_t proc_get_arch(orte_process_name_t *proc);
|
||||
static uint8_t proc_get_local_rank(orte_process_name_t *proc);
|
||||
static uint8_t proc_get_node_rank(orte_process_name_t *proc);
|
||||
static int update_arch(orte_process_name_t *proc, uint32_t arch);
|
||||
|
||||
orte_ess_base_module_t orte_ess_portals_utcp_module = {
|
||||
rte_init,
|
||||
@ -53,6 +54,7 @@ orte_ess_base_module_t orte_ess_portals_utcp_module = {
|
||||
proc_get_arch,
|
||||
proc_get_local_rank,
|
||||
proc_get_node_rank,
|
||||
update_arch,
|
||||
NULL /* ft_event */
|
||||
};
|
||||
|
||||
@ -154,6 +156,11 @@ static uint32_t proc_get_arch(orte_process_name_t *proc)
|
||||
return orte_process_info.arch;
|
||||
}
|
||||
|
||||
static int update_arch(orte_process_name_t *proc, uint32_t arch)
|
||||
{
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static uint8_t proc_get_local_rank(orte_process_name_t *proc)
|
||||
{
|
||||
/* RHC: someone more familiar with CNOS needs to
|
||||
|
@ -71,6 +71,7 @@ static char* proc_get_hostname(orte_process_name_t *proc);
|
||||
static uint32_t proc_get_arch(orte_process_name_t *proc);
|
||||
static uint8_t proc_get_local_rank(orte_process_name_t *proc);
|
||||
static uint8_t proc_get_node_rank(orte_process_name_t *proc);
|
||||
static int update_arch(orte_process_name_t *proc, uint32_t arch);
|
||||
|
||||
orte_ess_base_module_t orte_ess_singleton_module = {
|
||||
rte_init,
|
||||
@ -81,6 +82,7 @@ orte_ess_base_module_t orte_ess_singleton_module = {
|
||||
proc_get_arch,
|
||||
proc_get_local_rank,
|
||||
proc_get_node_rank,
|
||||
update_arch,
|
||||
NULL /* ft_event */
|
||||
};
|
||||
|
||||
@ -439,6 +441,25 @@ static uint32_t proc_get_arch(orte_process_name_t *proc)
|
||||
return nids[node]->arch;
|
||||
}
|
||||
|
||||
static int update_arch(orte_process_name_t *proc, uint32_t arch)
|
||||
{
|
||||
|
||||
int32_t node;
|
||||
orte_nid_t **nids;
|
||||
|
||||
node = pmap[proc->vpid].node;
|
||||
nids = (orte_nid_t**)nidmap.addr;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_output,
|
||||
"%s ess:singleton: updating proc %s to arch %0x",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(proc),
|
||||
arch));
|
||||
|
||||
nids[node]->arch = arch;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static uint8_t proc_get_local_rank(orte_process_name_t *proc)
|
||||
{
|
||||
return pmap[proc->vpid].local_rank;
|
||||
|
@ -54,6 +54,7 @@ static char* proc_get_hostname(orte_process_name_t *proc);
|
||||
static uint32_t proc_get_arch(orte_process_name_t *proc);
|
||||
static uint8_t proc_get_local_rank(orte_process_name_t *proc);
|
||||
static uint8_t proc_get_node_rank(orte_process_name_t *proc);
|
||||
static int update_arch(orte_process_name_t *proc, uint32_t arch);
|
||||
|
||||
|
||||
orte_ess_base_module_t orte_ess_slurm_module = {
|
||||
@ -65,6 +66,7 @@ orte_ess_base_module_t orte_ess_slurm_module = {
|
||||
proc_get_arch,
|
||||
proc_get_local_rank,
|
||||
proc_get_node_rank,
|
||||
update_arch,
|
||||
NULL /* ft_event */
|
||||
};
|
||||
|
||||
@ -230,6 +232,25 @@ static uint32_t proc_get_arch(orte_process_name_t *proc)
|
||||
return nids[node]->arch;
|
||||
}
|
||||
|
||||
static int update_arch(orte_process_name_t *proc, uint32_t arch)
|
||||
{
|
||||
|
||||
int32_t node;
|
||||
orte_nid_t **nids;
|
||||
|
||||
node = pmap[proc->vpid].node;
|
||||
nids = (orte_nid_t**)nidmap.addr;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_ess_base_output,
|
||||
"%s ess:slurm: updating proc %s to arch %0x",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(proc),
|
||||
arch));
|
||||
|
||||
nids[node]->arch = arch;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static uint8_t proc_get_local_rank(orte_process_name_t *proc)
|
||||
{
|
||||
|
||||
|
@ -55,6 +55,7 @@ orte_ess_base_module_t orte_ess_tool_module = {
|
||||
NULL, /* don't need a proc_get_arch fn */
|
||||
NULL, /* don't need a proc_get_local_rank fn */
|
||||
NULL, /* don't need a proc_get_node_rank fn */
|
||||
NULL, /* don't need to update_arch */
|
||||
NULL /* ft_event */
|
||||
};
|
||||
|
||||
|
43
orte/mca/grpcomm/bad/Makefile.am
Обычный файл
43
orte/mca/grpcomm/bad/Makefile.am
Обычный файл
@ -0,0 +1,43 @@
|
||||
#
|
||||
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
# University Research and Technology
|
||||
# Corporation. All rights reserved.
|
||||
# Copyright (c) 2004-2005 The University of Tennessee and The University
|
||||
# of Tennessee Research Foundation. All rights
|
||||
# reserved.
|
||||
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
# University of Stuttgart. All rights reserved.
|
||||
# Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
# All rights reserved.
|
||||
# $COPYRIGHT$
|
||||
#
|
||||
# Additional copyrights may follow
|
||||
#
|
||||
# $HEADER$
|
||||
#
|
||||
|
||||
sources = \
|
||||
grpcomm_bad.h \
|
||||
grpcomm_bad_module.c \
|
||||
grpcomm_bad_component.c
|
||||
|
||||
# Make the output library in this directory, and name it either
|
||||
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
|
||||
# (for static builds).
|
||||
|
||||
if OMPI_BUILD_grpcomm_bad_DSO
|
||||
component_noinst =
|
||||
component_install = mca_grpcomm_bad.la
|
||||
else
|
||||
component_noinst = libmca_grpcomm_bad.la
|
||||
component_install =
|
||||
endif
|
||||
|
||||
mcacomponentdir = $(pkglibdir)
|
||||
mcacomponent_LTLIBRARIES = $(component_install)
|
||||
mca_grpcomm_bad_la_SOURCES = $(sources)
|
||||
mca_grpcomm_bad_la_LDFLAGS = -module -avoid-version
|
||||
|
||||
noinst_LTLIBRARIES = $(component_noinst)
|
||||
libmca_grpcomm_bad_la_SOURCES =$(sources)
|
||||
libmca_grpcomm_bad_la_LDFLAGS = -module -avoid-version
|
13
orte/mca/grpcomm/bad/configure.m4
Обычный файл
13
orte/mca/grpcomm/bad/configure.m4
Обычный файл
@ -0,0 +1,13 @@
|
||||
# -*- shell-script -*-
|
||||
#
|
||||
# Copyright (c) 2007 Sandia National Laboratories. All rights reserved.
|
||||
# $COPYRIGHT$
|
||||
#
|
||||
# Additional copyrights may follow
|
||||
#
|
||||
# $HEADER$
|
||||
#
|
||||
|
||||
# MCA_grpcomm_bad_CONFIG([action-if-found], [action-if-not-found])
|
||||
# -----------------------------------------------------------
|
||||
AC_DEFUN([MCA_grpcomm_bad_CONFIG], [$1])
|
30
orte/mca/grpcomm/bad/configure.params
Обычный файл
30
orte/mca/grpcomm/bad/configure.params
Обычный файл
@ -0,0 +1,30 @@
|
||||
# -*- shell-script -*-
|
||||
#
|
||||
# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
# University Research and Technology
|
||||
# Corporation. All rights reserved.
|
||||
# Copyright (c) 2004-2005 The University of Tennessee and The University
|
||||
# of Tennessee Research Foundation. All rights
|
||||
# reserved.
|
||||
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
# University of Stuttgart. All rights reserved.
|
||||
# Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
# All rights reserved.
|
||||
# Copyright (c) 2007 Los Alamos National Security, LLC. All rights
|
||||
# reserved.
|
||||
# $COPYRIGHT$
|
||||
#
|
||||
# Additional copyrights may follow
|
||||
#
|
||||
# $HEADER$
|
||||
#
|
||||
|
||||
# Specific to this module
|
||||
|
||||
PARAM_CONFIG_FILES="Makefile"
|
||||
#
|
||||
# Set the config priority so that this
|
||||
# component will build for all environs -except-
|
||||
# those special ones that do not support it
|
||||
|
||||
PARAM_CONFIG_PRIORITY=10
|
51
orte/mca/grpcomm/bad/grpcomm_bad.h
Обычный файл
51
orte/mca/grpcomm/bad/grpcomm_bad.h
Обычный файл
@ -0,0 +1,51 @@
|
||||
/* -*- C -*-
|
||||
*
|
||||
* Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2006 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*
|
||||
*/
|
||||
#ifndef GRPCOMM_BAD_H
|
||||
#define GRPCOMM_BAD_H
|
||||
|
||||
#include "orte_config.h"
|
||||
#include "orte/types.h"
|
||||
|
||||
#include "opal/threads/mutex.h"
|
||||
#include "opal/threads/condition.h"
|
||||
#include "opal/class/opal_object.h"
|
||||
|
||||
#include "orte/mca/grpcomm/grpcomm.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
/*
|
||||
* Component open / close
|
||||
*/
|
||||
int orte_grpcomm_bad_open(void);
|
||||
int orte_grpcomm_bad_close(void);
|
||||
int orte_grpcomm_bad_component_query(mca_base_module_t **module, int *priority);
|
||||
|
||||
|
||||
/*
|
||||
* Grpcomm interfaces
|
||||
*/
|
||||
|
||||
ORTE_MODULE_DECLSPEC extern orte_grpcomm_base_component_t mca_grpcomm_bad_component;
|
||||
extern orte_grpcomm_base_module_t orte_grpcomm_bad_module;
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif
|
87
orte/mca/grpcomm/bad/grpcomm_bad_component.c
Обычный файл
87
orte/mca/grpcomm/bad/grpcomm_bad_component.c
Обычный файл
@ -0,0 +1,87 @@
|
||||
/* -*- C -*-
|
||||
*
|
||||
* Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
/** @file:
|
||||
*
|
||||
* The Open MPI Name Server
|
||||
*
|
||||
* The Open MPI Name Server provides unique name ranges for processes
|
||||
* within the universe. Each universe will have one name server
|
||||
* running within the seed daemon. This is done to prevent the
|
||||
* inadvertent duplication of names.
|
||||
*/
|
||||
|
||||
/*
|
||||
* includes
|
||||
*/
|
||||
#include "orte_config.h"
|
||||
#include "orte/constants.h"
|
||||
|
||||
#include "opal/threads/mutex.h"
|
||||
#include "opal/class/opal_list.h"
|
||||
#include "orte/util/show_help.h"
|
||||
|
||||
#include "opal/mca/mca.h"
|
||||
#include "opal/mca/base/mca_base_param.h"
|
||||
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
|
||||
#include "grpcomm_bad.h"
|
||||
|
||||
/*
|
||||
* Struct of function pointers that need to be initialized
|
||||
*/
|
||||
orte_grpcomm_base_component_t mca_grpcomm_bad_component = {
|
||||
{
|
||||
ORTE_GRPCOMM_BASE_VERSION_2_0_0,
|
||||
|
||||
"bad", /* MCA module name */
|
||||
ORTE_MAJOR_VERSION, /* MCA module major version */
|
||||
ORTE_MINOR_VERSION, /* MCA module minor version */
|
||||
ORTE_RELEASE_VERSION, /* MCA module release version */
|
||||
orte_grpcomm_bad_open, /* module open */
|
||||
orte_grpcomm_bad_close, /* module close */
|
||||
orte_grpcomm_bad_component_query /* module query */
|
||||
},
|
||||
{
|
||||
/* The component is checkpoint ready */
|
||||
MCA_BASE_METADATA_PARAM_CHECKPOINT
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/* Open the component */
|
||||
int orte_grpcomm_bad_open(void)
|
||||
{
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
int orte_grpcomm_bad_close(void)
|
||||
{
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
int orte_grpcomm_bad_component_query(mca_base_module_t **module, int *priority)
|
||||
{
|
||||
/* we are a default, so set a low priority so we can be overridden */
|
||||
*priority = 10;
|
||||
*module = (mca_base_module_t *)&orte_grpcomm_bad_module;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
569
orte/mca/grpcomm/bad/grpcomm_bad_module.c
Обычный файл
569
orte/mca/grpcomm/bad/grpcomm_bad_module.c
Обычный файл
@ -0,0 +1,569 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "orte_config.h"
|
||||
#include "orte/constants.h"
|
||||
#include "orte/types.h"
|
||||
|
||||
#include <string.h>
|
||||
#ifdef HAVE_SYS_TIME_H
|
||||
#include <sys/time.h>
|
||||
#endif /* HAVE_SYS_TIME_H */
|
||||
|
||||
#include "opal/threads/condition.h"
|
||||
#include "orte/util/show_help.h"
|
||||
#include "opal/util/bit_ops.h"
|
||||
|
||||
#include "opal/class/opal_hash_table.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "opal/dss/dss.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/odls/odls_types.h"
|
||||
#include "orte/mca/ess/ess.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/orted/orted.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
|
||||
#include "orte/mca/grpcomm/base/base.h"
|
||||
#include "grpcomm_bad.h"
|
||||
|
||||
|
||||
/* Static API's */
|
||||
static int init(void);
|
||||
static void finalize(void);
|
||||
static int xcast(orte_jobid_t job,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag);
|
||||
static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
|
||||
static int barrier(void);
|
||||
static int modex(opal_list_t *procs);
|
||||
|
||||
/* Module def */
|
||||
orte_grpcomm_base_module_t orte_grpcomm_bad_module = {
|
||||
init,
|
||||
finalize,
|
||||
xcast,
|
||||
allgather,
|
||||
orte_grpcomm_base_allgather_list,
|
||||
barrier,
|
||||
orte_grpcomm_base_set_proc_attr,
|
||||
orte_grpcomm_base_get_proc_attr,
|
||||
modex,
|
||||
orte_grpcomm_base_purge_proc_attrs
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Initialize the module
|
||||
*/
|
||||
static int init(void)
|
||||
{
|
||||
int rc;
|
||||
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_modex_init())) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Finalize the module
|
||||
*/
|
||||
static void finalize(void)
|
||||
{
|
||||
orte_grpcomm_base_modex_finalize();
|
||||
}
|
||||
|
||||
/**
|
||||
* A "broadcast-like" function to a job's processes.
|
||||
* @param jobid The job whose processes are to receive the message
|
||||
* @param buffer The data to broadcast
|
||||
*/
|
||||
|
||||
static int xcast(orte_jobid_t job,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag)
|
||||
{
|
||||
int rc = ORTE_SUCCESS;
|
||||
opal_buffer_t buf;
|
||||
orte_daemon_cmd_flag_t command;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm:bad:xcast sent to job %s tag %ld",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_JOBID_PRINT(job), (long)tag));
|
||||
|
||||
/* if there is no message to send, then just return ok */
|
||||
if (NULL == buffer) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* setup a buffer to handle the xcast command */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
/* all we need to do is send this to the HNP - the relay logic
|
||||
* will ensure everyone else gets it! So tell the HNP to
|
||||
* process and relay it. The HNP will use the routed.get_routing_tree
|
||||
* to find out who it should relay the message to.
|
||||
*/
|
||||
command = ORTE_DAEMON_PROCESS_AND_RELAY_CMD;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
/* pack the target jobid and tag for use in relay */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &job, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &tag, 1, ORTE_RML_TAG))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* if this isn't intended for the daemon command tag, then we better
|
||||
* tell the daemon to deliver it to the procs, and what job is supposed
|
||||
* to get it - this occurs when a caller just wants to send something
|
||||
* to all the procs in a job. In that use-case, the caller doesn't know
|
||||
* anything about inserting daemon commands or what routing algo might
|
||||
* be used, so we have to help them out a little. Functions that are
|
||||
* sending commands to the daemons themselves are smart enough to know
|
||||
* what they need to do.
|
||||
*/
|
||||
if (ORTE_RML_TAG_DAEMON != tag) {
|
||||
command = ORTE_DAEMON_MESSAGE_LOCAL_PROCS;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &job, 1, ORTE_JOBID))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &tag, 1, ORTE_RML_TAG))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
}
|
||||
|
||||
/* copy the payload into the new buffer - this is non-destructive, so our
|
||||
* caller is still responsible for releasing any memory in the buffer they
|
||||
* gave to us
|
||||
*/
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&buf, buffer))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
|
||||
/* if I am the HNP, just set things up so the cmd processor gets called.
|
||||
* We don't want to message ourselves as this can create circular logic
|
||||
* in the RML. Instead, this macro will set a zero-time event which will
|
||||
* cause the buffer to be processed by the cmd processor - probably will
|
||||
* fire right away, but that's okay
|
||||
* The macro makes a copy of the buffer, so it's okay to release it here
|
||||
*/
|
||||
if (orte_process_info.hnp) {
|
||||
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &buf, ORTE_RML_TAG_DAEMON, orte_daemon_cmd_processor);
|
||||
} else {
|
||||
/* otherwise, send it to the HNP for relay */
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buf, ORTE_RML_TAG_DAEMON, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto CLEANUP;
|
||||
}
|
||||
rc = ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
CLEANUP:
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
static bool barrier_recvd;
|
||||
static bool barrier_timer;
|
||||
|
||||
static void barrier_recv(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag, void *cbdata)
|
||||
{
|
||||
/* flag as recvd */
|
||||
barrier_recvd = true;
|
||||
}
|
||||
|
||||
static void barrier_timer_recv(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag, void *cbdata)
|
||||
{
|
||||
barrier_timer = true;
|
||||
}
|
||||
|
||||
static int barrier(void)
|
||||
{
|
||||
opal_buffer_t buf;
|
||||
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
|
||||
orte_grpcomm_coll_t coll_type=ORTE_GRPCOMM_BARRIER;
|
||||
int rc;
|
||||
struct timeval ompistart, ompistop;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm:bad entering barrier",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
if (orte_timing && ORTE_PROC_MY_NAME->vpid == 0) {
|
||||
gettimeofday(&ompistart, NULL);
|
||||
}
|
||||
|
||||
/* everyone sends barrier to local daemon */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
/* tell the daemon to collect the data */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return rc;
|
||||
}
|
||||
/* tell the daemon we are doing a barrier */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &coll_type, 1, ORTE_GRPCOMM_COLL_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return rc;
|
||||
}
|
||||
/* send to local daemon */
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &buf, ORTE_RML_TAG_DAEMON, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return rc;
|
||||
}
|
||||
OBJ_DESTRUCT(&buf);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s grpcomm:bad barrier sent",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* now receive the release. Be sure to do this in
|
||||
* a manner that allows us to return without being in a recv!
|
||||
*/
|
||||
barrier_recvd = false;
|
||||
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER,
|
||||
ORTE_RML_NON_PERSISTENT, barrier_recv, NULL);
|
||||
if (rc != ORTE_SUCCESS) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
ORTE_PROGRESSED_WAIT(barrier_recvd, 0, 1);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s grpcomm:bad received barrier release",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
if (orte_timing) {
|
||||
if (ORTE_PROC_MY_NAME->vpid == 0) {
|
||||
/* setup a receive to hear when the rank=N proc has received the data
|
||||
* release - in most xcast schemes, this will always be the final recvr
|
||||
*/
|
||||
barrier_timer = false;
|
||||
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLLECTIVE_TIMER,
|
||||
ORTE_RML_NON_PERSISTENT, barrier_timer_recv, NULL);
|
||||
if (rc != ORTE_SUCCESS) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
ORTE_PROGRESSED_WAIT(barrier_timer, 0, 1);
|
||||
gettimeofday(&ompistop, NULL);
|
||||
opal_output(0, "%s time to complete barrier %ld usec",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long int)((ompistop.tv_sec - ompistart.tv_sec)*1000000 +
|
||||
(ompistop.tv_usec - ompistart.tv_usec)));
|
||||
} else if (ORTE_PROC_MY_NAME->vpid == orte_process_info.num_procs-1) {
|
||||
/* if we are rank=N, send a message back to indicate
|
||||
* the xcast completed for timing purposes
|
||||
*/
|
||||
orte_process_name_t name;
|
||||
|
||||
name.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
name.vpid = 0;
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
if (0 > (rc = orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_COLLECTIVE_TIMER,0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return rc;
|
||||
}
|
||||
rc = ORTE_SUCCESS;
|
||||
OBJ_DESTRUCT(&buf);
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static opal_buffer_t *allgather_buf;
|
||||
static orte_std_cntr_t allgather_complete;
|
||||
|
||||
static void allgather_recv(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t *buffer,
|
||||
orte_rml_tag_t tag, void *cbdata)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* xfer the data */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(allgather_buf, buffer))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
}
|
||||
allgather_complete = true;
|
||||
}
|
||||
|
||||
static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
|
||||
{
|
||||
int rc;
|
||||
orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD;
|
||||
struct timeval ompistart, ompistop;
|
||||
opal_buffer_t coll;
|
||||
orte_grpcomm_coll_t coll_type=ORTE_GRPCOMM_ALLGATHER;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm:bad entering allgather",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
if (orte_timing && ORTE_PROC_MY_NAME->vpid == 0) {
|
||||
gettimeofday(&ompistart, NULL);
|
||||
}
|
||||
|
||||
/* everyone sends data to their local daemon */
|
||||
OBJ_CONSTRUCT(&coll, opal_buffer_t);
|
||||
/* tell the daemon to collect the data */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&coll, &command, 1, ORTE_DAEMON_CMD))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&coll);
|
||||
return rc;
|
||||
}
|
||||
/* tell the daemon we are doing an allgather */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&coll, &coll_type, 1, ORTE_GRPCOMM_COLL_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&coll);
|
||||
return rc;
|
||||
}
|
||||
/* add our data to it */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(&coll, sbuf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&coll);
|
||||
return rc;
|
||||
}
|
||||
/* send to local daemon */
|
||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &coll, ORTE_RML_TAG_DAEMON, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&coll);
|
||||
return rc;
|
||||
}
|
||||
OBJ_DESTRUCT(&coll);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s grpcomm:bad allgather buffer sent",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* setup the buffer that will recv the results */
|
||||
allgather_buf = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
/* now receive the final result. Be sure to do this in
|
||||
* a manner that allows us to return without being in a recv!
|
||||
*/
|
||||
allgather_complete = false;
|
||||
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_ALLGATHER,
|
||||
ORTE_RML_NON_PERSISTENT, allgather_recv, NULL);
|
||||
if (rc != ORTE_SUCCESS) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
ORTE_PROGRESSED_WAIT(allgather_complete, 0, 1);
|
||||
|
||||
/* copy payload to the caller's buffer */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(rbuf, allgather_buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(allgather_buf);
|
||||
return rc;
|
||||
}
|
||||
OBJ_RELEASE(allgather_buf);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s allgather buffer received",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
if (orte_timing) {
|
||||
if (ORTE_PROC_MY_NAME->vpid == 0) {
|
||||
/* setup a receive to hear when the rank=N proc has received the data
|
||||
* release - in most xcast schemes, this will always be the final recvr
|
||||
*/
|
||||
barrier_timer = false;
|
||||
rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLLECTIVE_TIMER,
|
||||
ORTE_RML_NON_PERSISTENT, barrier_timer_recv, NULL);
|
||||
if (ORTE_SUCCESS != rc) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
ORTE_PROGRESSED_WAIT(barrier_timer, 0, 1);
|
||||
gettimeofday(&ompistop, NULL);
|
||||
opal_output(0, "%s allgather: time to complete %ld usec",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long int)((ompistop.tv_sec - ompistart.tv_sec)*1000000 +
|
||||
(ompistop.tv_usec - ompistart.tv_usec)));
|
||||
} else if (ORTE_PROC_MY_NAME->vpid == orte_process_info.num_procs-1) {
|
||||
/* if we are rank=N, send a message back to indicate
|
||||
* the xcast completed for timing purposes
|
||||
*/
|
||||
orte_process_name_t name;
|
||||
opal_buffer_t buf;
|
||||
|
||||
name.jobid = ORTE_PROC_MY_NAME->jobid;
|
||||
name.vpid = 0;
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
if (0 > (rc = orte_rml.send_buffer(&name,&buf,ORTE_RML_TAG_COLLECTIVE_TIMER,0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
rc = ORTE_SUCCESS;
|
||||
OBJ_DESTRUCT(&buf);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm:bad allgather completed",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/*** MODEX SECTION ***/
|
||||
static int modex(opal_list_t *procs)
|
||||
{
|
||||
opal_buffer_t buf, rbuf;
|
||||
orte_std_cntr_t i, num_procs;
|
||||
orte_std_cntr_t cnt;
|
||||
orte_process_name_t proc_name;
|
||||
int rc;
|
||||
int32_t arch;
|
||||
bool modex_reqd; /* going to ignore this anyway */
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm:bad: modex entered",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* setup the buffer that will actually be sent */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
OBJ_CONSTRUCT(&rbuf, opal_buffer_t);
|
||||
|
||||
/* put our process name in the buffer so it can be unpacked later */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* add our architecture - we always send it in this module! */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* pack the entries we have received */
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s grpcomm:bad:modex: executing allgather",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* exchange the buffer with the list of peers (if provided) or all my peers */
|
||||
if (NULL == procs) {
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
} else {
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s grpcomm:bad:modex: processing modex info",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* process the results */
|
||||
/* extract the number of procs that put data in the buffer */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||
"%s grpcomm:bad:modex: received %ld data bytes from %ld procs",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long)(rbuf.pack_ptr - rbuf.unpack_ptr), (long)num_procs));
|
||||
|
||||
/* if the buffer doesn't have any more data, ignore it */
|
||||
if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) {
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* otherwise, process it */
|
||||
for (i=0; i < num_procs; i++) {
|
||||
/* unpack the process name */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* unpack its architecture */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* update the arch in the ESS */
|
||||
if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* update the modex database */
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_update_modex_entries(&proc_name, &rbuf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&buf);
|
||||
OBJ_DESTRUCT(&rbuf);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm:bad: modex completed",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
@ -55,6 +55,7 @@ ORTE_DECLSPEC extern bool mca_grpcomm_base_selected;
|
||||
ORTE_DECLSPEC extern opal_list_t mca_grpcomm_base_components_available;
|
||||
ORTE_DECLSPEC extern orte_grpcomm_base_component_t mca_grpcomm_base_selected_component;
|
||||
|
||||
#if !ORTE_DISABLE_FULL_SUPPORT
|
||||
|
||||
/*
|
||||
* Base functions
|
||||
@ -72,10 +73,11 @@ ORTE_DECLSPEC int orte_grpcomm_base_modex(opal_list_t *procs);
|
||||
ORTE_DECLSPEC int orte_grpcomm_base_purge_proc_attrs(void);
|
||||
ORTE_DECLSPEC int orte_grpcomm_base_modex_init(void);
|
||||
ORTE_DECLSPEC void orte_grpcomm_base_modex_finalize(void);
|
||||
ORTE_DECLSPEC int orte_grpcomm_base_pack_modex_entries(opal_buffer_t *buf, bool *modex_reqd);
|
||||
ORTE_DECLSPEC int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name,
|
||||
opal_buffer_t *rbuf);
|
||||
|
||||
/*
|
||||
* external API functions will be documented in the mca/grpcomm/grpcomm.h file
|
||||
*/
|
||||
#endif /* ORTE_DISABLE_FULL_SUPPORT */
|
||||
|
||||
END_C_DECLS
|
||||
#endif
|
||||
|
@ -358,191 +358,6 @@ int orte_grpcomm_base_get_proc_attr(const orte_process_name_t proc,
|
||||
}
|
||||
|
||||
|
||||
int orte_grpcomm_base_modex(opal_list_t *procs)
|
||||
{
|
||||
opal_buffer_t buf, rbuf;
|
||||
orte_std_cntr_t i, j, num_procs, num_recvd_entries;
|
||||
void *bytes = NULL;
|
||||
orte_std_cntr_t cnt;
|
||||
orte_process_name_t proc_name;
|
||||
modex_proc_data_t *proc_data;
|
||||
modex_attr_data_t *attr_data;
|
||||
int rc;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm: modex entered",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* setup the buffer that will actually be sent */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
OBJ_CONSTRUCT(&rbuf, opal_buffer_t);
|
||||
|
||||
/* put our process name in the buffer so it can be unpacked later */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||
"%s modex: reporting %ld entries",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long)num_entries));
|
||||
|
||||
/* put the number of entries into the buffer */
|
||||
OPAL_THREAD_LOCK(&mutex);
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &num_entries, 1, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OPAL_THREAD_UNLOCK(&mutex);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* if there are entries, non-destructively copy the data across */
|
||||
if (0 < num_entries) {
|
||||
if (ORTE_SUCCESS != (opal_dss.copy_payload(&buf, modex_buffer))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OPAL_THREAD_UNLOCK(&mutex);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&mutex);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s modex: executing allgather",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* exchange the buffer with the list of peers (if provided) or all my peers */
|
||||
if (NULL == procs) {
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
} else {
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s modex: processing modex info",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* process the results */
|
||||
/* extract the number of procs that put data in the buffer */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||
"%s modex: received %ld data bytes from %ld procs",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long)(rbuf.pack_ptr - rbuf.unpack_ptr), (long)num_procs));
|
||||
|
||||
/* if the buffer doesn't have any more data, ignore it */
|
||||
if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) {
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* otherwise, process it */
|
||||
for (i=0; i < num_procs; i++) {
|
||||
/* unpack the process name */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* look up the modex data structure */
|
||||
proc_data = modex_lookup_orte_proc(&proc_name);
|
||||
if (proc_data == NULL) {
|
||||
/* report the error */
|
||||
opal_output(0, "grpcomm_basic_modex: received modex info for unknown proc %s\n",
|
||||
ORTE_NAME_PRINT(&proc_name));
|
||||
rc = ORTE_ERR_NOT_FOUND;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* unpack the number of entries for this proc */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(&proc_data->modex_lock);
|
||||
|
||||
/*
|
||||
* Extract the attribute names and values
|
||||
*/
|
||||
for (j = 0; j < num_recvd_entries; j++) {
|
||||
size_t num_bytes;
|
||||
char *attr_name;
|
||||
|
||||
cnt = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &attr_name, &cnt, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
cnt = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_bytes, &cnt, OPAL_SIZE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
|
||||
goto cleanup;
|
||||
}
|
||||
if (num_bytes != 0) {
|
||||
if (NULL == (bytes = malloc(num_bytes))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
rc = ORTE_ERR_OUT_OF_RESOURCE;
|
||||
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
|
||||
goto cleanup;
|
||||
}
|
||||
cnt = (orte_std_cntr_t) num_bytes;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, bytes, &cnt, OPAL_BYTE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
|
||||
goto cleanup;
|
||||
}
|
||||
num_bytes = cnt;
|
||||
} else {
|
||||
bytes = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Lookup the corresponding modex structure
|
||||
*/
|
||||
if (NULL == (attr_data = modex_lookup_attr_data(proc_data,
|
||||
attr_name, true))) {
|
||||
opal_output(0, "grpcomm_basic_modex: modex_lookup_attr_data failed\n");
|
||||
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
|
||||
rc = ORTE_ERR_NOT_FOUND;
|
||||
goto cleanup;
|
||||
}
|
||||
if (NULL != attr_data->attr_data) {
|
||||
/* some pre-existing value must be here - release it */
|
||||
free(attr_data->attr_data);
|
||||
}
|
||||
attr_data->attr_data = bytes;
|
||||
attr_data->attr_data_size = num_bytes;
|
||||
proc_data->modex_received_data = true;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
|
||||
}
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&buf);
|
||||
OBJ_DESTRUCT(&rbuf);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm: modex completed",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
int orte_grpcomm_base_purge_proc_attrs(void)
|
||||
{
|
||||
/*
|
||||
@ -562,3 +377,121 @@ int orte_grpcomm_base_purge_proc_attrs(void)
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
int orte_grpcomm_base_pack_modex_entries(opal_buffer_t *buf, bool *mdx_reqd)
|
||||
{
|
||||
int rc;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||
"%s grpcomm:base:pack_modex: reporting %ld entries",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long)num_entries));
|
||||
|
||||
/* put the number of entries into the buffer */
|
||||
OPAL_THREAD_LOCK(&mutex);
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &num_entries, 1, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OPAL_THREAD_UNLOCK(&mutex);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* if there are entries, non-destructively copy the data across */
|
||||
if (0 < num_entries) {
|
||||
if (ORTE_SUCCESS != (opal_dss.copy_payload(buf, modex_buffer))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OPAL_THREAD_UNLOCK(&mutex);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
*mdx_reqd = true;
|
||||
|
||||
cleanup:
|
||||
OPAL_THREAD_UNLOCK(&mutex);
|
||||
return rc;
|
||||
}
|
||||
|
||||
int orte_grpcomm_base_update_modex_entries(orte_process_name_t *proc_name,
|
||||
opal_buffer_t *rbuf)
|
||||
{
|
||||
modex_proc_data_t *proc_data;
|
||||
modex_attr_data_t *attr_data;
|
||||
int rc = ORTE_SUCCESS;
|
||||
orte_std_cntr_t num_recvd_entries;
|
||||
orte_std_cntr_t cnt;
|
||||
void *bytes = NULL;
|
||||
orte_std_cntr_t j;
|
||||
|
||||
/* look up the modex data structure */
|
||||
proc_data = modex_lookup_orte_proc(proc_name);
|
||||
if (proc_data == NULL) {
|
||||
/* report the error */
|
||||
opal_output(0, "grpcomm:base:update_modex: received modex info for unknown proc %s\n",
|
||||
ORTE_NAME_PRINT(proc_name));
|
||||
return ORTE_ERR_NOT_FOUND;
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(&proc_data->modex_lock);
|
||||
|
||||
/* unpack the number of entries for this proc */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &num_recvd_entries, &cnt, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/*
|
||||
* Extract the attribute names and values
|
||||
*/
|
||||
for (j = 0; j < num_recvd_entries; j++) {
|
||||
size_t num_bytes;
|
||||
char *attr_name;
|
||||
|
||||
cnt = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &attr_name, &cnt, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
cnt = 1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, &num_bytes, &cnt, OPAL_SIZE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
if (num_bytes != 0) {
|
||||
if (NULL == (bytes = malloc(num_bytes))) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
|
||||
rc = ORTE_ERR_OUT_OF_RESOURCE;
|
||||
goto cleanup;
|
||||
}
|
||||
cnt = (orte_std_cntr_t) num_bytes;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(rbuf, bytes, &cnt, OPAL_BYTE))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
num_bytes = cnt;
|
||||
} else {
|
||||
bytes = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Lookup the corresponding modex structure
|
||||
*/
|
||||
if (NULL == (attr_data = modex_lookup_attr_data(proc_data,
|
||||
attr_name, true))) {
|
||||
opal_output(0, "grpcomm:base:update_modex: modex_lookup_attr_data failed\n");
|
||||
rc = ORTE_ERR_NOT_FOUND;
|
||||
goto cleanup;
|
||||
}
|
||||
if (NULL != attr_data->attr_data) {
|
||||
/* some pre-existing value must be here - release it */
|
||||
free(attr_data->attr_data);
|
||||
}
|
||||
attr_data->attr_data = bytes;
|
||||
attr_data->attr_data_size = num_bytes;
|
||||
proc_data->modex_received_data = true;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
|
||||
return rc;
|
||||
}
|
||||
|
@ -31,19 +31,6 @@
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
/*
|
||||
* globals
|
||||
*/
|
||||
/*
|
||||
* globals needed within component
|
||||
*/
|
||||
typedef struct {
|
||||
orte_vpid_t xcast_linear_xover;
|
||||
orte_vpid_t xcast_binomial_xover;
|
||||
} orte_grpcomm_basic_globals_t;
|
||||
|
||||
extern orte_grpcomm_basic_globals_t orte_grpcomm_basic;
|
||||
|
||||
/*
|
||||
* Component open / close
|
||||
*/
|
||||
|
@ -71,44 +71,9 @@ orte_grpcomm_base_component_t mca_grpcomm_basic_component = {
|
||||
}
|
||||
};
|
||||
|
||||
/*
|
||||
* instantiate globals needed within basic component
|
||||
*/
|
||||
orte_grpcomm_basic_globals_t orte_grpcomm_basic;
|
||||
|
||||
/* Open the component */
|
||||
int orte_grpcomm_basic_open(void)
|
||||
{
|
||||
char *mode;
|
||||
mca_base_component_t *c = &mca_grpcomm_basic_component.base_version;
|
||||
int tmp;
|
||||
|
||||
mca_base_param_reg_int(c, "xcast_linear_xover",
|
||||
"Number of daemons where use of linear xcast mode is to begin",
|
||||
false, false, XCAST_LINEAR_XOVER_DEFAULT, &tmp);
|
||||
orte_grpcomm_basic.xcast_linear_xover = tmp;
|
||||
|
||||
mca_base_param_reg_int(c, "xcast_binomial_xover",
|
||||
"Number of daemons where use of binomial xcast mode is to begin",
|
||||
false, false, XCAST_BINOMIAL_XOVER_DEFAULT, &tmp);
|
||||
orte_grpcomm_basic.xcast_binomial_xover = tmp;
|
||||
|
||||
mca_base_param_reg_string(c, "xcast_mode",
|
||||
"Select xcast mode (\"linear\" | \"binomial\" | \"direct\")",
|
||||
false, false, "none", &mode);
|
||||
if (0 == strcmp(mode, "binomial")) {
|
||||
orte_grpcomm_basic.xcast_binomial_xover = 0;
|
||||
orte_grpcomm_basic.xcast_linear_xover = 0;
|
||||
} else if (0 == strcmp(mode, "linear")) {
|
||||
orte_grpcomm_basic.xcast_linear_xover = 0;
|
||||
orte_grpcomm_basic.xcast_binomial_xover = INT_MAX;
|
||||
} else if (0 == strcmp(mode, "direct")) {
|
||||
orte_grpcomm_basic.xcast_binomial_xover = INT_MAX;
|
||||
orte_grpcomm_basic.xcast_linear_xover = INT_MAX;
|
||||
} else if (0 != strcmp(mode, "none")) {
|
||||
opal_output(0, "grpcomm_basic_xcast_mode: unknown option %s - using defaults", mode);
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
@ -119,7 +84,7 @@ int orte_grpcomm_basic_close(void)
|
||||
|
||||
int orte_grpcomm_basic_component_query(mca_base_module_t **module, int *priority)
|
||||
{
|
||||
/* we are the default, so set a low priority so we can be overridden */
|
||||
/* we are a lower-level default, so set a low priority so we can be overridden */
|
||||
*priority = 1;
|
||||
*module = (mca_base_module_t *)&orte_grpcomm_basic_module;
|
||||
return ORTE_SUCCESS;
|
||||
|
@ -27,19 +27,21 @@
|
||||
#endif /* HAVE_SYS_TIME_H */
|
||||
|
||||
#include "opal/threads/condition.h"
|
||||
#include "orte/util/show_help.h"
|
||||
#include "opal/util/bit_ops.h"
|
||||
|
||||
#include "opal/class/opal_hash_table.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "opal/dss/dss.h"
|
||||
|
||||
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/mca/ess/ess.h"
|
||||
#include "orte/mca/odls/odls_types.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/util/show_help.h"
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "orte/orted/orted.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
|
||||
#include "orte/mca/grpcomm/base/base.h"
|
||||
#include "grpcomm_basic.h"
|
||||
@ -53,6 +55,7 @@ static int xcast(orte_jobid_t job,
|
||||
orte_rml_tag_t tag);
|
||||
static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf);
|
||||
static int barrier(void);
|
||||
static int modex(opal_list_t *procs);
|
||||
|
||||
/* Module def */
|
||||
orte_grpcomm_base_module_t orte_grpcomm_basic_module = {
|
||||
@ -64,7 +67,7 @@ orte_grpcomm_base_module_t orte_grpcomm_basic_module = {
|
||||
barrier,
|
||||
orte_grpcomm_base_set_proc_attr,
|
||||
orte_grpcomm_base_get_proc_attr,
|
||||
orte_grpcomm_base_modex,
|
||||
modex,
|
||||
orte_grpcomm_base_purge_proc_attrs
|
||||
};
|
||||
|
||||
@ -447,3 +450,122 @@ static int allgather(opal_buffer_t *sbuf, opal_buffer_t *rbuf)
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/*** MODEX SECTION ***/
|
||||
static int modex(opal_list_t *procs)
|
||||
{
|
||||
opal_buffer_t buf, rbuf;
|
||||
orte_std_cntr_t i, num_procs;
|
||||
orte_std_cntr_t cnt;
|
||||
orte_process_name_t proc_name;
|
||||
int rc;
|
||||
int32_t arch;
|
||||
bool modex_reqd = false;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm:basic: modex entered",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* setup the buffer that will actually be sent */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
OBJ_CONSTRUCT(&rbuf, opal_buffer_t);
|
||||
|
||||
/* put our process name in the buffer so it can be unpacked later */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* decide if we need to add the architecture to the modex */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &orte_process_info.arch, 1, OPAL_UINT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* pack the entries we have received */
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(&buf, &modex_reqd))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (modex_reqd) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s grpcomm:basic:modex: executing allgather",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* exchange the buffer with the list of peers (if provided) or all my peers */
|
||||
if (NULL == procs) {
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather(&buf, &rbuf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
} else {
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm.allgather_list(procs, &buf, &rbuf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
|
||||
"%s grpcomm:basic:modex: processing modex info",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* process the results */
|
||||
/* extract the number of procs that put data in the buffer */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &num_procs, &cnt, ORTE_STD_CNTR))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_output,
|
||||
"%s grpcomm:basic:modex: received %ld data bytes from %ld procs",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long)(rbuf.pack_ptr - rbuf.unpack_ptr), (long)num_procs));
|
||||
|
||||
/* if the buffer doesn't have any more data, ignore it */
|
||||
if (0 >= (rbuf.pack_ptr - rbuf.unpack_ptr)) {
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* otherwise, process it */
|
||||
for (i=0; i < num_procs; i++) {
|
||||
/* unpack the process name */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &proc_name, &cnt, ORTE_NAME))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* unpack its architecture */
|
||||
cnt=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&rbuf, &arch, &cnt, OPAL_UINT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* update the arch in the ESS */
|
||||
if (ORTE_SUCCESS != (rc = orte_ess.update_arch(&proc_name, arch))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* update the modex database */
|
||||
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_update_modex_entries(&proc_name, &rbuf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&buf);
|
||||
OBJ_DESTRUCT(&rbuf);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
|
||||
"%s grpcomm:basic: modex completed",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user