From 0b0d7f56c1d2e7f5944ee6e8ef8f40a8298a31ea Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Thu, 10 Nov 2005 04:49:51 +0000 Subject: [PATCH] added support for callback on receipt of I/O This commit was SVN r8084. --- orte/mca/iof/base/iof_base_endpoint.c | 151 ++++++++++++++++++++++---- orte/mca/iof/base/iof_base_endpoint.h | 33 ++++++ orte/mca/iof/iof.h | 2 +- orte/mca/iof/proxy/iof_proxy.c | 11 +- orte/mca/iof/svc/iof_svc.c | 21 +++- orte/mca/iof/svc/iof_svc_sub.c | 19 +++- 6 files changed, 207 insertions(+), 30 deletions(-) diff --git a/orte/mca/iof/base/iof_base_endpoint.c b/orte/mca/iof/base/iof_base_endpoint.c index 122f88ab8e..21a2fb07de 100644 --- a/orte/mca/iof/base/iof_base_endpoint.c +++ b/orte/mca/iof/base/iof_base_endpoint.c @@ -39,6 +39,7 @@ static void orte_iof_base_endpoint_construct(orte_iof_base_endpoint_t* endpoint) endpoint->ep_fd = -1; memset(&endpoint->ep_event,0,sizeof(endpoint->ep_event)); OBJ_CONSTRUCT(&endpoint->ep_frags, opal_list_t); + OBJ_CONSTRUCT(&endpoint->ep_callbacks, opal_list_t); } static void orte_iof_base_endpoint_destruct(orte_iof_base_endpoint_t* endpoint) @@ -47,6 +48,7 @@ static void orte_iof_base_endpoint_destruct(orte_iof_base_endpoint_t* endpoint) opal_event_del(&endpoint->ep_event); } OBJ_DESTRUCT(&endpoint->ep_frags); + OBJ_DESTRUCT(&endpoint->ep_callbacks); } OBJ_CLASS_INSTANCE( @@ -55,6 +57,27 @@ OBJ_CLASS_INSTANCE( orte_iof_base_endpoint_construct, orte_iof_base_endpoint_destruct); +/** + * Construct/Destructor + */ + +static void orte_iof_base_callback_construct(orte_iof_base_callback_t* cb) +{ + cb->cb_func = 0; + cb->cb_data = NULL; +} + +static void orte_iof_base_callback_destruct(orte_iof_base_callback_t* cb) +{ +} + +OBJ_CLASS_INSTANCE( + orte_iof_base_callback_t, + opal_list_item_t, + orte_iof_base_callback_construct, + orte_iof_base_callback_destruct); + + /* * Callback when non-blocking OOB send completes. @@ -243,14 +266,15 @@ int orte_iof_base_endpoint_create( endpoint->ep_tag = tag; endpoint->ep_fd = fd; - /* set file descriptor to be non-blocking */ + /* set to non-blocking */ if((flags = fcntl(fd, F_GETFL, 0)) < 0) { - opal_output(0, "orte_iof_base_endpoint_create: fcntl(F_GETFL) failed with errno=%d\n", errno); + opal_output(0, "[%s:%d]: fcntl(F_GETFL) failed with errno=%d\n", + __FILE__, __LINE__, errno); } else { flags |= O_NONBLOCK; fcntl(fd, F_SETFL, flags); } - + /* setup event handler */ switch(mode) { case ORTE_IOF_SOURCE: @@ -378,6 +402,7 @@ int orte_iof_base_endpoint_forward( orte_iof_base_msg_header_t* hdr, const unsigned char* data) { + opal_list_item_t* item; orte_iof_base_frag_t* frag; size_t len = hdr->msg_len; int rc = 0; @@ -397,26 +422,45 @@ int orte_iof_base_endpoint_forward( frag->frag_src = *src; frag->frag_hdr.hdr_msg = *hdr; - /* try to write w/out copying data */ - if(opal_list_get_size(&endpoint->ep_frags) == 0) { - rc = write(endpoint->ep_fd,data,len); - if(rc < 0 && (errno != EAGAIN && errno != EINTR)) { - orte_iof_base_endpoint_closed(endpoint); - OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); - return ORTE_SUCCESS; - } - } + /* call any registered callbacks */ + for(item = opal_list_get_first(&endpoint->ep_callbacks); + item != opal_list_get_end(&endpoint->ep_callbacks); + item = opal_list_get_next(item)) { + orte_iof_base_callback_t* cb = (orte_iof_base_callback_t*)item; + cb->cb_func( + &hdr->msg_src, + hdr->msg_tag, + cb->cb_data, + data, + hdr->msg_len); + } - frag->frag_len = len - rc; - if(frag->frag_len > 0) { - /* handle incomplete write */ - frag->frag_ptr = frag->frag_data; - memcpy(frag->frag_ptr, data+rc, frag->frag_len); - opal_list_append(&endpoint->ep_frags, &frag->super); - if(opal_list_get_size(&endpoint->ep_frags) == 1) { - opal_event_add(&endpoint->ep_event,0); + if(endpoint->ep_fd >= 0) { + /* try to write w/out copying data */ + if(opal_list_get_size(&endpoint->ep_frags) == 0) { + rc = write(endpoint->ep_fd,data,len); + if(rc < 0 && (errno != EAGAIN && errno != EINTR)) { + orte_iof_base_endpoint_closed(endpoint); + OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); + return ORTE_SUCCESS; + } + } + + frag->frag_len = len - rc; + if(frag->frag_len > 0) { + /* handle incomplete write */ + frag->frag_ptr = frag->frag_data; + memcpy(frag->frag_ptr, data+rc, frag->frag_len); + opal_list_append(&endpoint->ep_frags, &frag->super); + if(opal_list_get_size(&endpoint->ep_frags) == 1) { + opal_event_add(&endpoint->ep_event,0); + } + OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); + } else { + OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); + /* acknowledge fragment */ + orte_iof_base_frag_ack(frag); } - OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); } else { OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); /* acknowledge fragment */ @@ -426,6 +470,71 @@ int orte_iof_base_endpoint_forward( } +/** + * Register a callback + */ + +int orte_iof_base_callback_create( + const orte_process_name_t* proc, + int tag, + orte_iof_base_callback_fn_t cbfunc, + void *cbdata) +{ + orte_iof_base_callback_t* cb = OBJ_NEW(orte_iof_base_callback_t); + orte_iof_base_endpoint_t* endpoint; + if(NULL == cb) + return ORTE_ERR_OUT_OF_RESOURCE; + + OPAL_THREAD_LOCK(&orte_iof_base.iof_lock); + if((endpoint = orte_iof_base_endpoint_lookup(proc,ORTE_IOF_SINK,tag)) == NULL) { + endpoint = OBJ_NEW(orte_iof_base_endpoint_t); + if(NULL == endpoint) { + OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); + return OMPI_ERR_OUT_OF_RESOURCE; + } + endpoint->ep_name = *proc; + endpoint->ep_mode = ORTE_IOF_SINK; + endpoint->ep_tag = tag; + endpoint->ep_fd = -1; + opal_list_append(&orte_iof_base.iof_endpoints, &endpoint->super); + } else { + OBJ_RELEASE(endpoint); + } + cb->cb_func = cbfunc; + cb->cb_data = cbdata; + opal_list_append(&endpoint->ep_callbacks, (opal_list_item_t*)cb); + OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); + return ORTE_SUCCESS; +} + + +/** + * Remove a callback + */ + +int orte_iof_base_callback_delete( + const orte_process_name_t* proc, + int tag) +{ + orte_iof_base_endpoint_t* endpoint; + opal_list_item_t* item; + + OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); + if(NULL == (endpoint = orte_iof_base_endpoint_lookup(proc,ORTE_IOF_SINK, tag))) { + OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); + return ORTE_ERR_NOT_FOUND; + } + + while(NULL != (item = opal_list_remove_first(&endpoint->ep_callbacks))) { + OBJ_RELEASE(item); + } + OBJ_RELEASE(endpoint); + OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock); + return ORTE_SUCCESS; +} + + + /** * Update the acknowledged sequence number. If forwarding had * previously been disabled as the window closed, and the window diff --git a/orte/mca/iof/base/iof_base_endpoint.h b/orte/mca/iof/base/iof_base_endpoint.h index 7ec98296fe..059a8bca7b 100644 --- a/orte/mca/iof/base/iof_base_endpoint.h +++ b/orte/mca/iof/base/iof_base_endpoint.h @@ -13,6 +13,19 @@ enum { ORTE_IOF_EP_CLOSED }; +/** + * Structure store callbacks + */ + +struct orte_iof_base_callback_t { + opal_list_item_t super; + orte_iof_base_callback_fn_t cb_func; + void* cb_data; +}; +typedef struct orte_iof_base_callback_t orte_iof_base_callback_t; + +OBJ_CLASS_DECLARATION(orte_iof_base_callback_t); + /** * Structure that represents a published endpoint. */ @@ -28,6 +41,7 @@ struct orte_iof_base_endpoint_t { uint32_t ep_ack; opal_event_t ep_event; opal_list_t ep_frags; + opal_list_t ep_callbacks; }; typedef struct orte_iof_base_endpoint_t orte_iof_base_endpoint_t; @@ -56,6 +70,25 @@ int orte_iof_base_endpoint_create( int tag, int fd); +/** + * Associate a callback on receipt of data. + * + * @param name Process name corresponding to endpoint. + * @param cbfunc Logical tag for matching. + * @aram cbdata Local file descriptor corresponding to endpoint. + */ + +int orte_iof_base_callback_create( + const orte_process_name_t *name, + int tag, + orte_iof_base_callback_fn_t cbfunc, + void* cbdata); + +int orte_iof_base_callback_delete( + const orte_process_name_t *name, + int tag); + + /** * Delete all local endpoints matching the specified * name/mask/tag parameters. diff --git a/orte/mca/iof/iof.h b/orte/mca/iof/iof.h index 163d31f27f..e7fc3b98cc 100644 --- a/orte/mca/iof/iof.h +++ b/orte/mca/iof/iof.h @@ -135,7 +135,7 @@ typedef int (*orte_iof_base_buffer_fn_t)( * from a specified set of peers. */ -typedef int (*orte_iof_base_callback_fn_t)( +typedef void (*orte_iof_base_callback_fn_t)( orte_process_name_t* src_name, orte_iof_base_tag_t src_tag, void *cbdata, diff --git a/orte/mca/iof/proxy/iof_proxy.c b/orte/mca/iof/proxy/iof_proxy.c index f689860b22..7d5e6efa44 100644 --- a/orte/mca/iof/proxy/iof_proxy.c +++ b/orte/mca/iof/proxy/iof_proxy.c @@ -237,12 +237,15 @@ int orte_iof_proxy_subscribe( const orte_process_name_t* src_name, orte_ns_cmp_bitmask_t src_mask, orte_iof_base_tag_t src_tag, - orte_iof_base_callback_fn_t cb, + orte_iof_base_callback_fn_t cbfunc, void* cbdata) { int rc; /* create a local registration to reflect the callback */ + rc = orte_iof_base_callback_create(ORTE_RML_NAME_SELF,src_tag,cbfunc,cbdata); + if(rc != OMPI_SUCCESS) + return rc; /* send a subscription message to the service */ rc = orte_iof_proxy_svc_subscribe( @@ -270,8 +273,10 @@ int orte_iof_proxy_unsubscribe( ORTE_RML_NAME_SELF, ORTE_NS_CMP_ALL, src_tag); - + if(rc != OMPI_SUCCESS) + return rc; + /* remove local callback */ - return ORTE_ERROR; + return orte_iof_base_callback_delete(src_name,src_tag); } diff --git a/orte/mca/iof/svc/iof_svc.c b/orte/mca/iof/svc/iof_svc.c index 699818dd58..ea0160b9c5 100644 --- a/orte/mca/iof/svc/iof_svc.c +++ b/orte/mca/iof/svc/iof_svc.c @@ -216,15 +216,28 @@ int orte_iof_svc_buffer( */ int orte_iof_svc_subscribe( - const orte_process_name_t* src_name, + const orte_process_name_t* src_name, orte_ns_cmp_bitmask_t src_mask, orte_iof_base_tag_t src_tag, - orte_iof_base_callback_fn_t cb, + orte_iof_base_callback_fn_t cbfunc, void* cbdata) { - /* setup local callback on receipt of data */ + int rc; + + /* create a local registration to reflect the callback */ + rc = orte_iof_base_callback_create(ORTE_RML_NAME_SELF,src_tag,cbfunc,cbdata); + if(rc != OMPI_SUCCESS) + return rc; + /* setup local subscription */ - return OMPI_ERROR; + rc = orte_iof_svc_sub_create( + src_name, + src_mask, + src_tag, + ORTE_RML_NAME_SELF, + ORTE_NS_CMP_ALL, + src_tag); + return rc; } int orte_iof_svc_unsubscribe( diff --git a/orte/mca/iof/svc/iof_svc_sub.c b/orte/mca/iof/svc/iof_svc_sub.c index 6173ef3171..a65d0c6994 100644 --- a/orte/mca/iof/svc/iof_svc_sub.c +++ b/orte/mca/iof/svc/iof_svc_sub.c @@ -72,9 +72,26 @@ int orte_iof_svc_sub_create( orte_ns_cmp_bitmask_t dst_mask, orte_iof_base_tag_t dst_tag) { + orte_iof_svc_sub_t* sub; opal_list_item_t* item; - orte_iof_svc_sub_t* sub = OBJ_NEW(orte_iof_svc_sub_t); + OPAL_THREAD_LOCK(&mca_iof_svc_component.svc_lock); + for(item = opal_list_get_first(&mca_iof_svc_component.svc_subscribed); + item != opal_list_get_end(&mca_iof_svc_component.svc_subscribed); + item = opal_list_get_next(item)) { + sub = (orte_iof_svc_sub_t*)item; + if (sub->src_mask == src_mask && + orte_ns.compare(sub->src_mask,&sub->src_name,src_name) == 0 && + sub->src_tag == src_tag && + sub->dst_mask == dst_mask && + orte_ns.compare(sub->dst_mask,&sub->dst_name,dst_name) == 0 && + sub->dst_tag == dst_tag) { + OPAL_THREAD_UNLOCK(&mca_iof_svc_component.svc_lock); + return ORTE_SUCCESS; + } + } + + sub = OBJ_NEW(orte_iof_svc_sub_t); sub->src_name = *src_name; sub->src_mask = src_mask; sub->src_tag = src_tag;