diff --git a/orte/mca/oob/tcp/oob_tcp_msg.h b/orte/mca/oob/tcp/oob_tcp_msg.h index 22da23af0c..76a65b7233 100644 --- a/orte/mca/oob/tcp/oob_tcp_msg.h +++ b/orte/mca/oob/tcp/oob_tcp_msg.h @@ -61,7 +61,6 @@ struct mca_oob_tcp_msg_t { orte_process_name_t msg_peer; /**< the name of the peer */ opal_mutex_t msg_lock; /**< lock for the condition variable */ opal_condition_t msg_condition; /**< condition variable for completion */ - opal_event_t msg_event; struct iovec msg_iov[MCA_OOB_TCP_IOV_MAX]; /** preallocate space for iovec array */ }; diff --git a/orte/mca/oob/tcp/oob_tcp_recv.c b/orte/mca/oob/tcp/oob_tcp_recv.c index 0920dc75fc..409415136d 100644 --- a/orte/mca/oob/tcp/oob_tcp_recv.c +++ b/orte/mca/oob/tcp/oob_tcp_recv.c @@ -136,74 +136,6 @@ int mca_oob_tcp_recv( return rc; } - -/** - * Evaluate posted non-blocking receive - */ - -static void mca_oob_tcp_recv_cb(int sd, short events, void* user) -{ - mca_oob_tcp_msg_t *msg = (mca_oob_tcp_msg_t*)user; - mca_oob_tcp_msg_t *match; - int i, rc; - - /* lock the tcp struct */ - OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock); - - /* check to see if a matching receive is on the list */ - match = mca_oob_tcp_msg_match_recv(&msg->msg_peer, msg->msg_hdr.msg_tag); - if(NULL != match) { - - if(match->msg_rc < 0) { - rc = match->msg_rc; - } - - /* if we are returning an allocated buffer - just take it from the message */ - else if(msg->msg_flags & MCA_OOB_ALLOC) { - msg->msg_uiov[0].iov_base = (ompi_iov_base_ptr_t)match->msg_rwbuf; - msg->msg_uiov[0].iov_len = match->msg_hdr.msg_size; - match->msg_rwbuf = NULL; - rc = match->msg_hdr.msg_size; - - } else { - - /* if we are just doing peek, return bytes without dequeing message */ - rc = mca_oob_tcp_msg_copy(match, msg->msg_uiov, msg->msg_ucnt); - if(rc >= 0 && MCA_OOB_TRUNC & msg->msg_flags) { - rc = 0; - for(i=1; imsg_rwcnt+1; i++) - rc += match->msg_rwiov[i].iov_len; - } - if(MCA_OOB_PEEK & msg->msg_flags) { - OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); - msg->msg_cbfunc(rc, - &match->msg_peer, - msg->msg_uiov, - msg->msg_ucnt, - match->msg_hdr.msg_tag, - msg->msg_cbdata); - MCA_OOB_TCP_MSG_RETURN(msg); - return; - } - } - - /* otherwise dequeue the message and return to free list */ - opal_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (opal_list_item_t *) match); - OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); - msg->msg_cbfunc(rc, - &match->msg_peer, - msg->msg_uiov, - msg->msg_ucnt, - match->msg_hdr.msg_tag, - msg->msg_cbdata); - MCA_OOB_TCP_MSG_RETURN(match); - MCA_OOB_TCP_MSG_RETURN(msg); - return; - } - opal_list_append(&mca_oob_tcp_component.tcp_msg_post, (opal_list_item_t *) msg); - OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); -} - /* * Non-blocking version of mca_oob_recv(). * @@ -225,28 +157,73 @@ int mca_oob_tcp_recv_nb( mca_oob_callback_fn_t cbfunc, void* cbdata) { - struct timeval tv; mca_oob_tcp_msg_t *msg; - int i, rc; + int i, rc, size = 0; - if(NULL == iov || 0 == count) { - return ORTE_ERR_BAD_PARAM; + /* lock the tcp struct */ + OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock); + + /* check to see if a matching receive is on the list */ + msg = mca_oob_tcp_msg_match_recv(peer, tag); + if(NULL != msg) { + + if(msg->msg_rc < 0) { + OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); + return msg->msg_rc; + } + + /* if we are returning an allocated buffer - just take it from the message */ + if(flags & MCA_OOB_ALLOC) { + + if(NULL == iov || 0 == count) { + OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); + return OMPI_ERR_BAD_PARAM; + } + iov[0].iov_base = (ompi_iov_base_ptr_t)msg->msg_rwbuf; + iov[0].iov_len = msg->msg_hdr.msg_size; + msg->msg_rwbuf = NULL; + rc = msg->msg_hdr.msg_size; + + } else { + + /* if we are just doing peek, return bytes without dequeing message */ + rc = mca_oob_tcp_msg_copy(msg, iov, count); + if(rc >= 0 && MCA_OOB_TRUNC & flags) { + rc = 0; + for(i=1; imsg_rwcnt+1; i++) + rc += msg->msg_rwiov[i].iov_len; + } + if(MCA_OOB_PEEK & flags) { + OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); + cbfunc(rc, &msg->msg_peer, iov, count, tag, cbdata); + return 0; + } + } + + /* otherwise dequeue the message and return to free list */ + opal_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (opal_list_item_t *) msg); + OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); + cbfunc(rc, &msg->msg_peer, iov, count, msg->msg_hdr.msg_tag, cbdata); + MCA_OOB_TCP_MSG_RETURN(msg); + return rc; } + /* the message has not already been received. So we add it to the receive queue */ MCA_OOB_TCP_MSG_ALLOC(msg, rc); if(NULL == msg) { + OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); return rc; } /* determine overall size of user supplied buffer */ - msg->msg_hdr.msg_size = 0; for(i = 0; i < count; i++) { - msg->msg_hdr.msg_size += iov[i].iov_len; + size += iov[i].iov_len; } /* fill in the header */ msg->msg_hdr.msg_src = *orte_process_info.my_name; msg->msg_hdr.msg_dst = *peer; + msg->msg_hdr.msg_size = size; msg->msg_hdr.msg_tag = tag; msg->msg_type = MCA_OOB_TCP_POSTED; msg->msg_rc = 0; @@ -259,12 +236,8 @@ int mca_oob_tcp_recv_nb( msg->msg_peer = *peer; msg->msg_rwbuf = NULL; msg->msg_rwiov = NULL; - - tv.tv_sec = 0; - tv.tv_usec = 0; - memset(&msg->msg_event, 0, sizeof(msg->msg_event)); - opal_evtimer_set(&msg->msg_event, mca_oob_tcp_recv_cb, msg); - opal_evtimer_add(&msg->msg_event, &tv); + opal_list_append(&mca_oob_tcp_component.tcp_msg_post, (opal_list_item_t *) msg); + OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); return 0; }