413 строки
18 KiB
C
413 строки
18 KiB
C
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
|
|
/*
|
|
* Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana
|
|
* University Research and Technology
|
|
* Corporation. All rights reserved.
|
|
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
|
* of Tennessee Research Foundation. All rights
|
|
* reserved.
|
|
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights
|
|
* reserved.
|
|
* Copyright (c) 2014 -2015 Intel, Inc. All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
/**
|
|
* @file
|
|
*
|
|
* RML Framework maintenence interface
|
|
*
|
|
* Interface for starting / stopping / controlling the RML framework,
|
|
* as well as support for modifying RML datatypes.
|
|
*
|
|
* @note The only RML datatype exposed to the user is the RML tag.
|
|
* This will always be an integral value, so the only datatype support
|
|
* really required is the internal DSS functions for packing /
|
|
* unpacking / comparing tags. The user should never need to deal
|
|
* with these.
|
|
*/
|
|
|
|
#ifndef MCA_RML_BASE_H
|
|
#define MCA_RML_BASE_H
|
|
|
|
#include "orte_config.h"
|
|
|
|
#include "opal/dss/dss_types.h"
|
|
#include "orte/mca/mca.h"
|
|
#include "opal/util/timings.h"
|
|
#include "opal/class/opal_pointer_array.h"
|
|
|
|
#include "orte/runtime/orte_globals.h"
|
|
|
|
#include "orte/mca/rml/rml.h"
|
|
|
|
|
|
BEGIN_C_DECLS
|
|
|
|
OPAL_TIMING_DECLARE_EXT(ORTE_DECLSPEC, tm_rml)
|
|
|
|
/*
|
|
* MCA Framework
|
|
*/
|
|
ORTE_DECLSPEC extern mca_base_framework_t orte_rml_base_framework;
|
|
/* select a component */
|
|
ORTE_DECLSPEC int orte_rml_base_select(void);
|
|
|
|
/**
|
|
* Post receive to get updates regarding contact information
|
|
*
|
|
* Post a non-blocking receive (likely during orte_init()) to receive
|
|
* updated contact information from the HNP when it becomes available.
|
|
* This should be called in any process that needs such updates, and
|
|
* the receive will continue to get update callbacks until
|
|
* orte_rml_base_comm_stop() is called.
|
|
*/
|
|
ORTE_DECLSPEC void orte_rml_base_comm_start(void);
|
|
|
|
|
|
/**
|
|
* Stop receiving contact information updates
|
|
*
|
|
* Shut down the receive posted during orte_rml_base_comm_start(),
|
|
* likely during orte_finalize().
|
|
*/
|
|
ORTE_DECLSPEC void orte_rml_base_comm_stop(void);
|
|
|
|
|
|
/* a global struct containing framework-level values */
|
|
typedef struct {
|
|
opal_list_t posted_recvs;
|
|
opal_list_t unmatched_msgs;
|
|
opal_pointer_array_t open_channels;
|
|
#if OPAL_ENABLE_TIMING
|
|
bool timing;
|
|
#endif
|
|
} orte_rml_base_t;
|
|
ORTE_DECLSPEC extern orte_rml_base_t orte_rml_base;
|
|
|
|
|
|
/**
|
|
* List of components that are available to the RML
|
|
*
|
|
* List of components that are currently available to the RML
|
|
* framework. Useable between calls to orte_rml_base_open() and
|
|
* orte_rml_base_close().
|
|
*
|
|
* @note This list should not be used by code outside the RML base.
|
|
*/
|
|
ORTE_DECLSPEC extern opal_list_t orte_rml_base_components;
|
|
|
|
|
|
/**
|
|
* Component structure for the selected RML component
|
|
*
|
|
* Component structure pointer for the currently selected RML
|
|
* component. Useable between calls to orte_rml_base_select() and
|
|
* orte_rml_base_close().
|
|
* @note This pointer should not be used outside the RML base. It is
|
|
* available outside the RML base only for the F/T component.
|
|
*/
|
|
ORTE_DECLSPEC extern orte_rml_component_t *orte_rml_component;
|
|
|
|
typedef enum {
|
|
orte_rml_channel_opening = 0,
|
|
orte_rml_channel_open = 1,
|
|
orte_rml_channel_closing = 2,
|
|
orte_rml_channel_closed = 3,
|
|
}orte_rml_channel_state_t;
|
|
|
|
/**
|
|
* RML channel structure.
|
|
* The RML only needs basic channel information as the rest of the book keeping information
|
|
* is stored in the QoS module specific channel object.
|
|
* It contains a pointer to the QoS module that handles requests on the channel.
|
|
* It contains a pointer to a struct that contains the QoS specific channel data.
|
|
*/
|
|
typedef struct {
|
|
opal_list_item_t super;
|
|
orte_rml_channel_num_t channel_num; // the channel number reference (exposed to the user).
|
|
orte_process_name_t peer; // the other end point (peer) of the channel
|
|
orte_rml_channel_num_t peer_channel; // peer channel number
|
|
void * qos; // pointer to QoS component specific module
|
|
void * qos_channel_ptr; // pointer to QoS component specific channel struct
|
|
orte_rml_channel_state_t state; // channel state
|
|
bool recv; // set to true if this is a receive (peer opened) channel. (Default is send channel)
|
|
} orte_rml_channel_t;
|
|
OBJ_CLASS_DECLARATION(orte_rml_channel_t);
|
|
|
|
|
|
/* structure to send RML messages - used internally */
|
|
typedef struct {
|
|
opal_list_item_t super;
|
|
orte_process_name_t dst; // targeted recipient
|
|
orte_process_name_t origin;
|
|
int status; // returned status on send
|
|
orte_rml_tag_t tag; // targeted tag
|
|
|
|
/* user's send callback functions and data */
|
|
union {
|
|
orte_rml_callback_fn_t iov;
|
|
orte_rml_buffer_callback_fn_t buffer;
|
|
orte_rml_send_channel_callback_fn_t iov_chan;
|
|
orte_rml_send_buffer_channel_callback_fn_t buf_chan;
|
|
} cbfunc;
|
|
void *cbdata;
|
|
|
|
/* pointer to the user's iovec array */
|
|
struct iovec *iov;
|
|
int count;
|
|
/* pointer to the user's buffer */
|
|
opal_buffer_t *buffer;
|
|
/*** TODO : need to move channel specific data to a channel struct */
|
|
/* pointer to the channel object */
|
|
orte_rml_channel_t *channel;
|
|
/* destination channel number */
|
|
orte_rml_channel_num_t dst_channel;
|
|
/* msg seq number */
|
|
uint32_t seq_num;
|
|
/* pointer to raw data for cross-transport
|
|
* transfers
|
|
*/
|
|
char *data;
|
|
} orte_rml_send_t;
|
|
OBJ_CLASS_DECLARATION(orte_rml_send_t);
|
|
|
|
/* structure to send RML channel open messages - used internally */
|
|
typedef struct {
|
|
opal_list_item_t super;
|
|
/* peer process */
|
|
orte_process_name_t dst;
|
|
/* msg send status */
|
|
int status;
|
|
/* channel object */
|
|
orte_rml_channel_t *channel;
|
|
/* attributes of the channel */
|
|
opal_list_t *qos_attributes;
|
|
/* user's callback function */
|
|
orte_rml_channel_callback_fn_t cbfunc;
|
|
/* user's cbdata */
|
|
void *cbdata;
|
|
} orte_rml_open_channel_t;
|
|
OBJ_CLASS_DECLARATION(orte_rml_open_channel_t);
|
|
|
|
/* structure to send RML channel close messages - used internally */
|
|
typedef struct {
|
|
opal_list_item_t super;
|
|
/* msg send status */
|
|
int status;
|
|
/* channel object */
|
|
orte_rml_channel_t *channel;
|
|
/* user's callback function */
|
|
orte_rml_channel_callback_fn_t cbfunc;
|
|
/* user's cbdata */
|
|
void *cbdata;
|
|
} orte_rml_close_channel_t;
|
|
OBJ_CLASS_DECLARATION(orte_rml_close_channel_t);
|
|
|
|
/* define an object for transferring send requests to the event lib */
|
|
typedef struct {
|
|
opal_object_t super;
|
|
opal_event_t ev;
|
|
union {
|
|
orte_rml_send_t send;
|
|
orte_rml_open_channel_t open_channel;
|
|
orte_rml_close_channel_t close_channel;
|
|
}post;
|
|
} orte_rml_send_request_t;
|
|
OBJ_CLASS_DECLARATION(orte_rml_send_request_t);
|
|
|
|
/* structure to recv RML messages - used internally */
|
|
typedef struct {
|
|
opal_list_item_t super;
|
|
opal_event_t ev;
|
|
orte_process_name_t sender; // sender
|
|
orte_rml_tag_t tag; // targeted tag
|
|
orte_rml_channel_num_t channel_num; // channel number
|
|
uint32_t seq_num; //sequence number
|
|
struct iovec iov; // the recvd data
|
|
} orte_rml_recv_t;
|
|
OBJ_CLASS_DECLARATION(orte_rml_recv_t);
|
|
|
|
typedef struct {
|
|
opal_list_item_t super;
|
|
bool buffer_data;
|
|
orte_process_name_t peer;
|
|
orte_rml_tag_t tag;
|
|
bool persistent;
|
|
union {
|
|
orte_rml_callback_fn_t iov;
|
|
orte_rml_buffer_callback_fn_t buffer;
|
|
} cbfunc;
|
|
void *cbdata;
|
|
} orte_rml_posted_recv_t;
|
|
OBJ_CLASS_DECLARATION(orte_rml_posted_recv_t);
|
|
|
|
/* define an object for transferring recv requests to the list of posted recvs */
|
|
typedef struct {
|
|
opal_object_t super;
|
|
opal_event_t ev;
|
|
bool cancel;
|
|
orte_rml_posted_recv_t *post;
|
|
} orte_rml_recv_request_t;
|
|
OBJ_CLASS_DECLARATION(orte_rml_recv_request_t);
|
|
|
|
#define ORTE_RML_POST_MESSAGE(p, t, c, s, b, l) \
|
|
do { \
|
|
orte_rml_recv_t *msg; \
|
|
opal_output_verbose(5, orte_rml_base_framework.framework_output, \
|
|
"%s Message posted at %s:%d", \
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
|
|
__FILE__, __LINE__); \
|
|
msg = OBJ_NEW(orte_rml_recv_t); \
|
|
msg->sender.jobid = (p)->jobid; \
|
|
msg->sender.vpid = (p)->vpid; \
|
|
msg->tag = (t); \
|
|
msg->channel_num = (c); \
|
|
msg->seq_num = (s); \
|
|
msg->iov.iov_base = (IOVBASE_TYPE*)(b); \
|
|
msg->iov.iov_len = (l); \
|
|
/* setup the event */ \
|
|
opal_event_set(orte_event_base, &msg->ev, -1, \
|
|
OPAL_EV_WRITE, \
|
|
orte_rml_base_process_msg, msg); \
|
|
opal_event_set_priority(&msg->ev, ORTE_MSG_PRI); \
|
|
opal_event_active(&msg->ev, OPAL_EV_WRITE, 1); \
|
|
} while(0);
|
|
|
|
#define ORTE_RML_ACTIVATE_MESSAGE(m) \
|
|
do { \
|
|
/* setup the event */ \
|
|
opal_event_set(orte_event_base, &(m)->ev, -1, \
|
|
OPAL_EV_WRITE, \
|
|
orte_rml_base_process_msg, (m)); \
|
|
opal_event_set_priority(&(m)->ev, ORTE_MSG_PRI); \
|
|
opal_event_active(&(m)->ev, OPAL_EV_WRITE, 1); \
|
|
} while(0);
|
|
|
|
/*
|
|
reactivates rcv msg on the unposted rcvd list when a match occurs
|
|
need a different path as the QoS recv processing was already done
|
|
for this process
|
|
*/
|
|
#define ORTE_RML_REACTIVATE_MESSAGE(m) \
|
|
do { \
|
|
/* setup the event */ \
|
|
opal_event_set(orte_event_base, &(m)->ev, -1, \
|
|
OPAL_EV_WRITE, \
|
|
orte_rml_base_reprocess_msg, (m)); \
|
|
opal_event_set_priority(&(m)->ev, ORTE_MSG_PRI); \
|
|
opal_event_active(&(m)->ev, OPAL_EV_WRITE, 1); \
|
|
} while(0);
|
|
|
|
#define ORTE_RML_SEND_COMPLETE(m) \
|
|
do { \
|
|
opal_output_verbose(5, orte_rml_base_framework.framework_output, \
|
|
"%s-%s Send message complete at %s:%d", \
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
|
|
ORTE_NAME_PRINT(&((m)->dst)), \
|
|
__FILE__, __LINE__); \
|
|
if( NULL == (m)->channel) { \
|
|
if (NULL != (m)->iov) { \
|
|
if (NULL != (m)->cbfunc.iov) { \
|
|
(m)->cbfunc.iov((m)->status, \
|
|
&((m)->dst), \
|
|
(m)->iov, (m)->count, \
|
|
(m)->tag, (m)->cbdata); \
|
|
} \
|
|
} else { \
|
|
/* non-blocking buffer send */ \
|
|
(m)->cbfunc.buffer((m)->status, &((m)->origin), \
|
|
(m)->buffer, \
|
|
(m)->tag, (m)->cbdata); \
|
|
} \
|
|
} else { \
|
|
if (NULL != (m)->iov) { \
|
|
if (NULL != (m)->cbfunc.iov_chan) { \
|
|
(m)->cbfunc.iov_chan((m)->status, \
|
|
(m)->channel->channel_num, \
|
|
(m)->iov, (m)->count, \
|
|
(m)->tag, (m)->cbdata); \
|
|
} \
|
|
} else { \
|
|
/* non-blocking buffer send */ \
|
|
(m)->cbfunc.buf_chan((m)->status, \
|
|
(m)->channel->channel_num, \
|
|
(m)->buffer, \
|
|
(m)->tag, (m)->cbdata); \
|
|
} \
|
|
} \
|
|
OBJ_RELEASE(m); \
|
|
}while(0);
|
|
|
|
|
|
#define ORTE_RML_OPEN_CHANNEL_COMPLETE(m) \
|
|
do { \
|
|
opal_output_verbose(5, orte_rml_base_framework.framework_output, \
|
|
"%s-%s open channel message complete at %s:%d", \
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
|
|
ORTE_NAME_PRINT(&((m)->dst)), \
|
|
__FILE__, __LINE__); \
|
|
/* call the callback function */ \
|
|
(m)->cbfunc((m)->status, (m)->channel->channel_num, \
|
|
&((m)->dst), \
|
|
NULL, (m)->cbdata) ; \
|
|
}while(0);
|
|
|
|
#define ORTE_RML_CLOSE_CHANNEL_COMPLETE(m) \
|
|
do { \
|
|
opal_output_verbose(5, orte_rml_base_framework.framework_output, \
|
|
"%s-%d close channel message complete at %s:%d", \
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
|
|
(m)->channel->channel_num, \
|
|
__FILE__, __LINE__); \
|
|
/* call the callback function */ \
|
|
(m)->cbfunc((m)->status, (m)->channel->channel_num, \
|
|
NULL, NULL, (m)->cbdata) ; \
|
|
}while(0);
|
|
/*
|
|
* This is the base priority for a RML wrapper component
|
|
* If there exists more than one wrapper, then the one with
|
|
* the lowest priority wins.
|
|
*/
|
|
#define RML_SELECT_WRAPPER_PRIORITY -128
|
|
|
|
#define ORTE_RML_INVALID_CHANNEL_NUM 1599
|
|
ORTE_DECLSPEC orte_rml_channel_t * orte_rml_base_get_channel (orte_rml_channel_num_t chan_num);
|
|
|
|
|
|
/* common implementations */
|
|
ORTE_DECLSPEC void orte_rml_base_post_recv(int sd, short args, void *cbdata);
|
|
ORTE_DECLSPEC void orte_rml_base_process_msg(int fd, short flags, void *cbdata);
|
|
ORTE_DECLSPEC void orte_rml_base_process_error(int fd, short flags, void *cbdata);
|
|
ORTE_DECLSPEC void orte_rml_base_open_channel(int fd, short flags, void *cbdata);
|
|
ORTE_DECLSPEC void orte_rml_base_close_channel(int fd, short flags, void *cbdata);
|
|
ORTE_DECLSPEC void orte_rml_base_open_channel_send_callback ( int status, orte_process_name_t* sender,
|
|
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
|
void* cbdata);
|
|
ORTE_DECLSPEC void orte_rml_base_open_channel_resp_callback (int status, orte_process_name_t* peer,
|
|
struct opal_buffer_t* buffer, orte_rml_tag_t tag,
|
|
void* cbdata);
|
|
ORTE_DECLSPEC void orte_rml_base_open_channel_reply_send_callback ( int status, orte_process_name_t* sender,
|
|
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
|
void* cbdata);
|
|
ORTE_DECLSPEC void orte_rml_base_prep_send_channel (orte_rml_channel_t *channel,
|
|
orte_rml_send_t *send);
|
|
ORTE_DECLSPEC int orte_rml_base_process_recv_channel (orte_rml_channel_t *channel,
|
|
orte_rml_recv_t *recv);
|
|
ORTE_DECLSPEC void orte_rml_base_close_channel_send_callback ( int status, orte_process_name_t* sender,
|
|
opal_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata);
|
|
ORTE_DECLSPEC void orte_rml_base_send_close_channel ( orte_rml_close_channel_t *close_chan);
|
|
ORTE_DECLSPEC void orte_rml_base_reprocess_msg(int fd, short flags, void *cbdata);
|
|
ORTE_DECLSPEC void orte_rml_base_complete_recv_msg (orte_rml_recv_t **recv_msg);
|
|
END_C_DECLS
|
|
|
|
#endif /* MCA_RML_BASE_H */
|