1
1
This commit was SVN r3973.
Этот коммит содержится в:
Tim Woodall 2005-01-12 20:51:34 +00:00
родитель 8f19c4af03
Коммит 55325d1651
28 изменённых файлов: 2669 добавлений и 47 удалений

Просмотреть файл

@ -23,13 +23,17 @@ AM_CPPFLAGS = -I$(top_builddir)/src
# Source code files
headers = \
base.h
base.h \
iof_base_endpoint.h \
iof_base_fragment.h
libmca_iof_base_la_SOURCES = \
$(headers) \
iof_base_close.c \
iof_base_open.c \
iof_base_select.c
iof_base_endpoint.c \
iof_base_fragment.c \
iof_base_select.c
# Conditionally install the header files

Просмотреть файл

@ -22,6 +22,8 @@
#include "ompi_config.h"
#include "include/ompi.h"
#include "class/ompi_free_list.h"
#include "threads/condition.h"
#include "mca/mca.h"
#include "mca/iof/iof.h"
@ -30,12 +32,26 @@ extern "C" {
#endif
struct mca_iof_base_t {
int iof_output;
ompi_list_t iof_components_opened;
ompi_list_t iof_endpoints;
ompi_mutex_t iof_lock;
ompi_condition_t iof_condition;
ompi_free_list_t iof_fragments;
size_t iof_window_size;
ompi_process_name_t* iof_service;
};
typedef struct mca_iof_base_t mca_iof_base_t;
int mca_iof_base_open(void);
int mca_iof_base_close(void);
int mca_iof_base_select(bool* allow_multi_user_threads);
int mca_iof_base_select(bool* allow_multi_user_threads, bool* have_hidden_threads);
extern int mca_iof_base_output;
extern ompi_list_t mca_iof_base_components_opened;
extern mca_iof_base_t mca_iof_base;
#if defined(c_plusplus) || defined(__cplusplus)
}

Просмотреть файл

@ -22,15 +22,75 @@
#include "mca/base/base.h"
#include "mca/iof/iof.h"
#include "mca/iof/base/base.h"
#include "mca/iof/base/iof_base_endpoint.h"
static void mca_iof_base_timer_cb(int fd, short flags, void *cbdata)
{
int *flushed = (int*)cbdata;
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
*flushed = 1;
ompi_condition_signal(&mca_iof_base.iof_condition);
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
}
int mca_iof_base_close(void)
{
/* close any remaining opened components */
if (0 != ompi_list_get_size(&mca_iof_base_components_opened)) {
mca_base_components_close(mca_iof_base_output,
&mca_iof_base_components_opened, NULL);
ompi_list_item_t* item;
ompi_event_t ev;
struct timeval tv = { 0, 0 };
int flushed = 0;
int closed = 0;
/* flush any pending output */
fflush(NULL);
/* wait until event loop has been progressed at least once */
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
ompi_evtimer_set(&ev, mca_iof_base_timer_cb, &flushed);
ompi_event_add(&ev, &tv);
while(flushed == 0)
ompi_condition_wait(&mca_iof_base.iof_condition, &mca_iof_base.iof_lock);
/* attempt to close all of the endpoints */
item = ompi_list_get_first(&mca_iof_base.iof_endpoints);
while(item != ompi_list_get_end(&mca_iof_base.iof_endpoints)) {
ompi_list_item_t* next = ompi_list_get_next(item);
mca_iof_base_endpoint_t* endpoint = (mca_iof_base_endpoint_t*)item;
mca_iof_base_endpoint_close(endpoint);
item = next;
}
/* wait for all to flush output and change to closed state */
while(closed != ompi_list_get_size(&mca_iof_base.iof_endpoints)) {
closed = 0;
for(item = ompi_list_get_first(&mca_iof_base.iof_endpoints);
item != ompi_list_get_end(&mca_iof_base.iof_endpoints);
item = ompi_list_get_next(item)) {
mca_iof_base_endpoint_t* endpoint = (mca_iof_base_endpoint_t*)item;
if(endpoint->ep_state == MCA_IOF_EP_CLOSED) {
closed++;
}
}
if(closed != ompi_list_get_size(&mca_iof_base.iof_endpoints)) {
ompi_condition_wait(&mca_iof_base.iof_condition, &mca_iof_base.iof_lock);
}
}
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
/* shutdown any remaining opened components */
if (0 != ompi_list_get_size(&mca_iof_base.iof_components_opened)) {
mca_base_components_close(mca_iof_base.iof_output,
&mca_iof_base.iof_components_opened, NULL);
}
/* final cleanup of resources */
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
while((item = ompi_list_remove_first(&mca_iof_base.iof_endpoints)) != NULL) {
OBJ_RELEASE(item);
}
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
return OMPI_SUCCESS;
}

389
src/mca/iof/base/iof_base_endpoint.c Обычный файл
Просмотреть файл

@ -0,0 +1,389 @@
#include "ompi_config.h"
#include <stdlib.h>
#include <string.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <errno.h>
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_FCNTL_H
#include <sys/fcntl.h>
#endif
#include <netinet/in.h>
#include "util/output.h"
#include "mca/oob/base/base.h"
#include "mca/iof/base/base.h"
#include "iof_base_endpoint.h"
#include "iof_base_fragment.h"
/**
*
*/
static void mca_iof_base_endpoint_construct(mca_iof_base_endpoint_t* endpoint)
{
endpoint->ep_mode = 0;
endpoint->ep_state = MCA_IOF_EP_CLOSED;
endpoint->ep_seq = 0;
endpoint->ep_ack = 0;
memset(&endpoint->ep_event,0,sizeof(endpoint->ep_event));
OBJ_CONSTRUCT(&endpoint->ep_frags, ompi_list_t);
}
static void mca_iof_base_endpoint_destruct(mca_iof_base_endpoint_t* endpoint)
{
OBJ_DESTRUCT(&endpoint->ep_frags);
}
OBJ_CLASS_INSTANCE(
mca_iof_base_endpoint_t,
ompi_list_item_t,
mca_iof_base_endpoint_construct,
mca_iof_base_endpoint_destruct);
/*
*
*/
static void mca_iof_base_endpoint_send_cb(
int status,
ompi_process_name_t* peer,
struct iovec* msg,
int count,
int tag,
void* cbdata)
{
mca_iof_base_frag_t* frag = (mca_iof_base_frag_t*)cbdata;
mca_iof_base_endpoint_t* endpoint = frag->frag_owner;
ompi_list_remove_item(&endpoint->ep_frags, &frag->super);
MCA_IOF_BASE_FRAG_RETURN(frag);
}
/*
* Receive from pipe/pty/etc. and forward to the
* service.
*/
static void mca_iof_base_endpoint_recv_handler(int fd, short flags, void *cbdata)
{
mca_iof_base_endpoint_t* endpoint = (mca_iof_base_endpoint_t*)cbdata;
mca_iof_base_frag_t* frag;
mca_iof_base_header_t* hdr;
int rc;
/* allocate a fragment */
MCA_IOF_BASE_FRAG_ALLOC(frag,rc);
if(NULL == frag) {
return;
}
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
frag->frag_owner = endpoint;
ompi_list_append(&endpoint->ep_frags, &frag->super);
/* read up to the fragment size */
rc = read(fd, frag->frag_data, sizeof(frag->frag_data));
if(rc <= 0) {
/* peer has closed the connection */
mca_iof_base_endpoint_closed(endpoint);
rc = 0;
}
frag->frag_iov[1].iov_len = frag->frag_len = rc;
/* fill in the header */
hdr = &frag->frag_hdr;
hdr->hdr_common.hdr_type = MCA_IOF_BASE_HDR_MSG;
hdr->hdr_msg.msg_src = endpoint->ep_name;
hdr->hdr_msg.msg_tag = endpoint->ep_tag;
hdr->hdr_msg.msg_seq = endpoint->ep_seq;
hdr->hdr_msg.msg_len = frag->frag_len;
MCA_IOF_BASE_HDR_MSG_HTON(hdr->hdr_msg);
/* if window size has been exceeded - disable forwarding */
endpoint->ep_seq += frag->frag_len;
if(MCA_IOF_BASE_SEQDIFF(endpoint->ep_seq,endpoint->ep_ack) > mca_iof_base.iof_window_size) {
ompi_event_del(&endpoint->ep_event);
}
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
/* start non-blocking OOB call to forward received data */
rc = mca_oob_send_nb(
mca_iof_base.iof_service,
frag->frag_iov,
2,
MCA_OOB_TAG_IOF_SVC,
0,
mca_iof_base_endpoint_send_cb,
frag);
}
static void mca_iof_base_endpoint_send_handler(int sd, short flags, void *user)
{
mca_iof_base_endpoint_t* endpoint = (mca_iof_base_endpoint_t*)user;
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
while(ompi_list_get_size(&endpoint->ep_frags)) {
mca_iof_base_frag_t* frag = (mca_iof_base_frag_t*)ompi_list_get_first(&endpoint->ep_frags);
int rc = write(endpoint->ep_fd, frag->frag_ptr, frag->frag_len);
if(rc < 0) {
mca_iof_base_endpoint_closed(endpoint);
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
return;
}
frag->frag_len -= rc;
frag->frag_ptr += rc;
if(frag->frag_len > 0) {
break;
}
ompi_list_remove_item(&endpoint->ep_frags, &frag->super);
mca_iof_base_frag_ack(frag);
}
/* is there anything left to write? */
if(ompi_list_get_size(&endpoint->ep_frags) == 0) {
ompi_event_del(&endpoint->ep_event);
}
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
}
/*
*
*/
mca_iof_base_endpoint_t* mca_iof_base_endpoint_lookup(
const ompi_process_name_t* proc,
mca_iof_base_mode_t mode,
int tag)
{
return NULL;
}
/*
*
*/
int mca_iof_base_endpoint_create(
const ompi_process_name_t* proc,
mca_iof_base_mode_t mode,
int tag,
int fd)
{
mca_iof_base_endpoint_t* endpoint;
int flags;
/* create local endpoint */
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
if((endpoint = mca_iof_base_endpoint_lookup(proc,mode,tag)) != NULL) {
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
return OMPI_EXISTS;
}
endpoint = OBJ_NEW(mca_iof_base_endpoint_t);
if(NULL == endpoint) {
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
endpoint->ep_name = *proc;
endpoint->ep_mode = mode;
endpoint->ep_tag = tag;
endpoint->ep_fd = fd;
/* set file descriptor to be non-blocking */
if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
ompi_output(0, "mca_iof_base_endpoint_create: fcntl(F_GETFL) failed with errno=%d\n", errno);
} else {
flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags);
}
/* setup event handler */
switch(mode) {
case MCA_IOF_SOURCE:
ompi_event_set(
&endpoint->ep_event,
endpoint->ep_fd,
OMPI_EV_READ|OMPI_EV_PERSIST,
mca_iof_base_endpoint_recv_handler,
endpoint);
ompi_event_add(&endpoint->ep_event, 0);
break;
case MCA_IOF_SINK:
ompi_event_set(
&endpoint->ep_event,
endpoint->ep_fd,
OMPI_EV_WRITE,
mca_iof_base_endpoint_send_handler,
endpoint);
break;
default:
ompi_output(0, "mca_iof_base_endpoint_create: invalid mode %d\n", mode);
return OMPI_ERR_BAD_PARAM;
}
ompi_list_append(&mca_iof_base.iof_endpoints, &endpoint->super);
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
return OMPI_SUCCESS;
}
/*
*
*/
int mca_iof_base_endpoint_delete(
const ompi_process_name_t* proc,
ompi_ns_cmp_bitmask_t mask,
int tag)
{
return OMPI_ERROR;
}
/*
*
*/
int mca_iof_base_endpoint_close(mca_iof_base_endpoint_t* endpoint)
{
bool closed = false;
endpoint->ep_state = MCA_IOF_EP_CLOSING;
switch(endpoint->ep_mode) {
case MCA_IOF_SOURCE:
ompi_event_del(&endpoint->ep_event);
if(endpoint->ep_seq == endpoint->ep_ack) {
endpoint->ep_state = MCA_IOF_EP_CLOSED;
}
break;
case MCA_IOF_SINK:
if(ompi_list_get_size(&endpoint->ep_frags) == 0) {
endpoint->ep_state = MCA_IOF_EP_CLOSED;
}
break;
}
return OMPI_SUCCESS;
}
/*
* Peer has gone away - cleanup and signal SOH monitor.
*/
void mca_iof_base_endpoint_closed(mca_iof_base_endpoint_t* endpoint)
{
}
/*
*
*/
mca_iof_base_endpoint_t* mca_iof_base_endpoint_match(
const ompi_process_name_t* dst_name,
ompi_ns_cmp_bitmask_t dst_mask,
int dst_tag)
{
ompi_list_item_t* item;
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
for(item = ompi_list_get_first(&mca_iof_base.iof_endpoints);
item != ompi_list_get_end(&mca_iof_base.iof_endpoints);
item = ompi_list_get_next(item)) {
mca_iof_base_endpoint_t* endpoint = (mca_iof_base_endpoint_t*)item;
if(ompi_name_server.compare(dst_mask,dst_name,&endpoint->ep_name) == 0) {
if(endpoint->ep_tag == dst_tag || endpoint->ep_tag == MCA_IOF_ANY || dst_tag == MCA_IOF_ANY) {
OBJ_RETAIN(endpoint);
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
return endpoint;
}
}
}
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
return NULL;
}
/*
*
*/
int mca_iof_base_endpoint_forward(
mca_iof_base_endpoint_t* endpoint,
const ompi_process_name_t* src,
mca_iof_base_msg_header_t* hdr,
const unsigned char* data)
{
mca_iof_base_frag_t* frag;
size_t len = hdr->msg_len;
int rc = 0;
/* allocate and initialize a fragment */
MCA_IOF_BASE_FRAG_ALLOC(frag, rc);
if(NULL == frag) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
frag->frag_owner = endpoint;
frag->frag_src = *src;
frag->frag_hdr.hdr_msg = *hdr;
/* try to write w/out copying data */
if(ompi_list_get_size(&endpoint->ep_frags) == 0) {
rc = write(endpoint->ep_fd,data,len);
if(rc < 0) {
mca_iof_base_endpoint_closed(endpoint);
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
return OMPI_SUCCESS;
}
}
frag->frag_len = len - rc;
if(frag->frag_len > 0) {
/* handle incomplete write */
frag->frag_ptr = frag->frag_data;
memcpy(frag->frag_ptr, data+rc, frag->frag_len);
ompi_list_append(&endpoint->ep_frags, &frag->super);
if(ompi_list_get_size(&endpoint->ep_frags) == 1) {
ompi_event_add(&endpoint->ep_event,0);
}
} else {
/* acknowledge fragment */
mca_iof_base_frag_ack(frag);
}
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
return OMPI_SUCCESS;
}
/**
*
*/
int mca_iof_base_endpoint_ack(
mca_iof_base_endpoint_t* endpoint,
uint32_t seq)
{
bool window_closed, window_open;
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
window_closed =
MCA_IOF_BASE_SEQDIFF(endpoint->ep_seq,endpoint->ep_ack) >= mca_iof_base.iof_window_size;
endpoint->ep_ack = seq;
window_open =
MCA_IOF_BASE_SEQDIFF(endpoint->ep_seq,endpoint->ep_ack) < mca_iof_base.iof_window_size;
/* if we are shutting down - cleanup endpoint */
if(endpoint->ep_state == MCA_IOF_EP_CLOSING) {
if(endpoint->ep_seq == endpoint->ep_ack) {
endpoint->ep_state = MCA_IOF_EP_CLOSED;
ompi_condition_signal(&mca_iof_base.iof_condition);
}
/* otherwise check to see if we can reenable forwarding */
} else if(window_closed && window_open) {
ompi_event_add(&endpoint->ep_event, 0);
}
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
return OMPI_SUCCESS;
}

110
src/mca/iof/base/iof_base_endpoint.h Обычный файл
Просмотреть файл

@ -0,0 +1,110 @@
#ifndef _IOF_BASE_ENDPOINT_
#define _IOF_BASE_ENDPOINT_
#include "ompi_config.h"
#include "class/ompi_list.h"
#include "event/event.h"
#include "mca/iof/iof.h"
#include "mca/iof/base/iof_base_header.h"
enum {
MCA_IOF_EP_OPEN,
MCA_IOF_EP_CLOSING,
MCA_IOF_EP_CLOSED
};
struct mca_iof_base_endpoint_t {
ompi_list_item_t super;
mca_iof_base_mode_t ep_mode;
ompi_process_name_t ep_name;
int ep_tag;
int ep_fd;
int ep_state;
uint32_t ep_seq;
uint32_t ep_ack;
ompi_event_t ep_event;
ompi_list_t ep_frags;
};
typedef struct mca_iof_base_endpoint_t mca_iof_base_endpoint_t;
OBJ_CLASS_DECLARATION(mca_iof_base_endpoint_t);
/*
* Diff between two sequence numbers allowing for rollover
*/
#define MCA_IOF_BASE_SEQDIFF(s1,s2) \
((s1 > s2) ? (s1 - s2) : (s1 + (ULONG_MAX - s2)))
/**
*
*/
mca_iof_base_endpoint_t* mca_iof_base_endpoint_lookup(
const ompi_process_name_t* proc,
mca_iof_base_mode_t mode,
int tag);
/**
*
*/
int mca_iof_base_endpoint_create(
const ompi_process_name_t* name,
mca_iof_base_mode_t mode,
int tag,
int fd);
/**
*
*/
int mca_iof_base_endpoint_delete(
const ompi_process_name_t* name,
ompi_ns_cmp_bitmask_t mask,
int tag);
/*
*
*/
int mca_iof_base_endpoint_close(
mca_iof_base_endpoint_t* endpoint);
/**
*
*/
mca_iof_base_endpoint_t* mca_iof_base_endpoint_match(
const ompi_process_name_t* dst_name,
ompi_ns_cmp_bitmask_t dst_mask,
int dst_tag);
/**
*
*/
int mca_iof_base_endpoint_forward(
mca_iof_base_endpoint_t* endpoint,
const ompi_process_name_t* src,
mca_iof_base_msg_header_t* hdr,
const unsigned char* data);
/*
*
*/
void mca_iof_base_endpoint_closed(
mca_iof_base_endpoint_t* endpoint);
/**
*
*/
int mca_iof_base_endpoint_ack(
mca_iof_base_endpoint_t* endpoint,
uint32_t seq);
#endif

91
src/mca/iof/base/iof_base_fragment.c Обычный файл
Просмотреть файл

@ -0,0 +1,91 @@
#include "ompi_config.h"
#include <stdlib.h>
#include <string.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <errno.h>
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_FCNTL_H
#include <sys/fcntl.h>
#endif
#include <netinet/in.h>
#include "util/output.h"
#include "mca/oob/base/base.h"
#include "mca/iof/base/base.h"
#include "iof_base_endpoint.h"
#include "iof_base_fragment.h"
/**
*
*/
static void mca_iof_base_frag_construct(mca_iof_base_frag_t* frag)
{
frag->frag_owner = NULL;
frag->frag_len = 0;
frag->frag_iov[0].iov_base = &frag->frag_hdr;
frag->frag_iov[0].iov_len = sizeof(frag->frag_hdr);
frag->frag_iov[1].iov_base = frag->frag_data;
frag->frag_iov[1].iov_len = sizeof(frag->frag_data);
}
static void mca_iof_base_frag_destruct(mca_iof_base_frag_t* frag)
{
frag->frag_iov[0].iov_base = &frag->frag_hdr;
frag->frag_iov[0].iov_len = sizeof(frag->frag_hdr);
frag->frag_iov[1].iov_base = frag->frag_data;
frag->frag_iov[1].iov_len = sizeof(frag->frag_data);
}
OBJ_CLASS_INSTANCE(
mca_iof_base_frag_t,
ompi_list_item_t,
mca_iof_base_frag_construct,
mca_iof_base_frag_destruct);
/*
*
*/
static void mca_iof_base_frag_send_cb(
int status,
ompi_process_name_t* peer,
struct iovec* msg,
int count,
int tag,
void* cbdata)
{
mca_iof_base_frag_t* frag = (mca_iof_base_frag_t*)cbdata;
MCA_IOF_BASE_FRAG_RETURN(frag);
}
/*
*
*/
int mca_iof_base_frag_ack(mca_iof_base_frag_t* frag)
{
int rc;
frag->frag_hdr.hdr_common.hdr_type = MCA_IOF_BASE_HDR_ACK;
MCA_IOF_BASE_HDR_MSG_HTON(frag->frag_hdr.hdr_msg);
/* start non-blocking OOB call to forward header */
rc = mca_oob_send_nb(
&frag->frag_src,
frag->frag_iov,
1,
MCA_OOB_TAG_IOF_SVC,
0,
mca_iof_base_frag_send_cb,
frag);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "mca_iof_base_frag_ack: mca_oob_send failed with status=%d\n", rc);
}
return rc;
}

56
src/mca/iof/base/iof_base_fragment.h Обычный файл
Просмотреть файл

@ -0,0 +1,56 @@
#ifndef _IOF_BASE_FRAGMENT_
#define _IOF_BASE_FRAGMENT_
#include "ompi_config.h"
#include "class/ompi_list.h"
#include "class/ompi_free_list.h"
#include "event/event.h"
#include "mca/iof/iof.h"
#include "mca/iof/base/base.h"
#include "mca/iof/base/iof_base_header.h"
/**
*
*/
struct mca_iof_base_frag_t {
ompi_list_item_t super;
mca_iof_base_header_t frag_hdr;
ompi_process_name_t frag_src;
unsigned char frag_data[MCA_IOF_BASE_MSG_MAX];
unsigned char* frag_ptr;
size_t frag_len;
struct iovec frag_iov[2];
struct mca_iof_base_endpoint_t* frag_owner;
};
typedef struct mca_iof_base_frag_t mca_iof_base_frag_t;
OBJ_CLASS_DECLARATION(mca_iof_base_frag_t);
/**
*
*/
#define MCA_IOF_BASE_FRAG_ALLOC(frag,rc) { \
ompi_list_item_t* item; \
OMPI_FREE_LIST_GET(&mca_iof_base.iof_fragments, item,rc); \
if((frag = (mca_iof_base_frag_t*)item) == NULL) { \
ompi_output(0, "MCA_IOF_BASE_FRAG_ALLOC failed with status=%d\n", rc); \
} \
}
#define MCA_IOF_BASE_FRAG_RETURN(frag) \
OMPI_FREE_LIST_RETURN(&mca_iof_base.iof_fragments, (ompi_list_item_t*)frag)
/**
* Send an acknowledgment to the peer that this fragment has been received.
*/
int mca_iof_base_frag_ack(mca_iof_base_frag_t*);
#endif

160
src/mca/iof/base/iof_base_header.h Обычный файл
Просмотреть файл

@ -0,0 +1,160 @@
#ifndef _IOF_BASE_HEADER_
#define _IOF_BASE_HEADER_
#include "ompi_config.h"
#include "mca/iof/iof.h"
#define MCA_IOF_BASE_HDR_MSG 0
#define MCA_IOF_BASE_HDR_ACK 1
#define MCA_IOF_BASE_HDR_PUB 2
#define MCA_IOF_BASE_HDR_UNPUB 3
#define MCA_IOF_BASE_HDR_SUB 4
#define MCA_IOF_BASE_HDR_UNSUB 5
/*
* Maximum size of msg
*/
#define MCA_IOF_BASE_MSG_MAX 2048
/**
* Convert process name from network to host byte order.
*
* @param name
*/
#define OMPI_PROCESS_NAME_NTOH(n) \
n.cellid = ntohl((n).cellid); \
n.jobid = ntohl((n).jobid); \
n.vpid = ntohl((n).vpid);
/**
* Convert process name from host to network byte order.
*
* @param name
*/
#define OMPI_PROCESS_NAME_HTON(n) \
n.cellid = htonl((n).cellid); \
n.jobid = htonl((n).jobid); \
n.vpid = htonl((n).vpid);
#define OMPI_PROCESS_NAME_ARGS(n) \
n.cellid,n.jobid,n.vpid
/**
*
*/
struct mca_iof_base_common_header_t {
uint8_t hdr_type;
uint8_t hdr_reserve;
int16_t hdr_status;
};
typedef struct mca_iof_base_common_header_t mca_iof_base_common_header_t;
#define MCA_IOF_BASE_HDR_CMN_NTOH(h) \
(h).hdr_status = ntohs((h).hdr_status)
#define MCA_IOF_BASE_HDR_CMN_HTON(h) \
(h).hdr_status = htons((h).hdr_status)
/**
*
*/
struct mca_iof_base_msg_header_t {
mca_iof_base_common_header_t hdr_common;
ompi_process_name_t msg_src;
int32_t msg_tag;
uint32_t msg_seq;
uint32_t msg_len;
};
typedef struct mca_iof_base_msg_header_t mca_iof_base_msg_header_t;
#define MCA_IOF_BASE_HDR_MSG_NTOH(h) \
MCA_IOF_BASE_HDR_CMN_NTOH((h).hdr_common); \
OMPI_PROCESS_NAME_NTOH((h).msg_src); \
(h).msg_tag = ntohl((h).msg_tag); \
(h).msg_seq = ntohl((h).msg_seq); \
(h).msg_len = ntohl((h).msg_len);
#define MCA_IOF_BASE_HDR_MSG_HTON(h) \
MCA_IOF_BASE_HDR_CMN_HTON((h).hdr_common); \
OMPI_PROCESS_NAME_HTON((h).msg_src); \
(h).msg_tag = htonl((h).msg_tag); \
(h).msg_seq = htonl((h).msg_seq); \
(h).msg_len = htonl((h).msg_len);
/**
* Publish/Unpublish
*/
struct mca_iof_base_pub_header_t {
mca_iof_base_common_header_t hdr_common;
ompi_process_name_t pub_name;
ompi_process_name_t pub_proxy;
int32_t pub_mask;
int32_t pub_tag;
};
typedef struct mca_iof_base_pub_header_t mca_iof_base_pub_header_t;
#define MCA_IOF_BASE_HDR_PUB_NTOH(h) \
MCA_IOF_BASE_HDR_CMN_NTOH((h).hdr_common); \
OMPI_PROCESS_NAME_NTOH((h).pub_proxy); \
OMPI_PROCESS_NAME_NTOH((h).pub_name); \
(h).pub_mask = ntohl((h).pub_mask); \
(h).pub_tag = ntohl((h).pub_tag);
#define MCA_IOF_BASE_HDR_PUB_HTON(h) \
MCA_IOF_BASE_HDR_CMN_HTON((h).hdr_common); \
OMPI_PROCESS_NAME_HTON((h).pub_name); \
OMPI_PROCESS_NAME_HTON((h).pub_proxy); \
(h).pub_mask = htonl((h).pub_mask); \
(h).pub_tag = htonl((h).pub_tag);
/**
* Subscription message.
*/
struct mca_iof_base_sub_header_t {
mca_iof_base_common_header_t hdr_common;
ompi_process_name_t src_name;
ompi_ns_cmp_bitmask_t src_mask;
int32_t src_tag;
ompi_process_name_t dst_name;
ompi_ns_cmp_bitmask_t dst_mask;
int32_t dst_tag;
};
typedef struct mca_iof_base_sub_header_t mca_iof_base_sub_header_t;
#define MCA_IOF_BASE_HDR_SUB_NTOH(h) \
MCA_IOF_BASE_HDR_CMN_NTOH((h).hdr_common); \
OMPI_PROCESS_NAME_NTOH((h).src_name); \
(h).src_tag = ntohl((h).src_tag); \
OMPI_PROCESS_NAME_NTOH((h).dst_name); \
(h).dst_tag = ntohl((h).dst_tag);
#define MCA_IOF_BASE_HDR_SUB_HTON(h) \
MCA_IOF_BASE_HDR_CMN_HTON((h).hdr_common); \
OMPI_PROCESS_NAME_HTON((h).src_name); \
(h).src_tag = htonl((h).src_tag); \
OMPI_PROCESS_NAME_HTON((h).dst_name); \
(h).dst_tag = htonl((h).dst_tag);
/**
*
*/
union mca_iof_base_header_t {
mca_iof_base_common_header_t hdr_common;
mca_iof_base_msg_header_t hdr_msg;
mca_iof_base_sub_header_t hdr_sub;
mca_iof_base_pub_header_t hdr_pub;
};
typedef union mca_iof_base_header_t mca_iof_base_header_t;
#endif

Просмотреть файл

@ -21,7 +21,8 @@
#include "mca/base/mca_base_param.h"
#include "mca/iof/iof.h"
#include "mca/iof/base/base.h"
#include "mca/iof/base/iof_base_header.h"
#include "mca/iof/base/iof_base_fragment.h"
/*
* The following file was created by configure. It contains extern
@ -35,8 +36,8 @@
/*
* Global variables
*/
int mca_iof_base_output = -1;
ompi_list_t mca_iof_base_components_opened;
mca_iof_base_t mca_iof_base;
/**
@ -45,15 +46,45 @@ ompi_list_t mca_iof_base_components_opened;
*/
int mca_iof_base_open(void)
{
/* Open up all available components */
int id;
int int_value;
char* str_value;
if (OMPI_SUCCESS !=
mca_base_components_open("iof", 0, mca_iof_base_static_components,
&mca_iof_base_components_opened)) {
return OMPI_ERROR;
}
/* Initialize globals */
OBJ_CONSTRUCT(&mca_iof_base.iof_components_opened, ompi_list_t);
OBJ_CONSTRUCT(&mca_iof_base.iof_endpoints, ompi_list_t);
OBJ_CONSTRUCT(&mca_iof_base.iof_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_iof_base.iof_condition, ompi_condition_t);
OBJ_CONSTRUCT(&mca_iof_base.iof_fragments, ompi_free_list_t);
/* All done */
return OMPI_SUCCESS;
/* lookup common parameters */
id = mca_base_param_register_int("iof","base","window_size",NULL,MCA_IOF_BASE_MSG_MAX << 1);
mca_base_param_lookup_int(id,&int_value);
mca_iof_base.iof_window_size = int_value;
id = mca_base_param_register_string("iof","base","service",NULL,"0.0.0");
mca_base_param_lookup_int(id,&str_value);
mca_iof_base.iof_service = ompi_name_server.convert_string_to_process_name(str_value);
/* initialize free list */
ompi_free_list_init(
&mca_iof_base.iof_fragments,
sizeof(mca_iof_base_frag_t),
OBJ_CLASS(mca_iof_base_frag_t),
0, /* number to initially allocate */
-1, /* maximum elements to allocate */
32, /* number per allocation */
NULL); /* optional memory pool */
/* Open up all available components */
if (OMPI_SUCCESS !=
mca_base_components_open("iof", 0, mca_iof_base_static_components,
&mca_iof_base.iof_components_opened)) {
return OMPI_ERROR;
}
/* All done */
return OMPI_SUCCESS;
}

Просмотреть файл

@ -20,46 +20,51 @@
#include "mca/iof/iof.h"
#include "mca/iof/base/base.h"
mca_iof_base_module_t mca_iof;
/**
* Call the init function on all available components to find out if
* they want to run. Select the single component with the highest
* priority.
*/
int mca_iof_base_select(bool *allow_multi_user_threads)
int mca_iof_base_select(bool *allow_multi_user_threads, bool* have_hidden_threads)
{
ompi_list_item_t *item;
mca_base_component_list_item_t *cli;
int selected_priority = -1;
mca_iof_base_component_t *selected_component = NULL;
mca_iof_base_module_t *selected_module = NULL;
bool selected_threading;
bool selected_allow_user;
bool selected_have_hidden;
/* Traverse the list of opened modules; call their init functions. */
for(item = ompi_list_get_first(&mca_iof_base_components_opened);
item != ompi_list_get_end(&mca_iof_base_components_opened);
for(item = ompi_list_get_first(&mca_iof_base.iof_components_opened);
item != ompi_list_get_end(&mca_iof_base.iof_components_opened);
item = ompi_list_get_next(item)) {
mca_iof_base_component_t* component;
cli = (mca_base_component_list_item_t *) item;
component = (mca_iof_base_component_t *) cli->cli_component;
ompi_output_verbose(10, mca_iof_base_output,
ompi_output_verbose(10, mca_iof_base.iof_output,
"mca_iof_base_select: initializing %s component %s",
component->iof_version.mca_type_name,
component->iof_version.mca_component_name);
if (NULL == component->iof_init) {
ompi_output_verbose(10, mca_iof_base_output,
ompi_output_verbose(10, mca_iof_base.iof_output,
"mca_iof_base_select: no init function; ignoring component");
} else {
bool threading;
bool allow_user;
bool have_hidden;
int priority;
mca_iof_base_module_t* module = component->iof_init(&priority, &threading);
mca_iof_base_module_t* module = component->iof_init(&priority, &allow_user, &have_hidden);
/* If the component didn't initialize, remove it from the opened
list and remove it from the component repository */
if (NULL == module) {
ompi_output_verbose(10, mca_iof_base_output,
ompi_output_verbose(10, mca_iof_base.iof_output,
"mca_iof_base_select: init returned failure");
continue;
}
@ -68,23 +73,25 @@ int mca_iof_base_select(bool *allow_multi_user_threads)
selected_priority = priority;
selected_component = component;
selected_module = module;
selected_threading = threading;
selected_allow_user = allow_user;
selected_have_hidden = have_hidden;
}
}
}
/* unload all components that were not selected */
item = ompi_list_get_first(&mca_iof_base_components_opened);
while(item != ompi_list_get_end(&mca_iof_base_components_opened)) {
item = ompi_list_get_first(&mca_iof_base.iof_components_opened);
while(item != ompi_list_get_end(&mca_iof_base.iof_components_opened)) {
ompi_list_item_t* next = ompi_list_get_next(item);
mca_iof_base_component_t* component = (mca_iof_base_component_t *) cli->cli_component;
mca_iof_base_component_t* component;
cli = (mca_base_component_list_item_t *) item;
component = (mca_iof_base_component_t *) cli->cli_component;
if(component != selected_component) {
ompi_output_verbose(10, mca_iof_base_output,
ompi_output_verbose(10, mca_iof_base.iof_output,
"mca_iof_base_select: module %s unloaded",
component->iof_version.mca_component_name);
mca_base_component_repository_release((mca_base_component_t *) component);
ompi_list_remove_item(&mca_iof_base_components_opened, item);
ompi_list_remove_item(&mca_iof_base.iof_components_opened, item);
OBJ_RELEASE(item);
}
item = next;
@ -92,7 +99,8 @@ int mca_iof_base_select(bool *allow_multi_user_threads)
/* setup reference to selected module */
if(NULL != selected_module) {
*allow_multi_user_threads = selected_threading;
*allow_multi_user_threads = selected_allow_user;
*have_hidden_threads = selected_have_hidden;
mca_iof = *selected_module;
}
return OMPI_SUCCESS;

Просмотреть файл

@ -178,7 +178,8 @@ OMPI_DECLSPEC extern mca_iof_base_module_t mca_iof;
typedef mca_iof_base_module_t* (*mca_iof_base_component_init_fn_t)(
int *priority,
bool *thread_support
bool *allow_user_threads,
bool *have_hidden_threads
);
struct mca_iof_base_component_1_0_0_t {

Просмотреть файл

@ -31,7 +31,9 @@ endif
proxy_SOURCES = \
iof_proxy.c \
iof_proxy.h \
iof_proxy_component.c
iof_proxy_component.c \
iof_proxy_svc.c \
iof_proxy_svc.h
mcacomponentdir = $(libdir)/openmpi
mcacomponent_LTLIBRARIES = $(component_install)

Просмотреть файл

@ -18,7 +18,12 @@
#include "include/constants.h"
#include "util/output.h"
#include "mca/iof/iof.h"
#include "mca/oob/oob.h"
#include "mca/iof/iof.h"
#include "mca/iof/base/iof_base_endpoint.h"
#include "iof_proxy.h"
#include "iof_proxy_svc.h"
mca_iof_base_module_t mca_iof_proxy_module = {
@ -49,7 +54,22 @@ int mca_iof_proxy_publish(
mca_iof_base_tag_t tag,
int fd)
{
return OMPI_ERROR;
int rc;
/* publish to server */
if(mode == MCA_IOF_SINK) {
rc = mca_iof_proxy_svc_publish(name,tag);
if(rc != OMPI_SUCCESS)
return rc;
}
/* setup a local endpoint to reflect registration */
rc = mca_iof_base_endpoint_create(
name,
mode,
tag,
fd);
return rc;
}
@ -68,7 +88,20 @@ int mca_iof_proxy_unpublish(
ompi_ns_cmp_bitmask_t mask,
mca_iof_base_tag_t tag)
{
return OMPI_ERROR;
int rc;
/* cleanup server */
mca_iof_proxy_svc_unpublish(
name,
mask,
tag);
/* setup a local endpoint to reflect registration */
rc = mca_iof_base_endpoint_delete(
name,
mask,
tag);
return rc;
}
@ -88,7 +121,28 @@ int mca_iof_proxy_push(
mca_iof_base_tag_t dst_tag,
int fd)
{
return OMPI_ERROR;
int rc;
/* send a subscription to server on behalf of the destination */
rc = mca_iof_proxy_svc_subscribe(
MCA_OOB_NAME_SELF,
OMPI_NS_CMP_ALL,
dst_tag,
dst_name,
dst_mask,
dst_tag
);
if(rc != OMPI_SUCCESS)
return rc;
/* setup a local endpoint to reflect registration */
rc = mca_iof_base_endpoint_create(
MCA_OOB_NAME_SELF,
MCA_IOF_SOURCE,
dst_tag,
fd);
return rc;
}
@ -108,9 +162,26 @@ int mca_iof_proxy_pull(
mca_iof_base_tag_t src_tag,
int fd)
{
return OMPI_ERROR;
}
/* setup a local endpoint */
int rc;
rc = mca_iof_base_endpoint_create(
MCA_OOB_NAME_SELF,
MCA_IOF_SOURCE,
src_tag,
fd);
if(rc != OMPI_SUCCESS)
return rc;
/* send a subscription message to the server */
rc = mca_iof_proxy_svc_subscribe(
src_name,
src_mask,
src_tag,
MCA_OOB_NAME_SELF,
OMPI_NS_CMP_ALL,
src_tag);
return rc;
}
/**
* Setup buffering for a specified set of endpoints.
@ -138,7 +209,19 @@ int mca_iof_proxy_subscribe(
mca_iof_base_callback_fn_t cb,
void* cbdata)
{
return OMPI_ERROR;
int rc;
/* create a local registration to reflect the callback */
/* send a subscription message to the service */
rc = mca_iof_proxy_svc_subscribe(
src_name,
src_mask,
src_tag,
MCA_OOB_NAME_SELF,
OMPI_NS_CMP_ALL,
src_tag);
return rc;
}
int mca_iof_proxy_unsubscribe(
@ -146,6 +229,18 @@ int mca_iof_proxy_unsubscribe(
ompi_ns_cmp_bitmask_t src_mask,
mca_iof_base_tag_t src_tag)
{
int rc;
/* send an unsubscribe message to the service */
rc = mca_iof_proxy_svc_unsubscribe(
src_name,
src_mask,
src_tag,
MCA_OOB_NAME_SELF,
OMPI_NS_CMP_ALL,
src_tag);
/* remove local callback */
return OMPI_ERROR;
}

Просмотреть файл

@ -127,6 +127,8 @@ int mca_iof_proxy_unsubscribe(
*/
struct mca_iof_proxy_component_t {
mca_iof_base_component_t super;
int proxy_debug;
struct iovec proxy_iov[1];
};
typedef struct mca_iof_proxy_component_t mca_iof_proxy_component_t;

Просмотреть файл

@ -13,16 +13,25 @@
*/
#include "ompi_config.h"
#include "util/proc_info.h"
#include "util/output.h"
#include "runtime/ompi_progress.h"
#include "mca/oob/base/base.h"
#include "mca/base/base.h"
#include "mca/base/mca_base_param.h"
#include "mca/iof/base/iof_base_endpoint.h"
#include "iof_proxy.h"
#include "iof_proxy_svc.h"
/*
* Local functions
*/
static int mca_iof_proxy_open(void);
static mca_iof_base_module_t* mca_iof_proxy_init(int* priority, bool *allow_multi_user_threads);
static int mca_iof_proxy_close(void);
static mca_iof_base_module_t* mca_iof_proxy_init(
int* priority,
bool *allow_multi_user_threads,
bool *have_hidden_threads);
mca_iof_proxy_component_t mca_iof_proxy_component = {
@ -41,7 +50,7 @@ mca_iof_proxy_component_t mca_iof_proxy_component = {
0, /* MCA component minor version */
0, /* MCA component release version */
mca_iof_proxy_open, /* component open */
NULL
mca_iof_proxy_close /* component close */
},
/* Next the MCA v1.0.0 component meta data */
@ -80,15 +89,49 @@ static int mca_iof_proxy_param_register_int(
*/
static int mca_iof_proxy_open(void)
{
mca_iof_proxy_component.proxy_debug = mca_iof_proxy_param_register_int("debug",1);
return OMPI_SUCCESS;
}
static mca_iof_base_module_t*
mca_iof_proxy_init(int* priority, bool *allow_multi_user_threads)
mca_iof_proxy_init(int* priority, bool *allow_multi_user_threads, bool *have_hidden_threads)
{
int rc;
if(ompi_process_info.seed == true)
return NULL;
*priority = 1;
*allow_multi_user_threads = true;
*have_hidden_threads = false;
/* post receive with oob */
mca_iof_proxy_component.proxy_iov[0].iov_base = NULL;
mca_iof_proxy_component.proxy_iov[0].iov_len = 0;
rc = mca_oob_recv_nb(
MCA_OOB_NAME_ANY,
mca_iof_proxy_component.proxy_iov,
1,
MCA_OOB_TAG_IOF_SVC,
MCA_OOB_ALLOC,
mca_iof_proxy_svc_recv,
NULL
);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "mca_iof_proxy_init: unable to post non-blocking recv");
return NULL;
}
return &mca_iof_proxy_module;
}
/**
*
*/
static int mca_iof_proxy_close(void)
{
return mca_oob_recv_cancel(MCA_OOB_NAME_ANY, MCA_OOB_TAG_IOF_SVC);
}

256
src/mca/iof/proxy/iof_proxy_svc.c Обычный файл
Просмотреть файл

@ -0,0 +1,256 @@
#include "ompi_config.h"
#include "util/output.h"
#include "mca/oob/oob.h"
#include "mca/iof/base/base.h"
#include "mca/iof/base/iof_base_header.h"
#include "mca/iof/base/iof_base_endpoint.h"
#include "iof_proxy.h"
#include "iof_proxy_svc.h"
/**
*
*/
static void mca_iof_proxy_svc_msg(
const ompi_process_name_t* src,
mca_iof_base_msg_header_t* msg,
unsigned char* data);
static void mca_iof_proxy_svc_ack(
const ompi_process_name_t* src,
mca_iof_base_msg_header_t* msg);
/**
*
*/
int mca_iof_proxy_svc_publish(
const ompi_process_name_t* name,
int tag)
{
mca_iof_base_header_t hdr;
struct iovec iov;
int rc;
hdr.hdr_common.hdr_type = MCA_IOF_BASE_HDR_PUB;
hdr.hdr_common.hdr_status = 0;
hdr.hdr_pub.pub_name = *name;
hdr.hdr_pub.pub_proxy = mca_oob_name_self;
hdr.hdr_pub.pub_mask = OMPI_NS_CMP_ALL;
hdr.hdr_pub.pub_tag = tag;
MCA_IOF_BASE_HDR_PUB_NTOH(hdr.hdr_pub);
iov.iov_base = &hdr;
iov.iov_len = sizeof(hdr);
rc = mca_oob_send(
mca_iof_base.iof_service,
&iov,
1,
MCA_OOB_TAG_IOF_SVC,
0);
if(rc < 0) {
ompi_output(0, "mca_iof_proxy_svc_publish: mca_oob_send failed with status=%d\n", rc);
return rc;
}
return OMPI_SUCCESS;
}
/**
*
*/
int mca_iof_proxy_svc_unpublish(
const ompi_process_name_t* name,
ompi_ns_cmp_bitmask_t mask,
int tag)
{
mca_iof_base_header_t hdr;
struct iovec iov;
int rc;
hdr.hdr_common.hdr_type = MCA_IOF_BASE_HDR_PUB;
hdr.hdr_common.hdr_status = 0;
hdr.hdr_pub.pub_name = *name;
hdr.hdr_pub.pub_proxy = mca_oob_name_self;
hdr.hdr_pub.pub_mask = mask;
hdr.hdr_pub.pub_tag = tag;
MCA_IOF_BASE_HDR_PUB_NTOH(hdr.hdr_pub);
iov.iov_base = &hdr;
iov.iov_len = sizeof(hdr);
rc = mca_oob_send(
mca_iof_base.iof_service,
&iov,
1,
MCA_OOB_TAG_IOF_SVC,
0);
if(rc < 0) {
ompi_output(0, "mca_iof_proxy_svc_unpublish: mca_oob_send failed with status=%d\n", rc);
return rc;
}
return OMPI_SUCCESS;
}
/**
*
*/
int mca_iof_proxy_svc_subscribe(
const ompi_process_name_t* src_name,
ompi_ns_cmp_bitmask_t src_mask,
int src_tag,
const ompi_process_name_t* dst_name,
ompi_ns_cmp_bitmask_t dst_mask,
int dst_tag
)
{
mca_iof_base_header_t hdr;
struct iovec iov;
int rc;
hdr.hdr_common.hdr_type = MCA_IOF_BASE_HDR_SUB;
hdr.hdr_common.hdr_status = 0;
hdr.hdr_sub.src_name = *src_name;
hdr.hdr_sub.src_mask = src_mask;
hdr.hdr_sub.src_tag = src_tag;
hdr.hdr_sub.dst_name = *dst_name;
hdr.hdr_sub.dst_mask = dst_mask;
hdr.hdr_sub.dst_tag = dst_tag;
MCA_IOF_BASE_HDR_SUB_NTOH(hdr.hdr_sub);
iov.iov_base = &hdr;
iov.iov_len = sizeof(hdr);
rc = mca_oob_send(
mca_iof_base.iof_service,
&iov,
1,
MCA_OOB_TAG_IOF_SVC,
0);
if(rc < 0) {
ompi_output(0, "mca_iof_proxy_svc_subscribe: mca_oob_send failed with status=%d\n", rc);
return rc;
}
return OMPI_SUCCESS;
}
int mca_iof_proxy_svc_unsubscribe(
const ompi_process_name_t* src_name,
ompi_ns_cmp_bitmask_t src_mask,
int src_tag,
const ompi_process_name_t* dst_name,
ompi_ns_cmp_bitmask_t dst_mask,
int dst_tag
)
{
mca_iof_base_header_t hdr;
struct iovec iov;
int rc;
hdr.hdr_common.hdr_type = MCA_IOF_BASE_HDR_UNSUB;
hdr.hdr_sub.src_name = *src_name;
hdr.hdr_sub.src_mask = src_mask;
hdr.hdr_sub.src_tag = src_tag;
hdr.hdr_sub.dst_name = *dst_name;
hdr.hdr_sub.dst_mask = dst_mask;
hdr.hdr_sub.dst_tag = dst_tag;
MCA_IOF_BASE_HDR_SUB_NTOH(hdr.hdr_sub);
iov.iov_base = &hdr;
iov.iov_len = sizeof(hdr);
rc = mca_oob_send(
mca_iof_base.iof_service,
&iov,
1,
MCA_OOB_TAG_IOF_SVC,
0);
if(rc < 0) {
ompi_output(0, "mca_iof_proxy_svc_unsubscribe: mca_oob_send failed with status=%d\n", rc);
return rc;
}
return OMPI_SUCCESS;
}
void mca_iof_proxy_svc_recv(
int status,
ompi_process_name_t* src,
struct iovec* msg,
int count,
int tag,
void* cbdata)
{
mca_iof_base_header_t* hdr = (mca_iof_base_header_t*)msg->iov_base;
int rc;
switch(hdr->hdr_common.hdr_type) {
case MCA_IOF_BASE_HDR_MSG:
MCA_IOF_BASE_HDR_MSG_NTOH(hdr->hdr_msg);
mca_iof_proxy_svc_msg(src,&hdr->hdr_msg,(unsigned char*)(hdr+1));
break;
case MCA_IOF_BASE_HDR_ACK:
MCA_IOF_BASE_HDR_MSG_NTOH(hdr->hdr_msg);
mca_iof_proxy_svc_ack(src,&hdr->hdr_msg);
break;
default:
break;
}
free(hdr);
/* repost receive */
mca_iof_proxy_component.proxy_iov[0].iov_base = NULL;
mca_iof_proxy_component.proxy_iov[0].iov_len = 0;
rc = mca_oob_recv_nb(
MCA_OOB_NAME_ANY,
mca_iof_proxy_component.proxy_iov,
1,
MCA_OOB_TAG_IOF_SVC,
MCA_OOB_ALLOC,
mca_iof_proxy_svc_recv,
NULL
);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "mca_iof_proxy_svc_recv: unable to post non-blocking recv");
return;
}
}
static void mca_iof_proxy_svc_msg(
const ompi_process_name_t* src,
mca_iof_base_msg_header_t* msg,
unsigned char* data)
{
mca_iof_base_endpoint_t* endpoint;
endpoint = mca_iof_base_endpoint_match(MCA_OOB_NAME_ANY, OMPI_NS_CMP_NONE, msg->msg_tag);
if(endpoint != NULL) {
mca_iof_base_endpoint_forward(endpoint,src,msg,data);
OBJ_RELEASE(endpoint);
}
}
/**
*
*/
static void mca_iof_proxy_svc_ack(
const ompi_process_name_t* src,
mca_iof_base_msg_header_t* msg)
{
mca_iof_base_endpoint_t* endpoint;
endpoint = mca_iof_base_endpoint_match(&msg->msg_src, OMPI_NS_CMP_ALL, msg->msg_tag);
if(endpoint != NULL) {
mca_iof_base_endpoint_ack(endpoint,msg->msg_seq + msg->msg_len);
OBJ_RELEASE(endpoint);
}
}

86
src/mca/iof/proxy/iof_proxy_svc.h Обычный файл
Просмотреть файл

@ -0,0 +1,86 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_IOF_PROXY_SVC_H
#define MCA_IOF_PROXY_SVC_H
#include "ompi_config.h"
#include "mca/iof/iof.h"
#include "mca/ns/ns.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/**
*
*/
int mca_iof_proxy_svc_publish(
const ompi_process_name_t* name,
int tag
);
int mca_iof_proxy_svc_unpublish(
const ompi_process_name_t* name,
ompi_ns_cmp_bitmask_t mask,
int tag
);
/**
*
*/
int mca_iof_proxy_svc_subscribe(
const ompi_process_name_t* src_name,
ompi_ns_cmp_bitmask_t src_mask,
int src_tag,
const ompi_process_name_t* dst_name,
ompi_ns_cmp_bitmask_t dst_mask,
int dst_tag
);
/**
*
*/
int mca_iof_proxy_svc_unsubscribe(
const ompi_process_name_t* src_name,
ompi_ns_cmp_bitmask_t src_mask,
int src_tag,
const ompi_process_name_t* dst_name,
ompi_ns_cmp_bitmask_t dst_mask,
int dst_tag
);
/**
*
*/
void mca_iof_proxy_svc_recv(
int status,
ompi_process_name_t* peer,
struct iovec* msg,
int count,
int tag,
void* cbdata);
#if defined(c_plusplus) || defined(__cplusplus)
};
#endif
#endif

51
src/mca/iof/svc/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,51 @@
#
# Copyright (c) 2004-2005 The Trustees of Indiana University.
# All rights reserved.
# Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
# All rights reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Use the top-level Makefile.options
include $(top_ompi_srcdir)/config/Makefile.options
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if OMPI_BUILD_iof_svc_DSO
component_noinst =
component_install = mca_iof_svc.la
else
component_noinst = libmca_iof_svc.la
component_install =
endif
svc_SOURCES = \
iof_svc.c \
iof_svc.h \
iof_svc_component.c \
iof_svc_proxy.h \
iof_svc_proxy.c \
iof_svc_publish.h \
iof_svc_publish.c \
iof_svc_subscript.h \
iof_svc_subscript.c
mcacomponentdir = $(libdir)/openmpi
mcacomponent_LTLIBRARIES = $(component_install)
mca_iof_svc_la_SOURCES = $(svc_SOURCES)
mca_iof_svc_la_LIBADD =
mca_iof_svc_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_iof_svc_la_SOURCES = $(svc_SOURCES)
libmca_iof_svc_la_LIBADD =
libmca_iof_svc_la_LDFLAGS = -module -avoid-version

20
src/mca/iof/svc/configure.params Обычный файл
Просмотреть файл

@ -0,0 +1,20 @@
# -*- shell-script -*-
#
# Copyright (c) 2004-2005 The Trustees of Indiana University.
# All rights reserved.
# Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
# All rights reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Specific to this module
PARAM_INIT_FILE=iof_proxy.c
PARAM_CONFIG_HEADER_FILE="iof_config.h"
PARAM_CONFIG_FILES="Makefile"

232
src/mca/iof/svc/iof_svc.c Обычный файл
Просмотреть файл

@ -0,0 +1,232 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include "include/constants.h"
#include "util/output.h"
#include "mca/oob/base/base.h"
#include "mca/iof/base/iof_base_endpoint.h"
#include "iof_svc.h"
#include "iof_svc_publish.h"
#include "iof_svc_subscript.h"
mca_iof_base_module_t mca_iof_svc_module = {
mca_iof_svc_publish,
mca_iof_svc_unpublish,
mca_iof_svc_push,
mca_iof_svc_pull,
mca_iof_svc_subscribe,
mca_iof_svc_unsubscribe
};
/**
* Publish a local file descriptor as an endpoint that is logically
* associated with the specified process name (e.g. master side of a
* pipe/pty connected to a child process)
*
* @param name
* @param mode
* @param tag
* @param fd
*
*/
int mca_iof_svc_publish(
const ompi_process_name_t* name,
mca_iof_base_mode_t mode,
mca_iof_base_tag_t tag,
int fd)
{
int rc;
/* setup a local endpoint to reflect registration */
rc = mca_iof_base_endpoint_create(
name,
mode,
tag,
fd);
if(rc != OMPI_SUCCESS) {
return rc;
}
/* publish endpoint */
if(mode == MCA_IOF_SINK) {
rc = mca_iof_svc_publish_create(
name,
&mca_oob_name_self,
OMPI_NS_CMP_ALL,
tag);
}
return rc;
}
/**
* Remove all registrations matching the specified process
* name, mask and tag values.
*
* @param name
* @param mask
* @param tag
*
*/
int mca_iof_svc_unpublish(
const ompi_process_name_t* name,
ompi_ns_cmp_bitmask_t mask,
mca_iof_base_tag_t tag)
{
int rc;
rc = mca_iof_svc_publish_delete(
name,
MCA_OOB_NAME_SELF,
mask,
tag);
if(rc != OMPI_SUCCESS)
return rc;
/* setup a local endpoint to reflect registration */
rc = mca_iof_base_endpoint_delete(
name,
mask,
tag);
return rc;
}
/**
* Explicitly push data from the specified file descriptor
* to the indicated set of peers.
*
* @param dst_name Name used to qualify set of peers.
* @param dst_mask Mask that specified how name is interpreted.
* @param dst_tag Match a specific peer endpoint.
* @param fd Local file descriptor.
*/
int mca_iof_svc_push(
const ompi_process_name_t* dst_name,
ompi_ns_cmp_bitmask_t dst_mask,
mca_iof_base_tag_t dst_tag,
int fd)
{
int rc;
/* setup a subscription */
rc = mca_iof_svc_subscript_create(
MCA_OOB_NAME_SELF,
OMPI_NS_CMP_ALL,
dst_tag,
dst_name,
dst_mask,
dst_tag);
if(rc != OMPI_SUCCESS)
return rc;
/* setup a local endpoint to reflect registration */
rc = mca_iof_base_endpoint_create(
MCA_OOB_NAME_SELF,
MCA_IOF_SOURCE,
dst_tag,
fd);
return rc;
}
/**
* Explicitly pull data from the specified set of peers
* and dump to the indicated file descriptor.
*
* @param dst_name Name used to qualify set of peers.
* @param dst_mask Mask that specified how name is interpreted.
* @param dst_tag Match a specific peer endpoint.
* @param fd Local file descriptor.
*/
int mca_iof_svc_pull(
const ompi_process_name_t* src_name,
ompi_ns_cmp_bitmask_t src_mask,
mca_iof_base_tag_t src_tag,
int fd)
{
int rc;
/* setup a local endpoint */
rc = mca_iof_base_endpoint_create(
MCA_OOB_NAME_SELF,
MCA_IOF_SINK,
src_tag,
fd);
if(rc != OMPI_SUCCESS)
return rc;
/* create a subscription */
rc = mca_iof_svc_subscript_create(
src_name,
src_mask,
src_tag,
MCA_OOB_NAME_SELF,
OMPI_NS_CMP_ALL,
src_tag);
return rc;
}
/**
* Setup buffering for a specified set of endpoints.
*/
int mca_iof_svc_buffer(
const ompi_process_name_t* src_name,
ompi_ns_cmp_bitmask_t src_mask,
mca_iof_base_tag_t src_tag,
size_t buffer_size)
{
/* send a message to the server indicating this set of connections should be buffered */
return OMPI_ERROR;
}
/*
* Subscribe to receive a callback on receipt of data
* from a specified set of peers.
*/
int mca_iof_svc_subscribe(
const ompi_process_name_t* src_name,
ompi_ns_cmp_bitmask_t src_mask,
mca_iof_base_tag_t src_tag,
mca_iof_base_callback_fn_t cb,
void* cbdata)
{
/* setup local callback on receipt of data */
/* setup local subscription */
return OMPI_ERROR;
}
int mca_iof_svc_unsubscribe(
const ompi_process_name_t* src_name,
ompi_ns_cmp_bitmask_t src_mask,
mca_iof_base_tag_t src_tag)
{
/* cleanup any local resouces associated with this subscription */
return OMPI_ERROR;
}

153
src/mca/iof/svc/iof_svc.h Обычный файл
Просмотреть файл

@ -0,0 +1,153 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_IOF_SVC_H
#define MCA_IOF_SVC_H
#include "mca/iof/iof.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifndef HAVE_UIO_H
#include <sys/uio.h>
#endif
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/**
* Publish a local file descriptor as an endpoint that is logically
* associated with the specified process name (e.g. master side of a
* pipe/pty connected to a child process)
*
* @param name
* @param mode
* @param tag
* @param fd
*
*/
int mca_iof_svc_publish(
const ompi_process_name_t* name,
mca_iof_base_mode_t mode,
mca_iof_base_tag_t tag,
int fd
);
/**
* Remove all registrations matching the specified process
* name, mask and tag values.
*
* @param name
* @param mask
* @param tag
*
*/
int mca_iof_svc_unpublish(
const ompi_process_name_t* name,
ompi_ns_cmp_bitmask_t mask,
mca_iof_base_tag_t tag
);
/**
* Explicitly push data from the specified file descriptor
* to the indicated set of peers.
*
* @param dst_name Name used to qualify set of peers.
* @param dst_mask Mask that specified how name is interpreted.
* @param dst_tag Match a specific peer endpoint.
* @param fd Local file descriptor.
*/
int mca_iof_svc_push(
const ompi_process_name_t* dst_name,
ompi_ns_cmp_bitmask_t dst_mask,
mca_iof_base_tag_t dst_tag,
int fd
);
/**
* Explicitly pull data from the specified set of peers
* and dump to the indicated file descriptor.
*
* @param dst_name Name used to qualify set of peers.
* @param dst_mask Mask that specified how name is interpreted.
* @param dst_tag Match a specific peer endpoint.
* @param fd Local file descriptor.
*/
int mca_iof_svc_pull(
const ompi_process_name_t* src_name,
ompi_ns_cmp_bitmask_t src_mask,
mca_iof_base_tag_t src_tag,
int fd
);
/**
* Setup buffering for a specified set of endpoints.
*/
int mca_iof_svc_buffer(
const ompi_process_name_t* src_name,
ompi_ns_cmp_bitmask_t src_mask,
mca_iof_base_tag_t src_tag,
size_t buffer_size
);
/*
* Subscribe to receive a callback on receipt of data
* from a specified set of peers.
*/
int mca_iof_svc_subscribe(
const ompi_process_name_t* src_name,
ompi_ns_cmp_bitmask_t src_mask,
mca_iof_base_tag_t src_tag,
mca_iof_base_callback_fn_t cb,
void* cbdata
);
int mca_iof_svc_unsubscribe(
const ompi_process_name_t* src_name,
ompi_ns_cmp_bitmask_t src_mask,
mca_iof_base_tag_t src_tag
);
/**
* IOF svc Component
*/
struct mca_iof_svc_component_t {
mca_iof_base_component_t super;
int svc_debug;
ompi_list_t svc_published;
ompi_list_t svc_subscribed;
ompi_mutex_t svc_lock;
struct iovec svc_iov[1];
};
typedef struct mca_iof_svc_component_t mca_iof_svc_component_t;
OMPI_COMP_EXPORT extern mca_iof_svc_component_t mca_iof_svc_component;
OMPI_COMP_EXPORT extern mca_iof_base_module_t mca_iof_svc_module;
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

146
src/mca/iof/svc/iof_svc_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,146 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "util/proc_info.h"
#include "util/output.h"
#include "mca/base/base.h"
#include "mca/base/mca_base_param.h"
#include "mca/oob/oob.h"
#include "iof_svc.h"
#include "iof_svc_proxy.h"
/*
* Local functions
*/
static int mca_iof_svc_open(void);
static int mca_iof_svc_close(void);
static mca_iof_base_module_t* mca_iof_svc_init(
int* priority,
bool *allow_multi_user_threads,
bool *have_hidden_threads);
mca_iof_svc_component_t mca_iof_svc_component = {
{
/* First, the mca_base_component_t struct containing meta
information about the component itself */
{
/* Indicate that we are a iof v1.0.0 component (which also
implies a specific MCA version) */
MCA_IOF_BASE_VERSION_1_0_0,
"svc", /* MCA component name */
1, /* MCA component major version */
0, /* MCA component minor version */
0, /* MCA component release version */
mca_iof_svc_open, /* component open */
mca_iof_svc_close /* component close */
},
/* Next the MCA v1.0.0 component meta data */
{
/* Whether the component is checkpointable or not */
false
},
mca_iof_svc_init
}
};
static char* mca_iof_svc_param_register_string(
const char* param_name,
const char* default_value)
{
char *param_value;
int id = mca_base_param_register_string("iof","svc",param_name,NULL,default_value);
mca_base_param_lookup_string(id, &param_value);
return param_value;
}
static int mca_iof_svc_param_register_int(
const char* param_name,
int default_value)
{
int id = mca_base_param_register_int("iof","svc",param_name,NULL,default_value);
int param_value = default_value;
mca_base_param_lookup_int(id,&param_value);
return param_value;
}
/**
* component open/close/init function
*/
static int mca_iof_svc_open(void)
{
mca_iof_svc_component.svc_debug = mca_iof_svc_param_register_int("debug", 1);
OBJ_CONSTRUCT(&mca_iof_svc_component.svc_subscribed, ompi_list_t);
OBJ_CONSTRUCT(&mca_iof_svc_component.svc_published, ompi_list_t);
OBJ_CONSTRUCT(&mca_iof_svc_component.svc_lock, ompi_mutex_t);
return OMPI_SUCCESS;
}
static int mca_iof_svc_close(void)
{
ompi_list_item_t* item;
OMPI_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
while((item = ompi_list_remove_first(&mca_iof_svc_component.svc_subscribed)) != NULL) {
OBJ_RELEASE(item);
}
while((item = ompi_list_remove_first(&mca_iof_svc_component.svc_published)) != NULL) {
OBJ_RELEASE(item);
}
OMPI_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
mca_oob_recv_cancel(MCA_OOB_NAME_ANY, MCA_OOB_TAG_IOF_SVC);
}
static mca_iof_base_module_t*
mca_iof_svc_init(int* priority, bool *allow_multi_user_threads, bool *have_hidden_threads)
{
int rc;
if(ompi_process_info.seed == false)
return NULL;
*priority = 1;
*allow_multi_user_threads = true;
*have_hidden_threads = false;
/* post non-blocking recv */
mca_iof_svc_component.svc_iov[0].iov_base = NULL;
mca_iof_svc_component.svc_iov[0].iov_len = 0;
rc = mca_oob_recv_nb(
MCA_OOB_NAME_ANY,
mca_iof_svc_component.svc_iov,
1,
MCA_OOB_TAG_IOF_SVC,
MCA_OOB_ALLOC,
mca_iof_svc_proxy_recv,
NULL
);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "mca_iof_svc_init: unable to post non-blocking recv");
return NULL;
}
return &mca_iof_svc_module;
}

244
src/mca/iof/svc/iof_svc_proxy.c Обычный файл
Просмотреть файл

@ -0,0 +1,244 @@
#include "ompi_config.h"
#include "util/output.h"
#include "mca/oob/oob.h"
#include "mca/iof/base/iof_base_header.h"
#include "mca/iof/base/iof_base_endpoint.h"
#include "mca/iof/base/iof_base_fragment.h"
#include "iof_svc.h"
#include "iof_svc_proxy.h"
#include "iof_svc_publish.h"
#include "iof_svc_subscript.h"
static void mca_iof_svc_proxy_msg(const ompi_process_name_t*, mca_iof_base_msg_header_t*, unsigned char*);
static void mca_iof_svc_proxy_ack(const ompi_process_name_t*, mca_iof_base_msg_header_t*);
static void mca_iof_svc_proxy_pub(const ompi_process_name_t*, mca_iof_base_pub_header_t*);
static void mca_iof_svc_proxy_unpub(const ompi_process_name_t*, mca_iof_base_pub_header_t*);
static void mca_iof_svc_proxy_sub(const ompi_process_name_t*, mca_iof_base_sub_header_t*);
static void mca_iof_svc_proxy_unsub(const ompi_process_name_t*, mca_iof_base_sub_header_t*);
/**
* Callback function from OOB on receipt of IOF request.
*
* @param status (IN) Completion status.
* @param peer (IN) Opaque name of peer process.
* @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param cbdata (IN) User data.
*/
void mca_iof_svc_proxy_recv(
int status,
ompi_process_name_t* peer,
struct iovec* iov,
int count,
int tag,
void* cbdata)
{
int rc;
mca_iof_base_header_t* hdr = (mca_iof_base_header_t*)iov[0].iov_base;
if(status < 0) {
ompi_output(0, "mca_iof_svc_recv: receive failed with status: %d", status);
goto done;
}
switch(hdr->hdr_common.hdr_type) {
case MCA_IOF_BASE_HDR_MSG:
MCA_IOF_BASE_HDR_MSG_NTOH(hdr->hdr_msg);
mca_iof_svc_proxy_msg(peer, &hdr->hdr_msg,
((unsigned char*)iov[0].iov_base)+sizeof(mca_iof_base_header_t));
break;
case MCA_IOF_BASE_HDR_ACK:
MCA_IOF_BASE_HDR_MSG_NTOH(hdr->hdr_msg);
mca_iof_svc_proxy_ack(peer, &hdr->hdr_msg);
break;
case MCA_IOF_BASE_HDR_PUB:
MCA_IOF_BASE_HDR_PUB_NTOH(hdr->hdr_pub);
mca_iof_svc_proxy_pub(peer, &hdr->hdr_pub);
break;
case MCA_IOF_BASE_HDR_UNPUB:
MCA_IOF_BASE_HDR_PUB_NTOH(hdr->hdr_pub);
mca_iof_svc_proxy_unpub(peer, &hdr->hdr_pub);
break;
case MCA_IOF_BASE_HDR_SUB:
MCA_IOF_BASE_HDR_SUB_NTOH(hdr->hdr_sub);
mca_iof_svc_proxy_sub(peer, &hdr->hdr_sub);
break;
case MCA_IOF_BASE_HDR_UNSUB:
MCA_IOF_BASE_HDR_SUB_NTOH(hdr->hdr_sub);
mca_iof_svc_proxy_unsub(peer, &hdr->hdr_sub);
break;
default:
ompi_output(0, "mca_iof_svc_recv: invalid message type: %d\n", hdr->hdr_common.hdr_type);
break;
}
done:
free(hdr);
mca_iof_svc_component.svc_iov[0].iov_base = NULL;
mca_iof_svc_component.svc_iov[0].iov_len = 0;
rc = mca_oob_recv_nb(
MCA_OOB_NAME_ANY,
mca_iof_svc_component.svc_iov,
1,
MCA_OOB_TAG_IOF_SVC,
MCA_OOB_ALLOC,
mca_iof_svc_proxy_recv,
NULL
);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "mca_iof_svc_proxy_recv: unable to post non-blocking recv");
return;
}
}
/**
*
*/
static void mca_iof_svc_proxy_msg(
const ompi_process_name_t* src,
mca_iof_base_msg_header_t* hdr,
unsigned char* data)
{
ompi_list_item_t* item;
if(mca_iof_svc_component.svc_debug > 1) {
ompi_output(0, "mca_iof_svc_proxy_msg");
}
/* dispatch based on subscription list */
OMPI_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
for(item = ompi_list_get_first(&mca_iof_svc_component.svc_subscribed);
item != ompi_list_get_end(&mca_iof_svc_component.svc_subscribed);
item = ompi_list_get_next(item)) {
mca_iof_svc_subscript_t* sub = (mca_iof_svc_subscript_t*)item;
/* tags match */
if(sub->src_tag != hdr->msg_tag && hdr->msg_tag != MCA_IOF_ANY)
continue;
/* source match */
if(ompi_name_server.compare(sub->src_mask,&sub->src_name,&hdr->msg_src) == 0) {
mca_iof_svc_subscript_forward(sub,src,hdr,data);
}
}
OMPI_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
}
/**
*
*/
static void mca_iof_svc_proxy_ack(
const ompi_process_name_t* src,
mca_iof_base_msg_header_t* hdr)
{
if(mca_iof_svc_component.svc_debug > 1) {
ompi_output(0, "mca_iof_svc_proxy_ack");
}
}
/**
*
*/
static void mca_iof_svc_proxy_pub(
const ompi_process_name_t* src,
mca_iof_base_pub_header_t* hdr)
{
int rc;
if(mca_iof_svc_component.svc_debug > 1) {
ompi_output(0, "mca_iof_svc_proxy_pub");
}
rc = mca_iof_svc_publish_create(
&hdr->pub_name,
&hdr->pub_proxy,
hdr->pub_mask,
hdr->pub_tag);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "mca_iof_svc_pub: mca_iof_svc_publish_create failed with status=%d\n", rc);
}
}
/*
*
*/
static void mca_iof_svc_proxy_unpub(
const ompi_process_name_t* src,
mca_iof_base_pub_header_t* hdr)
{
int rc;
if(mca_iof_svc_component.svc_debug > 1) {
ompi_output(0, "mca_iof_svc_proxy_unpub");
}
rc = mca_iof_svc_publish_delete(
&hdr->pub_name,
&hdr->pub_proxy,
hdr->pub_mask,
hdr->pub_tag);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "mca_iof_svc_proxy_unpub: mca_iof_svc_publish_delete failed with status=%d\n", rc);
}
}
/**
*
*/
static void mca_iof_svc_proxy_sub(
const ompi_process_name_t* src,
mca_iof_base_sub_header_t* hdr)
{
int rc;
if(mca_iof_svc_component.svc_debug > 1) {
ompi_output(0, "mca_iof_svc_proxy_sub");
}
rc = mca_iof_svc_subscript_create(
&hdr->src_name,
hdr->src_mask,
hdr->src_tag,
&hdr->dst_name,
hdr->dst_mask,
hdr->dst_tag);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "mca_iof_svc_proxy_sub: mca_iof_svc_subcript_create failed with status=%d\n", rc);
}
}
/**
*
*/
static void mca_iof_svc_proxy_unsub(
const ompi_process_name_t* src,
mca_iof_base_sub_header_t* hdr)
{
int rc;
if(mca_iof_svc_component.svc_debug > 1) {
ompi_output(0, "mca_iof_svc_proxy_unsub");
}
rc = mca_iof_svc_subscript_delete(
&hdr->src_name,
hdr->src_mask,
hdr->src_tag,
&hdr->dst_name,
hdr->dst_mask,
hdr->dst_tag);
if(rc != OMPI_SUCCESS) {
ompi_output(0, "mca_iof_svc_proxy_unsub: mca_iof_svc_subcript_delete failed with status=%d\n", rc);
}
}

53
src/mca/iof/svc/iof_svc_proxy.h Обычный файл
Просмотреть файл

@ -0,0 +1,53 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_IOF_SVC_PROXY_H
#define MCA_IOF_SVC_PROXY_H
#include "mca/iof/iof.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
#include "ompi_config.h"
#include "mca/iof/iof.h"
#include "mca/iof/base/iof_base_header.h"
/**
* Callback function from OOB on receipt of IOF request.
*
* @param status (IN) Completion status.
* @param peer (IN) Opaque name of peer process.
* @param msg (IN) Array of iovecs describing user buffers and lengths.
* @param count (IN) Number of elements in iovec array.
* @param tag (IN) User defined tag for matching send/recv.
* @param cbdata (IN) User data.
*/
void mca_iof_svc_proxy_recv(
int status,
ompi_process_name_t* peer,
struct iovec* msg,
int count,
int tag,
void* cbdata);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

63
src/mca/iof/svc/iof_svc_publish.c Обычный файл
Просмотреть файл

@ -0,0 +1,63 @@
#include "ompi_config.h"
#include "util/output.h"
#include "mca/oob/oob.h"
#include "mca/iof/base/iof_base_header.h"
#include "iof_svc.h"
#include "iof_svc_proxy.h"
#include "iof_svc_publish.h"
static void mca_iof_svc_publish_construct(mca_iof_svc_publish_t* publish)
{
}
static void mca_iof_svc_publish_destruct(mca_iof_svc_publish_t* publish)
{
}
OBJ_CLASS_INSTANCE(
mca_iof_svc_publish_t,
ompi_list_item_t,
mca_iof_svc_publish_construct,
mca_iof_svc_publish_destruct);
/**
*
*/
int mca_iof_svc_publish_create(
const ompi_process_name_t *pub_name,
const ompi_process_name_t *pub_proxy,
ompi_ns_cmp_bitmask_t pub_mask,
mca_iof_base_tag_t pub_tag)
{
mca_iof_svc_publish_t* pub = OBJ_NEW(mca_iof_svc_publish_t);
pub->pub_name = *pub_name;
pub->pub_proxy = *pub_proxy;
pub->pub_mask = pub_mask;
pub->pub_tag = pub_tag;
pub->pub_endpoint = mca_iof_base_endpoint_match(pub_name,pub_mask,pub_tag);
ompi_list_append(&mca_iof_svc_component.svc_published, &pub->super);
return OMPI_SUCCESS;
}
/**
*
*/
int mca_iof_svc_publish_delete(
const ompi_process_name_t *pub_name,
const ompi_process_name_t *pub_proxy,
ompi_ns_cmp_bitmask_t pub_mask,
mca_iof_base_tag_t pub_tag)
{
return OMPI_SUCCESS;
}

50
src/mca/iof/svc/iof_svc_publish.h Обычный файл
Просмотреть файл

@ -0,0 +1,50 @@
#ifndef MCA_IOF_SVC_PUBLISH_H
#define MCA_IOF_SVC_PUBLISH_H
#include "ompi_config.h"
#include "mca/iof/iof.h"
#include "mca/iof/base/base.h"
#include "mca/iof/base/iof_base_endpoint.h"
#include "iof_svc.h"
/**
*
*/
struct mca_iof_svc_publish_t {
ompi_list_item_t super;
ompi_process_name_t pub_name;
ompi_process_name_t pub_proxy;
ompi_ns_cmp_bitmask_t pub_mask;
mca_iof_base_tag_t pub_tag;
mca_iof_base_endpoint_t* pub_endpoint;
};
typedef struct mca_iof_svc_publish_t mca_iof_svc_publish_t;
OBJ_CLASS_DECLARATION(mca_iof_svc_publish_t);
/**
*
*/
int mca_iof_svc_publish_create(
const ompi_process_name_t* pub_name,
const ompi_process_name_t* pub_proxy,
ompi_ns_cmp_bitmask_t pub_mask,
mca_iof_base_tag_t pub_tag);
/**
*
*/
int mca_iof_svc_publish_delete(
const ompi_process_name_t* pub_name,
const ompi_process_name_t* pub_proxy,
ompi_ns_cmp_bitmask_t pub_mask,
mca_iof_base_tag_t pub_tag);
#endif

133
src/mca/iof/svc/iof_svc_subscript.c Обычный файл
Просмотреть файл

@ -0,0 +1,133 @@
#include "ompi_config.h"
#include "util/output.h"
#include "mca/oob/oob.h"
#include "mca/iof/base/iof_base_header.h"
#include "mca/iof/base/iof_base_fragment.h"
#include "iof_svc.h"
#include "iof_svc_proxy.h"
#include "iof_svc_publish.h"
#include "iof_svc_subscript.h"
static void mca_iof_svc_subscript_construct(mca_iof_svc_subscript_t* subscript)
{
}
static void mca_iof_svc_subscript_destruct(mca_iof_svc_subscript_t* subscript)
{
}
OBJ_CLASS_INSTANCE(
mca_iof_svc_subscript_t,
ompi_list_item_t,
mca_iof_svc_subscript_construct,
mca_iof_svc_subscript_destruct);
/**
*
*/
int mca_iof_svc_subscript_create(
const ompi_process_name_t *src_name,
ompi_ns_cmp_bitmask_t src_mask,
mca_iof_base_tag_t src_tag,
const ompi_process_name_t *dst_name,
ompi_ns_cmp_bitmask_t dst_mask,
mca_iof_base_tag_t dst_tag)
{
mca_iof_svc_subscript_t* sub = OBJ_NEW(mca_iof_svc_subscript_t);
sub->src_name = *src_name;
sub->src_mask = src_mask;
sub->src_tag = src_tag;
sub->dst_name = *dst_name;
sub->dst_mask = dst_mask;
sub->dst_tag = dst_tag;
sub->sub_endpoint = mca_iof_base_endpoint_match(&sub->dst_name, sub->dst_mask, sub->dst_tag);
ompi_list_append(&mca_iof_svc_component.svc_subscribed, &sub->super);
return OMPI_SUCCESS;
}
/**
*
*/
int mca_iof_svc_subscript_delete(
const ompi_process_name_t *src_name,
ompi_ns_cmp_bitmask_t src_mask,
mca_iof_base_tag_t src_tag,
const ompi_process_name_t *dst_name,
ompi_ns_cmp_bitmask_t dst_mask,
mca_iof_base_tag_t dst_tag)
{
return OMPI_SUCCESS;
}
/*
*
*/
static void mca_iof_svc_subscript_send_cb(
int status,
ompi_process_name_t* peer,
struct iovec* msg,
int count,
int tag,
void* cbdata)
{
mca_iof_base_frag_t* frag = (mca_iof_base_frag_t*)frag;
MCA_IOF_BASE_FRAG_RETURN(frag);
}
/**
*
*/
int mca_iof_svc_subscript_forward(
mca_iof_svc_subscript_t* sub,
const ompi_process_name_t* src,
mca_iof_base_msg_header_t* hdr,
const unsigned char* data)
{
ompi_list_item_t* item;
for(item = ompi_list_get_first(&mca_iof_svc_component.svc_published);
item != ompi_list_get_end(&mca_iof_svc_component.svc_published);
item = ompi_list_get_next(item)) {
mca_iof_svc_publish_t* pub = (mca_iof_svc_publish_t*)item;
int rc;
if(sub->dst_tag != pub->pub_tag && pub->pub_tag != MCA_IOF_ANY)
continue;
if(pub->pub_endpoint != NULL) {
rc = mca_iof_base_endpoint_forward(pub->pub_endpoint,src,hdr,data);
} else {
/* forward */
mca_iof_base_frag_t* frag;
MCA_IOF_BASE_FRAG_ALLOC(frag,rc);
frag->frag_hdr.hdr_msg = *hdr;
frag->frag_len = frag->frag_hdr.hdr_msg.msg_len;
frag->frag_iov[1].iov_len = frag->frag_len;
memcpy(frag->frag_data, data, frag->frag_len);
rc = mca_oob_send_nb(
&pub->pub_proxy,
frag->frag_iov,
2,
MCA_OOB_TAG_IOF_SVC,
0,
mca_iof_svc_subscript_send_cb,
frag);
}
if(rc != OMPI_SUCCESS) {
return rc;
}
}
if(sub->sub_endpoint != NULL) {
return mca_iof_base_endpoint_forward(sub->sub_endpoint,src,hdr,data);
}
return OMPI_SUCCESS;
}

67
src/mca/iof/svc/iof_svc_subscript.h Обычный файл
Просмотреть файл

@ -0,0 +1,67 @@
#ifndef MCA_IOF_SVC_SUBSCRIPT_H
#define MCA_IOF_SVC_SUBSCRIPT_H
/**
*
*/
struct mca_iof_svc_subscript_t {
ompi_list_item_t super;
ompi_process_name_t src_name;
ompi_ns_cmp_bitmask_t src_mask;
mca_iof_base_tag_t src_tag;
ompi_process_name_t dst_name;
ompi_ns_cmp_bitmask_t dst_mask;
mca_iof_base_tag_t dst_tag;
mca_iof_base_endpoint_t* sub_endpoint;
};
typedef struct mca_iof_svc_subscript_t mca_iof_svc_subscript_t;
OBJ_CLASS_DECLARATION(mca_iof_svc_subscript_t);
/**
*
*/
mca_iof_svc_subscript_t* mca_iof_svc_subscript_lookup(
const ompi_process_name_t* src
);
/**
*
*/
int mca_iof_svc_subscript_create(
const ompi_process_name_t *src_name,
ompi_ns_cmp_bitmask_t src_mask,
mca_iof_base_tag_t src_tag,
const ompi_process_name_t *dst_name,
ompi_ns_cmp_bitmask_t dst_mask,
mca_iof_base_tag_t dst_tag);
/**
*
*/
int mca_iof_svc_subscript_delete(
const ompi_process_name_t *src_name,
ompi_ns_cmp_bitmask_t src_mask,
mca_iof_base_tag_t src_tag,
const ompi_process_name_t *dst_name,
ompi_ns_cmp_bitmask_t dst_mask,
mca_iof_base_tag_t dst_tag);
/**
*
*/
int mca_iof_svc_subscript_forward(
mca_iof_svc_subscript_t* subscript,
const ompi_process_name_t* src,
mca_iof_base_msg_header_t* hdr,
const unsigned char* data);
#endif