1
1

Move modex from pml base to general ompi runtime, sicne it's used by more

than just the PML/BTLs these days.  Also clean up the code so that it
handles the situation where not all nodes register information for a given
node (rather than just spinning until that node sends information, like
we do today).

Includes r15234 and r15265 from the /tmp/bwb-modex branch.

This commit was SVN r15310.

The following SVN revisions from the original message are invalid or
inconsistent and therefore were not cross-referenced:
  r15234
  r15265
Этот коммит содержится в:
Brian Barrett 2007-07-09 17:16:34 +00:00
родитель b212cf4dae
Коммит 8b9e8054fd
37 изменённых файлов: 1145 добавлений и 1034 удалений

Просмотреть файл

@ -35,7 +35,6 @@
#include "ompi/mca/bml/base/bml_base_btl.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "orte/mca/smr/smr.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/gpr/gpr.h"

Просмотреть файл

@ -45,8 +45,8 @@
* information required by its peers. An example would be the TCP
* listen port opened by the TCP module for incoming connection
* requests. This information is published to peers via the
* mca_pml_base_modex_send() interface. Note that peer information is not
* guaranteed to be available via mca_pml_base_modex_recv() during the
* ompi_modex_send() interface. Note that peer information is not
* guaranteed to be available via ompi_modex_recv() during the
* module's init function. However, it will be available during
* BTL selection (mca_btl_base_add_proc_fn_t()).
*
@ -267,7 +267,7 @@ typedef struct mca_btl_base_header_t mca_btl_base_header_t;
* the physical devices that are available for the given transport,
* and create a BTL module to represent each device. Any addressing
* information required by peers to reach the device should be published
* during this function via the mca_pml_base_modex_send() interface.
* during this function via the ompi_modex_send() interface.
*
*/
@ -346,9 +346,9 @@ typedef int (*mca_btl_base_module_finalize_fn_t)(
*
* The mca_btl_base_module_add_procs_fn_t() is called by the PML to
* determine the set of BTLs that should be used to reach each process.
* Any addressing information exported by the peer via the mca_pml_base_modex_send()
* Any addressing information exported by the peer via the ompi_modex_send()
* function should be available during this call via the corresponding
* mca_pml_base_modex_recv() function. The BTL may utilize this information to
* ompi_modex_recv() function. The BTL may utilize this information to
* determine reachability of each peer process.
*
* For each process that is reachable by the BTL, the bit corresponding to the index

Просмотреть файл

@ -44,7 +44,7 @@
#include "ompi/datatype/convertor.h"
#include "btl_gm_endpoint.h"
#include "orte/util/proc_info.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#if OMPI_ENABLE_PROGRESS_THREADS
@ -476,7 +476,7 @@ mca_btl_gm_modex_send(void)
MCA_BTL_GM_ADDR_HTON(addrs[i]);
}
}
rc = mca_pml_base_modex_send (&mca_btl_gm_component.super.btl_version, addrs, size);
rc = ompi_modex_send (&mca_btl_gm_component.super.btl_version, addrs, size);
if (NULL != addrs) {
free (addrs);
}

Просмотреть файл

@ -19,7 +19,7 @@
#include "ompi_config.h"
#include "opal/class/opal_hash_table.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#ifdef HAVE_INTTYPES_H
#include <inttypes.h>
@ -122,13 +122,13 @@ mca_btl_gm_proc_t* mca_btl_gm_proc_create(ompi_proc_t* ompi_proc)
gm_proc->proc_guid = ompi_proc->proc_name;
/* query for the peer address info */
rc = mca_pml_base_modex_recv(
rc = ompi_modex_recv(
&mca_btl_gm_component.super.btl_version,
ompi_proc,
(void*)&gm_proc->proc_addrs,
&size);
if(OMPI_SUCCESS != rc) {
opal_output(0, "[%s:%d] mca_pml_base_modex_recv failed for peer [%ld,%ld,%ld]",
opal_output(0, "[%s:%d] ompi_modex_recv failed for peer [%ld,%ld,%ld]",
__FILE__,__LINE__,ORTE_NAME_ARGS(&ompi_proc->proc_name));
OBJ_RELEASE(gm_proc);
return NULL;

Просмотреть файл

@ -49,7 +49,7 @@
#include "ompi/datatype/convertor.h"
#include "ompi/mca/mpool/rdma/mpool_rdma.h"
#include "btl_mvapi_endpoint.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
static int mvapi_reg_mr(void *reg_data, void *base, size_t size,
mca_mpool_base_registration_t *reg);
@ -277,7 +277,7 @@ mca_btl_mvapi_modex_send(void)
ports[i] = btl->port_info;
}
}
rc = mca_pml_base_modex_send (&mca_btl_mvapi_component.super.btl_version, ports, size);
rc = ompi_modex_send (&mca_btl_mvapi_component.super.btl_version, ports, size);
if (NULL != ports) {
free (ports);
}

Просмотреть файл

@ -19,7 +19,7 @@
#include "ompi_config.h"
#include "opal/class/opal_hash_table.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "btl_mvapi.h"
#include "btl_mvapi_proc.h"
@ -130,7 +130,7 @@ mca_btl_mvapi_proc_t* mca_btl_mvapi_proc_create(ompi_proc_t* ompi_proc)
mvapi_proc->proc_guid = ompi_proc->proc_name;
/* query for the peer address info */
rc = mca_pml_base_modex_recv(
rc = ompi_modex_recv(
&mca_btl_mvapi_component.super.btl_version,
ompi_proc,
(void*)&mvapi_proc->proc_ports,
@ -140,7 +140,7 @@ mca_btl_mvapi_proc_t* mca_btl_mvapi_proc_create(ompi_proc_t* ompi_proc)
if(OMPI_SUCCESS != rc) {
opal_output(0, "[%s:%d] mca_pml_base_modex_recv failed for peer [%ld,%ld,%ld]",
opal_output(0, "[%s:%d] ompi_modex_recv failed for peer [%ld,%ld,%ld]",
__FILE__,__LINE__,ORTE_NAME_ARGS(&ompi_proc->proc_name));
OBJ_RELEASE(mvapi_proc);
return NULL;

Просмотреть файл

@ -28,7 +28,7 @@
#include "opal/mca/base/mca_base_param.h"
#include "orte/mca/errmgr/errmgr.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "ompi/mca/common/mx/common_mx.h"
@ -427,7 +427,7 @@ mca_btl_base_module_t** mca_btl_mx_component_init(int *num_btl_modules,
if (enable_progress_threads) {
opal_output( 0, "mca_btl_mx_component_init: progress threads requested but not supported");
mca_pml_base_modex_send(&mca_btl_mx_component.super.btl_version,
ompi_modex_send(&mca_btl_mx_component.super.btl_version,
NULL, 0);
return NULL;
}
@ -446,7 +446,8 @@ mca_btl_base_module_t** mca_btl_mx_component_init(int *num_btl_modules,
/* First check if MX is available ... */
if( OMPI_SUCCESS != ompi_common_mx_initialize() ) {
mca_pml_base_modex_send( &mca_btl_mx_component.super.btl_version, NULL, 0 );
ompi_modex_send(&mca_btl_mx_component.super.btl_version,
NULL, 0);
return NULL;
}
@ -480,14 +481,14 @@ mca_btl_base_module_t** mca_btl_mx_component_init(int *num_btl_modules,
&mca_btl_mx_component.mx_num_btls, sizeof(uint32_t))) != MX_SUCCESS ) {
opal_output( 0, "mca_btl_mx_component_init: mx_get_info(MX_NIC_COUNT) failed with status %d(%s)\n",
status, mx_strerror(status) );
mca_pml_base_modex_send(&mca_btl_mx_component.super.btl_version,
ompi_modex_send(&mca_btl_mx_component.super.btl_version,
NULL, 0);
return NULL;
}
if (0 == mca_btl_mx_component.mx_num_btls) {
mca_btl_base_error_no_nics("Myrinet/MX", "NIC");
mca_pml_base_modex_send(&mca_btl_mx_component.super.btl_version,
ompi_modex_send(&mca_btl_mx_component.super.btl_version,
NULL, 0);
return NULL;
}
@ -552,8 +553,8 @@ mca_btl_base_module_t** mca_btl_mx_component_init(int *num_btl_modules,
}
/* publish the MX addresses via the MCA framework */
mca_pml_base_modex_send( &mca_btl_mx_component.super.btl_version, mx_addrs,
sizeof(mca_btl_mx_addr_t) * mca_btl_mx_component.mx_num_btls );
ompi_modex_send(&mca_btl_mx_component.super.btl_version, mx_addrs,
sizeof(mca_btl_mx_addr_t) * mca_btl_mx_component.mx_num_btls);
free( nic_addrs );
free( mx_addrs );

Просмотреть файл

@ -19,7 +19,7 @@
#include "ompi_config.h"
#include "opal/class/opal_hash_table.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "btl_mx.h"
#include "btl_mx_proc.h"
@ -118,7 +118,7 @@ mca_btl_mx_proc_t* mca_btl_mx_proc_create(ompi_proc_t* ompi_proc)
}
/* query for the peer address info */
rc = mca_pml_base_modex_recv( &mca_btl_mx_component.super.btl_version,
rc = ompi_modex_recv( &mca_btl_mx_component.super.btl_version,
ompi_proc, (void*)&mx_peers, &size );
if( OMPI_SUCCESS != rc ) {
opal_output( 0, "mca_pml_base_modex_recv failed for peer [%ld,%ld,%ld]",

Просмотреть файл

@ -32,7 +32,7 @@
#include "orte/mca/errmgr/errmgr.h"
#include "ompi/mca/btl/base/base.h"
#include "ompi/mca/mpool/rdma/mpool_rdma.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "btl_ofud.h"
#include "btl_ofud_frag.h"
@ -224,7 +224,7 @@ static int mca_btl_ud_modex_send(void)
}
}
rc = mca_pml_base_modex_send(
rc = ompi_modex_send(
&mca_btl_ofud_component.super.btl_version, addrs, size);
if(NULL != addrs) {
free(addrs);

Просмотреть файл

@ -18,7 +18,9 @@
* $HEADER$
*/
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi_config.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "btl_ofud.h"
#include "btl_ofud_proc.h"
@ -119,13 +121,13 @@ mca_btl_ud_proc_t* mca_btl_ud_proc_create(ompi_proc_t* ompi_proc)
/* query for the peer address info */
rc = mca_pml_base_modex_recv(&mca_btl_ofud_component.super.btl_version,
rc = ompi_modex_recv(&mca_btl_ofud_component.super.btl_version,
ompi_proc, (void*)&module_proc->proc_addrs,
&size);
if(OMPI_SUCCESS != rc) {
opal_output(0,
"[%s:%d] mca_pml_base_modex_recv failed for peer [%ld,%ld,%ld]",
"[%s:%d] ompi_modex_recv failed for peer [%ld,%ld,%ld]",
__FILE__,__LINE__,ORTE_NAME_ARGS(&ompi_proc->proc_name));
OBJ_RELEASE(module_proc);
return NULL;

Просмотреть файл

@ -54,7 +54,7 @@
#include <errno.h>
#include <string.h> /* for strerror()*/
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
/*
* Local functions
@ -191,7 +191,7 @@ static int btl_openib_modex_send(void)
#endif
}
}
rc = mca_pml_base_modex_send (&mca_btl_openib_component.super.btl_version, ports, size);
rc = ompi_modex_send (&mca_btl_openib_component.super.btl_version, ports, size);
if (NULL != ports) {
free (ports);
}

Просмотреть файл

@ -19,7 +19,7 @@
#include "ompi_config.h"
#include "opal/class/opal_hash_table.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "ompi/datatype/dt_arch.h"
#include "btl_openib.h"
@ -126,7 +126,7 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(ompi_proc_t* ompi_proc)
/* query for the peer address info */
rc = mca_pml_base_modex_recv(
rc = ompi_modex_recv(
&mca_btl_openib_component.super.btl_version,
ompi_proc,
(void*)&module_proc->proc_ports,
@ -136,7 +136,7 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(ompi_proc_t* ompi_proc)
if(OMPI_SUCCESS != rc) {
opal_output(0, "[%s:%d] mca_pml_base_modex_recv failed for peer [%ld,%ld,%ld]",
opal_output(0, "[%s:%d] ompi_modex_recv failed for peer [%ld,%ld,%ld]",
__FILE__,__LINE__,ORTE_NAME_ARGS(&ompi_proc->proc_name));
OBJ_RELEASE(module_proc);
return NULL;

Просмотреть файл

@ -46,7 +46,7 @@
#include "orte/util/proc_info.h"
#include "ompi/mca/pml/pml.h"
#include "opal/mca/base/mca_base_param.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "ompi/mca/mpool/base/base.h"
#include "ompi/mca/common/sm/common_sm_mmap.h"
#include "ompi/mca/btl/base/btl_base_error.h"

Просмотреть файл

@ -54,7 +54,7 @@
#include "ompi/mca/btl/btl.h"
#include "opal/mca/base/mca_base_param.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "orte/mca/errmgr/errmgr.h"
#include "ompi/mca/mpool/base/base.h"
#include "ompi/mca/btl/base/btl_base_error.h"
@ -703,8 +703,8 @@ static int mca_btl_tcp_component_exchange(void)
#endif
} /* end of for opal_ifbegin() */
} /* end of for tcp_num_btls */
rc = mca_pml_base_modex_send(&mca_btl_tcp_component.super.btl_version,
addrs, xfer_size);
rc = ompi_modex_send(&mca_btl_tcp_component.super.btl_version,
addrs, xfer_size);
free(addrs);
} /* end if */
return rc;

Просмотреть файл

@ -27,7 +27,7 @@
#include "orte/class/orte_proc_table.h"
#include "ompi/mca/btl/base/btl_base_error.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "ompi/datatype/dt_arch.h"
#include "opal/util/if.h"
#include "opal/util/net.h"
@ -108,7 +108,7 @@ mca_btl_tcp_proc_t* mca_btl_tcp_proc_create(ompi_proc_t* ompi_proc)
OPAL_THREAD_UNLOCK(&mca_btl_tcp_component.tcp_lock);
/* lookup tcp parameters exported by this proc */
rc = mca_pml_base_modex_recv( &mca_btl_tcp_component.super.btl_version,
rc = ompi_modex_recv( &mca_btl_tcp_component.super.btl_version,
ompi_proc,
(void**)&btl_proc->proc_addrs,
&size );

Просмотреть файл

@ -27,7 +27,7 @@
#include "ompi/mca/btl/btl.h"
#include "opal/mca/base/mca_base_param.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "orte/mca/errmgr/errmgr.h"
#include "ompi/mca/mpool/base/base.h"
#include "btl_template.h"

Просмотреть файл

@ -19,7 +19,7 @@
#include "ompi_config.h"
#include "opal/class/opal_hash_table.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "btl_template.h"
#include "btl_template_proc.h"

Просмотреть файл

@ -46,7 +46,7 @@
#include "ompi/datatype/convertor.h"
#include "btl_udapl_endpoint.h"
#include "orte/util/proc_info.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
/*
* Local Functions
@ -190,7 +190,7 @@ mca_btl_udapl_modex_send(void)
}
}
rc = mca_pml_base_modex_send(
rc = ompi_modex_send(
&mca_btl_udapl_component.super.btl_version, addrs, size);
if (NULL != addrs) {
free (addrs);

Просмотреть файл

@ -21,7 +21,7 @@
#include "ompi_config.h"
#include "opal/class/opal_hash_table.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "btl_udapl.h"
#include "btl_udapl_endpoint.h"
@ -123,13 +123,13 @@ mca_btl_udapl_proc_t* mca_btl_udapl_proc_create(ompi_proc_t* ompi_proc)
udapl_proc->proc_guid = ompi_proc->proc_name;
/* query for the peer address info */
rc = mca_pml_base_modex_recv(
rc = ompi_modex_recv(
&mca_btl_udapl_component.super.btl_version,
ompi_proc,
(void*)&udapl_proc->proc_addrs,
&size);
if(OMPI_SUCCESS != rc) {
opal_output(0, "[%s:%d] mca_pml_base_modex_recv failed for peer [%lu,%lu,%lu]",
opal_output(0, "[%s:%d] ompi_modex_recv failed for peer [%ld,%ld,%ld]",
__FILE__,__LINE__,ORTE_NAME_ARGS(&ompi_proc->proc_name));
OBJ_RELEASE(udapl_proc);
return NULL;

Просмотреть файл

@ -25,7 +25,7 @@
#include "opal/mca/base/mca_base_param.h"
#include "ompi/proc/proc.h"
#include "ompi/constants.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#ifdef __APPLE__
static char *ptl_ifname = "en0";
@ -153,7 +153,7 @@ ompi_common_portals_initialize(void)
}
}
ret = mca_pml_base_modex_send(&portals_component,
ret = ompi_modex_send(&portals_component,
&info, sizeof(ptl_process_id_t));
if (OMPI_SUCCESS != ret) {
return ret;
@ -201,14 +201,14 @@ ompi_common_portals_ni_initialize(ptl_handle_ni_t *ni_handle)
for (i = 0 ; i < nprocs ; ++i) {
if (proc_self == procs[i]) my_rid = i;
ret = mca_pml_base_modex_recv(&portals_component,
ret = ompi_modex_recv(&portals_component,
procs[i], (void**) &info, &size);
if (OMPI_SUCCESS != ret) {
opal_output(0, "%5d: mca_pml_base_modex_recv failed: %d",
opal_output(0, "%5d: ompi_modex_recv failed: %d",
getpid(), ret);
return ret;
} else if (sizeof(ptl_process_id_t) != size) {
opal_output(0, "%5d: mca_pml_base_modex_recv returned size %d, expected %d",
opal_output(0, "%5d: ompi_modex_recv returned size %d, expected %d",
getpid(), size, sizeof(ptl_process_id_t));
return OMPI_ERROR;
}
@ -288,14 +288,14 @@ ompi_common_portals_get_procs(size_t nprocs,
ptl_process_id_t *info;
for (i = 0 ; i < nprocs ; ++i) {
ret = mca_pml_base_modex_recv(&portals_component,
ret = ompi_modex_recv(&portals_component,
procs[i], (void**) &info, &size);
if (OMPI_SUCCESS != ret) {
opal_output(0, "%5d: mca_pml_base_modex_recv failed: %d",
opal_output(0, "%5d: ompi_modex_recv failed: %d",
getpid(), ret);
return ret;
} else if (sizeof(ptl_process_id_t) != size) {
opal_output(0, "%5d: mca_pml_base_modex_recv returned size %d, expected %d",
opal_output(0, "%5d: ompi_modex_recv returned size %d, expected %d",
getpid(), size, sizeof(ptl_process_id_t));
return OMPI_ERROR;
}

Просмотреть файл

@ -45,7 +45,6 @@
#include "ompi/mca/bml/base/base.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/mca/pml/base/pml_base_request.h"
/******************

Просмотреть файл

@ -175,7 +175,6 @@
#include "ompi/datatype/dt_arch.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/mca/pml/base/pml_base_request.h"
#include "ompi/mca/crcp/crcp.h"
#include "ompi/mca/crcp/base/base.h"

Просмотреть файл

@ -124,8 +124,8 @@ typedef int (*mca_mtl_base_module_finalize_fn_t)(struct mca_mtl_base_module_t* m
* The mca_mtl_base_module_add_procs_fn_t() is used by the PML to
* notify the MTL that new processes are connected to the current
* process. Any addressing information exported by the peer via the
* mca_pml_base_modex_send() function should be available during this
* call via the corresponding mca_pml_base_modex_recv() function. The
* ompi_modex_send() function should be available during this
* call via the corresponding ompi_modex_recv() function. The
* MTL may utilize this information to determine reachability of each
* peer process.
*

Просмотреть файл

@ -23,7 +23,7 @@
#include "ompi/mca/mtl/mtl.h"
#include "ompi/communicator/communicator.h"
#include "opal/class/opal_list.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "ompi/mca/mtl/base/mtl_base_datatype.h"
#include "ompi/mca/common/mx/common_mx.h"
#include "mtl_mx.h"
@ -109,7 +109,7 @@ int ompi_mtl_mx_module_init(){
mca_pml_base_modex_send( &mca_mtl_mx_component.super.mtl_version,
ompi_modex_send( &mca_mtl_mx_component.super.mtl_version,
&ompi_mtl_mx.mx_addr,
sizeof(mca_mtl_mx_addr_t));

Просмотреть файл

@ -29,7 +29,7 @@
#include "mtl_mx.h"
#include "mtl_mx_types.h"
#include "mtl_mx_endpoint.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
/*
* Initialize state of the endpoint instance.
@ -70,7 +70,7 @@ mca_mtl_mx_endpoint_t* mca_mtl_mx_endpoint_create(ompi_proc_t* ompi_proc) {
mx_return_t mx_return;
int num_retry = 0;
/* get the remote proc's address (only one) */
rc = mca_pml_base_modex_recv(&mca_mtl_mx_component.super.mtl_version,
rc = ompi_modex_recv(&mca_mtl_mx_component.super.mtl_version,
ompi_proc, (void**)&mx_peer, &size);
if( rc != OMPI_SUCCESS || size != sizeof(mca_mtl_mx_addr_t)) {
return NULL;

Просмотреть файл

@ -22,7 +22,7 @@
#include "ompi/mca/mtl/mtl.h"
#include "ompi/communicator/communicator.h"
#include "opal/class/opal_list.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "ompi/mca/mtl/base/mtl_base_datatype.h"
#include "ompi/proc/proc.h"
@ -131,7 +131,7 @@ int ompi_mtl_psm_module_init() {
ompi_mtl_psm.mq = mq;
if (OMPI_SUCCESS !=
mca_pml_base_modex_send( &mca_mtl_psm_component.super.mtl_version,
ompi_modex_send( &mca_mtl_psm_component.super.mtl_version,
&ompi_mtl_psm.epid,
sizeof(psm_epid_t))) {
opal_output(0, "Open MPI couldn't send PSM epid to head node process");
@ -233,7 +233,7 @@ ompi_mtl_psm_add_procs(struct mca_mtl_base_module_t *mtl,
/* Get the epids for all the processes from modex */
for (i = 0; i < (int) nprocs; i++) {
rc = mca_pml_base_modex_recv(&mca_mtl_psm_component.super.mtl_version,
rc = ompi_modex_recv(&mca_mtl_psm_component.super.mtl_version,
procs[i], (void**)&epid, &size);
if (rc != OMPI_SUCCESS || size != sizeof(psm_epid_t))
return OMPI_ERROR;

Просмотреть файл

@ -30,7 +30,7 @@
#include "mtl_psm.h"
#include "mtl_psm_types.h"
#include "mtl_psm_endpoint.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
/*
* Initialize state of the endpoint instance.

Просмотреть файл

@ -19,7 +19,6 @@
headers += \
base/base.h \
base/pml_base_bsend.h \
base/pml_base_module_exchange.h \
base/pml_base_request.h \
base/pml_base_recvreq.h \
base/pml_base_sendreq.h
@ -27,7 +26,6 @@ headers += \
libmca_pml_la_SOURCES += \
base/pml_base_bsend.c \
base/pml_base_close.c \
base/pml_base_module_exchange.c \
base/pml_base_open.c \
base/pml_base_recvreq.c \
base/pml_base_request.c \

Просмотреть файл

@ -1,763 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "ompi/proc/proc.h"
#include "opal/threads/condition.h"
#include "opal/util/output.h"
#include "orte/util/proc_info.h"
#include "orte/class/orte_proc_table.h"
#include "orte/dss/dss.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/schema/schema.h"
#include "orte/mca/gpr/gpr.h"
#include "orte/mca/gpr/base/base.h"
#include "orte/mca/ns/ns.h"
#include "ompi/constants.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
/* MODEX DESIGN
*
* Modex data is always associated with a given ompi_proc_t. However,
* because modex data is received from the GPR for entire jobids, it
* is possible that the modex callback will receive data for a process
* not yet in the ompi_proc_all() list of processes. This information
* must be kept for later use, because if accept/connect causes the
* proc to be added to the ompi_proc_all() list, the subscription to
* the mdoex information can not be reliably fired without causing a
* potential connection storm. Therefore, we use an orte_proc_table
* backing store to contain all modex information. Backpointers are
* provided from the ompi_proc_t structure to improve lookup
* performance in the common case.
*
* While we could add the now discovered proc into the ompi_proc_all()
* list, this has some problems, in that we don't have the
* architecture and hostname information needed to properly fill in
* the ompi_proc_t structure and we don't want to cause GPR
* communication to get it when we dont' really need to know anything
* about the remote proc.
*
*/
/**
* callback data for modex
*/
struct mca_pml_base_modex_cb_t {
opal_list_item_t super;
mca_base_component_t *component;
mca_pml_base_modex_cb_fn_t cbfunc;
void *cbdata;
};
typedef struct mca_pml_base_modex_cb_t mca_pml_base_modex_cb_t;
OBJ_CLASS_INSTANCE(mca_pml_base_modex_cb_t,
opal_list_item_t,
NULL,
NULL);
/**
* mca_pml_base_modex_module_t
*
* Data for a specic proc and module.
*/
struct mca_pml_base_modex_module_t {
opal_list_item_t super;
mca_base_component_t component;
void *module_data;
size_t module_data_size;
bool module_data_avail;
opal_list_t module_cbs;
opal_condition_t module_data_cond;
};
typedef struct mca_pml_base_modex_module_t mca_pml_base_modex_module_t;
static void
mca_pml_base_modex_module_construct(mca_pml_base_modex_module_t * module)
{
OBJ_CONSTRUCT(&module->module_data_cond, opal_condition_t);
OBJ_CONSTRUCT(&module->module_cbs, opal_list_t);
memset(&module->component, 0, sizeof(module->component));
module->module_data = NULL;
module->module_data_size = 0;
module->module_data_avail = false;
}
static void
mca_pml_base_modex_module_destruct(mca_pml_base_modex_module_t * module)
{
OBJ_DESTRUCT(&module->module_data_cond);
}
OBJ_CLASS_INSTANCE(mca_pml_base_modex_module_t,
opal_list_item_t,
mca_pml_base_modex_module_construct,
mca_pml_base_modex_module_destruct);
/**
* mca_pml_base_modex_t
*
* List of modules (mca_pml_base_modex_module_t) for which data has been
* received from peers.
*/
struct mca_pml_base_modex_t {
opal_list_item_t super;
opal_mutex_t modex_lock;
opal_list_t modex_modules;
};
typedef struct mca_pml_base_modex_t mca_pml_base_modex_t;
static void
mca_pml_base_modex_construct(mca_pml_base_modex_t * modex)
{
OBJ_CONSTRUCT(&modex->modex_lock, opal_mutex_t);
OBJ_CONSTRUCT(&modex->modex_modules, opal_list_t);
}
static void
mca_pml_base_modex_destruct(mca_pml_base_modex_t * modex)
{
OBJ_DESTRUCT(&modex->modex_modules);
OBJ_DESTRUCT(&modex->modex_lock);
}
OBJ_CLASS_INSTANCE(mca_pml_base_modex_t,
opal_object_t,
mca_pml_base_modex_construct,
mca_pml_base_modex_destruct);
/**
* mca_pml_base_modex_subscription_t
*
* Track segments we have subscribed to.
*/
struct mca_pml_base_modex_subscription_t {
opal_list_item_t item;
orte_jobid_t jobid;
};
typedef struct mca_pml_base_modex_subscription_t mca_pml_base_modex_subscription_t;
OBJ_CLASS_INSTANCE(mca_pml_base_modex_subscription_t,
opal_list_item_t,
NULL,
NULL);
/**
* Globals to track the list of subscriptions.
*/
static opal_list_t mca_pml_base_modex_subscriptions;
static opal_hash_table_t mca_pml_base_modex_data;
static opal_mutex_t mca_pml_base_modex_lock;
/**
* Initialize global state.
*/
int
mca_pml_base_modex_init(void)
{
OBJ_CONSTRUCT(&mca_pml_base_modex_data, opal_hash_table_t);
OBJ_CONSTRUCT(&mca_pml_base_modex_subscriptions, opal_list_t);
OBJ_CONSTRUCT(&mca_pml_base_modex_lock, opal_mutex_t);
opal_hash_table_init(&mca_pml_base_modex_data, 256);
return OMPI_SUCCESS;
}
/**
* Cleanup global state.
*/
int
mca_pml_base_modex_finalize(void)
{
opal_list_item_t *item;
opal_hash_table_remove_all(&mca_pml_base_modex_data);
OBJ_DESTRUCT(&mca_pml_base_modex_data);
while (NULL != (item = opal_list_remove_first(&mca_pml_base_modex_subscriptions)))
OBJ_RELEASE(item);
OBJ_DESTRUCT(&mca_pml_base_modex_subscriptions);
OBJ_DESTRUCT(&mca_pml_base_modex_lock);
return OMPI_SUCCESS;
}
/**
* Look to see if there is any data associated with a specified module.
*/
static mca_pml_base_modex_module_t *
mca_pml_base_modex_lookup_module(mca_pml_base_modex_t * modex,
mca_base_component_t * component)
{
mca_pml_base_modex_module_t *modex_module;
for (modex_module = (mca_pml_base_modex_module_t *) opal_list_get_first(&modex->modex_modules);
modex_module != (mca_pml_base_modex_module_t *) opal_list_get_end(&modex->modex_modules);
modex_module = (mca_pml_base_modex_module_t *) opal_list_get_next(modex_module)) {
if (mca_base_component_compatible(&modex_module->component, component) == 0) {
return modex_module;
}
}
return NULL;
}
/**
* Create a placeholder for data associated with the specified module.
*/
static mca_pml_base_modex_module_t *
mca_pml_base_modex_create_module(mca_pml_base_modex_t * modex,
mca_base_component_t * component)
{
mca_pml_base_modex_module_t *modex_module;
if (NULL == (modex_module = mca_pml_base_modex_lookup_module(modex, component))) {
modex_module = OBJ_NEW(mca_pml_base_modex_module_t);
if (NULL != modex_module) {
modex_module->component = *component;
opal_list_append(&modex->modex_modules, (opal_list_item_t *) modex_module);
}
}
return modex_module;
}
/**
* Callback for registry notifications.
*/
static void
mca_pml_base_modex_registry_callback(orte_gpr_notify_data_t * data,
void *cbdata)
{
orte_std_cntr_t i, j, k;
orte_gpr_value_t **values, *value;
orte_gpr_keyval_t **keyval;
orte_process_name_t *proc_name;
mca_pml_base_modex_t *modex;
mca_pml_base_modex_module_t *modex_module;
mca_base_component_t component;
int rc;
ompi_proc_t *proc;
/* process the callback */
values = (orte_gpr_value_t **) (data->values)->addr;
for (i = 0, k = 0; k < data->cnt &&
i < (data->values)->size; i++) {
if (NULL != values[i]) {
k++;
value = values[i];
if (0 < value->cnt) { /* needs to be at least one keyval */
/* Find the process name in the keyvals */
keyval = value->keyvals;
for (j = 0; j < value->cnt; j++) {
if (0 != strcmp(keyval[j]->key, ORTE_PROC_NAME_KEY)) continue;
/* this is the process name - extract it */
if (ORTE_SUCCESS != orte_dss.get((void**)&proc_name, keyval[j]->value, ORTE_NAME)) {
opal_output(0, "mca_pml_base_modex_registry_callback: unable to extract process name\n");
return; /* nothing we can do */
}
goto GOTNAME;
}
opal_output(0, "mca_pml_base_modex_registry_callback: unable to find process name in notify message\n");
return; /* if the name wasn't here, there is nothing we can do */
GOTNAME:
/* look up the modex data structure */
OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);
modex = (mca_pml_base_modex_t*)orte_hash_table_get_proc(&mca_pml_base_modex_data, proc_name);
if (modex == NULL) {
/* create a modex data structure for this proc */
modex = OBJ_NEW(mca_pml_base_modex_t);
if (NULL == modex) {
opal_output(0, "mca_pml_base_modex_registry_callback: unable to allocate mca_pml_base_modex_t\n");
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
return;
}
orte_hash_table_set_proc(&mca_pml_base_modex_data, proc_name, modex);
}
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
OPAL_THREAD_LOCK(&modex->modex_lock);
proc = NULL;
/*
* Extract the component name and version from the keyval object's key
* Could be multiple keyvals returned since there is one for each
* component type/name/version - process them all
*/
keyval = value->keyvals;
for (j = 0; j < value->cnt; j++) {
orte_buffer_t buffer;
opal_list_item_t *item;
char *ptr;
void *bytes = NULL;
orte_std_cntr_t cnt;
size_t num_bytes;
orte_byte_object_t *bo;
if (strcmp(keyval[j]->key, OMPI_MODEX_KEY) != 0)
continue;
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
if (ORTE_SUCCESS != (rc = orte_dss.get((void **) &bo, keyval[j]->value, ORTE_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
continue;
}
if (ORTE_SUCCESS != (rc = orte_dss.load(&buffer, bo->bytes, bo->size))) {
ORTE_ERROR_LOG(rc);
continue;
}
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
continue;
}
strcpy(component.mca_type_name, ptr);
free(ptr);
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
continue;
}
strcpy(component.mca_component_name, ptr);
free(ptr);
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer,
&component.mca_component_major_version, &cnt, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
continue;
}
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer,
&component.mca_component_minor_version, &cnt, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
continue;
}
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &num_bytes, &cnt, ORTE_SIZE))) {
ORTE_ERROR_LOG(rc);
continue;
}
if (num_bytes != 0) {
if (NULL == (bytes = malloc(num_bytes))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
continue;
}
cnt = (orte_std_cntr_t) num_bytes;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, bytes, &cnt, ORTE_BYTE))) {
ORTE_ERROR_LOG(rc);
continue;
}
num_bytes = cnt;
} else {
bytes = NULL;
}
/*
* Lookup the corresponding modex structure
*/
if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, &component))) {
opal_output(0, "mca_pml_base_modex_registry_callback: mca_pml_base_modex_create_module failed\n");
OBJ_RELEASE(data);
OPAL_THREAD_UNLOCK(&modex->modex_lock);
OBJ_RELEASE(modex);
return;
}
modex_module->module_data = bytes;
modex_module->module_data_size = num_bytes;
modex_module->module_data_avail = true;
opal_condition_signal(&modex_module->module_data_cond);
if (opal_list_get_size(&modex_module->module_cbs)) {
if (NULL == proc) {
proc = ompi_proc_find(proc_name);
}
if (NULL != proc) {
OPAL_THREAD_LOCK(&proc->proc_lock);
/* call any registered callbacks */
for (item = opal_list_get_first(&modex_module->module_cbs);
item != opal_list_get_end(&modex_module->module_cbs);
item = opal_list_get_next(item)) {
mca_pml_base_modex_cb_t *cb = (mca_pml_base_modex_cb_t *) item;
cb->cbfunc(cb->component, proc, bytes, num_bytes, cb->cbdata);
}
OPAL_THREAD_UNLOCK(&proc->proc_lock);
}
}
}
OPAL_THREAD_UNLOCK(&modex->modex_lock);
} /* if value[i]->cnt > 0 */
} /* if value[i] != NULL */
}
}
/**
* Make sure we have subscribed to this segment.
*/
static int
mca_pml_base_modex_subscribe(orte_process_name_t * name)
{
char *segment, *sub_name, *trig_name;
orte_gpr_subscription_id_t sub_id;
orte_jobid_t jobid;
opal_list_item_t *item;
mca_pml_base_modex_subscription_t *subscription;
int rc;
char *keys[] = {
ORTE_PROC_NAME_KEY,
OMPI_MODEX_KEY,
NULL
};
/* check for an existing subscription */
OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);
if (!opal_list_is_empty(&mca_pml_base_modex_subscriptions)) {
for (item = opal_list_get_first(&mca_pml_base_modex_subscriptions);
item != opal_list_get_end(&mca_pml_base_modex_subscriptions);
item = opal_list_get_next(item)) {
subscription = (mca_pml_base_modex_subscription_t *) item;
if (subscription->jobid == name->jobid) {
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
return OMPI_SUCCESS;
}
}
}
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
/* otherwise - subscribe to get this jobid's contact info */
jobid = name->jobid;
if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name,
OMPI_MODEX_SUBSCRIPTION, jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* attach to the stage-1 standard trigger */
if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name,
ORTE_STG1_TRIGGER, jobid))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
return rc;
}
/* define the segment */
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
free(trig_name);
return rc;
}
if (jobid != orte_process_info.my_name->jobid) {
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&sub_id, NULL, NULL,
ORTE_GPR_NOTIFY_ADD_ENTRY |
ORTE_GPR_NOTIFY_VALUE_CHG |
ORTE_GPR_NOTIFY_PRE_EXISTING,
ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED,
segment,
NULL, /* look at all
* containers on this
* segment */
2, keys,
mca_pml_base_modex_registry_callback, NULL))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
free(trig_name);
free(segment);
return rc;
}
} else {
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&sub_id, trig_name, sub_name,
ORTE_GPR_NOTIFY_ADD_ENTRY |
ORTE_GPR_NOTIFY_VALUE_CHG |
ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG,
ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED,
segment,
NULL, /* look at all
* containers on this
* segment */
2, keys,
mca_pml_base_modex_registry_callback, NULL))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
free(trig_name);
free(segment);
return rc;
}
}
free(sub_name);
free(trig_name);
free(segment);
/* add this jobid to our list of subscriptions */
OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);
subscription = OBJ_NEW(mca_pml_base_modex_subscription_t);
subscription->jobid = name->jobid;
opal_list_append(&mca_pml_base_modex_subscriptions, &subscription->item);
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
return OMPI_SUCCESS;
}
/**
* Store the data associated with the specified module in the
* gpr. Note that the gpr is in a mode where it caches
* individual puts during startup and sends them as an aggregate
* command.
*/
int
mca_pml_base_modex_send(mca_base_component_t * source_component,
const void *data,
size_t size)
{
orte_jobid_t jobid;
int rc;
orte_buffer_t buffer;
orte_std_cntr_t i, num_tokens;
char *ptr, *segment, **tokens;
orte_byte_object_t bo;
orte_data_value_t value = ORTE_DATA_VALUE_EMPTY;
jobid = ORTE_PROC_MY_NAME->jobid;
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens,
&num_tokens, orte_process_info.my_name))) {
ORTE_ERROR_LOG(rc);
free(segment);
return rc;
}
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
ptr = source_component->mca_type_name;
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
ptr = source_component->mca_component_name;
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_major_version, 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_minor_version, 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &size, 1, ORTE_SIZE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (0 != size) {
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, (void *) data, size, ORTE_BYTE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
if (ORTE_SUCCESS != (rc = orte_dss.unload(&buffer, (void **) &(bo.bytes), &(bo.size)))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OBJ_DESTRUCT(&buffer);
/* setup the data_value structure to hold the byte object */
if (ORTE_SUCCESS != (rc = orte_dss.set(&value, (void *) &bo, ORTE_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
rc = orte_gpr.put_1(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR,
segment, tokens, OMPI_MODEX_KEY, &value);
cleanup:
free(segment);
for (i = 0; i < num_tokens; i++) {
free(tokens[i]);
tokens[i] = NULL;
}
if (NULL != tokens)
free(tokens);
return rc;
}
/**
* Retreive the data for the specified module from the source process.
*/
int
mca_pml_base_modex_recv(mca_base_component_t * component,
ompi_proc_t * proc,
void **buffer,
size_t * size)
{
mca_pml_base_modex_t *modex;
mca_pml_base_modex_module_t *modex_module;
/* make sure we could possibly have modex data */
if (0 == strcmp(orte_gpr_base_selected_component.gpr_version.mca_component_name,
"null")) {
return OMPI_ERR_NOT_IMPLEMENTED;
}
/* check the proc for cached data */
if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {
/* see if we already have data for this proc... */
OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);
modex = (mca_pml_base_modex_t*)orte_hash_table_get_proc(&mca_pml_base_modex_data, &proc->proc_name);
if (NULL == modex) {
/* create an empty modex data... */
modex = OBJ_NEW(mca_pml_base_modex_t);
if (NULL == modex) {
opal_output(0, "mca_pml_base_modex_recv: unable to allocate mca_pml_base_modex_t\n");
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
orte_hash_table_set_proc(&mca_pml_base_modex_data, &proc->proc_name, modex);
OBJ_RETAIN(modex);
proc->proc_modex = &modex->super.super;
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
/* verify that we have subscribed to this segment */
mca_pml_base_modex_subscribe(&proc->proc_name);
} else {
/* create a backpointer from the proc to the modex data */
OBJ_RETAIN(modex);
proc->proc_modex = &modex->super.super;
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
}
}
OPAL_THREAD_LOCK(&modex->modex_lock);
/* lookup/create the module */
if (NULL == (modex_module = mca_pml_base_modex_create_module(modex, component))) {
OPAL_THREAD_UNLOCK(&modex->modex_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* wait until data is available */
while (modex_module->module_data_avail == false) {
opal_condition_wait(&modex_module->module_data_cond, &modex->modex_lock);
}
/* copy the data out to the user */
if (modex_module->module_data_size == 0) {
*buffer = NULL;
*size = 0;
} else {
void *copy = malloc(modex_module->module_data_size);
if (copy == NULL) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy(copy, modex_module->module_data, modex_module->module_data_size);
*buffer = copy;
*size = modex_module->module_data_size;
}
OPAL_THREAD_UNLOCK(&modex->modex_lock);
return OMPI_SUCCESS;
}
/**
*
*/
int
mca_pml_base_modex_recv_nb(mca_base_component_t * component,
ompi_proc_t * proc,
mca_pml_base_modex_cb_fn_t cbfunc,
void *cbdata)
{
mca_pml_base_modex_t *modex;
mca_pml_base_modex_module_t *module;
mca_pml_base_modex_cb_t *cb;
/* check the proc for cached data */
if (NULL == (modex = (mca_pml_base_modex_t *) proc->proc_modex)) {
/* see if we already have data for this proc... */
OPAL_THREAD_LOCK(&mca_pml_base_modex_lock);
modex = (mca_pml_base_modex_t*)orte_hash_table_get_proc(&mca_pml_base_modex_data, &proc->proc_name);
if (NULL == modex) {
/* create an empty modex data... */
modex = OBJ_NEW(mca_pml_base_modex_t);
if (NULL == modex) {
opal_output(0, "mca_pml_base_modex_recv: unable to allocate mca_pml_base_modex_t\n");
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
orte_hash_table_set_proc(&mca_pml_base_modex_data, &proc->proc_name, modex);
OBJ_RETAIN(modex);
proc->proc_modex = &modex->super.super;
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
/* verify that we have subscribed to this segment */
mca_pml_base_modex_subscribe(&proc->proc_name);
} else {
/* create a backpointer from the proc to the modex data */
OBJ_RETAIN(modex);
proc->proc_modex = &modex->super.super;
OPAL_THREAD_UNLOCK(&mca_pml_base_modex_lock);
}
}
OPAL_THREAD_LOCK(&modex->modex_lock);
/* lookup/create the module */
if (NULL == (module = mca_pml_base_modex_create_module(modex, component))) {
OPAL_THREAD_UNLOCK(&modex->modex_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* register the callback */
cb = OBJ_NEW(mca_pml_base_modex_cb_t);
cb->component = component;
cb->cbfunc = cbfunc;
cb->cbdata = cbdata;
opal_list_append(&module->module_cbs, (opal_list_item_t *) cb);
OPAL_THREAD_UNLOCK(&modex->modex_lock);
return OMPI_SUCCESS;
}
/**
* Subscribe to the segment corresponding
* to this job.
*/
int
mca_pml_base_modex_exchange(void)
{
return mca_pml_base_modex_subscribe(orte_process_info.my_name);
}

Просмотреть файл

@ -1,183 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file **/
#ifndef MCA_OMPI_MODULE_EXCHANGE_H
#define MCA_OMPI_MODULE_EXCHANGE_H
#include "ompi_config.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include "opal/mca/mca.h"
struct ompi_proc_t;
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/**
* Send a module-specific buffer to all other corresponding MCA
* modules in peer processes.
*
* @param source_component A pointer to this module's component
* structure (i.e., mca_base_component_t)
* @param buffer A pointer to the beginning of the buffer to send.
* @param size Number of bytes of each instance in the buffer.
* @param count Number of instances in the buffer.
*
* @retval OMPI_SUCCESS On success
* @retval OMPI_ERROR On failure
*
* This function takes a contiguous buffer of network-ordered data
* and makes it available to all other MCA processes during the
* selection process. Modules sent by one source_component can only
* be received by a corresponding module with the same
* source_component in peer processes.
*
* Two components are "corresponding" if:
*
* - they share the same major and minor MCA version number
* - they have the same type name string
* - they share the same major and minor type version number
* - they have the same component name string
* - they share the same major and minor component version number
*
* This function is indended to be used during MCA module
* initialization \em before \em selection (the selection process is
* defined differently for each component type). Each module will
* provide a buffer containing meta information and/or parameters
* that it wants to share with its corresponding modules in peer
* processes. This information typically contains location /
* contact information for establishing communication between
* processes (in a manner that is specific to that module). For
* example, a TCP-based module could provide its IP address and TCP
* port where it is waiting on listen(). The peer process receiving
* this buffer can therefore open a socket to the indicated IP
* address and TCP port.
*
* During the selection process, the MCA framework will effectively
* perform an "allgather" operation of all modex buffers; every
* buffer will be available to every peer process (see
* mca_pml_base_modex_recv()).
*
* Note that the buffer should not be modified after invoking this
* fuction; the MCA framework may asynchronously send it to a
* process peer at any time.
*
* Note again that the buffer contents is transparent to the MCA
* framework -- it \em must already either be in network order or be
* in some format that peer processes will be able to read it,
* regardless of pointer sizes or endian bias.
*/
OMPI_DECLSPEC int mca_pml_base_modex_send(mca_base_component_t *source_component,
const void *buffer, size_t size);
/**
* Receive a module-specific buffer from a corresponding MCA module
* in a specific peer process.
*
* @param dest_component A pointer to this module's component struct
* (i.e., mca_base_component_t instance).
* @param source_proc Peer process to receive from.
* @param buffer A pointer to a (void*) that will be filled with a
* pointer to the received buffer.
* @param size Pointer to a size_t that will be filled with the
* number of bytes of each instance in the buffer.
* @param count Pointer to an int that will be filled with the
* number of instances in the buffer.
*
* @retval OMPI_SUCCESS If a corresponding module buffer is found and
* is successfully returned to the caller.
* @retval OMPI_ERR_OUT_OF_RESOURCE If no corresponding module buffer is found,
* or if an error occurs wil returning the buffer to the caller.
*
* This is the corresponding "get" call to mca_pml_base_modex_send().
* After selection, modules can call this function to receive the
* buffer sent by their corresponding module on the process
* source_proc.
*
* If a buffer from a corresponding module is found, buffer will be
* filled with a pointer to a copy of the buffer that was sent by
* the peer process. It is the caller's responsibility to free this
* buffer. size will be filled in with the number of instances in
* the buffer, and count will be filled in with the number of
* instances. The total number of bytes in the buffer is (size *
* count). See the explanation in mca_pml_base_modex_send() for why the
* number of bytes is split into two parts.
*/
OMPI_DECLSPEC int mca_pml_base_modex_recv(mca_base_component_t *dest_component,
struct ompi_proc_t *source_proc,
void **buffer, size_t *size);
/**
* Register to receive a callback on change to module specific data.
*
* @param dest_component A pointer to this module's component struct
* (i.e., mca_base_component_t instance).
* @param source_proc Peer process to receive from.
* @param buffer A pointer to a (void*) that will be filled with a
* pointer to the received buffer.
* @param size Pointer to a size_t that will be filled with the
* number of bytes of each instance in the buffer.
* @param count Pointer to an int that will be filled with the
* number of instances in the buffer.
*
* @retval OMPI_SUCCESS If a corresponding module buffer is found and
* is successfully returned to the caller.
* @retval OMPI_ERR_OUT_OF_RESOURCE If no corresponding module buffer is found,
* or if an error occurs wil returning the buffer to the caller.
*
*/
typedef void (*mca_pml_base_modex_cb_fn_t)(
mca_base_component_t *component,
struct ompi_proc_t* proc,
void* buffer,
size_t size,
void* cbdata);
OMPI_DECLSPEC int mca_pml_base_modex_recv_nb(
mca_base_component_t *component,
struct ompi_proc_t* proc,
mca_pml_base_modex_cb_fn_t cbfunc,
void* cbdata);
/*
* Called to subscribe to registry.
*/
OMPI_DECLSPEC int mca_pml_base_modex_exchange(void);
/**
*
*/
OMPI_DECLSPEC int mca_pml_base_modex_init(void);
/**
*
*/
OMPI_DECLSPEC int mca_pml_base_modex_finalize(void);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif /* MCA_OMPI_MODULE_EXCHANGE_H */

Просмотреть файл

@ -29,7 +29,7 @@
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/proc/proc.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
typedef struct opened_component_t {
opal_list_item_t super;
@ -294,7 +294,7 @@ static mca_base_component_t pml_base_component = {
int
mca_pml_base_pml_selected(const char *name)
{
return mca_pml_base_modex_send(&pml_base_component, name, strlen(name) + 1);
return ompi_modex_send(&pml_base_component, name, strlen(name) + 1);
}
int
@ -309,7 +309,7 @@ mca_pml_base_pml_check_selected(const char *my_pml,
for (i = 0 ; i < nprocs ; ++i) {
if (ompi_proc_local() == procs[i]) continue;
ret = mca_pml_base_modex_recv(&pml_base_component,
ret = ompi_modex_recv(&pml_base_component,
procs[i],
(void**) &remote_pml, &size);
/* if modex isn't implemented, then just assume all is well... */

Просмотреть файл

@ -39,7 +39,7 @@
#include "orte/mca/errmgr/errmgr.h"
#include "ompi/runtime/ompi_cr.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "orte/mca/smr/smr.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/gpr/gpr.h"
@ -426,7 +426,7 @@ int mca_pml_ob1_ft_event( int state )
*/
opal_output_verbose(10, ompi_cr_output,
"pml:ob1: ft_event(Restart): Restart Modex information");
if (OMPI_SUCCESS != (ret = mca_pml_base_modex_finalize())) {
if (OMPI_SUCCESS != (ret = ompi_modex_finalize())) {
opal_output(0,
"pml:ob1: ft_event(Restart): modex_finalize Failed %d",
ret);
@ -440,7 +440,7 @@ int mca_pml_ob1_ft_event( int state )
}
}
if (OMPI_SUCCESS != (ret = mca_pml_base_modex_init())) {
if (OMPI_SUCCESS != (ret = ompi_modex_init())) {
opal_output(0,
"pml:ob1: ft_event(Restart): modex_init Failed %d",
ret);
@ -473,11 +473,11 @@ int mca_pml_ob1_ft_event( int state )
}
else if(OPAL_CRS_RESTART == state) {
/*
* Re-exchange the Modex, and go through the stage gates
* Re-subscribe to the modex information
*/
if (OMPI_SUCCESS != (ret = mca_pml_base_modex_exchange())) {
if (OMPI_SUCCESS != (ret = ompi_modex_subscribe_job(ORTE_PROC_MY_NAME->jobid))) {
opal_output(0,
"pml:ob1: ft_event(Restart): modex_exchange Failed %d",
"pml:ob1: ft_event(Restart): Failed to subscribe to the modex information %d",
ret);
return ret;
}

Просмотреть файл

@ -23,8 +23,10 @@ dist_pkgdata_DATA += runtime/help-mpi-runtime.txt
headers += \
runtime/mpiruntime.h \
runtime/params.h \
runtime/ompi_cr.h \
runtime/params.h \
runtime/ompi_cr.h
runtime/ompi_module_exchange.h
libmpi_la_SOURCES += \
runtime/ompi_mpi_abort.c \
@ -32,4 +34,5 @@ libmpi_la_SOURCES += \
runtime/ompi_mpi_finalize.c \
runtime/ompi_mpi_params.c \
runtime/ompi_mpi_preconnect.c \
runtime/ompi_cr.c
runtime/ompi_cr.c \
runtime/ompi_module_exchange.c

836
ompi/runtime/ompi_module_exchange.c Обычный файл
Просмотреть файл

@ -0,0 +1,836 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "ompi/proc/proc.h"
#include "opal/threads/condition.h"
#include "opal/util/output.h"
#include "orte/util/proc_info.h"
#include "orte/class/orte_proc_table.h"
#include "orte/dss/dss.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/schema/schema.h"
#include "orte/mca/gpr/gpr.h"
#include "orte/mca/gpr/base/base.h"
#include "orte/mca/ns/ns.h"
#include "ompi/constants.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/runtime/ompi_module_exchange.h"
/**
* @file
*
* MODEX DESIGN
*
* Modex data is always associated with a given orte process name, in
* an orte hash table. A backpointer is kept on an ompi_proc_t for
* fast access. The hash table is necessary because modex data is
* received from the GPR for entire jobids and when working with
* dynamic processes, it is possible we will receive data for a
* process not yet in the ompi_proc_all() list of process. This
* information must be kept for later use, because if accept/connect
* causes the proc to be added to the ompi_proc_all() list, the
* subscription to the mdoex information can not be reliably fired
* without causing a potential connection storm. Therefore, we use an
* orte_proc_table backing store to contain all modex information.
* Backpointers are provided from the ompi_proc_t structure to improve
* lookup performance in the common case.
*
* While we could add the now discovered proc into the ompi_proc_all()
* list, this has some problems, in that we don't have the
* architecture and hostname information needed to properly fill in
* the ompi_proc_t structure and we don't want to cause GPR
* communication to get it when we dont' really need to know anything
* about the remote proc.
*
* All data put into the modex (or received from the modex) is
* associated with a given proc,component pair. The data structures
* to maintain this data look something like:
*
* orte_hash_table_t ompi_modex_data -> list of ompi_modex_proc_t objects
*
* +-----------------------------+
* | ompi_modex_proc_data_t |
* | - opal_list_item_t |
* +-----------------------------+
* | opal_mutex_t modex_lock |
* | opal_condition_t modex_cond |
* | bool modex_received_data | 1
* | opal_list_t modules | ---------+
* +-----------------------------+ |
* * |
* +--------------------------------+ <--------+
* | ompi_modex_module_data_t |
* | - opal_list_item_t |
* +--------------------------------+
* | mca_base_component_t component |
* | void *module_data |
* | size_t module_data_size | 1
* | opal_list_t module_cbs | ---------+
* +--------------------------------+ |
* * |
* +---------------------------+ <--------+
* | ompi_modex_cb_t |
* | - opal_list_item_t |
* +---------------------------+
* | ompi_modex_cb_fn_t cbfunc |
* | void *cbdata |
* +---------------------------+
*
* In order to maintain subscriptions to the registry for modex
* information, a list of all active subscriptions is maintained as a
* list (ompi_modex_subscriptions) of ompi_modex_subscription_t
* structures. The structure contains the jobid used in the
* subscription.
*/
/**
* Modex data for a particular orte process
*
* Locking infrastructure and list of module data for a given orte
* process name. The name association is maintained in the
* ompi_modex_proc_list hash table.
*/
struct ompi_modex_proc_data_t {
/** Structure can be put on lists (including in hash tables) */
opal_list_item_t super;
/* Lock held whenever the modex data for this proc is being
modified */
opal_mutex_t modex_lock;
/* Condition variable used when blocking on data from this
process. Should be signalled whenever data is updated for this
process. */
opal_condition_t modex_cond;
/* True if modex data has ever been received from this process,
false otherwise. */
bool modex_received_data;
/* List of ompi_modex_module_data_t structures containing all data
received from this process, sorted by component name. */
opal_list_t modex_module_data;
};
typedef struct ompi_modex_proc_data_t ompi_modex_proc_data_t;
static void
ompi_modex_construct(ompi_modex_proc_data_t * modex)
{
OBJ_CONSTRUCT(&modex->modex_lock, opal_mutex_t);
OBJ_CONSTRUCT(&modex->modex_cond, opal_condition_t);
modex->modex_received_data = false;
OBJ_CONSTRUCT(&modex->modex_module_data, opal_list_t);
}
static void
ompi_modex_destruct(ompi_modex_proc_data_t * modex)
{
OBJ_DESTRUCT(&modex->modex_module_data);
OBJ_DESTRUCT(&modex->modex_cond);
OBJ_DESTRUCT(&modex->modex_lock);
}
OBJ_CLASS_INSTANCE(ompi_modex_proc_data_t, opal_object_t,
ompi_modex_construct, ompi_modex_destruct);
/**
* Modex data for a particular component name
*
* Container for data for a particular proc,component pair. This
* structure should be contained in the modules list in an
* ompi_modex_proc_data_t structure to maintain an association with a
* given proc. The list is then searched for a matching component
* name.
*
* While searching the list or reading from (or writing to) this
* structure, the lock in the proc_data_t should be held.
*/
struct ompi_modex_module_data_t {
/** Structure can be put on lists */
opal_list_item_t super;
/** Component information for this data */
mca_base_component_t component;
/** Binary blob of data associated with this proc,component pair */
void *module_data;
/** Size (in bytes) of module_data */
size_t module_data_size;
/** callbacks that should be fired when module_data changes. */
opal_list_t module_cbs;
};
typedef struct ompi_modex_module_data_t ompi_modex_module_data_t;
static void
ompi_modex_module_construct(ompi_modex_module_data_t * module)
{
memset(&module->component, 0, sizeof(module->component));
module->module_data = NULL;
module->module_data_size = 0;
OBJ_CONSTRUCT(&module->module_cbs, opal_list_t);
}
static void
ompi_modex_module_destruct(ompi_modex_module_data_t * module)
{
opal_list_item_t *item;
while (NULL != (item = opal_list_remove_first(&module->module_cbs))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&module->module_cbs);
}
OBJ_CLASS_INSTANCE(ompi_modex_module_data_t,
opal_list_item_t,
ompi_modex_module_construct,
ompi_modex_module_destruct);
/**
* Callback data for modex updates
*
* Data container for update callbacks that should be fired whenever a
* given proc,component pair has a modex data update.
*/
struct ompi_modex_cb_t {
opal_list_item_t super;
ompi_modex_cb_fn_t cbfunc;
void *cbdata;
};
typedef struct ompi_modex_cb_t ompi_modex_cb_t;
OBJ_CLASS_INSTANCE(ompi_modex_cb_t,
opal_list_item_t,
NULL,
NULL);
/**
* Container for segment subscription data
*
* Track segments we have subscribed to. Any jobid segment we are
* subscribed to for updates will have one of these containers,
* hopefully put on the ompi_modex_subscriptions list.
*/
struct ompi_modex_subscription_t {
opal_list_item_t item;
orte_jobid_t jobid;
};
typedef struct ompi_modex_subscription_t ompi_modex_subscription_t;
OBJ_CLASS_INSTANCE(ompi_modex_subscription_t,
opal_list_item_t,
NULL,
NULL);
/**
* Global modex list for tracking subscriptions
*
* A list of ompi_modex_subscription_t structures, each representing a
* jobid to which we have subscribed for modex updates.
*
* \note The ompi_modex_lock mutex should be held whenever this list
* is being updated or searched.
*/
static opal_list_t ompi_modex_subscriptions;
/**
* Global modex list of proc data
*
* Global bhash table associating orte_process_name_t values with an
* ompi_modex_proc_data_t container.
*
* \note The ompi_modex_lock mutex should be held whenever this list
* is being updated or searched.
*/
static opal_hash_table_t ompi_modex_data;
/**
* Global modex lock
*
* Global lock for modex usage, particularily protecting the
* ompi_modex_subscriptions list and the ompi_modex_data hash table.
*/
static opal_mutex_t ompi_modex_lock;
int
ompi_modex_init(void)
{
OBJ_CONSTRUCT(&ompi_modex_data, opal_hash_table_t);
OBJ_CONSTRUCT(&ompi_modex_subscriptions, opal_list_t);
OBJ_CONSTRUCT(&ompi_modex_lock, opal_mutex_t);
opal_hash_table_init(&ompi_modex_data, 256);
return OMPI_SUCCESS;
}
int
ompi_modex_finalize(void)
{
opal_list_item_t *item;
opal_hash_table_remove_all(&ompi_modex_data);
OBJ_DESTRUCT(&ompi_modex_data);
while (NULL != (item = opal_list_remove_first(&ompi_modex_subscriptions)))
OBJ_RELEASE(item);
OBJ_DESTRUCT(&ompi_modex_subscriptions);
OBJ_DESTRUCT(&ompi_modex_lock);
return OMPI_SUCCESS;
}
/**
* Find data for a given component in a given modex_proc_data_t
* container.
*
* Find data for a given component in a given modex_proc_data_t
* container. The proc_data's modex_lock must be held during this
* search.
*/
static ompi_modex_module_data_t *
ompi_modex_lookup_module(ompi_modex_proc_data_t *proc_data,
mca_base_component_t *component,
bool create_if_not_found)
{
ompi_modex_module_data_t *module_data = NULL;
for (module_data = (ompi_modex_module_data_t *) opal_list_get_first(&proc_data->modex_module_data);
module_data != (ompi_modex_module_data_t *) opal_list_get_end(&proc_data->modex_module_data);
module_data = (ompi_modex_module_data_t *) opal_list_get_next(module_data)) {
if (mca_base_component_compatible(&module_data->component, component) == 0) {
return module_data;
}
}
if (create_if_not_found) {
module_data = OBJ_NEW(ompi_modex_module_data_t);
if (NULL == module_data) return NULL;
memcpy(&module_data->component, component, sizeof(mca_base_component_t));
opal_list_append(&proc_data->modex_module_data, &module_data->super);
return module_data;
}
return NULL;
}
/**
* Find ompi_modex_proc_data_t container associated with given
* orte_process_name_t.
*
* Find ompi_modex_proc_data_t container associated with given
* orte_process_name_t. The global lock should *NOT* be held when
* calling this function.
*/
static ompi_modex_proc_data_t*
ompi_modex_lookup_orte_proc(orte_process_name_t *orte_proc)
{
ompi_modex_proc_data_t *proc_data;
OPAL_THREAD_LOCK(&ompi_modex_lock);
proc_data = (ompi_modex_proc_data_t*)
orte_hash_table_get_proc(&ompi_modex_data, orte_proc);
if (NULL == proc_data) {
/* The proc clearly exists, so create a modex structure
for it and try to subscribe */
proc_data = OBJ_NEW(ompi_modex_proc_data_t);
if (NULL == proc_data) {
opal_output(0, "ompi_modex_lookup_orte_proc: unable to allocate ompi_modex_proc_data_t\n");
OPAL_THREAD_UNLOCK(&ompi_modex_lock);
return NULL;
}
orte_hash_table_set_proc(&ompi_modex_data, orte_proc, proc_data);
}
OPAL_THREAD_UNLOCK(&ompi_modex_lock);
return proc_data;
}
/**
* Find ompi_modex_proc_data_t container associated with given ompi_proc_t
*
* Find ompi_modex_proc_data_t container associated with given
* ompi_proc_t. The global lock should *NOT* be held when calling
* this function.
*/
static ompi_modex_proc_data_t*
ompi_modex_lookup_proc(ompi_proc_t *proc)
{
ompi_modex_proc_data_t *proc_data =
(ompi_modex_proc_data_t *) proc->proc_modex;
if (NULL == proc_data) {
proc_data = ompi_modex_lookup_orte_proc(&proc->proc_name);
if (NULL == proc_data) return NULL;
/* set the association with the ompi_proc, if not already done. */
OPAL_THREAD_LOCK(&ompi_modex_lock);
if (NULL == proc->proc_modex) {
OBJ_RETAIN(proc_data);
proc->proc_modex = &proc_data->super.super;
/* verify that we have subscribed to this segment */
ompi_modex_subscribe_job(proc->proc_name.jobid);
}
OPAL_THREAD_LOCK(&ompi_modex_lock);
}
return proc_data;
}
/**
* Callback for registry notifications.
*/
static void
ompi_modex_registry_callback(orte_gpr_notify_data_t * data,
void *cbdata)
{
orte_std_cntr_t i, j, k;
orte_gpr_value_t **values, *value;
orte_gpr_keyval_t **keyval;
orte_process_name_t *proc_name;
ompi_modex_proc_data_t *proc_data;
ompi_modex_module_data_t *module_data;
mca_base_component_t component;
int rc;
/* process the callback */
values = (orte_gpr_value_t **) (data->values)->addr;
for (i = 0, k = 0; k < data->cnt &&
i < (data->values)->size; i++) {
if (NULL != values[i]) {
k++;
value = values[i];
if (0 < value->cnt) { /* needs to be at least one keyval */
/* Find the process name in the keyvals */
keyval = value->keyvals;
for (j = 0; j < value->cnt; j++) {
if (0 != strcmp(keyval[j]->key, ORTE_PROC_NAME_KEY)) continue;
/* this is the process name - extract it */
if (ORTE_SUCCESS != orte_dss.get((void**)&proc_name, keyval[j]->value, ORTE_NAME)) {
opal_output(0, "ompi_modex_registry_callback: unable to extract process name\n");
return; /* nothing we can do */
}
goto GOTNAME;
}
opal_output(0, "ompi_modex_registry_callback: unable to find process name in notify message\n");
return; /* if the name wasn't here, there is nothing we can do */
GOTNAME:
/* look up the modex data structure */
proc_data = ompi_modex_lookup_orte_proc(proc_name);
if (proc_data == NULL) continue;
OPAL_THREAD_LOCK(&proc_data->modex_lock);
/*
* Extract the component name and version from the keyval object's key
* Could be multiple keyvals returned since there is one for each
* component type/name/version - process them all
*/
keyval = value->keyvals;
for (j = 0; j < value->cnt; j++) {
orte_buffer_t buffer;
opal_list_item_t *item;
char *ptr;
void *bytes = NULL;
orte_std_cntr_t cnt;
size_t num_bytes;
orte_byte_object_t *bo;
if (strcmp(keyval[j]->key, OMPI_MODEX_KEY) != 0)
continue;
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
if (ORTE_SUCCESS != (rc = orte_dss.get((void **) &bo, keyval[j]->value, ORTE_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
continue;
}
if (ORTE_SUCCESS != (rc = orte_dss.load(&buffer, bo->bytes, bo->size))) {
ORTE_ERROR_LOG(rc);
continue;
}
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
continue;
}
strcpy(component.mca_type_name, ptr);
free(ptr);
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &ptr, &cnt, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
continue;
}
strcpy(component.mca_component_name, ptr);
free(ptr);
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer,
&component.mca_component_major_version, &cnt, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
continue;
}
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer,
&component.mca_component_minor_version, &cnt, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
continue;
}
cnt = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, &num_bytes, &cnt, ORTE_SIZE))) {
ORTE_ERROR_LOG(rc);
continue;
}
if (num_bytes != 0) {
if (NULL == (bytes = malloc(num_bytes))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
continue;
}
cnt = (orte_std_cntr_t) num_bytes;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(&buffer, bytes, &cnt, ORTE_BYTE))) {
ORTE_ERROR_LOG(rc);
continue;
}
num_bytes = cnt;
} else {
bytes = NULL;
}
/*
* Lookup the corresponding modex structure
*/
if (NULL == (module_data = ompi_modex_lookup_module(proc_data,
&component,
true))) {
opal_output(0, "ompi_modex_registry_callback: ompi_modex_lookup_module failed\n");
OBJ_RELEASE(data);
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
return;
}
module_data->module_data = bytes;
module_data->module_data_size = num_bytes;
proc_data->modex_received_data = true;
opal_condition_signal(&proc_data->modex_cond);
if (opal_list_get_size(&module_data->module_cbs)) {
ompi_proc_t *proc = ompi_proc_find(proc_name);
if (NULL != proc) {
OPAL_THREAD_LOCK(&proc->proc_lock);
/* call any registered callbacks */
for (item = opal_list_get_first(&module_data->module_cbs);
item != opal_list_get_end(&module_data->module_cbs);
item = opal_list_get_next(item)) {
ompi_modex_cb_t *cb = (ompi_modex_cb_t *) item;
cb->cbfunc(&module_data->component,
proc, bytes, num_bytes, cb->cbdata);
}
OPAL_THREAD_UNLOCK(&proc->proc_lock);
}
}
}
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
} /* if value[i]->cnt > 0 */
} /* if value[i] != NULL */
}
}
int
ompi_modex_subscribe_job(orte_jobid_t jobid)
{
char *segment, *sub_name, *trig_name;
orte_gpr_subscription_id_t sub_id;
opal_list_item_t *item;
ompi_modex_subscription_t *subscription;
int rc;
char *keys[] = {
ORTE_PROC_NAME_KEY,
OMPI_MODEX_KEY,
NULL
};
/* check for an existing subscription */
OPAL_THREAD_LOCK(&ompi_modex_lock);
for (item = opal_list_get_first(&ompi_modex_subscriptions) ;
item != opal_list_get_end(&ompi_modex_subscriptions) ;
item = opal_list_get_next(item)) {
subscription = (ompi_modex_subscription_t *) item;
if (subscription->jobid == jobid) {
OPAL_THREAD_UNLOCK(&ompi_modex_lock);
return OMPI_SUCCESS;
}
}
OPAL_THREAD_UNLOCK(&ompi_modex_lock);
/* otherwise - subscribe to get this jobid's contact info */
if (ORTE_SUCCESS != (rc = orte_schema.get_std_subscription_name(&sub_name,
OMPI_MODEX_SUBSCRIPTION, jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* attach to the stage-1 standard trigger */
if (ORTE_SUCCESS != (rc = orte_schema.get_std_trigger_name(&trig_name,
ORTE_STG1_TRIGGER, jobid))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
return rc;
}
/* define the segment */
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
free(trig_name);
return rc;
}
if (jobid != orte_process_info.my_name->jobid) {
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&sub_id, NULL, NULL,
ORTE_GPR_NOTIFY_ADD_ENTRY |
ORTE_GPR_NOTIFY_VALUE_CHG |
ORTE_GPR_NOTIFY_PRE_EXISTING,
ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED,
segment,
NULL, /* look at all
* containers on this
* segment */
2, keys,
ompi_modex_registry_callback, NULL))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
free(trig_name);
free(segment);
return rc;
}
} else {
if (ORTE_SUCCESS != (rc = orte_gpr.subscribe_N(&sub_id, trig_name, sub_name,
ORTE_GPR_NOTIFY_ADD_ENTRY |
ORTE_GPR_NOTIFY_VALUE_CHG |
ORTE_GPR_NOTIFY_STARTS_AFTER_TRIG,
ORTE_GPR_KEYS_OR | ORTE_GPR_TOKENS_OR | ORTE_GPR_STRIPPED,
segment,
NULL, /* look at all
* containers on this
* segment */
2, keys,
ompi_modex_registry_callback, NULL))) {
ORTE_ERROR_LOG(rc);
free(sub_name);
free(trig_name);
free(segment);
return rc;
}
}
free(sub_name);
free(trig_name);
free(segment);
/* add this jobid to our list of subscriptions */
OPAL_THREAD_LOCK(&ompi_modex_lock);
subscription = OBJ_NEW(ompi_modex_subscription_t);
subscription->jobid = jobid;
opal_list_append(&ompi_modex_subscriptions, &subscription->item);
OPAL_THREAD_UNLOCK(&ompi_modex_lock);
return OMPI_SUCCESS;
}
int
ompi_modex_send(mca_base_component_t * source_component,
const void *data,
size_t size)
{
orte_jobid_t jobid;
int rc;
orte_buffer_t buffer;
orte_std_cntr_t i, num_tokens;
char *ptr, *segment, **tokens;
orte_byte_object_t bo;
orte_data_value_t value = ORTE_DATA_VALUE_EMPTY;
/* get location in GPR for the data */
jobid = ORTE_PROC_MY_NAME->jobid;
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens,
&num_tokens, orte_process_info.my_name))) {
ORTE_ERROR_LOG(rc);
free(segment);
return rc;
}
OBJ_CONSTRUCT(&buffer, orte_buffer_t);
/* Pack the component name information into the buffer */
ptr = source_component->mca_type_name;
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
ptr = source_component->mca_component_name;
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &ptr, 1, ORTE_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_major_version, 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &source_component->mca_component_minor_version, 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, &size, 1, ORTE_SIZE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* Pack the actual data into the buffer */
if (0 != size) {
if (ORTE_SUCCESS != (rc = orte_dss.pack(&buffer, (void *) data, size, ORTE_BYTE))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
if (ORTE_SUCCESS != (rc = orte_dss.unload(&buffer, (void **) &(bo.bytes), &(bo.size)))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OBJ_DESTRUCT(&buffer);
/* setup the data_value structure to hold the byte object */
if (ORTE_SUCCESS != (rc = orte_dss.set(&value, (void *) &bo, ORTE_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* Put data in registry */
rc = orte_gpr.put_1(ORTE_GPR_TOKENS_AND | ORTE_GPR_KEYS_OR,
segment, tokens, OMPI_MODEX_KEY, &value);
cleanup:
free(segment);
for (i = 0; i < num_tokens; i++) {
free(tokens[i]);
tokens[i] = NULL;
}
if (NULL != tokens)
free(tokens);
return rc;
}
int
ompi_modex_recv(mca_base_component_t * component,
ompi_proc_t * proc,
void **buffer,
size_t * size)
{
ompi_modex_proc_data_t *proc_data;
ompi_modex_module_data_t *module_data;
/* make sure we could possibly have modex data */
if (0 == strcmp(orte_gpr_base_selected_component.gpr_version.mca_component_name,
"null")) {
return OMPI_ERR_NOT_IMPLEMENTED;
}
proc_data = ompi_modex_lookup_proc(proc);
if (NULL == proc_data) return OMPI_ERR_NOT_FOUND;
OPAL_THREAD_LOCK(&proc_data->modex_lock);
/* wait until data is available */
while (proc_data->modex_received_data == false) {
opal_condition_wait(&proc_data->modex_cond, &proc_data->modex_lock);
}
/* look up module */
module_data = ompi_modex_lookup_module(proc_data, component, false);
/* copy the data out to the user */
if ((NULL == module_data) ||
(module_data->module_data_size == 0)) {
*buffer = NULL;
*size = 0;
} else {
void *copy = malloc(module_data->module_data_size);
if (copy == NULL) {
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy(copy, module_data->module_data, module_data->module_data_size);
*buffer = copy;
*size = module_data->module_data_size;
}
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
return OMPI_SUCCESS;
}
int
ompi_modex_recv_nb(mca_base_component_t *component,
ompi_proc_t *proc,
ompi_modex_cb_fn_t cbfunc,
void *cbdata)
{
ompi_modex_proc_data_t *proc_data;
ompi_modex_module_data_t *module_data;
ompi_modex_cb_t *cb;
proc_data = ompi_modex_lookup_proc(proc);
if (NULL == proc_data) return OMPI_ERR_NOT_FOUND;
OPAL_THREAD_LOCK(&proc_data->modex_lock);
/* lookup / create module */
module_data = ompi_modex_lookup_module(proc_data, component, true);
if (NULL == module_data) {
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* register the callback */
cb = OBJ_NEW(ompi_modex_cb_t);
cb->cbfunc = cbfunc;
cb->cbdata = cbdata;
opal_list_append(&module_data->module_cbs, &cb->super);
OPAL_THREAD_UNLOCK(&proc_data->modex_lock);
return OMPI_SUCCESS;
}

220
ompi/runtime/ompi_module_exchange.h Обычный файл
Просмотреть файл

@ -0,0 +1,220 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file
* Open MPI module-related data transfer mechanism
*
* A system for publishing module-related data for global
* initialization. Known simply as the "modex", this interface
* provides a system for sharing data, particularly data related to
* modules and their availability on the system.
*
* The modex system is tightly integrated into the general run-time
* initialization system and takes advantage of global update periods
* to minimize the amount of network traffic. All updates are also
* stored in the general purpose registry, and can be read at any time
* during the life of the process. Care should be taken to not call
* the blocking receive during the first stage of global
* initialization, as data will not be available the process will
* likely hang.
*
* @note For the purpose of this interface, two components are
* "corresponding" if:
* - they share the same major and minor MCA version number
* - they have the same type name string
* - they share the same major and minor type version number
* - they have the same component name string
* - they share the same major and minor component version number
*/
#ifndef MCA_OMPI_MODULE_EXCHANGE_H
#define MCA_OMPI_MODULE_EXCHANGE_H
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#include "orte/mca/ns/ns_types.h"
struct mca_base_component_t;
struct ompi_proc_t;
BEGIN_C_DECLS
/**
* Send a module-specific buffer to all other corresponding MCA
* modules in peer processes
*
* This function takes a contiguous buffer of network-ordered data
* and makes it available to all other MCA processes during the
* selection process. Modules sent by one source_component can only
* be received by a corresponding module with the same
* component name.
*
* This function is indended to be used during MCA module
* initialization \em before \em selection (the selection process is
* defined differently for each component type). Each module will
* provide a buffer containing meta information and/or parameters
* that it wants to share with its corresponding modules in peer
* processes. This information typically contains location /
* contact information for establishing communication between
* processes (in a manner that is specific to that module). For
* example, a TCP-based module could provide its IP address and TCP
* port where it is waiting on listen(). The peer process receiving
* this buffer can therefore open a socket to the indicated IP
* address and TCP port.
*
* During the selection process, the MCA framework will effectively
* perform an "allgather" operation of all modex buffers; every
* buffer will be available to every peer process (see
* ompi_modex_recv()).
*
* The buffer is copied during the send call and may be modified or
* free()'ed immediately after the return from this function call.
*
* @note Buffer contents is transparent to the MCA framework -- it \em
* must already either be in network order or be in some format that
* peer processes will be able to read it, regardless of pointer sizes
* or endian bias.
*
* @param source_component A pointer to this module's component
* structure
* @param buffer A pointer to the beginning of the buffer to send
* @param size Number of bytes in the buffer
*
* @retval OMPI_SUCCESS On success
* @retval OMPI_ERROR An unspecified error occurred
*/
OMPI_DECLSPEC int ompi_modex_send(struct mca_base_component_t *source_component,
const void *buffer, size_t size);
/**
* Receive a module-specific buffer from a corresponding MCA module
* in a specific peer process
*
* This is the corresponding "get" call to ompi_modex_send().
* After selection, modules can call this function to receive the
* buffer sent by their corresponding module on the process
* source_proc.
*
* If a buffer from a corresponding module is found, buffer will be
* filled with a pointer to a copy of the buffer that was sent by
* the peer process. It is the caller's responsibility to free this
* buffer. The size will be filled in with the total size of the
* buffer.
*
* @note If the modex system has received information from a given
* process, but has not yet received information for the given
* component, ompi_modex_recv() will return no data. This
* can not happen to a process that has gone through the normal
* startup proceedure, but if you believe this can happen with your
* component, you should use ompi_modex_recv_nb() to receive updates
* when the information becomes available.
*
* @param dest_component A pointer to this module's component struct
* @param source_proc Peer process to receive from
* @param buffer A pointer to a (void*) that will be filled
* with a pointer to the received buffer
* @param size Pointer to a size_t that will be filled with
* the number of bytes in the buffer
*
* @retval OMPI_SUCCESS If a corresponding module buffer is found and
* successfully returned to the caller.
* @retval OMPI_ERR_NOT_IMPLEMENTED Modex support is not available in
* this build of Open MPI (systems like the Cray XT)
* @retval OMPI_ERR_OUT_OF_RESOURCE No memory could be allocated for the
* buffer.
*/
OMPI_DECLSPEC int ompi_modex_recv(struct mca_base_component_t *dest_component,
struct ompi_proc_t *source_proc,
void **buffer, size_t *size);
typedef void (*ompi_modex_cb_fn_t)(struct mca_base_component_t *component,
struct ompi_proc_t* proc,
void* buffer,
size_t size,
void* cbdata);
/**
* Register to receive a callback on change to module specific data.
*
* The non-blocking version of ompi_modex_recv(). All information
* about ompi_modex_recv() applies to ompi_modex_recv_nb(), with the
* exception of what happens when data is available for the given peer
* process but not the specified module. In that case, no callback
* will be fired until data is available.
*
* @param component A pointer to this module's component struct
* @param proc Peer process to receive from
* @param cbfunc Callback function when data is available,
* of type ompi_modex_cb_fn_t
* @param cbdata Opaque callback data to pass to cbfunc
*
* @retval OMPI_SUCCESS Success
* @retval OMPI_ERR_OUT_OF_RESOURCE No memory could be allocated
* for internal data structures
*/
OMPI_DECLSPEC int ompi_modex_recv_nb(struct mca_base_component_t *component,
struct ompi_proc_t* proc,
ompi_modex_cb_fn_t cbfunc,
void* cbdata);
/**
* Subscribe to resource updates for a specific job
*
* Generally called during process initialization, after all the data
* has been loaded into the module exchange system, but before the
* data is actually used.
*
* Intended to help the scalability of start-up by not subscribing to
* the job updates until all data is in the system (and not firing
* updates along the way) and launching the asynchronous request for
* the data before it is actually needed later in init.
*
* This function is probably not useful outside of application
* initialization code.
*/
OMPI_DECLSPEC int ompi_modex_subscribe_job(orte_jobid_t jobid);
/**
* Initialize the modex system
*
* Allocate memory for the local data cache and initialize the
* module exchange system. Does not cause communication nor any
* subscriptions to be placed on the registry.
*/
OMPI_DECLSPEC int ompi_modex_init(void);
/**
* Finalize the modex system
*
* Release any memory associated with the modex system, remove all
* subscriptions on the GPR and end all non-blocking update triggers
* currently available on the system.
*/
OMPI_DECLSPEC int ompi_modex_finalize(void);
END_C_DECLS
#endif /* MCA_OMPI_MODULE_EXCHANGE_H */

Просмотреть файл

@ -64,7 +64,7 @@
#include "ompi/info/info.h"
#include "ompi/runtime/mpiruntime.h"
#include "ompi/attribute/attribute.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/mca/osc/base/base.h"
@ -264,7 +264,7 @@ int ompi_mpi_finalize(void)
}
/* free module exchange resources */
if (OMPI_SUCCESS != (ret = mca_pml_base_modex_finalize())) {
if (OMPI_SUCCESS != (ret = ompi_modex_finalize())) {
return ret;
}

Просмотреть файл

@ -73,7 +73,7 @@
#include "ompi/mca/mpool/base/base.h"
#include "ompi/mca/mpool/mpool.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
#include "ompi/runtime/ompi_module_exchange.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/mca/osc/base/base.h"
#include "ompi/mca/coll/coll.h"
@ -400,8 +400,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
/* Initialize module exchange */
if (OMPI_SUCCESS != (ret = mca_pml_base_modex_init())) {
error = "mca_pml_base_modex_init() failed";
if (OMPI_SUCCESS != (ret = ompi_modex_init())) {
error = "ompi_modex_init() failed";
goto error;
}
@ -506,8 +506,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
goto error;
}
/* do module exchange */
if (OMPI_SUCCESS != (ret = mca_pml_base_modex_exchange())) {
error = "mca_pml_base_modex_exchange() failed";
if (OMPI_SUCCESS != (ret = ompi_modex_subscribe_job(ORTE_PROC_MY_NAME->jobid))) {
error = "ompi_modex_subscribe_job() failed";
goto error;
}