1
1

added support for callback on receipt of I/O

This commit was SVN r8084.
Этот коммит содержится в:
Tim Woodall 2005-11-10 04:49:51 +00:00
родитель 3699c924bd
Коммит 0b0d7f56c1
6 изменённых файлов: 207 добавлений и 30 удалений

Просмотреть файл

@ -39,6 +39,7 @@ static void orte_iof_base_endpoint_construct(orte_iof_base_endpoint_t* endpoint)
endpoint->ep_fd = -1; endpoint->ep_fd = -1;
memset(&endpoint->ep_event,0,sizeof(endpoint->ep_event)); memset(&endpoint->ep_event,0,sizeof(endpoint->ep_event));
OBJ_CONSTRUCT(&endpoint->ep_frags, opal_list_t); OBJ_CONSTRUCT(&endpoint->ep_frags, opal_list_t);
OBJ_CONSTRUCT(&endpoint->ep_callbacks, opal_list_t);
} }
static void orte_iof_base_endpoint_destruct(orte_iof_base_endpoint_t* endpoint) static void orte_iof_base_endpoint_destruct(orte_iof_base_endpoint_t* endpoint)
@ -47,6 +48,7 @@ static void orte_iof_base_endpoint_destruct(orte_iof_base_endpoint_t* endpoint)
opal_event_del(&endpoint->ep_event); opal_event_del(&endpoint->ep_event);
} }
OBJ_DESTRUCT(&endpoint->ep_frags); OBJ_DESTRUCT(&endpoint->ep_frags);
OBJ_DESTRUCT(&endpoint->ep_callbacks);
} }
OBJ_CLASS_INSTANCE( OBJ_CLASS_INSTANCE(
@ -55,6 +57,27 @@ OBJ_CLASS_INSTANCE(
orte_iof_base_endpoint_construct, orte_iof_base_endpoint_construct,
orte_iof_base_endpoint_destruct); orte_iof_base_endpoint_destruct);
/**
* Construct/Destructor
*/
static void orte_iof_base_callback_construct(orte_iof_base_callback_t* cb)
{
cb->cb_func = 0;
cb->cb_data = NULL;
}
static void orte_iof_base_callback_destruct(orte_iof_base_callback_t* cb)
{
}
OBJ_CLASS_INSTANCE(
orte_iof_base_callback_t,
opal_list_item_t,
orte_iof_base_callback_construct,
orte_iof_base_callback_destruct);
/* /*
* Callback when non-blocking OOB send completes. * Callback when non-blocking OOB send completes.
@ -243,14 +266,15 @@ int orte_iof_base_endpoint_create(
endpoint->ep_tag = tag; endpoint->ep_tag = tag;
endpoint->ep_fd = fd; endpoint->ep_fd = fd;
/* set file descriptor to be non-blocking */ /* set to non-blocking */
if((flags = fcntl(fd, F_GETFL, 0)) < 0) { if((flags = fcntl(fd, F_GETFL, 0)) < 0) {
opal_output(0, "orte_iof_base_endpoint_create: fcntl(F_GETFL) failed with errno=%d\n", errno); opal_output(0, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n",
__FILE__, __LINE__, errno);
} else { } else {
flags |= O_NONBLOCK; flags |= O_NONBLOCK;
fcntl(fd, F_SETFL, flags); fcntl(fd, F_SETFL, flags);
} }
/* setup event handler */ /* setup event handler */
switch(mode) { switch(mode) {
case ORTE_IOF_SOURCE: case ORTE_IOF_SOURCE:
@ -378,6 +402,7 @@ int orte_iof_base_endpoint_forward(
orte_iof_base_msg_header_t* hdr, orte_iof_base_msg_header_t* hdr,
const unsigned char* data) const unsigned char* data)
{ {
opal_list_item_t* item;
orte_iof_base_frag_t* frag; orte_iof_base_frag_t* frag;
size_t len = hdr->msg_len; size_t len = hdr->msg_len;
int rc = 0; int rc = 0;
@ -397,26 +422,45 @@ int orte_iof_base_endpoint_forward(
frag->frag_src = *src; frag->frag_src = *src;
frag->frag_hdr.hdr_msg = *hdr; frag->frag_hdr.hdr_msg = *hdr;
/* try to write w/out copying data */ /* call any registered callbacks */
if(opal_list_get_size(&endpoint->ep_frags) == 0) { for(item = opal_list_get_first(&endpoint->ep_callbacks);
rc = write(endpoint->ep_fd,data,len); item != opal_list_get_end(&endpoint->ep_callbacks);
if(rc < 0 && (errno != EAGAIN && errno != EINTR)) { item = opal_list_get_next(item)) {
orte_iof_base_endpoint_closed(endpoint); orte_iof_base_callback_t* cb = (orte_iof_base_callback_t*)item;
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); cb->cb_func(
return ORTE_SUCCESS; &hdr->msg_src,
} hdr->msg_tag,
} cb->cb_data,
data,
hdr->msg_len);
}
frag->frag_len = len - rc; if(endpoint->ep_fd >= 0) {
if(frag->frag_len > 0) { /* try to write w/out copying data */
/* handle incomplete write */ if(opal_list_get_size(&endpoint->ep_frags) == 0) {
frag->frag_ptr = frag->frag_data; rc = write(endpoint->ep_fd,data,len);
memcpy(frag->frag_ptr, data+rc, frag->frag_len); if(rc < 0 && (errno != EAGAIN && errno != EINTR)) {
opal_list_append(&endpoint->ep_frags, &frag->super); orte_iof_base_endpoint_closed(endpoint);
if(opal_list_get_size(&endpoint->ep_frags) == 1) { OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
opal_event_add(&endpoint->ep_event,0); return ORTE_SUCCESS;
}
}
frag->frag_len = len - rc;
if(frag->frag_len > 0) {
/* handle incomplete write */
frag->frag_ptr = frag->frag_data;
memcpy(frag->frag_ptr, data+rc, frag->frag_len);
opal_list_append(&endpoint->ep_frags, &frag->super);
if(opal_list_get_size(&endpoint->ep_frags) == 1) {
opal_event_add(&endpoint->ep_event,0);
}
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
} else {
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
/* acknowledge fragment */
orte_iof_base_frag_ack(frag);
} }
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
} else { } else {
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
/* acknowledge fragment */ /* acknowledge fragment */
@ -426,6 +470,71 @@ int orte_iof_base_endpoint_forward(
} }
/**
* Register a callback
*/
int orte_iof_base_callback_create(
const orte_process_name_t* proc,
int tag,
orte_iof_base_callback_fn_t cbfunc,
void *cbdata)
{
orte_iof_base_callback_t* cb = OBJ_NEW(orte_iof_base_callback_t);
orte_iof_base_endpoint_t* endpoint;
if(NULL == cb)
return ORTE_ERR_OUT_OF_RESOURCE;
OPAL_THREAD_LOCK(&orte_iof_base.iof_lock);
if((endpoint = orte_iof_base_endpoint_lookup(proc,ORTE_IOF_SINK,tag)) == NULL) {
endpoint = OBJ_NEW(orte_iof_base_endpoint_t);
if(NULL == endpoint) {
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
endpoint->ep_name = *proc;
endpoint->ep_mode = ORTE_IOF_SINK;
endpoint->ep_tag = tag;
endpoint->ep_fd = -1;
opal_list_append(&orte_iof_base.iof_endpoints, &endpoint->super);
} else {
OBJ_RELEASE(endpoint);
}
cb->cb_func = cbfunc;
cb->cb_data = cbdata;
opal_list_append(&endpoint->ep_callbacks, (opal_list_item_t*)cb);
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return ORTE_SUCCESS;
}
/**
* Remove a callback
*/
int orte_iof_base_callback_delete(
const orte_process_name_t* proc,
int tag)
{
orte_iof_base_endpoint_t* endpoint;
opal_list_item_t* item;
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
if(NULL == (endpoint = orte_iof_base_endpoint_lookup(proc,ORTE_IOF_SINK, tag))) {
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return ORTE_ERR_NOT_FOUND;
}
while(NULL != (item = opal_list_remove_first(&endpoint->ep_callbacks))) {
OBJ_RELEASE(item);
}
OBJ_RELEASE(endpoint);
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return ORTE_SUCCESS;
}
/** /**
* Update the acknowledged sequence number. If forwarding had * Update the acknowledged sequence number. If forwarding had
* previously been disabled as the window closed, and the window * previously been disabled as the window closed, and the window

Просмотреть файл

@ -13,6 +13,19 @@ enum {
ORTE_IOF_EP_CLOSED ORTE_IOF_EP_CLOSED
}; };
/**
* Structure store callbacks
*/
struct orte_iof_base_callback_t {
opal_list_item_t super;
orte_iof_base_callback_fn_t cb_func;
void* cb_data;
};
typedef struct orte_iof_base_callback_t orte_iof_base_callback_t;
OBJ_CLASS_DECLARATION(orte_iof_base_callback_t);
/** /**
* Structure that represents a published endpoint. * Structure that represents a published endpoint.
*/ */
@ -28,6 +41,7 @@ struct orte_iof_base_endpoint_t {
uint32_t ep_ack; uint32_t ep_ack;
opal_event_t ep_event; opal_event_t ep_event;
opal_list_t ep_frags; opal_list_t ep_frags;
opal_list_t ep_callbacks;
}; };
typedef struct orte_iof_base_endpoint_t orte_iof_base_endpoint_t; typedef struct orte_iof_base_endpoint_t orte_iof_base_endpoint_t;
@ -56,6 +70,25 @@ int orte_iof_base_endpoint_create(
int tag, int tag,
int fd); int fd);
/**
* Associate a callback on receipt of data.
*
* @param name Process name corresponding to endpoint.
* @param cbfunc Logical tag for matching.
* @aram cbdata Local file descriptor corresponding to endpoint.
*/
int orte_iof_base_callback_create(
const orte_process_name_t *name,
int tag,
orte_iof_base_callback_fn_t cbfunc,
void* cbdata);
int orte_iof_base_callback_delete(
const orte_process_name_t *name,
int tag);
/** /**
* Delete all local endpoints matching the specified * Delete all local endpoints matching the specified
* name/mask/tag parameters. * name/mask/tag parameters.

Просмотреть файл

@ -135,7 +135,7 @@ typedef int (*orte_iof_base_buffer_fn_t)(
* from a specified set of peers. * from a specified set of peers.
*/ */
typedef int (*orte_iof_base_callback_fn_t)( typedef void (*orte_iof_base_callback_fn_t)(
orte_process_name_t* src_name, orte_process_name_t* src_name,
orte_iof_base_tag_t src_tag, orte_iof_base_tag_t src_tag,
void *cbdata, void *cbdata,

Просмотреть файл

@ -237,12 +237,15 @@ int orte_iof_proxy_subscribe(
const orte_process_name_t* src_name, const orte_process_name_t* src_name,
orte_ns_cmp_bitmask_t src_mask, orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag, orte_iof_base_tag_t src_tag,
orte_iof_base_callback_fn_t cb, orte_iof_base_callback_fn_t cbfunc,
void* cbdata) void* cbdata)
{ {
int rc; int rc;
/* create a local registration to reflect the callback */ /* create a local registration to reflect the callback */
rc = orte_iof_base_callback_create(ORTE_RML_NAME_SELF,src_tag,cbfunc,cbdata);
if(rc != OMPI_SUCCESS)
return rc;
/* send a subscription message to the service */ /* send a subscription message to the service */
rc = orte_iof_proxy_svc_subscribe( rc = orte_iof_proxy_svc_subscribe(
@ -270,8 +273,10 @@ int orte_iof_proxy_unsubscribe(
ORTE_RML_NAME_SELF, ORTE_RML_NAME_SELF,
ORTE_NS_CMP_ALL, ORTE_NS_CMP_ALL,
src_tag); src_tag);
if(rc != OMPI_SUCCESS)
return rc;
/* remove local callback */ /* remove local callback */
return ORTE_ERROR; return orte_iof_base_callback_delete(src_name,src_tag);
} }

Просмотреть файл

@ -216,15 +216,28 @@ int orte_iof_svc_buffer(
*/ */
int orte_iof_svc_subscribe( int orte_iof_svc_subscribe(
const orte_process_name_t* src_name, const orte_process_name_t* src_name,
orte_ns_cmp_bitmask_t src_mask, orte_ns_cmp_bitmask_t src_mask,
orte_iof_base_tag_t src_tag, orte_iof_base_tag_t src_tag,
orte_iof_base_callback_fn_t cb, orte_iof_base_callback_fn_t cbfunc,
void* cbdata) void* cbdata)
{ {
/* setup local callback on receipt of data */ int rc;
/* create a local registration to reflect the callback */
rc = orte_iof_base_callback_create(ORTE_RML_NAME_SELF,src_tag,cbfunc,cbdata);
if(rc != OMPI_SUCCESS)
return rc;
/* setup local subscription */ /* setup local subscription */
return OMPI_ERROR; rc = orte_iof_svc_sub_create(
src_name,
src_mask,
src_tag,
ORTE_RML_NAME_SELF,
ORTE_NS_CMP_ALL,
src_tag);
return rc;
} }
int orte_iof_svc_unsubscribe( int orte_iof_svc_unsubscribe(

Просмотреть файл

@ -72,9 +72,26 @@ int orte_iof_svc_sub_create(
orte_ns_cmp_bitmask_t dst_mask, orte_ns_cmp_bitmask_t dst_mask,
orte_iof_base_tag_t dst_tag) orte_iof_base_tag_t dst_tag)
{ {
orte_iof_svc_sub_t* sub;
opal_list_item_t* item; opal_list_item_t* item;
orte_iof_svc_sub_t* sub = OBJ_NEW(orte_iof_svc_sub_t);
OPAL_THREAD_LOCK(&mca_iof_svc_component.svc_lock);
for(item = opal_list_get_first(&mca_iof_svc_component.svc_subscribed);
item != opal_list_get_end(&mca_iof_svc_component.svc_subscribed);
item = opal_list_get_next(item)) {
sub = (orte_iof_svc_sub_t*)item;
if (sub->src_mask == src_mask &&
orte_ns.compare(sub->src_mask,&sub->src_name,src_name) == 0 &&
sub->src_tag == src_tag &&
sub->dst_mask == dst_mask &&
orte_ns.compare(sub->dst_mask,&sub->dst_name,dst_name) == 0 &&
sub->dst_tag == dst_tag) {
OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock);
return ORTE_SUCCESS;
}
}
sub = OBJ_NEW(orte_iof_svc_sub_t);
sub->src_name = *src_name; sub->src_name = *src_name;
sub->src_mask = src_mask; sub->src_mask = src_mask;
sub->src_tag = src_tag; sub->src_tag = src_tag;