Adding commit for multiple plugin loading support in RML
Этот коммит содержится в:
родитель
7b73c868d5
Коммит
0188c3cf81
@ -82,8 +82,21 @@ ORTE_DECLSPEC void orte_rml_base_comm_start(void);
|
||||
ORTE_DECLSPEC void orte_rml_base_comm_stop(void);
|
||||
|
||||
|
||||
/*
|
||||
* globals that might be needed
|
||||
*/
|
||||
/* adding element to hold the active modules and components */
|
||||
typedef struct {
|
||||
opal_list_item_t super;
|
||||
int pri;
|
||||
orte_rml_base_module_t *module;
|
||||
mca_base_component_t *component;
|
||||
} orte_rml_base_active_t;
|
||||
OBJ_CLASS_DECLARATION(orte_rml_base_active_t);
|
||||
|
||||
/* a global struct containing framework-level values */
|
||||
typedef struct {
|
||||
opal_list_t actives; /* list to hold the active plugins */
|
||||
opal_list_t posted_recvs;
|
||||
opal_list_t unmatched_msgs;
|
||||
opal_pointer_array_t open_channels;
|
||||
@ -106,17 +119,6 @@ ORTE_DECLSPEC extern orte_rml_base_t orte_rml_base;
|
||||
ORTE_DECLSPEC extern opal_list_t orte_rml_base_components;
|
||||
|
||||
|
||||
/**
|
||||
* Component structure for the selected RML component
|
||||
*
|
||||
* Component structure pointer for the currently selected RML
|
||||
* component. Useable between calls to orte_rml_base_select() and
|
||||
* orte_rml_base_close().
|
||||
* @note This pointer should not be used outside the RML base. It is
|
||||
* available outside the RML base only for the F/T component.
|
||||
*/
|
||||
ORTE_DECLSPEC extern orte_rml_component_t *orte_rml_component;
|
||||
|
||||
typedef enum {
|
||||
orte_rml_channel_opening = 0,
|
||||
orte_rml_channel_open = 1,
|
||||
@ -407,6 +409,66 @@ ORTE_DECLSPEC void orte_rml_base_close_channel_send_callback ( int status, orte_
|
||||
ORTE_DECLSPEC void orte_rml_base_send_close_channel ( orte_rml_close_channel_t *close_chan);
|
||||
ORTE_DECLSPEC void orte_rml_base_reprocess_msg(int fd, short flags, void *cbdata);
|
||||
ORTE_DECLSPEC void orte_rml_base_complete_recv_msg (orte_rml_recv_t **recv_msg);
|
||||
|
||||
|
||||
/* Stub API interfaces to cycle through active plugins and call highest priority */
|
||||
ORTE_DECLSPEC int orte_rml_API_enable_comm(void);
|
||||
ORTE_DECLSPEC int orte_rml_API_finalize(void);
|
||||
ORTE_DECLSPEC char* orte_rml_API_get_contact_info(void);
|
||||
ORTE_DECLSPEC void orte_rml_API_set_contact_info(const char *contact_info);
|
||||
ORTE_DECLSPEC int orte_rml_API_ping(const char* contact_info, const struct timeval* tv);
|
||||
ORTE_DECLSPEC int orte_rml_API_send_nb(orte_process_name_t* peer,struct iovec* msg,
|
||||
int count, orte_rml_tag_t tag,orte_rml_callback_fn_t cbfunc,void* cbdata);
|
||||
ORTE_DECLSPEC int orte_rml_API_send_buffer_nb(orte_process_name_t* peer,
|
||||
struct opal_buffer_t* buffer,
|
||||
orte_rml_tag_t tag,
|
||||
orte_rml_buffer_callback_fn_t cbfunc,
|
||||
void* cbdata);
|
||||
ORTE_DECLSPEC void orte_rml_API_recv_nb(orte_process_name_t* peer,
|
||||
orte_rml_tag_t tag,
|
||||
bool persistent,
|
||||
orte_rml_callback_fn_t cbfunc,
|
||||
void* cbdata);
|
||||
|
||||
ORTE_DECLSPEC void orte_rml_API_recv_buffer_nb(orte_process_name_t* peer,
|
||||
orte_rml_tag_t tag,
|
||||
bool persistent,
|
||||
orte_rml_buffer_callback_fn_t cbfunc,
|
||||
void* cbdata);
|
||||
|
||||
ORTE_DECLSPEC void orte_rml_API_recv_cancel(orte_process_name_t* peer, orte_rml_tag_t tag);
|
||||
|
||||
ORTE_DECLSPEC int orte_rml_API_add_exception_handler(orte_rml_exception_callback_t cbfunc);
|
||||
|
||||
ORTE_DECLSPEC int orte_rml_API_del_exception_handler(orte_rml_exception_callback_t cbfunc);
|
||||
|
||||
ORTE_DECLSPEC int orte_rml_API_ft_event(int state);
|
||||
|
||||
ORTE_DECLSPEC void orte_rml_API_purge(orte_process_name_t *peer);
|
||||
|
||||
ORTE_DECLSPEC int orte_rml_API_open_channel(orte_process_name_t* peer,
|
||||
opal_list_t *qos_attributes,
|
||||
orte_rml_channel_callback_fn_t cbfunc,
|
||||
void* cbdata);
|
||||
|
||||
ORTE_DECLSPEC int orte_rml_API_send_channel_nb(orte_rml_channel_num_t channel,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
orte_rml_tag_t tag,
|
||||
orte_rml_send_channel_callback_fn_t cbfunc,
|
||||
void* cbdata);
|
||||
|
||||
ORTE_DECLSPEC int orte_rml_API_send_buffer_channel_nb(orte_rml_channel_num_t channel,
|
||||
struct opal_buffer_t * buffer,
|
||||
orte_rml_tag_t tag,
|
||||
orte_rml_send_buffer_channel_callback_fn_t cbfunc,
|
||||
void* cbdata);
|
||||
|
||||
ORTE_DECLSPEC int orte_rml_API_close_channel(orte_rml_channel_num_t channel_num,
|
||||
orte_rml_channel_callback_fn_t cbfunc,
|
||||
void* cbdata);
|
||||
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif /* MCA_RML_BASE_H */
|
||||
|
@ -36,7 +36,29 @@
|
||||
* component's public mca_base_component_t struct. */
|
||||
#include "orte/mca/rml/base/static-components.h"
|
||||
|
||||
orte_rml_module_t orte_rml = {0};
|
||||
|
||||
/* Initialising stub fns in the global var used by other modules */
|
||||
orte_rml_API_module_t orte_rml = {
|
||||
orte_rml_API_enable_comm,
|
||||
orte_rml_API_finalize,
|
||||
orte_rml_API_get_contact_info,
|
||||
orte_rml_API_set_contact_info,
|
||||
orte_rml_API_ping,
|
||||
orte_rml_API_send_nb,
|
||||
orte_rml_API_send_buffer_nb,
|
||||
orte_rml_API_recv_nb,
|
||||
orte_rml_API_recv_buffer_nb,
|
||||
orte_rml_API_recv_cancel,
|
||||
orte_rml_API_add_exception_handler,
|
||||
orte_rml_API_del_exception_handler,
|
||||
orte_rml_API_ft_event,
|
||||
orte_rml_API_purge,
|
||||
orte_rml_API_open_channel,
|
||||
orte_rml_API_send_channel_nb,
|
||||
orte_rml_API_send_buffer_channel_nb,
|
||||
orte_rml_API_close_channel
|
||||
};
|
||||
|
||||
orte_rml_base_t orte_rml_base = {{{0}}};
|
||||
OPAL_TIMING_DECLARE(tm_rml)
|
||||
|
||||
@ -89,6 +111,17 @@ static int orte_rml_base_close(void)
|
||||
{
|
||||
bool active;
|
||||
|
||||
orte_rml_base_active_t *active_module;
|
||||
|
||||
/*close the active modules */
|
||||
OPAL_LIST_FOREACH(active_module, &orte_rml_base.actives, orte_rml_base_active_t)
|
||||
{
|
||||
if (NULL != active_module->module->base_finalize) {
|
||||
active_module->module->base_finalize();
|
||||
}
|
||||
}
|
||||
OPAL_LIST_DESTRUCT(&orte_rml_base.actives)
|
||||
|
||||
/* because the RML posted recvs list is in a separate
|
||||
* async thread for apps, we can't just destruct it here.
|
||||
* Instead, we push it into that event thread and destruct
|
||||
@ -115,6 +148,8 @@ static int orte_rml_base_close(void)
|
||||
static int orte_rml_base_open(mca_base_open_flag_t flags)
|
||||
{
|
||||
/* Initialize globals */
|
||||
/* construct object for holding the active plugin modules */
|
||||
OBJ_CONSTRUCT(&orte_rml_base.actives, opal_list_t);
|
||||
OBJ_CONSTRUCT(&orte_rml_base.posted_recvs, opal_list_t);
|
||||
OBJ_CONSTRUCT(&orte_rml_base.unmatched_msgs, opal_list_t);
|
||||
OBJ_CONSTRUCT(&orte_rml_base.open_channels, opal_pointer_array_t);
|
||||
@ -131,18 +166,445 @@ MCA_BASE_FRAMEWORK_DECLARE(orte, rml, "ORTE Run-Time Messaging Layer",
|
||||
orte_rml_base_register, orte_rml_base_open, orte_rml_base_close,
|
||||
mca_rml_base_static_components, 0);
|
||||
|
||||
OBJ_CLASS_INSTANCE(orte_rml_base_active_t,
|
||||
opal_list_item_t,
|
||||
NULL, NULL);
|
||||
|
||||
/*
|
||||
* The stub API interface implementation that cycles through the active list and
|
||||
* calls into the plugin with highest priority that implements it.
|
||||
*/
|
||||
|
||||
/** Enable communication once a process name has been assigned */
|
||||
int orte_rml_API_enable_comm(void)
|
||||
{
|
||||
int rc = ORTE_ERROR;
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:enable_comm calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_enable_comm) {
|
||||
if (ORTE_SUCCESS == (rc = active->module->base_enable_comm())) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/** Shutdown the communication system and clean up resources */
|
||||
int orte_rml_API_finalize(void)
|
||||
{
|
||||
int rc = ORTE_ERROR;
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:finalize() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_finalize) {
|
||||
if (ORTE_SUCCESS == (rc = active->module->base_finalize())) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
/** Get contact information for local process */
|
||||
char* orte_rml_API_get_contact_info(void)
|
||||
{
|
||||
char* rc = NULL;
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:get_contact_info() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_get_contact_info) {
|
||||
if (ORTE_SUCCESS == (rc = active->module->base_get_contact_info())) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/** Set contact information for remote process */
|
||||
void orte_rml_API_set_contact_info(const char *contact_info)
|
||||
{
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:set_contact_info() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_set_contact_info) {
|
||||
return active->module->base_set_contact_info(contact_info);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** Ping process for connectivity check */
|
||||
int orte_rml_API_ping(const char* contact_info, const struct timeval* tv) {
|
||||
int rc = ORTE_ERROR;
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:ping() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_ping) {
|
||||
if (ORTE_SUCCESS == (rc = active->module->base_ping(contact_info,tv))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/** Send non-blocking iovec message */
|
||||
int orte_rml_API_send_nb(orte_process_name_t* peer,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
orte_rml_tag_t tag,
|
||||
orte_rml_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
int rc = ORTE_ERROR;
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:send_nb() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_send_nb) {
|
||||
if (ORTE_SUCCESS == (rc = active->module->base_send_nb(peer,msg,count,tag,cbfunc,cbdata))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/** Send non-blocking buffer message */
|
||||
int orte_rml_API_send_buffer_nb(orte_process_name_t* peer,
|
||||
struct opal_buffer_t* buffer,
|
||||
orte_rml_tag_t tag,
|
||||
orte_rml_buffer_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
int rc = ORTE_ERROR;
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:send_buffer_nb() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_send_buffer_nb) {
|
||||
if (ORTE_SUCCESS == (rc = active->module->base_send_buffer_nb(peer,buffer,tag,cbfunc,cbdata))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
/** Receive non-blocking iovec message */
|
||||
void orte_rml_API_recv_nb(orte_process_name_t* peer,
|
||||
orte_rml_tag_t tag,
|
||||
bool persistent,
|
||||
orte_rml_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:recv_nb() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_recv_nb) {
|
||||
return active->module->base_recv_nb(peer,tag,persistent,cbfunc,cbdata);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Receive non-blocking buffer message */
|
||||
void orte_rml_API_recv_buffer_nb(orte_process_name_t* peer,
|
||||
orte_rml_tag_t tag,
|
||||
bool persistent,
|
||||
orte_rml_buffer_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{ opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:recv_buffer_nb() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_recv_buffer_nb) {
|
||||
return active->module->base_recv_buffer_nb(peer,tag,persistent,cbfunc,cbdata);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Cancel posted non-blocking receive */
|
||||
void orte_rml_API_recv_cancel(orte_process_name_t* peer, orte_rml_tag_t tag)
|
||||
{
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:recv_cancel() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_recv_cancel) {
|
||||
return active->module->base_recv_cancel(peer,tag);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** Add callback for communication exception */
|
||||
int orte_rml_API_add_exception_handler(orte_rml_exception_callback_t cbfunc)
|
||||
{
|
||||
int rc = ORTE_ERROR;
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:add_exception_handler() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_add_exception_handler) {
|
||||
if (ORTE_SUCCESS == (rc = active->module->base_add_exception_handler(cbfunc))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
/** Delete callback for communication exception */
|
||||
int orte_rml_API_del_exception_handler(orte_rml_exception_callback_t cbfunc)
|
||||
{
|
||||
int rc = ORTE_ERROR;
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:del_exception_handler() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_del_exception_handler) {
|
||||
if (ORTE_SUCCESS == (rc = active->module->base_del_exception_handler(cbfunc))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
/** Fault tolerance handler */
|
||||
int orte_rml_API_ft_event(int state)
|
||||
{
|
||||
int rc = ORTE_ERROR;
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:ft_event() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_ft_event) {
|
||||
if (ORTE_SUCCESS == (rc = active->module->base_ft_event(state))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
|
||||
/** Purge information */
|
||||
void orte_rml_API_purge(orte_process_name_t *peer)
|
||||
{
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:purge() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_purge) {
|
||||
return active->module->base_purge(peer);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** Open a qos messaging channel to a peer*/
|
||||
int orte_rml_API_open_channel(orte_process_name_t* peer,
|
||||
opal_list_t *qos_attributes,
|
||||
orte_rml_channel_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
int rc = ORTE_ERROR;
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:open_channel() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_open_channel) {
|
||||
if (ORTE_SUCCESS == (rc = active->module->base_open_channel(peer,qos_attributes,cbfunc,cbdata))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
/** send a non blocking iovec message over a channel */
|
||||
int orte_rml_API_send_channel_nb(orte_rml_channel_num_t channel,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
orte_rml_tag_t tag,
|
||||
orte_rml_send_channel_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
int rc = ORTE_ERROR;
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:send_channel_nb() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_send_channel_nb) {
|
||||
if (ORTE_SUCCESS == (rc = active->module->base_send_channel_nb(channel,msg,count,tag,cbfunc,cbdata))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/** send a non blocking buffer message over a channel */
|
||||
int orte_rml_API_send_buffer_channel_nb(orte_rml_channel_num_t channel,
|
||||
struct opal_buffer_t * buffer,
|
||||
orte_rml_tag_t tag,
|
||||
orte_rml_send_buffer_channel_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
int rc = ORTE_ERROR;
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:send_buffer_channel_nb() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_send_buffer_channel_nb) {
|
||||
if (ORTE_SUCCESS == (rc = active->module->base_send_buffer_channel_nb(channel,buffer,tag,cbfunc,cbdata))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/** close a qos messaging channel */
|
||||
int orte_rml_API_close_channel(orte_rml_channel_num_t channel_num,
|
||||
orte_rml_channel_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
int rc = ORTE_ERROR;
|
||||
opal_buffer_t *buf;
|
||||
orte_rml_base_active_t *active;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((1,orte_rml_base_framework.framework_output,
|
||||
"%s rml:base:close_channel() - calling the respective plugin that implements this",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
|
||||
/* cycle thru the actives and see who can send it */
|
||||
OPAL_LIST_FOREACH(active, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (NULL != active->module->base_close_channel) {
|
||||
if (ORTE_SUCCESS == (rc = active->module->base_close_channel(channel_num,cbfunc,cbdata))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Function for selecting one component(plugin) from all those that are
|
||||
* available.
|
||||
*/
|
||||
int orte_rml_base_select(void)
|
||||
{
|
||||
opal_list_item_t *item, *next;
|
||||
mca_base_component_list_item_t *cli;
|
||||
|
||||
int selected_priority = -1;
|
||||
orte_rml_component_t *selected_component = NULL;
|
||||
orte_rml_module_t *selected_module = NULL;
|
||||
mca_base_component_list_item_t *cli=NULL;
|
||||
mca_base_component_t *component=NULL;
|
||||
mca_base_module_t *module=NULL;
|
||||
orte_rml_base_module_t *nmodule;
|
||||
orte_rml_base_active_t *newmodule, *mod;
|
||||
int rc, priority;
|
||||
bool inserted;
|
||||
|
||||
orte_rml_component_t *wrapper_component = NULL;
|
||||
bool return_silent=false;
|
||||
|
||||
if (selected) {
|
||||
return ORTE_SUCCESS;
|
||||
@ -150,74 +612,71 @@ int orte_rml_base_select(void)
|
||||
selected = true;
|
||||
|
||||
OPAL_LIST_FOREACH(cli, &orte_rml_base_framework.framework_components, mca_base_component_list_item_t ) {
|
||||
orte_rml_component_t* component;
|
||||
component = (orte_rml_component_t *) cli->cli_component;
|
||||
component = (mca_base_component_t *) cli->cli_component;
|
||||
|
||||
opal_output_verbose(10, orte_rml_base_framework.framework_output,
|
||||
"orte_rml_base_select: initializing %s component %s",
|
||||
component->rml_version.mca_type_name,
|
||||
component->rml_version.mca_component_name);
|
||||
"orte_rml_base_select: Initializing %s component %s",
|
||||
component->mca_type_name,
|
||||
component->mca_component_name);
|
||||
|
||||
if (NULL == component->rml_init) {
|
||||
if (NULL == ((orte_rml_component_t *)component)->rml_init) {
|
||||
opal_output_verbose(10, orte_rml_base_framework.framework_output,
|
||||
"orte_rml_base_select: no init function; ignoring component");
|
||||
"orte_rml_base_select: no init function; ignoring component [%s]",component->mca_component_name);
|
||||
} else {
|
||||
int priority = 0;
|
||||
|
||||
orte_rml_module_t* module = component->rml_init(&priority);
|
||||
module = (mca_base_module_t *) ((orte_rml_component_t *)component)->rml_init(&priority);
|
||||
if (NULL == module) {
|
||||
opal_output_verbose(10, orte_rml_base_framework.framework_output,
|
||||
"orte_rml_base_select: init returned failure");
|
||||
if (priority < 0) {
|
||||
return_silent = true;
|
||||
}
|
||||
"orte_rml_base_select: init returned failure [%s]",component->mca_component_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
if(NULL != orte_rml_base_wrapper &&
|
||||
/* If this is a wrapper component then save it for later */
|
||||
if (NULL != orte_rml_base_wrapper &&
|
||||
// If this is a wrapper component then save it for later
|
||||
RML_SELECT_WRAPPER_PRIORITY >= priority) {
|
||||
if( 0 == strncmp(component->rml_version.mca_component_name,
|
||||
if ( 0 == strncmp(component->mca_component_name,
|
||||
orte_rml_base_wrapper,
|
||||
strlen(orte_rml_base_wrapper) ) ) {
|
||||
wrapper_component = component;
|
||||
}
|
||||
} else if (priority > selected_priority) {
|
||||
/* Otherwise this is a normal module and subject to normal selection */
|
||||
if (NULL != selected_module && NULL != selected_module->finalize) {
|
||||
selected_module->finalize();
|
||||
}
|
||||
selected_priority = priority;
|
||||
selected_component = component;
|
||||
selected_module = module;
|
||||
wrapper_component = (orte_rml_component_t *) component;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Unload all components that were not selected
|
||||
*/
|
||||
OPAL_LIST_FOREACH_SAFE(item, next, &orte_rml_base_framework.framework_components, opal_list_item_t) {
|
||||
mca_base_component_list_item_t *cli = (mca_base_component_list_item_t *) item;
|
||||
orte_rml_component_t* component = (orte_rml_component_t *) cli->cli_component;
|
||||
|
||||
/* Keep it if it is the wrapper component */
|
||||
if ((component == wrapper_component) || (component == selected_component)) {
|
||||
else
|
||||
{
|
||||
/* This is normal plugin component - based on priority add it to the actives list */
|
||||
nmodule = (orte_rml_base_module_t*) module;
|
||||
/* if the module fails to init, skip it */
|
||||
if (NULL == nmodule->base_enable_comm || ORTE_SUCCESS != nmodule->base_enable_comm()) {
|
||||
continue;
|
||||
}
|
||||
/* Not the selected component */
|
||||
opal_output_verbose(10, orte_rml_base_framework.framework_output,
|
||||
"orte_rml_base_select: module %s unloaded",
|
||||
component->rml_version.mca_component_name);
|
||||
opal_list_remove_item(&orte_rml_base_framework.framework_components, item);
|
||||
mca_base_component_repository_release((mca_base_component_t *) component);
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
|
||||
/* setup reference to selected module */
|
||||
if (NULL != selected_module) {
|
||||
orte_rml = *selected_module;
|
||||
orte_rml_component = selected_component;
|
||||
/* add to the list of selected modules */
|
||||
newmodule = OBJ_NEW(orte_rml_base_active_t);
|
||||
newmodule->pri = priority;
|
||||
newmodule->module = nmodule;
|
||||
newmodule->component = component;
|
||||
|
||||
/* maintain priority order */
|
||||
inserted = false;
|
||||
OPAL_LIST_FOREACH(mod, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
if (priority > mod->pri) {
|
||||
opal_list_insert_pos(&orte_rml_base.actives,
|
||||
(opal_list_item_t*)mod, &newmodule->super);
|
||||
inserted = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!inserted) {
|
||||
/* must be lowest priority - add to end */
|
||||
opal_list_append(&orte_rml_base.actives, &newmodule->super);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (4 < opal_output_get_verbosity(orte_rml_base_framework.framework_output)) {
|
||||
opal_output(0, "%s: Final rml priorities", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
/* show the prioritized list */
|
||||
OPAL_LIST_FOREACH(mod, &orte_rml_base.actives, orte_rml_base_active_t) {
|
||||
opal_output(0, "\tComponent: %s Priority: %d", mod->component->mca_component_name, mod->pri);
|
||||
}
|
||||
}
|
||||
|
||||
/* If a wrapper component was requested then
|
||||
@ -227,12 +686,6 @@ int orte_rml_base_select(void)
|
||||
wrapper_component->rml_init(NULL);
|
||||
}
|
||||
|
||||
if (NULL == selected_component) {
|
||||
if (return_silent) {
|
||||
return ORTE_ERR_SILENT;
|
||||
}
|
||||
return ORTE_ERROR;
|
||||
}
|
||||
/* Post a persistent recieve for open channel request */
|
||||
orte_rml.recv_buffer_nb (ORTE_NAME_WILDCARD, ORTE_RML_TAG_OPEN_CHANNEL_REQ,
|
||||
ORTE_RML_PERSISTENT, orte_rml_open_channel_recv_callback,
|
||||
@ -241,6 +694,7 @@ int orte_rml_base_select(void)
|
||||
orte_rml.recv_buffer_nb (ORTE_NAME_WILDCARD, ORTE_RML_TAG_CLOSE_CHANNEL_REQ,
|
||||
ORTE_RML_PERSISTENT, orte_rml_close_channel_recv_callback,
|
||||
NULL);
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -37,7 +37,7 @@
|
||||
BEGIN_C_DECLS
|
||||
|
||||
typedef struct {
|
||||
struct orte_rml_module_t super;
|
||||
struct orte_rml_base_module_t super;
|
||||
opal_list_t exceptions;
|
||||
opal_list_t queued_routing_messages;
|
||||
opal_event_t *timer_event;
|
||||
|
@ -51,7 +51,7 @@
|
||||
#include "orte/mca/oob/base/base.h"
|
||||
#include "rml_oob.h"
|
||||
|
||||
static orte_rml_module_t* rml_oob_init(int* priority);
|
||||
static orte_rml_base_module_t* rml_oob_init(int* priority);
|
||||
static int rml_oob_open(void);
|
||||
static int rml_oob_close(void);
|
||||
|
||||
@ -80,31 +80,31 @@ orte_rml_component_t mca_rml_oob_component = {
|
||||
|
||||
orte_rml_oob_module_t orte_rml_oob_module = {
|
||||
{
|
||||
.enable_comm = orte_rml_oob_init,
|
||||
.finalize = orte_rml_oob_fini,
|
||||
orte_rml_oob_init,
|
||||
orte_rml_oob_fini,
|
||||
|
||||
.get_contact_info = orte_rml_oob_get_uri,
|
||||
.set_contact_info = orte_rml_oob_set_uri,
|
||||
orte_rml_oob_get_uri,
|
||||
orte_rml_oob_set_uri,
|
||||
|
||||
.ping = orte_rml_oob_ping,
|
||||
orte_rml_oob_ping,
|
||||
|
||||
.send_nb = orte_rml_oob_send_nb,
|
||||
.send_buffer_nb = orte_rml_oob_send_buffer_nb,
|
||||
orte_rml_oob_send_nb,
|
||||
orte_rml_oob_send_buffer_nb,
|
||||
|
||||
.recv_nb = orte_rml_oob_recv_nb,
|
||||
.recv_buffer_nb = orte_rml_oob_recv_buffer_nb,
|
||||
orte_rml_oob_recv_nb,
|
||||
orte_rml_oob_recv_buffer_nb,
|
||||
|
||||
.recv_cancel = orte_rml_oob_recv_cancel,
|
||||
orte_rml_oob_recv_cancel,
|
||||
|
||||
.add_exception_handler = orte_rml_oob_add_exception,
|
||||
.del_exception_handler = orte_rml_oob_del_exception,
|
||||
.ft_event = orte_rml_oob_ft_event,
|
||||
.purge = orte_rml_oob_purge,
|
||||
orte_rml_oob_add_exception,
|
||||
orte_rml_oob_del_exception,
|
||||
orte_rml_oob_ft_event,
|
||||
orte_rml_oob_purge,
|
||||
|
||||
.open_channel = orte_rml_oob_open_channel,
|
||||
.send_channel_nb = orte_rml_oob_send_channel_nb,
|
||||
.send_buffer_channel_nb = orte_rml_oob_send_buffer_channel_nb,
|
||||
.close_channel = orte_rml_oob_close_channel
|
||||
orte_rml_oob_open_channel,
|
||||
orte_rml_oob_send_channel_nb,
|
||||
orte_rml_oob_send_buffer_channel_nb,
|
||||
orte_rml_oob_close_channel
|
||||
}
|
||||
};
|
||||
|
||||
@ -124,7 +124,8 @@ rml_oob_close(void)
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
static orte_rml_module_t*
|
||||
|
||||
static orte_rml_base_module_t*
|
||||
rml_oob_init(int* priority)
|
||||
{
|
||||
if (init_done) {
|
||||
|
@ -57,7 +57,8 @@ BEGIN_C_DECLS
|
||||
|
||||
|
||||
struct opal_buffer_t;
|
||||
struct orte_rml_module_t;
|
||||
struct orte_rml_base_module_t;
|
||||
struct orte_rml_API_module_t;
|
||||
typedef struct {
|
||||
opal_object_t super;
|
||||
orte_process_name_t name;
|
||||
@ -109,8 +110,7 @@ ORTE_DECLSPEC void orte_rml_close_channel_recv_callback(int status,
|
||||
* @retval NULL An error occurred and initialization did not occur
|
||||
* @retval non-NULL The module was successfully initialized
|
||||
*/
|
||||
typedef struct orte_rml_module_t* (*orte_rml_component_init_fn_t)(int *priority);
|
||||
|
||||
typedef struct orte_rml_base_module_t* (*orte_rml_component_init_fn_t)(int *priority);
|
||||
|
||||
/**
|
||||
* RML component interface
|
||||
@ -594,13 +594,75 @@ typedef int (*orte_rml_module_close_channel_fn_t)( orte_rml_channel_num_t channe
|
||||
|
||||
|
||||
/**
|
||||
* RML module interface
|
||||
* RML module interface - internal modules
|
||||
*
|
||||
* Module interface for the internal plugins. The base_select() will
|
||||
* put these modules in a global list (actives) based on priority to
|
||||
* allow for loading multiple plugins.
|
||||
*/
|
||||
struct orte_rml_base_module_t {
|
||||
/** Enable communication once a process name has been assigned */
|
||||
orte_rml_module_enable_comm_fn_t base_enable_comm;
|
||||
/** Shutdown the communication system and clean up resources */
|
||||
orte_rml_module_finalize_fn_t base_finalize;
|
||||
|
||||
/** Get contact information for local process */
|
||||
orte_rml_module_get_contact_info_fn_t base_get_contact_info;
|
||||
/** Set contact information for remote process */
|
||||
orte_rml_module_set_contact_info_fn_t base_set_contact_info;
|
||||
|
||||
/** Ping process for connectivity check */
|
||||
orte_rml_module_ping_fn_t base_ping;
|
||||
|
||||
/** Send non-blocking iovec message */
|
||||
orte_rml_module_send_nb_fn_t base_send_nb;
|
||||
/** Send non-blocking buffer message */
|
||||
orte_rml_module_send_buffer_nb_fn_t base_send_buffer_nb;
|
||||
|
||||
/** Receive non-blocking iovec message */
|
||||
orte_rml_module_recv_nb_fn_t base_recv_nb;
|
||||
|
||||
/** Receive non-blocking buffer message */
|
||||
orte_rml_module_recv_buffer_nb_fn_t base_recv_buffer_nb;
|
||||
|
||||
/** Cancel posted non-blocking receive */
|
||||
orte_rml_module_recv_cancel_fn_t base_recv_cancel;
|
||||
|
||||
/** Add callback for communication exception */
|
||||
orte_rml_module_exception_fn_t base_add_exception_handler;
|
||||
/** Delete callback for communication exception */
|
||||
orte_rml_module_exception_fn_t base_del_exception_handler;
|
||||
|
||||
/** Fault tolerance handler */
|
||||
orte_rml_module_ft_event_fn_t base_ft_event;
|
||||
|
||||
/** Purge information */
|
||||
orte_rml_module_purge_fn_t base_purge;
|
||||
|
||||
/** Open a qos messaging channel to a peer*/
|
||||
orte_rml_module_open_channel_fn_t base_open_channel;
|
||||
|
||||
/** send a non blocking iovec message over a channel */
|
||||
orte_rml_module_send_channel_nb_fn_t base_send_channel_nb;
|
||||
|
||||
/** send a non blocking buffer message over a channel */
|
||||
orte_rml_module_send_buffer_channel_nb_fn_t base_send_buffer_channel_nb;
|
||||
|
||||
/** close a qos messaging channel */
|
||||
orte_rml_module_close_channel_fn_t base_close_channel;
|
||||
};
|
||||
/** Convienence typedef */
|
||||
typedef struct orte_rml_base_module_t orte_rml_base_module_t;
|
||||
|
||||
|
||||
/**
|
||||
* RML module interface - external API modules
|
||||
*
|
||||
* Module interface to the RML communication system. A global
|
||||
* instance of this module, orte_rml, provices an interface into the
|
||||
* active RML interface.
|
||||
*/
|
||||
struct orte_rml_module_t {
|
||||
struct orte_rml_API_module_t {
|
||||
/** Enable communication once a process name has been assigned */
|
||||
orte_rml_module_enable_comm_fn_t enable_comm;
|
||||
/** Shutdown the communication system and clean up resources */
|
||||
@ -652,10 +714,10 @@ struct orte_rml_module_t {
|
||||
orte_rml_module_close_channel_fn_t close_channel;
|
||||
};
|
||||
/** Convienence typedef */
|
||||
typedef struct orte_rml_module_t orte_rml_module_t;
|
||||
typedef struct orte_rml_API_module_t orte_rml_API_module_t;
|
||||
|
||||
/** Interface for RML communication */
|
||||
ORTE_DECLSPEC extern orte_rml_module_t orte_rml;
|
||||
ORTE_DECLSPEC extern orte_rml_API_module_t orte_rml;
|
||||
|
||||
|
||||
/* ******************************************************************** */
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user