806 строки
30 KiB
C
806 строки
30 KiB
C
/*
|
|
* Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2011 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2006 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2006 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* Copyright (c) 2006-2014 Cisco Systems, Inc. All rights reserved.
|
|
* Copyright (c) 2012 Los Alamos National Security, LLC. All rights
|
|
* reserved.
|
|
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved
|
|
* Copyright (c) 2014 Research Organization for Information Science
|
|
* and Technology (RIST). All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "ompi_config.h"
|
|
|
|
#include <string.h>
|
|
#include <strings.h>
|
|
|
|
#include "ompi/constants.h"
|
|
#include "opal/datatype/opal_convertor.h"
|
|
#include "opal/threads/mutex.h"
|
|
#include "opal/dss/dss.h"
|
|
#include "opal/util/arch.h"
|
|
#include "opal/util/show_help.h"
|
|
#include "opal/mca/dstore/dstore.h"
|
|
#include "opal/mca/hwloc/base/base.h"
|
|
#include "opal/mca/pmix/pmix.h"
|
|
|
|
#include "ompi/proc/proc.h"
|
|
#include "ompi/datatype/ompi_datatype.h"
|
|
#include "ompi/runtime/mpiruntime.h"
|
|
#include "ompi/runtime/params.h"
|
|
|
|
static opal_list_t ompi_proc_list;
|
|
static opal_mutex_t ompi_proc_lock;
|
|
ompi_proc_t* ompi_proc_local_proc = NULL;
|
|
|
|
static void ompi_proc_construct(ompi_proc_t* proc);
|
|
static void ompi_proc_destruct(ompi_proc_t* proc);
|
|
|
|
OBJ_CLASS_INSTANCE(
|
|
ompi_proc_t,
|
|
opal_proc_t,
|
|
ompi_proc_construct,
|
|
ompi_proc_destruct
|
|
);
|
|
|
|
|
|
void ompi_proc_construct(ompi_proc_t* proc)
|
|
{
|
|
bzero(proc->proc_endpoints, sizeof(proc->proc_endpoints));
|
|
|
|
/* By default all processors are supposedly having the same architecture as me. Thus,
|
|
* by default we run in a homogeneous environment. Later, when the RTE can tell us
|
|
* the arch of the remote nodes, we will have to set the convertors to the correct
|
|
* architecture.
|
|
*/
|
|
OBJ_RETAIN( ompi_mpi_local_convertor );
|
|
proc->super.proc_convertor = ompi_mpi_local_convertor;
|
|
}
|
|
|
|
|
|
void ompi_proc_destruct(ompi_proc_t* proc)
|
|
{
|
|
/* As all the convertors are created with OBJ_NEW we can just call OBJ_RELEASE. All, except
|
|
* the local convertor, will get destroyed at some point here. If the reference count is correct
|
|
* the local convertor (who has the reference count increased in the datatype) will not get
|
|
* destroyed here. It will be destroyed later when the ompi_datatype_finalize is called.
|
|
*/
|
|
OBJ_RELEASE( proc->super.proc_convertor );
|
|
if (NULL != proc->super.proc_hostname) {
|
|
free(proc->super.proc_hostname);
|
|
}
|
|
OPAL_THREAD_LOCK(&ompi_proc_lock);
|
|
opal_list_remove_item(&ompi_proc_list, (opal_list_item_t*)proc);
|
|
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
|
|
}
|
|
|
|
|
|
int ompi_proc_init(void)
|
|
{
|
|
ompi_vpid_t i;
|
|
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
|
|
int ret;
|
|
#endif
|
|
|
|
OBJ_CONSTRUCT(&ompi_proc_list, opal_list_t);
|
|
OBJ_CONSTRUCT(&ompi_proc_lock, opal_mutex_t);
|
|
|
|
/* create proc structures and find self */
|
|
for( i = 0; i < ompi_process_info.num_procs; i++ ) {
|
|
ompi_proc_t *proc = OBJ_NEW(ompi_proc_t);
|
|
opal_list_append(&ompi_proc_list, (opal_list_item_t*)proc);
|
|
|
|
OMPI_CAST_RTE_NAME(&proc->super.proc_name)->jobid = OMPI_PROC_MY_NAME->jobid;
|
|
OMPI_CAST_RTE_NAME(&proc->super.proc_name)->vpid = i;
|
|
|
|
if (i == OMPI_PROC_MY_NAME->vpid) {
|
|
ompi_proc_local_proc = proc;
|
|
proc->super.proc_flags = OPAL_PROC_ALL_LOCAL;
|
|
proc->super.proc_hostname = strdup(ompi_process_info.nodename);
|
|
proc->super.proc_arch = opal_local_arch;
|
|
/* Register the local proc with OPAL */
|
|
opal_proc_local_set(&proc->super);
|
|
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
|
|
/* add our arch to the modex */
|
|
OPAL_MODEX_SEND_STRING(ret, PMIX_SYNC_REQD, PMIX_REMOTE, OPAL_DSTORE_ARCH,
|
|
&proc->super.proc_arch, OPAL_UINT32);
|
|
if (OPAL_SUCCESS != ret) {
|
|
return ret;
|
|
}
|
|
#endif
|
|
}
|
|
}
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
|
|
/**
|
|
* The process creation is split into two steps. The second step
|
|
* is the important one, it sets the properties of the remote
|
|
* process, such as architecture, node name and locality flags.
|
|
*
|
|
* This function is to be called __only__ after the modex exchange
|
|
* has been performed, in order to allow the modex to carry the data
|
|
* instead of requiring the runtime to provide it.
|
|
*/
|
|
int ompi_proc_complete_init(void)
|
|
{
|
|
ompi_proc_t *proc;
|
|
int ret, errcode = OMPI_SUCCESS;
|
|
opal_list_t myvals;
|
|
opal_value_t *kv;
|
|
|
|
OPAL_THREAD_LOCK(&ompi_proc_lock);
|
|
|
|
OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
|
|
if (OMPI_CAST_RTE_NAME(&proc->super.proc_name)->vpid != OMPI_PROC_MY_NAME->vpid) {
|
|
/* get the locality information - do not use modex recv for
|
|
* this request as that will automatically cause the hostname
|
|
* to be loaded as well. All RTEs are required to provide this
|
|
* information at startup for procs on our node. Thus, not
|
|
* finding the info indicates that the proc is non-local.
|
|
*/
|
|
OBJ_CONSTRUCT(&myvals, opal_list_t);
|
|
if (OMPI_SUCCESS != (ret = opal_dstore.fetch(opal_dstore_internal,
|
|
(opal_identifier_t*)&proc->super.proc_name,
|
|
OPAL_DSTORE_LOCALITY, &myvals))) {
|
|
proc->super.proc_flags = OPAL_PROC_NON_LOCAL;
|
|
} else {
|
|
kv = (opal_value_t*)opal_list_get_first(&myvals);
|
|
proc->super.proc_flags = kv->data.uint16;
|
|
}
|
|
OPAL_LIST_DESTRUCT(&myvals);
|
|
|
|
if (ompi_process_info.num_procs < ompi_direct_modex_cutoff) {
|
|
/* IF the number of procs falls below the specified cutoff,
|
|
* then we assume the job is small enough that retrieving
|
|
* the hostname (which will typically cause retrieval of
|
|
* ALL modex info for this proc) will have no appreciable
|
|
* impact on launch scaling
|
|
*/
|
|
OPAL_MODEX_RECV_VALUE(ret, OPAL_DSTORE_HOSTNAME, (opal_proc_t*)&proc->super,
|
|
(char**)&(proc->super.proc_hostname), OPAL_STRING);
|
|
if (OPAL_SUCCESS != ret) {
|
|
errcode = ret;
|
|
break;
|
|
}
|
|
} else {
|
|
/* just set the hostname to NULL for now - we'll fill it in
|
|
* as modex_recv's are called for procs we will talk to, thus
|
|
* avoiding retrieval of ALL modex info for this proc until
|
|
* required. Transports that delay calling modex_recv until
|
|
* first message will therefore scale better than those that
|
|
* call modex_recv on all procs during init.
|
|
*/
|
|
proc->super.proc_hostname = NULL;
|
|
}
|
|
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
|
|
/* get the remote architecture - this might force a modex except
|
|
* for those environments where the RM provides it */
|
|
{
|
|
uint32_t *ui32ptr;
|
|
ui32ptr = &(proc->super.proc_arch);
|
|
OPAL_MODEX_RECV_VALUE(ret, OPAL_DSTORE_ARCH, (opal_proc_t*)&proc->super,
|
|
(void**)&ui32ptr, OPAL_UINT32);
|
|
if (OPAL_SUCCESS == ret) {
|
|
/* if arch is different than mine, create a new convertor for this proc */
|
|
if (proc->super.proc_arch != opal_local_arch) {
|
|
OBJ_RELEASE(proc->super.proc_convertor);
|
|
proc->super.proc_convertor = opal_convertor_create(proc->super.proc_arch, 0);
|
|
}
|
|
} else if (OMPI_ERR_NOT_IMPLEMENTED == ret) {
|
|
proc->super.proc_arch = opal_local_arch;
|
|
} else {
|
|
errcode = ret;
|
|
break;
|
|
}
|
|
}
|
|
#else
|
|
/* must be same arch as my own */
|
|
proc->super.proc_arch = opal_local_arch;
|
|
#endif
|
|
}
|
|
}
|
|
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
|
|
return errcode;
|
|
}
|
|
|
|
int ompi_proc_finalize (void)
|
|
{
|
|
opal_list_item_t *item;
|
|
|
|
/* Unregister the local proc from OPAL */
|
|
opal_proc_local_set(NULL);
|
|
|
|
/* remove all items from list and destroy them. Since we cannot know
|
|
* the reference count of the procs for certain, it is possible that
|
|
* a single OBJ_RELEASE won't drive the count to zero, and hence will
|
|
* not release the memory. Accordingly, we cycle through the list here,
|
|
* calling release on each item.
|
|
*
|
|
* This will cycle until it forces the reference count of each item
|
|
* to zero, thus causing the destructor to run - which will remove
|
|
* the item from the list!
|
|
*
|
|
* We cannot do this under the thread lock as the destructor will
|
|
* call it when removing the item from the list. However, this function
|
|
* is ONLY called from MPI_Finalize, and all threads are prohibited from
|
|
* calling an MPI function once ANY thread has called MPI_Finalize. Of
|
|
* course, multiple threads are allowed to call MPI_Finalize, so this
|
|
* function may get called multiple times by various threads. We believe
|
|
* it is thread safe to do so...though it may not -appear- to be so
|
|
* without walking through the entire list/destructor sequence.
|
|
*/
|
|
while (opal_list_get_end(&ompi_proc_list) != (item = opal_list_get_first(&ompi_proc_list))) {
|
|
OBJ_RELEASE(item);
|
|
}
|
|
/* now destruct the list and thread lock */
|
|
OBJ_DESTRUCT(&ompi_proc_list);
|
|
OBJ_DESTRUCT(&ompi_proc_lock);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
ompi_proc_t** ompi_proc_world(size_t *size)
|
|
{
|
|
ompi_proc_t **procs;
|
|
ompi_proc_t *proc;
|
|
size_t count = 0;
|
|
ompi_rte_cmp_bitmask_t mask;
|
|
ompi_process_name_t my_name;
|
|
|
|
/* check bozo case */
|
|
if (NULL == ompi_proc_local_proc) {
|
|
return NULL;
|
|
}
|
|
mask = OMPI_RTE_CMP_JOBID;
|
|
my_name = *OMPI_CAST_RTE_NAME(&ompi_proc_local_proc->super.proc_name);
|
|
|
|
/* First count how many match this jobid */
|
|
OPAL_THREAD_LOCK(&ompi_proc_lock);
|
|
for (proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list);
|
|
proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list);
|
|
proc = (ompi_proc_t*)opal_list_get_next(proc)) {
|
|
if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, OMPI_CAST_RTE_NAME(&proc->super.proc_name), &my_name)) {
|
|
++count;
|
|
}
|
|
}
|
|
|
|
/* allocate an array */
|
|
procs = (ompi_proc_t**) malloc(count * sizeof(ompi_proc_t*));
|
|
if (NULL == procs) {
|
|
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
|
|
return NULL;
|
|
}
|
|
|
|
/* now save only the procs that match this jobid */
|
|
count = 0;
|
|
for (proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list);
|
|
proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list);
|
|
proc = (ompi_proc_t*)opal_list_get_next(proc)) {
|
|
if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, &my_name)) {
|
|
/* DO NOT RETAIN THIS OBJECT - the reference count on this
|
|
* object will be adjusted by external callers. The intent
|
|
* here is to allow the reference count to drop to zero if
|
|
* the app no longer desires to communicate with this proc.
|
|
* For example, the proc may call comm_disconnect on all
|
|
* communicators involving this proc. In such cases, we want
|
|
* the proc object to be removed from the list. By not incrementing
|
|
* the reference count here, we allow this to occur.
|
|
*
|
|
* We don't implement that yet, but we are still safe for now as
|
|
* the OBJ_NEW in ompi_proc_init owns the initial reference
|
|
* count which cannot be released until ompi_proc_finalize is
|
|
* called.
|
|
*/
|
|
procs[count++] = proc;
|
|
}
|
|
}
|
|
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
|
|
|
|
*size = count;
|
|
return procs;
|
|
}
|
|
|
|
|
|
ompi_proc_t** ompi_proc_all(size_t* size)
|
|
{
|
|
ompi_proc_t **procs =
|
|
(ompi_proc_t**) malloc(opal_list_get_size(&ompi_proc_list) * sizeof(ompi_proc_t*));
|
|
ompi_proc_t *proc;
|
|
size_t count = 0;
|
|
|
|
if (NULL == procs) {
|
|
return NULL;
|
|
}
|
|
|
|
OPAL_THREAD_LOCK(&ompi_proc_lock);
|
|
for(proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list);
|
|
proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list);
|
|
proc = (ompi_proc_t*)opal_list_get_next(proc)) {
|
|
/* We know this isn't consistent with the behavior in ompi_proc_world,
|
|
* but we are leaving the RETAIN for now because the code using this function
|
|
* assumes that the results need to be released when done. It will
|
|
* be cleaned up later as the "fix" will impact other places in
|
|
* the code
|
|
*/
|
|
OBJ_RETAIN(proc);
|
|
procs[count++] = proc;
|
|
}
|
|
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
|
|
*size = count;
|
|
return procs;
|
|
}
|
|
|
|
|
|
ompi_proc_t** ompi_proc_self(size_t* size)
|
|
{
|
|
ompi_proc_t **procs = (ompi_proc_t**) malloc(sizeof(ompi_proc_t*));
|
|
if (NULL == procs) {
|
|
return NULL;
|
|
}
|
|
/* We know this isn't consistent with the behavior in ompi_proc_world,
|
|
* but we are leaving the RETAIN for now because the code using this function
|
|
* assumes that the results need to be released when done. It will
|
|
* be cleaned up later as the "fix" will impact other places in
|
|
* the code
|
|
*/
|
|
OBJ_RETAIN(ompi_proc_local_proc);
|
|
*procs = ompi_proc_local_proc;
|
|
*size = 1;
|
|
return procs;
|
|
}
|
|
|
|
ompi_proc_t * ompi_proc_find ( const ompi_process_name_t * name )
|
|
{
|
|
ompi_proc_t *proc, *rproc=NULL;
|
|
ompi_rte_cmp_bitmask_t mask;
|
|
|
|
/* return the proc-struct which matches this jobid+process id */
|
|
mask = OMPI_RTE_CMP_JOBID | OMPI_RTE_CMP_VPID;
|
|
OPAL_THREAD_LOCK(&ompi_proc_lock);
|
|
for(proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list);
|
|
proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list);
|
|
proc = (ompi_proc_t*)opal_list_get_next(proc)) {
|
|
if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, name)) {
|
|
rproc = proc;
|
|
break;
|
|
}
|
|
}
|
|
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
|
|
|
|
return rproc;
|
|
}
|
|
|
|
|
|
int ompi_proc_refresh(void)
|
|
{
|
|
ompi_proc_t *proc = NULL;
|
|
opal_list_item_t *item = NULL;
|
|
ompi_vpid_t i = 0;
|
|
int ret=OMPI_SUCCESS;
|
|
opal_list_t myvals;
|
|
opal_value_t *kv;
|
|
|
|
OPAL_THREAD_LOCK(&ompi_proc_lock);
|
|
|
|
for( item = opal_list_get_first(&ompi_proc_list), i = 0;
|
|
item != opal_list_get_end(&ompi_proc_list);
|
|
item = opal_list_get_next(item), ++i ) {
|
|
proc = (ompi_proc_t*)item;
|
|
|
|
/* Does not change: proc->super.proc_name.vpid */
|
|
OMPI_CAST_RTE_NAME(&proc->super.proc_name)->jobid = OMPI_PROC_MY_NAME->jobid;
|
|
|
|
/* Make sure to clear the local flag before we set it below */
|
|
proc->super.proc_flags = 0;
|
|
|
|
if (i == OMPI_PROC_MY_NAME->vpid) {
|
|
ompi_proc_local_proc = proc;
|
|
proc->super.proc_flags = OPAL_PROC_ALL_LOCAL;
|
|
proc->super.proc_hostname = ompi_process_info.nodename;
|
|
proc->super.proc_arch = opal_local_arch;
|
|
opal_proc_local_set(&proc->super);
|
|
} else {
|
|
/* get the locality information - do not use modex recv for
|
|
* this request as that will automatically cause the hostname
|
|
* to be loaded as well. All RTEs are required to provide this
|
|
* information at startup for procs on our node. Thus, not
|
|
* finding the info indicates that the proc is non-local.
|
|
*/
|
|
OBJ_CONSTRUCT(&myvals, opal_list_t);
|
|
if (OMPI_SUCCESS != (ret = opal_dstore.fetch(opal_dstore_internal,
|
|
(opal_identifier_t*)&proc->super.proc_name,
|
|
OPAL_DSTORE_LOCALITY, &myvals))) {
|
|
proc->super.proc_flags = OPAL_PROC_NON_LOCAL;
|
|
} else {
|
|
kv = (opal_value_t*)opal_list_get_first(&myvals);
|
|
proc->super.proc_flags = kv->data.uint16;
|
|
}
|
|
OPAL_LIST_DESTRUCT(&myvals);
|
|
|
|
if (ompi_process_info.num_procs < ompi_direct_modex_cutoff) {
|
|
/* IF the number of procs falls below the specified cutoff,
|
|
* then we assume the job is small enough that retrieving
|
|
* the hostname (which will typically cause retrieval of
|
|
* ALL modex info for this proc) will have no appreciable
|
|
* impact on launch scaling
|
|
*/
|
|
OPAL_MODEX_RECV_VALUE(ret, OPAL_DSTORE_HOSTNAME, (opal_proc_t*)&proc->super,
|
|
(char**)&(proc->super.proc_hostname), OPAL_STRING);
|
|
if (OMPI_SUCCESS != ret) {
|
|
break;
|
|
}
|
|
} else {
|
|
/* just set the hostname to NULL for now - we'll fill it in
|
|
* as modex_recv's are called for procs we will talk to, thus
|
|
* avoiding retrieval of ALL modex info for this proc until
|
|
* required. Transports that delay calling modex_recv until
|
|
* first message will therefore scale better than those that
|
|
* call modex_recv on all procs during init.
|
|
*/
|
|
proc->super.proc_hostname = NULL;
|
|
}
|
|
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
|
|
{
|
|
/* get the remote architecture */
|
|
uint32_t* uiptr = &(proc->super.proc_arch);
|
|
OPAL_MODEX_RECV_VALUE(ret, OPAL_DSTORE_ARCH, (opal_proc_t*)&proc->super,
|
|
(void**)&uiptr, OPAL_UINT32);
|
|
if (OMPI_SUCCESS != ret) {
|
|
break;
|
|
}
|
|
/* if arch is different than mine, create a new convertor for this proc */
|
|
if (proc->super.proc_arch != opal_local_arch) {
|
|
OBJ_RELEASE(proc->super.proc_convertor);
|
|
proc->super.proc_convertor = opal_convertor_create(proc->super.proc_arch, 0);
|
|
}
|
|
}
|
|
#else
|
|
/* must be same arch as my own */
|
|
proc->super.proc_arch = opal_local_arch;
|
|
#endif
|
|
}
|
|
}
|
|
|
|
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
|
|
|
|
return ret;
|
|
}
|
|
|
|
int
|
|
ompi_proc_pack(ompi_proc_t **proclist, int proclistsize,
|
|
bool full_info,
|
|
opal_buffer_t* buf)
|
|
{
|
|
int i, rc;
|
|
|
|
OPAL_THREAD_LOCK(&ompi_proc_lock);
|
|
|
|
/* cycle through the provided array, packing the OMPI level
|
|
* data for each proc. This data may or may not be included
|
|
* in any subsequent modex operation, so we include it here
|
|
* to ensure completion of a connect/accept handshake. See
|
|
* the ompi/mca/dpm framework for an example of where and how
|
|
* this info is used.
|
|
*
|
|
* Eventually, we will review the procedures that call this
|
|
* function to see if duplication of communication can be
|
|
* reduced. For now, just go ahead and pack the info so it
|
|
* can be sent.
|
|
*/
|
|
for (i=0; i<proclistsize; i++) {
|
|
rc = opal_dss.pack(buf, &(proclist[i]->super.proc_name), 1, OMPI_NAME);
|
|
if(rc != OPAL_SUCCESS) {
|
|
OMPI_ERROR_LOG(rc);
|
|
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
|
|
return rc;
|
|
}
|
|
if (full_info) {
|
|
int32_t num_entries;
|
|
opal_value_t *kv;
|
|
opal_list_t data;
|
|
|
|
/* fetch all info we know about the peer - while
|
|
* the remote procs may already know some of it, we cannot
|
|
* be certain they do. So we must include a full dump of
|
|
* everything we know about this proc
|
|
*/
|
|
OBJ_CONSTRUCT(&data, opal_list_t);
|
|
rc = opal_dstore.fetch(opal_dstore_internal,
|
|
(opal_identifier_t*)&proclist[i]->super.proc_name,
|
|
NULL, &data);
|
|
if (OPAL_SUCCESS != rc) {
|
|
OMPI_ERROR_LOG(rc);
|
|
num_entries = 0;
|
|
} else {
|
|
/* count the number of entries we will send */
|
|
num_entries = opal_list_get_size(&data);
|
|
}
|
|
|
|
/* put the number of entries into the buffer */
|
|
rc = opal_dss.pack(buf, &num_entries, 1, OPAL_INT32);
|
|
if (OPAL_SUCCESS != rc) {
|
|
OMPI_ERROR_LOG(rc);
|
|
break;
|
|
}
|
|
|
|
/* if there are entries, store them */
|
|
while (NULL != (kv = (opal_value_t*)opal_list_remove_first(&data))) {
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &kv, 1, OPAL_VALUE))) {
|
|
OMPI_ERROR_LOG(rc);
|
|
break;
|
|
}
|
|
OBJ_RELEASE(kv);
|
|
}
|
|
OBJ_DESTRUCT(&data);
|
|
|
|
} else {
|
|
rc = opal_dss.pack(buf, &(proclist[i]->super.proc_arch), 1, OPAL_UINT32);
|
|
if(rc != OPAL_SUCCESS) {
|
|
OMPI_ERROR_LOG(rc);
|
|
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
|
|
return rc;
|
|
}
|
|
rc = opal_dss.pack(buf, &(proclist[i]->super.proc_hostname), 1, OPAL_STRING);
|
|
if(rc != OPAL_SUCCESS) {
|
|
OMPI_ERROR_LOG(rc);
|
|
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
|
|
return rc;
|
|
}
|
|
}
|
|
}
|
|
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
static ompi_proc_t *
|
|
ompi_proc_find_and_add(const ompi_process_name_t * name, bool* isnew)
|
|
{
|
|
ompi_proc_t *proc, *rproc = NULL;
|
|
ompi_rte_cmp_bitmask_t mask;
|
|
|
|
/* return the proc-struct which matches this jobid+process id */
|
|
mask = OMPI_RTE_CMP_JOBID | OMPI_RTE_CMP_VPID;
|
|
OPAL_THREAD_LOCK(&ompi_proc_lock);
|
|
for(proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list);
|
|
proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list);
|
|
proc = (ompi_proc_t*)opal_list_get_next(proc)) {
|
|
if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, name)) {
|
|
rproc = proc;
|
|
*isnew = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
/* if we didn't find this proc in the list, create a new
|
|
* proc_t and append it to the list
|
|
*/
|
|
if (NULL == rproc) {
|
|
*isnew = true;
|
|
rproc = OBJ_NEW(ompi_proc_t);
|
|
if (NULL != rproc) {
|
|
opal_list_append(&ompi_proc_list, (opal_list_item_t*)rproc);
|
|
*OMPI_CAST_RTE_NAME(&rproc->super.proc_name) = *name;
|
|
}
|
|
/* caller had better fill in the rest of the proc, or there's
|
|
going to be pain later... */
|
|
}
|
|
|
|
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
|
|
|
|
return rproc;
|
|
}
|
|
|
|
|
|
int
|
|
ompi_proc_unpack(opal_buffer_t* buf,
|
|
int proclistsize, ompi_proc_t ***proclist,
|
|
bool full_info,
|
|
int *newproclistsize, ompi_proc_t ***newproclist)
|
|
{
|
|
int i;
|
|
size_t newprocs_len = 0;
|
|
ompi_proc_t **plist=NULL, **newprocs = NULL;
|
|
opal_list_t myvals;
|
|
opal_value_t *kv;
|
|
|
|
/* do not free plist *ever*, since it is used in the remote group
|
|
structure of a communicator */
|
|
plist = (ompi_proc_t **) calloc (proclistsize, sizeof (ompi_proc_t *));
|
|
if ( NULL == plist ) {
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
/* free this on the way out */
|
|
newprocs = (ompi_proc_t **) calloc (proclistsize, sizeof (ompi_proc_t *));
|
|
if (NULL == newprocs) {
|
|
free(plist);
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
/* cycle through the array of provided procs and unpack
|
|
* their info - as packed by ompi_proc_pack
|
|
*/
|
|
for ( i=0; i<proclistsize; i++ ){
|
|
int32_t count=1;
|
|
ompi_process_name_t new_name;
|
|
uint32_t new_arch;
|
|
char *new_hostname;
|
|
bool isnew = false;
|
|
int rc;
|
|
|
|
rc = opal_dss.unpack(buf, &new_name, &count, OMPI_NAME);
|
|
if (rc != OPAL_SUCCESS) {
|
|
OMPI_ERROR_LOG(rc);
|
|
free(plist);
|
|
free(newprocs);
|
|
return rc;
|
|
}
|
|
if (!full_info) {
|
|
rc = opal_dss.unpack(buf, &new_arch, &count, OPAL_UINT32);
|
|
if (rc != OPAL_SUCCESS) {
|
|
OMPI_ERROR_LOG(rc);
|
|
free(plist);
|
|
free(newprocs);
|
|
return rc;
|
|
}
|
|
rc = opal_dss.unpack(buf, &new_hostname, &count, OPAL_STRING);
|
|
if (rc != OPAL_SUCCESS) {
|
|
OMPI_ERROR_LOG(rc);
|
|
free(plist);
|
|
free(newprocs);
|
|
return rc;
|
|
}
|
|
}
|
|
/* see if this proc is already on our ompi_proc_list */
|
|
plist[i] = ompi_proc_find_and_add(&new_name, &isnew);
|
|
if (isnew) {
|
|
/* if not, then it was added, so update the values
|
|
* in the proc_t struct with the info that was passed
|
|
* to us
|
|
*/
|
|
newprocs[newprocs_len++] = plist[i];
|
|
|
|
if (full_info) {
|
|
int32_t num_recvd_entries;
|
|
int32_t cnt;
|
|
int32_t j;
|
|
|
|
/* unpack the number of entries for this proc */
|
|
cnt = 1;
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &num_recvd_entries, &cnt, OPAL_INT32))) {
|
|
OMPI_ERROR_LOG(rc);
|
|
break;
|
|
}
|
|
|
|
/*
|
|
* Extract the attribute names and values
|
|
*/
|
|
for (j = 0; j < num_recvd_entries; j++) {
|
|
cnt = 1;
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &kv, &cnt, OPAL_VALUE))) {
|
|
OMPI_ERROR_LOG(rc);
|
|
break;
|
|
}
|
|
/* if this is me, ignore the data - we already have it in the db */
|
|
if (OPAL_EQUAL != ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
|
|
OMPI_PROC_MY_NAME, &new_name)) {
|
|
/* store it in the database */
|
|
if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_internal,
|
|
(opal_identifier_t*)&new_name, kv))) {
|
|
OMPI_ERROR_LOG(rc);
|
|
}
|
|
}
|
|
OBJ_RELEASE(kv);
|
|
}
|
|
/* RHC: compute locality */
|
|
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
|
|
OBJ_CONSTRUCT(&myvals, opal_list_t);
|
|
rc = opal_dstore.fetch(opal_dstore_internal,
|
|
(opal_identifier_t*)&new_name,
|
|
OPAL_DSTORE_ARCH, &myvals);
|
|
if( OPAL_SUCCESS == rc ) {
|
|
kv = (opal_value_t*)opal_list_get_first(&myvals);
|
|
new_arch = kv->data.uint32;
|
|
} else {
|
|
new_arch = opal_local_arch;
|
|
}
|
|
OPAL_LIST_DESTRUCT(&myvals);
|
|
#else
|
|
new_arch = opal_local_arch;
|
|
#endif
|
|
if (ompi_process_info.num_procs < ompi_direct_modex_cutoff) {
|
|
/* retrieve the hostname */
|
|
OBJ_CONSTRUCT(&myvals, opal_list_t);
|
|
rc = opal_dstore.fetch(opal_dstore_internal,
|
|
(opal_identifier_t*)&new_name,
|
|
OPAL_DSTORE_HOSTNAME, &myvals);
|
|
if( OPAL_SUCCESS == rc ) {
|
|
kv = (opal_value_t*)opal_list_get_first(&myvals);
|
|
new_hostname = strdup(kv->data.string);
|
|
} else {
|
|
new_hostname = NULL;
|
|
}
|
|
OPAL_LIST_DESTRUCT(&myvals);
|
|
} else {
|
|
/* just set the hostname to NULL for now - we'll fill it in
|
|
* as modex_recv's are called for procs we will talk to
|
|
*/
|
|
new_hostname = NULL;
|
|
}
|
|
}
|
|
/* update all the values */
|
|
plist[i]->super.proc_arch = new_arch;
|
|
/* if arch is different than mine, create a new convertor for this proc */
|
|
if (plist[i]->super.proc_arch != opal_local_arch) {
|
|
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
|
|
OBJ_RELEASE(plist[i]->super.proc_convertor);
|
|
plist[i]->super.proc_convertor = opal_convertor_create(plist[i]->super.proc_arch, 0);
|
|
#else
|
|
opal_show_help("help-mpi-runtime.txt",
|
|
"heterogeneous-support-unavailable",
|
|
true, ompi_process_info.nodename,
|
|
new_hostname == NULL ? "<hostname unavailable>" :
|
|
new_hostname);
|
|
free(plist);
|
|
free(newprocs);
|
|
return OMPI_ERR_NOT_SUPPORTED;
|
|
#endif
|
|
}
|
|
|
|
if (0 == strcmp(ompi_proc_local_proc->super.proc_hostname, new_hostname)) {
|
|
plist[i]->super.proc_flags |= (OPAL_PROC_ON_NODE | OPAL_PROC_ON_CU | OPAL_PROC_ON_CLUSTER);
|
|
}
|
|
|
|
/* Save the hostname */
|
|
plist[i]->super.proc_hostname = new_hostname;
|
|
|
|
} else {
|
|
if (full_info) {
|
|
int32_t num_recvd_entries;
|
|
int32_t j, cnt;
|
|
|
|
/* discard all keys: they are already locally known */
|
|
cnt = 1;
|
|
if (OPAL_SUCCESS == (rc = opal_dss.unpack(buf, &num_recvd_entries, &cnt, OPAL_INT32))) {
|
|
for (j = 0; j < num_recvd_entries; j++) {
|
|
opal_value_t *kv;
|
|
cnt = 1;
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &kv, &cnt, OPAL_VALUE))) {
|
|
OMPI_ERROR_LOG(rc);
|
|
continue;
|
|
}
|
|
OBJ_RELEASE(kv);
|
|
}
|
|
} else {
|
|
OMPI_ERROR_LOG(rc);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (NULL != newproclistsize) *newproclistsize = newprocs_len;
|
|
if (NULL != newproclist) {
|
|
*newproclist = newprocs;
|
|
} else if (newprocs != NULL) {
|
|
free(newprocs);
|
|
}
|
|
|
|
*proclist = plist;
|
|
return OMPI_SUCCESS;
|
|
}
|