1
1
This commit was SVN r3985.
Этот коммит содержится в:
Tim Woodall 2005-01-13 15:27:28 +00:00
родитель 75ceb0a326
Коммит 434d6059a1
10 изменённых файлов: 143 добавлений и 63 удалений

Просмотреть файл

@ -41,7 +41,7 @@ int mca_iof_base_close(void)
ompi_event_t ev;
struct timeval tv = { 0, 0 };
int flushed = 0;
int closed = 0;
size_t closed = 0;
/* flush any pending output */
fflush(NULL);

Просмотреть файл

@ -19,7 +19,7 @@
#include "iof_base_fragment.h"
/**
*
* Construct/Destructor
*/
static void mca_iof_base_endpoint_construct(mca_iof_base_endpoint_t* endpoint)
@ -45,7 +45,7 @@ OBJ_CLASS_INSTANCE(
/*
*
* Callback when non-blocking OOB send completes.
*/
static void mca_iof_base_endpoint_send_cb(
@ -64,11 +64,10 @@ static void mca_iof_base_endpoint_send_cb(
/*
* Receive from pipe/pty/etc. and forward to the
* service.
* Callback when data is available on the endpoint to read.
*/
static void mca_iof_base_endpoint_recv_handler(int fd, short flags, void *cbdata)
static void mca_iof_base_endpoint_read_handler(int fd, short flags, void *cbdata)
{
mca_iof_base_endpoint_t* endpoint = (mca_iof_base_endpoint_t*)cbdata;
mca_iof_base_frag_t* frag;
@ -88,6 +87,13 @@ static void mca_iof_base_endpoint_recv_handler(int fd, short flags, void *cbdata
/* read up to the fragment size */
rc = read(fd, frag->frag_data, sizeof(frag->frag_data));
if(rc <= 0) {
/* non-blocking */
if(rc < 0 && errno == EAGAIN) {
MCA_IOF_BASE_FRAG_RETURN(frag);
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
return;
}
/* peer has closed the connection */
mca_iof_base_endpoint_closed(endpoint);
rc = 0;
@ -108,7 +114,7 @@ static void mca_iof_base_endpoint_recv_handler(int fd, short flags, void *cbdata
if(MCA_IOF_BASE_SEQDIFF(endpoint->ep_seq,endpoint->ep_ack) > mca_iof_base.iof_window_size) {
ompi_event_del(&endpoint->ep_event);
}
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
/* start non-blocking OOB call to forward received data */
rc = mca_oob_send_nb(
@ -122,15 +128,29 @@ static void mca_iof_base_endpoint_recv_handler(int fd, short flags, void *cbdata
}
static void mca_iof_base_endpoint_send_handler(int sd, short flags, void *user)
/**
* Callback when the endpoint is available for write.
*/
static void mca_iof_base_endpoint_write_handler(int sd, short flags, void *user)
{
mca_iof_base_endpoint_t* endpoint = (mca_iof_base_endpoint_t*)user;
ompi_list_t completed;
ompi_process_name_t last = mca_oob_name_any;
OBJ_CONSTRUCT(&completed, ompi_list_t);
/*
* step through the list of queued fragments and attempt to write
* until the output descriptor would block
*/
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
while(ompi_list_get_size(&endpoint->ep_frags)) {
mca_iof_base_frag_t* frag = (mca_iof_base_frag_t*)ompi_list_get_first(&endpoint->ep_frags);
int rc = write(endpoint->ep_fd, frag->frag_ptr, frag->frag_len);
if(rc < 0) {
if(errno == EAGAIN)
break;
mca_iof_base_endpoint_closed(endpoint);
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
return;
@ -152,10 +172,11 @@ static void mca_iof_base_endpoint_send_handler(int sd, short flags, void *user)
}
/*
*
* Lookup existing endpoint matching parameters
* supplied to create.
*/
mca_iof_base_endpoint_t* mca_iof_base_endpoint_lookup(
static mca_iof_base_endpoint_t* mca_iof_base_endpoint_lookup(
const ompi_process_name_t* proc,
mca_iof_base_mode_t mode,
int tag)
@ -165,7 +186,7 @@ mca_iof_base_endpoint_t* mca_iof_base_endpoint_lookup(
/*
*
* Create a local endpoint.
*/
int mca_iof_base_endpoint_create(
@ -177,7 +198,6 @@ int mca_iof_base_endpoint_create(
mca_iof_base_endpoint_t* endpoint;
int flags;
/* create local endpoint */
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
if((endpoint = mca_iof_base_endpoint_lookup(proc,mode,tag)) != NULL) {
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
@ -208,7 +228,7 @@ int mca_iof_base_endpoint_create(
&endpoint->ep_event,
endpoint->ep_fd,
OMPI_EV_READ|OMPI_EV_PERSIST,
mca_iof_base_endpoint_recv_handler,
mca_iof_base_endpoint_read_handler,
endpoint);
ompi_event_add(&endpoint->ep_event, 0);
break;
@ -216,8 +236,8 @@ int mca_iof_base_endpoint_create(
ompi_event_set(
&endpoint->ep_event,
endpoint->ep_fd,
OMPI_EV_WRITE,
mca_iof_base_endpoint_send_handler,
OMPI_EV_WRITE|OMPI_EV_PERSIST,
mca_iof_base_endpoint_write_handler,
endpoint);
break;
default:
@ -232,7 +252,7 @@ int mca_iof_base_endpoint_create(
/*
*
* Close one or more matching endpoints.
*/
int mca_iof_base_endpoint_delete(
@ -249,7 +269,6 @@ int mca_iof_base_endpoint_delete(
int mca_iof_base_endpoint_close(mca_iof_base_endpoint_t* endpoint)
{
bool closed = false;
endpoint->ep_state = MCA_IOF_EP_CLOSING;
switch(endpoint->ep_mode) {
case MCA_IOF_SOURCE:
@ -276,7 +295,7 @@ void mca_iof_base_endpoint_closed(mca_iof_base_endpoint_t* endpoint)
}
/*
*
* Lookup endpoint based on destination process name/mask/tag.
*/
mca_iof_base_endpoint_t* mca_iof_base_endpoint_match(
@ -303,7 +322,9 @@ mca_iof_base_endpoint_t* mca_iof_base_endpoint_match(
}
/*
*
* Forward data out the endpoint as the destination
* is available. Queue incomplete fragments in order
* received and process as the destination becomes available.
*/
int mca_iof_base_endpoint_forward(
@ -316,13 +337,18 @@ int mca_iof_base_endpoint_forward(
size_t len = hdr->msg_len;
int rc = 0;
if(endpoint->ep_mode != MCA_IOF_SINK) {
return OMPI_ERR_BAD_PARAM;
}
/* allocate and initialize a fragment */
MCA_IOF_BASE_FRAG_ALLOC(frag, rc);
if(NULL == frag) {
return OMPI_ERR_OUT_OF_RESOURCE;
return OMPI_ERR_OUT_OF_RESOURCE;
}
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
endpoint->ep_seq = hdr->msg_seq + hdr->msg_len;
frag->frag_owner = endpoint;
frag->frag_src = *src;
frag->frag_hdr.hdr_msg = *hdr;
@ -348,6 +374,7 @@ int mca_iof_base_endpoint_forward(
}
} else {
/* acknowledge fragment */
endpoint->ep_ack = frag->frag_hdr.hdr_msg.msg_seq + frag->frag_hdr.hdr_msg.msg_len;
mca_iof_base_frag_ack(frag);
}
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
@ -356,7 +383,9 @@ int mca_iof_base_endpoint_forward(
/**
*
* Update the acknowledged sequence number. If forwarding had
* previously been disabled as the window closed, and the window
* is now open, re-enable forwarding.
*/
int mca_iof_base_endpoint_ack(
@ -379,7 +408,7 @@ int mca_iof_base_endpoint_ack(
ompi_condition_signal(&mca_iof_base.iof_condition);
}
/* otherwise check to see if we can reenable forwarding */
/* otherwise check to see if we need to reenable forwarding */
} else if(window_closed && window_open) {
ompi_event_add(&endpoint->ep_event, 0);
}

Просмотреть файл

@ -13,6 +13,10 @@ enum {
MCA_IOF_EP_CLOSED
};
/**
* Structure that represents a published endpoint.
*/
struct mca_iof_base_endpoint_t {
ompi_list_item_t super;
mca_iof_base_mode_t ep_mode;
@ -38,16 +42,12 @@ OBJ_CLASS_DECLARATION(mca_iof_base_endpoint_t);
/**
*
*/
mca_iof_base_endpoint_t* mca_iof_base_endpoint_lookup(
const ompi_process_name_t* proc,
mca_iof_base_mode_t mode,
int tag);
/**
*
* Create a local endpoint.
*
* @param name Process name corresponding to endpoint.
* @param mode Source or sink of data (exclusive).
* @param tag Logical tag for matching.
* @aram fd Local file descriptor corresponding to endpoint.
*/
int mca_iof_base_endpoint_create(
@ -57,7 +57,12 @@ int mca_iof_base_endpoint_create(
int fd);
/**
* Delete all local endpoints matching the specified
* name/mask/tag parameters.
*
* @paran name Process name corresponding to one or more endpoint(s).
* @param mask Mask used for name comparisons.
* @param tag Tag for matching endpoints.
*/
int mca_iof_base_endpoint_delete(
@ -65,15 +70,16 @@ int mca_iof_base_endpoint_delete(
ompi_ns_cmp_bitmask_t mask,
int tag);
/*
*
/**
* Disable forwarding through the specified endpoint.
*/
int mca_iof_base_endpoint_close(
mca_iof_base_endpoint_t* endpoint);
/**
*
* Attempt to match an endpoint based on the destination
* process name/mask/tag.
*/
mca_iof_base_endpoint_t* mca_iof_base_endpoint_match(
@ -82,7 +88,7 @@ mca_iof_base_endpoint_t* mca_iof_base_endpoint_match(
int dst_tag);
/**
*
* Forward the specified message out the endpoint.
*/
int mca_iof_base_endpoint_forward(
@ -92,16 +98,17 @@ int mca_iof_base_endpoint_forward(
const unsigned char* data);
/*
*
* Callback when peer has closed endpoint.
*/
void mca_iof_base_endpoint_closed(
mca_iof_base_endpoint_t* endpoint);
/**
*
* Callback when the specified sequence has been
* acknowledged.
*/
int mca_iof_base_endpoint_ack(
mca_iof_base_endpoint_t* endpoint,
uint32_t seq);

Просмотреть файл

@ -42,7 +42,7 @@
n.cellid,n.jobid,n.vpid
/**
*
* Fields common to all headers.
*/
struct mca_iof_base_common_header_t {
@ -60,7 +60,7 @@ typedef struct mca_iof_base_common_header_t mca_iof_base_common_header_t;
/**
*
* Header for data.
*/
struct mca_iof_base_msg_header_t {
@ -144,7 +144,7 @@ typedef struct mca_iof_base_sub_header_t mca_iof_base_sub_header_t;
(h).dst_tag = htonl((h).dst_tag);
/**
*
* Union of all header types.
*/
union mca_iof_base_header_t {

Просмотреть файл

@ -63,7 +63,7 @@ int mca_iof_base_open(void)
mca_iof_base.iof_window_size = int_value;
id = mca_base_param_register_string("iof","base","service",NULL,"0.0.0");
mca_base_param_lookup_int(id,&str_value);
mca_base_param_lookup_string(id,&str_value);
mca_iof_base.iof_service = ompi_name_server.convert_string_to_process_name(str_value);
/* initialize free list */

Просмотреть файл

@ -8,8 +8,8 @@
#include "iof_proxy_svc.h"
/**
*
/*
* Local function prototypes.
*/
static void mca_iof_proxy_svc_msg(
@ -22,8 +22,9 @@ static void mca_iof_proxy_svc_ack(
mca_iof_base_msg_header_t* msg);
/**
*
/*
* Publish the availability of a local endpoint
* to the servver.
*/
int mca_iof_proxy_svc_publish(
@ -59,8 +60,8 @@ int mca_iof_proxy_svc_publish(
}
/**
*
/*
* Remove published endpoint from the server.
*/
int mca_iof_proxy_svc_unpublish(
@ -97,8 +98,9 @@ int mca_iof_proxy_svc_unpublish(
}
/**
*
/*
* Subscribe one or more destination process(es) to
* one/more source process.
*/
int mca_iof_proxy_svc_subscribe(
@ -141,6 +143,10 @@ int mca_iof_proxy_svc_subscribe(
}
/*
* Remove subscription message from the server.
*/
int mca_iof_proxy_svc_unsubscribe(
const ompi_process_name_t* src_name,
ompi_ns_cmp_bitmask_t src_mask,
@ -180,6 +186,11 @@ int mca_iof_proxy_svc_unsubscribe(
}
/*
* Handle receipt of data/ack messages from the server
* and forward on to the appropriate endpoint.
*/
void mca_iof_proxy_svc_recv(
int status,
ompi_process_name_t* src,
@ -225,6 +236,10 @@ void mca_iof_proxy_svc_recv(
}
/*
* Forward data message to the matching endpoint.
*/
static void mca_iof_proxy_svc_msg(
const ompi_process_name_t* src,
mca_iof_base_msg_header_t* msg,
@ -239,7 +254,7 @@ static void mca_iof_proxy_svc_msg(
}
/**
*
* Forward ack message to the matching endpoint.
*/
static void mca_iof_proxy_svc_ack(

Просмотреть файл

@ -26,7 +26,7 @@ extern "C" {
#endif
/**
*
* Publish endpoint to forwarding service.
*/
int mca_iof_proxy_svc_publish(
@ -41,7 +41,9 @@ int mca_iof_proxy_svc_unpublish(
);
/**
*
* Subscribe one/more destination processes as
* specified by the process name/mask to one/more
* source processes.
*/
int mca_iof_proxy_svc_subscribe(
@ -54,7 +56,8 @@ int mca_iof_proxy_svc_subscribe(
);
/**
*
* Remove subscription from forwarding
* service.
*/
int mca_iof_proxy_svc_unsubscribe(
@ -67,7 +70,8 @@ int mca_iof_proxy_svc_unsubscribe(
);
/**
*
* Dispatch messages received from forwarding
* service.
*/
void mca_iof_proxy_svc_recv(

Просмотреть файл

@ -129,7 +129,7 @@ static void mca_iof_svc_proxy_msg(
mca_iof_svc_subscript_forward(sub,src,hdr,data);
}
}
OMPI_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
OMPI_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
}
/**

Просмотреть файл

@ -12,11 +12,14 @@
static void mca_iof_svc_subscript_construct(mca_iof_svc_subscript_t* subscript)
{
subscript->sub_endpoint = NULL;
}
static void mca_iof_svc_subscript_destruct(mca_iof_svc_subscript_t* subscript)
{
if(subscript->sub_endpoint != NULL)
OBJ_RELEASE(subscript->sub_endpoint);
}
@ -27,7 +30,7 @@ OBJ_CLASS_INSTANCE(
mca_iof_svc_subscript_destruct);
/**
*
* Create a subscription/forwarding entry.
*/
int mca_iof_svc_subscript_create(
@ -46,14 +49,16 @@ int mca_iof_svc_subscript_create(
sub->dst_mask = dst_mask;
sub->dst_tag = dst_tag;
sub->sub_endpoint = mca_iof_base_endpoint_match(&sub->dst_name, sub->dst_mask, sub->dst_tag);
OMPI_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
ompi_list_append(&mca_iof_svc_component.svc_subscribed, &sub->super);
OMPI_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
return OMPI_SUCCESS;
}
/**
*
* Delete all matching subscriptions.
*/
int mca_iof_svc_subscript_delete(
const ompi_process_name_t *src_name,
ompi_ns_cmp_bitmask_t src_mask,
@ -62,12 +67,30 @@ int mca_iof_svc_subscript_delete(
ompi_ns_cmp_bitmask_t dst_mask,
mca_iof_base_tag_t dst_tag)
{
ompi_list_item_t *item;
OMPI_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
item = ompi_list_get_first(&mca_iof_svc_component.svc_subscribed);
while(item != ompi_list_get_end(&mca_iof_svc_component.svc_subscribed)) {
ompi_list_item_t* next = ompi_list_get_next(item);
mca_iof_svc_subscript_t* sub = (mca_iof_svc_subscript_t*)item;
if (sub->src_mask == src_mask &&
ompi_name_server.compare(sub->src_mask,&sub->src_name,src_name) == 0 &&
sub->src_tag == src_tag &&
sub->dst_mask == dst_mask &&
ompi_name_server.compare(sub->dst_mask,&sub->dst_name,dst_name) == 0 &&
sub->dst_tag == dst_tag) {
ompi_list_remove_item(&mca_iof_svc_component.svc_subscribed, item);
OBJ_RELEASE(item);
}
item = next;
}
OMPI_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
return OMPI_SUCCESS;
}
/*
*
* Callback on send completion. Release send resources (fragment).
*/
static void mca_iof_svc_subscript_send_cb(
@ -83,7 +106,8 @@ static void mca_iof_svc_subscript_send_cb(
}
/**
*
* Check for matching endpoints that have been published to the
* server. Forward data out each matching endpoint.
*/
int mca_iof_svc_subscript_forward(

Просмотреть файл

@ -3,7 +3,8 @@
/**
*
* A subscription maps data from a specified set
* of source endpoints to one or more destination(s).
*/
struct mca_iof_svc_subscript_t {