b225366012
passed to make it all flow thru the opal/pmix "put/get" operations. Update the PMIx code to latest master to pickup some required behaviors. Remove the no-longer-required get_contact_info and set_contact_info from the RML layer. Add an MCA param to allow the ofi/rml component to route messages if desired. This is mainly for experimentation at this point as we aren't sure if routing wi ll be beneficial at large scales. Leave it "off" by default. Signed-off-by: Ralph Castain <rhc@open-mpi.org>
334 строки
12 KiB
C
334 строки
12 KiB
C
/*
|
|
* Copyright (c) 2004-2011 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
|
|
* reserved.
|
|
* Copyright (c) 2013 Cisco Systems, Inc. All rights reserved.
|
|
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
|
|
* Copyright (c) 2015-2017 Research Organization for Information Science
|
|
* and Technology (RIST). All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "orte_config.h"
|
|
|
|
#include <string.h>
|
|
|
|
#include "opal/class/opal_list.h"
|
|
#include "opal/dss/dss.h"
|
|
#include "orte/mca/mca.h"
|
|
#include "opal/mca/base/mca_base_component_repository.h"
|
|
#include "opal/util/argv.h"
|
|
#include "opal/util/output.h"
|
|
|
|
#include "orte/mca/rml/rml.h"
|
|
#include "orte/mca/state/state.h"
|
|
#include "orte/runtime/orte_wait.h"
|
|
#include "orte/util/name_fns.h"
|
|
#include "orte/util/threads.h"
|
|
|
|
#include "orte/mca/rml/base/base.h"
|
|
|
|
/*
|
|
* The stub API interface functions
|
|
*/
|
|
|
|
/** Open a conduit - check if the ORTE_RML_INCLUDE_COMP attribute is provided, this is */
|
|
/* a comma seperated list of components, try to open the conduit in this order. */
|
|
/* if the ORTE_RML_INCLUDE_COMP is not provided or this list was not able to open conduit */
|
|
/* call the open_conduit() of the component in priority order to see if they can use the */
|
|
/* attribute to open a conduit. */
|
|
/* Note: The component takes care of checking for duplicate and returning the previously */
|
|
/* opened module* in case of duplicates. Currently we are saving it in a new conduit_id */
|
|
/* even if it is duplicate. [ToDo] compare the module* received from component to see if */
|
|
/* already present in array and return the prev conduit_id instead of adding it again to array */
|
|
/* @param[in] attributes The attributes is a list of opal_value_t of type OPAL_STRING */
|
|
orte_rml_conduit_t orte_rml_API_open_conduit(opal_list_t *attributes)
|
|
{
|
|
orte_rml_base_active_t *active;
|
|
orte_rml_component_t *comp;
|
|
orte_rml_base_module_t *mod, *ourmod=NULL;
|
|
int rc;
|
|
|
|
opal_output_verbose(10,orte_rml_base_framework.framework_output,
|
|
"%s rml:base:open_conduit",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
|
|
/* bozo check - you cannot specify both include and exclude */
|
|
if (orte_get_attribute(attributes, ORTE_RML_INCLUDE_COMP_ATTRIB, NULL, OPAL_STRING) &&
|
|
orte_get_attribute(attributes, ORTE_RML_EXCLUDE_COMP_ATTRIB, NULL, OPAL_STRING)) {
|
|
// orte_show_help();
|
|
return ORTE_ERR_NOT_SUPPORTED;
|
|
}
|
|
|
|
/* cycle thru the actives in priority order and let each one see if they can support this request */
|
|
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
|
comp = (orte_rml_component_t *)active->component;
|
|
if (NULL != comp->open_conduit) {
|
|
if (NULL != (mod = comp->open_conduit(attributes))) {
|
|
opal_output_verbose(2, orte_rml_base_framework.framework_output,
|
|
"%s rml:base:open_conduit Component %s provided a conduit",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
active->component->base.mca_component_name);
|
|
ourmod = mod;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if (NULL != ourmod) {
|
|
/* we got an answer - store this conduit in our array */
|
|
rc = opal_pointer_array_add(&orte_rml_base.conduits, ourmod);
|
|
if (rc < 0) {
|
|
return ORTE_RML_CONDUIT_INVALID;
|
|
}
|
|
return rc;
|
|
}
|
|
/* we get here if nobody could support it */
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
|
|
return ORTE_RML_CONDUIT_INVALID;
|
|
}
|
|
|
|
|
|
|
|
/** Shutdown the communication system and clean up resources */
|
|
void orte_rml_API_close_conduit(orte_rml_conduit_t id)
|
|
{
|
|
orte_rml_base_module_t *mod;
|
|
orte_rml_component_t *comp;
|
|
|
|
opal_output_verbose(10,orte_rml_base_framework.framework_output,
|
|
"%s rml:base:close_conduit(%d)",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)id);
|
|
|
|
if( NULL != (mod = (orte_rml_base_module_t*)opal_pointer_array_get_item(&orte_rml_base.conduits, id))) {
|
|
comp = (orte_rml_component_t*)mod->component;
|
|
if (NULL != comp && NULL != comp->close_conduit) {
|
|
comp->close_conduit(mod);
|
|
}
|
|
opal_pointer_array_set_item(&orte_rml_base.conduits, id, NULL);
|
|
free(mod);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/** Ping process for connectivity check */
|
|
int orte_rml_API_ping(orte_rml_conduit_t conduit_id,
|
|
const char* contact_info,
|
|
const struct timeval* tv)
|
|
{
|
|
int rc = ORTE_ERR_UNREACH;
|
|
orte_rml_base_module_t *mod;
|
|
|
|
opal_output_verbose(10,orte_rml_base_framework.framework_output,
|
|
"%s rml:base:ping(conduit-%d)",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),conduit_id);
|
|
|
|
/* get the module */
|
|
if (NULL == (mod = (orte_rml_base_module_t*)opal_pointer_array_get_item(&orte_rml_base.conduits, conduit_id))) {
|
|
return rc;
|
|
}
|
|
if (NULL == mod->ping) {
|
|
return rc;
|
|
}
|
|
rc = mod->ping((struct orte_rml_base_module_t*)mod, contact_info, tv);
|
|
return rc;
|
|
}
|
|
|
|
|
|
/** Send non-blocking iovec message through a specific conduit*/
|
|
int orte_rml_API_send_nb(orte_rml_conduit_t conduit_id,
|
|
orte_process_name_t* peer,
|
|
struct iovec* msg,
|
|
int count,
|
|
orte_rml_tag_t tag,
|
|
orte_rml_callback_fn_t cbfunc,
|
|
void* cbdata)
|
|
{
|
|
int rc = ORTE_ERR_UNREACH;
|
|
orte_rml_base_module_t *mod;
|
|
|
|
opal_output_verbose(10,orte_rml_base_framework.framework_output,
|
|
"%s rml:base:send_nb() to peer %s through conduit %d",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(peer),conduit_id);
|
|
/* get the module */
|
|
if (NULL == (mod = (orte_rml_base_module_t*)opal_pointer_array_get_item(&orte_rml_base.conduits, conduit_id))) {
|
|
return rc;
|
|
}
|
|
if (NULL == mod->send_nb) {
|
|
return rc;
|
|
}
|
|
rc = mod->send_nb((struct orte_rml_base_module_t*)mod, peer, msg, count, tag, cbfunc, cbdata);
|
|
return rc;
|
|
}
|
|
|
|
/** Send non-blocking buffer message */
|
|
int orte_rml_API_send_buffer_nb(orte_rml_conduit_t conduit_id,
|
|
orte_process_name_t* peer,
|
|
struct opal_buffer_t* buffer,
|
|
orte_rml_tag_t tag,
|
|
orte_rml_buffer_callback_fn_t cbfunc,
|
|
void* cbdata)
|
|
{
|
|
int rc = ORTE_ERR_UNREACH;
|
|
orte_rml_base_module_t *mod;
|
|
|
|
opal_output_verbose(10,orte_rml_base_framework.framework_output,
|
|
"%s rml:base:send_buffer_nb() to peer %s through conduit %d",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(peer),conduit_id);
|
|
|
|
/* get the module */
|
|
if (NULL == (mod = (orte_rml_base_module_t*)opal_pointer_array_get_item(&orte_rml_base.conduits, conduit_id))) {
|
|
return rc;
|
|
}
|
|
if (NULL == mod->send_buffer_nb) {
|
|
return rc;
|
|
}
|
|
rc = mod->send_buffer_nb((struct orte_rml_base_module_t*)mod, peer, buffer, tag, cbfunc, cbdata);
|
|
return rc;
|
|
}
|
|
|
|
/** post a receive for an IOV message - this is done
|
|
* strictly in the base, and so it does not go to a module */
|
|
void orte_rml_API_recv_nb(orte_process_name_t* peer,
|
|
orte_rml_tag_t tag,
|
|
bool persistent,
|
|
orte_rml_callback_fn_t cbfunc,
|
|
void* cbdata)
|
|
{
|
|
orte_rml_recv_request_t *req;
|
|
|
|
opal_output_verbose(10, orte_rml_base_framework.framework_output,
|
|
"%s rml_recv_nb for peer %s tag %d",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(peer), tag);
|
|
|
|
/* push the request into the event base so we can add
|
|
* the receive to our list of posted recvs */
|
|
req = OBJ_NEW(orte_rml_recv_request_t);
|
|
req->post->buffer_data = false;
|
|
req->post->peer.jobid = peer->jobid;
|
|
req->post->peer.vpid = peer->vpid;
|
|
req->post->tag = tag;
|
|
req->post->persistent = persistent;
|
|
req->post->cbfunc.iov = cbfunc;
|
|
req->post->cbdata = cbdata;
|
|
ORTE_THREADSHIFT(req, orte_event_base, orte_rml_base_post_recv, ORTE_MSG_PRI);
|
|
}
|
|
|
|
/** Receive non-blocking buffer message */
|
|
void orte_rml_API_recv_buffer_nb(orte_process_name_t* peer,
|
|
orte_rml_tag_t tag,
|
|
bool persistent,
|
|
orte_rml_buffer_callback_fn_t cbfunc,
|
|
void* cbdata)
|
|
{
|
|
orte_rml_recv_request_t *req;
|
|
|
|
opal_output_verbose(10, orte_rml_base_framework.framework_output,
|
|
"%s rml_recv_buffer_nb for peer %s tag %d",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(peer), tag);
|
|
|
|
/* push the request into the event base so we can add
|
|
* the receive to our list of posted recvs */
|
|
req = OBJ_NEW(orte_rml_recv_request_t);
|
|
req->post->buffer_data = true;
|
|
req->post->peer.jobid = peer->jobid;
|
|
req->post->peer.vpid = peer->vpid;
|
|
req->post->tag = tag;
|
|
req->post->persistent = persistent;
|
|
req->post->cbfunc.buffer = cbfunc;
|
|
req->post->cbdata = cbdata;
|
|
ORTE_THREADSHIFT(req, orte_event_base, orte_rml_base_post_recv, ORTE_MSG_PRI);
|
|
}
|
|
|
|
/** Cancel posted non-blocking receive */
|
|
void orte_rml_API_recv_cancel(orte_process_name_t* peer, orte_rml_tag_t tag)
|
|
{
|
|
orte_rml_recv_request_t *req;
|
|
|
|
opal_output_verbose(10, orte_rml_base_framework.framework_output,
|
|
"%s rml_recv_cancel for peer %s tag %d",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
ORTE_NAME_PRINT(peer), tag);
|
|
|
|
ORTE_ACQUIRE_OBJECT(orte_event_base_active);
|
|
if (!orte_event_base_active) {
|
|
/* no event will be processed any more, so simply return. */
|
|
return;
|
|
}
|
|
|
|
/* push the request into the event base so we can remove
|
|
* the receive from our list of posted recvs */
|
|
req = OBJ_NEW(orte_rml_recv_request_t);
|
|
req->cancel = true;
|
|
req->post->peer.jobid = peer->jobid;
|
|
req->post->peer.vpid = peer->vpid;
|
|
req->post->tag = tag;
|
|
ORTE_THREADSHIFT(req, orte_event_base, orte_rml_base_post_recv, ORTE_MSG_PRI);
|
|
}
|
|
|
|
/** Purge information */
|
|
void orte_rml_API_purge(orte_process_name_t *peer)
|
|
{
|
|
orte_rml_base_module_t *mod;
|
|
int i;
|
|
|
|
for (i=0; i < orte_rml_base.conduits.size; i++) {
|
|
/* get the module */
|
|
if (NULL != (mod = (orte_rml_base_module_t*)opal_pointer_array_get_item(&orte_rml_base.conduits, i))) {
|
|
if (NULL != mod->purge) {
|
|
mod->purge(peer);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
int orte_rml_API_query_transports(opal_list_t *providers)
|
|
{
|
|
|
|
orte_rml_base_active_t *active;
|
|
orte_rml_pathway_t *p;
|
|
|
|
opal_output_verbose(10,orte_rml_base_framework.framework_output,
|
|
"%s rml:base:orte_rml_API_query_transports()",
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
|
|
/* cycle thru the actives */
|
|
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
|
if (NULL != active->component->query_transports) {
|
|
opal_output_verbose(10,orte_rml_base_framework.framework_output,
|
|
"\n calling module: %s->query_transports() \n",
|
|
active->component->base.mca_component_name);
|
|
if (NULL != (p = active->component->query_transports())) {
|
|
/* pass the results across */
|
|
OBJ_RETAIN(p);
|
|
opal_list_append(providers, &p->super);
|
|
}
|
|
}
|
|
}
|
|
return ORTE_SUCCESS;
|
|
|
|
}
|
|
|
|
char* orte_rml_API_get_routed(orte_rml_conduit_t id)
|
|
{
|
|
orte_rml_base_module_t *mod;
|
|
|
|
/* get the module */
|
|
if (NULL != (mod = (orte_rml_base_module_t*)opal_pointer_array_get_item(&orte_rml_base.conduits, id))) {
|
|
return mod->routed;
|
|
}
|
|
|
|
return NULL;
|
|
}
|