1
1

add support for a "persistent" non-blocking receive

doesn't require a re-registration on every receive

This commit was SVN r7822.
Этот коммит содержится в:
Tim Woodall 2005-10-20 22:06:11 +00:00
родитель cea599a274
Коммит 88c7fd9f8d
9 изменённых файлов: 119 добавлений и 79 удалений

Просмотреть файл

@ -286,7 +286,7 @@ int orte_gpr_proxy_module_init(void)
{
/* issue the non-blocking receive */
int rc;
rc = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_GPR_NOTIFY, 0, orte_gpr_proxy_notify_recv, NULL);
rc = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_GPR_NOTIFY, ORTE_RML_PERSISTENT, orte_gpr_proxy_notify_recv, NULL);
if(rc < 0) {
ORTE_ERROR_LOG(rc);
return rc;
@ -399,9 +399,6 @@ void orte_gpr_proxy_notify_recv(int status, orte_process_name_t* sender,
RETURN_ERROR:
/* reissue non-blocking receive */
orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_GPR_NOTIFY, 0, orte_gpr_proxy_notify_recv, NULL);
return;
}

Просмотреть файл

@ -73,9 +73,5 @@ void orte_gpr_replica_recv(int status, orte_process_name_t* sender,
}
OPAL_THREAD_UNLOCK(&orte_gpr_replica_globals.mutex);
/* reissue the non-blocking receive before returning */
orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_GPR, 0, orte_gpr_replica_recv, NULL);
return;
}

Просмотреть файл

@ -305,7 +305,8 @@ int orte_gpr_replica_module_init(void)
/* issue the non-blocking receive */
if (!orte_gpr_replica_globals.isolate) {
int rc = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_GPR, 0, orte_gpr_replica_recv, NULL);
int rc = orte_rml.recv_buffer_nb(
ORTE_RML_NAME_ANY, ORTE_RML_TAG_GPR, ORTE_RML_PERSISTENT, orte_gpr_replica_recv, NULL);
if(rc < 0) {
ORTE_ERROR_LOG(rc);
return rc;

Просмотреть файл

@ -328,7 +328,7 @@ int orte_ns_replica_module_init(void)
}
/* issue non-blocking receive for call_back function */
rc = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_NS, 0, orte_ns_replica_recv, NULL);
rc = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_NS, ORTE_RML_PERSISTENT, orte_ns_replica_recv, NULL);
if(rc < 0) {
ORTE_ERROR_LOG(rc);
return rc;
@ -628,9 +628,6 @@ RETURN_ERROR:
OBJ_DESTRUCT(&error_answer);
CLEANUP:
/* reissue the non-blocking receive */
orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY, ORTE_RML_TAG_NS, 0, orte_ns_replica_recv, NULL);
/* cleanup */
OBJ_DESTRUCT(&answer);
}

Просмотреть файл

@ -64,6 +64,7 @@ OMPI_DECLSPEC extern orte_process_name_t mca_oob_name_seed;
#define MCA_OOB_ALLOC 0x04 /**< flag to oob_recv to request the oob to allocate a buffer of the appropriate
* size for the receive and return the allocated buffer and size in the first
* element of the iovec array. */
#define MCA_OOB_PERSISTENT 0x08 /* post receive request persistently - don't remove on match */
/**

Просмотреть файл

@ -35,6 +35,7 @@ struct mca_oob_recv_cbdata {
struct iovec cbiov;
mca_oob_callback_packed_fn_t cbfunc;
void* cbdata;
bool persistent;
};
typedef struct mca_oob_recv_cbdata mca_oob_recv_cbdata_t;
@ -111,7 +112,7 @@ int mca_oob_recv_packed_nb(
memset(oob_cbdata, 0, sizeof(mca_oob_recv_cbdata_t));
oob_cbdata->cbfunc = cbfunc;
oob_cbdata->cbdata = cbdata;
oob_cbdata->persistent = (flags & MCA_OOB_PERSISTENT) ? true : false;
rc = mca_oob.oob_recv_nb(peer, &oob_cbdata->cbiov, 1, tag, flags|MCA_OOB_ALLOC, mca_oob_recv_callback, oob_cbdata);
if(rc < 0) {
free(oob_cbdata);
@ -157,6 +158,8 @@ static void mca_oob_recv_callback(
/* cleanup */
OBJ_DESTRUCT(&buffer);
free(oob_cbdata);
if(oob_cbdata->persistent == false) {
free(oob_cbdata);
}
}

Просмотреть файл

@ -383,15 +383,12 @@ static void mca_oob_tcp_msg_data(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* pee
mca_oob_tcp_msg_t* post;
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
/* queue of posted receives is stored in network byte order -
* the message header has already been converted back to host -
* so must convert back to network to match.
*/
/* match msg against posted receives */
post = mca_oob_tcp_msg_match_post(&peer->peer_name, msg->msg_hdr.msg_tag);
if(NULL != post) {
if(post->msg_flags & MCA_OOB_ALLOC) {
/* set the users iovec struct to point to pre-allocated buffer */
if(NULL == post->msg_uiov || 0 == post->msg_ucnt) {
post->msg_rc = OMPI_ERR_BAD_PARAM;
@ -401,12 +398,12 @@ static void mca_oob_tcp_msg_data(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* pee
*/
post->msg_uiov[0].iov_base = (ompi_iov_base_ptr_t)msg->msg_rwbuf;
post->msg_uiov[0].iov_len = msg->msg_hdr.msg_size;
post->msg_rc = msg->msg_hdr.msg_size;
post->msg_rc = msg->msg_hdr.msg_size;
msg->msg_rwbuf = NULL;
}
} else {
/* copy msg data into posted recv */
post->msg_rc = mca_oob_tcp_msg_copy(msg, post->msg_uiov, post->msg_ucnt);
if(post->msg_flags & MCA_OOB_TRUNC) {
@ -416,7 +413,7 @@ static void mca_oob_tcp_msg_data(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* pee
post->msg_rc = size;
}
}
if(post->msg_flags & MCA_OOB_PEEK) {
/* will need message for actual receive */
opal_list_append(&mca_oob_tcp_component.tcp_msg_recv, &msg->super);
@ -426,7 +423,17 @@ static void mca_oob_tcp_msg_data(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* pee
mca_oob_tcp_component.tcp_match_count++;
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
mca_oob_tcp_msg_complete(post, &peer->peer_name);
if(post->msg_flags & MCA_OOB_PERSISTENT) {
post->msg_cbfunc(
post->msg_rc,
&peer->peer_name,
post->msg_uiov,
post->msg_ucnt,
post->msg_hdr.msg_tag,
post->msg_cbdata);
} else {
mca_oob_tcp_msg_complete(post, &peer->peer_name);
}
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
if(--mca_oob_tcp_component.tcp_match_count == 0)
@ -529,12 +536,10 @@ mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(orte_process_name_t* name, int tag
if((0 == cmpval1) || (0 == cmpval2)) {
if (msg->msg_hdr.msg_tag == tag) {
if((msg->msg_flags & MCA_OOB_PEEK) == 0) {
if((msg->msg_flags & MCA_OOB_PERSISTENT) == 0) {
opal_list_remove_item(&mca_oob_tcp_component.tcp_msg_post, &msg->super);
return msg;
} else {
return NULL;
}
return msg;
}
}
}

Просмотреть файл

@ -136,6 +136,67 @@ int mca_oob_tcp_recv(
return rc;
}
/**
* Process a matched posted receive
*
* Note that the match lock must be held prior to the call.
*/
static void mca_oob_tcp_msg_matched(mca_oob_tcp_msg_t* msg, mca_oob_tcp_msg_t* match)
{
int i,rc;
if(match->msg_rc < 0) {
rc = match->msg_rc;
}
/* if we are returning an allocated buffer - just take it from the message */
else if(msg->msg_flags & MCA_OOB_ALLOC) {
msg->msg_uiov[0].iov_base = (ompi_iov_base_ptr_t)match->msg_rwbuf;
msg->msg_uiov[0].iov_len = match->msg_hdr.msg_size;
match->msg_rwbuf = NULL;
rc = match->msg_hdr.msg_size;
} else {
/* if we are just doing peek, return bytes without dequeing message */
rc = mca_oob_tcp_msg_copy(match, msg->msg_uiov, msg->msg_ucnt);
if(rc >= 0 && MCA_OOB_TRUNC & msg->msg_flags) {
rc = 0;
for(i=1; i<match->msg_rwcnt+1; i++)
rc += match->msg_rwiov[i].iov_len;
}
if(MCA_OOB_PEEK & msg->msg_flags) {
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
msg->msg_cbfunc(rc,
&match->msg_peer,
msg->msg_uiov,
msg->msg_ucnt,
match->msg_hdr.msg_tag,
msg->msg_cbdata);
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
return;
}
}
/* otherwise remove the match */
opal_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (opal_list_item_t *) match);
/* invoke callback */
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
msg->msg_cbfunc(rc,
&match->msg_peer,
msg->msg_uiov,
msg->msg_ucnt,
match->msg_hdr.msg_tag,
msg->msg_cbdata);
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
/* return match to free list */
MCA_OOB_TCP_MSG_RETURN(match);
}
/*
* Non-blocking version of mca_oob_recv().
*
@ -158,57 +219,15 @@ int mca_oob_tcp_recv_nb(
void* cbdata)
{
mca_oob_tcp_msg_t *msg;
mca_oob_tcp_msg_t *match;
int i, rc, size = 0;
/* lock the tcp struct */
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
/* check to see if a matching receive is on the list */
msg = mca_oob_tcp_msg_match_recv(peer, tag);
if(NULL != msg) {
if(msg->msg_rc < 0) {
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
return msg->msg_rc;
}
/* if we are returning an allocated buffer - just take it from the message */
if(flags & MCA_OOB_ALLOC) {
if(NULL == iov || 0 == count) {
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
return OMPI_ERR_BAD_PARAM;
}
iov[0].iov_base = (ompi_iov_base_ptr_t)msg->msg_rwbuf;
iov[0].iov_len = msg->msg_hdr.msg_size;
msg->msg_rwbuf = NULL;
rc = msg->msg_hdr.msg_size;
} else {
/* if we are just doing peek, return bytes without dequeing message */
rc = mca_oob_tcp_msg_copy(msg, iov, count);
if(rc >= 0 && MCA_OOB_TRUNC & flags) {
rc = 0;
for(i=1; i<msg->msg_rwcnt+1; i++)
rc += msg->msg_rwiov[i].iov_len;
}
if(MCA_OOB_PEEK & flags) {
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
cbfunc(rc, &msg->msg_peer, iov, count, tag, cbdata);
return 0;
}
}
/* otherwise dequeue the message and return to free list */
opal_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (opal_list_item_t *) msg);
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
cbfunc(rc, &msg->msg_peer, iov, count, msg->msg_hdr.msg_tag, cbdata);
MCA_OOB_TCP_MSG_RETURN(msg);
return rc;
/* validate params */
if(NULL == iov || 0 == count) {
return OMPI_ERR_BAD_PARAM;
}
/* the message has not already been received. So we add it to the receive queue */
/* allocate/initialize the posted receive */
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg) {
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
@ -236,7 +255,27 @@ int mca_oob_tcp_recv_nb(
msg->msg_peer = *peer;
msg->msg_rwbuf = NULL;
msg->msg_rwiov = NULL;
opal_list_append(&mca_oob_tcp_component.tcp_msg_post, (opal_list_item_t *) msg);
/* acquire the match lock */
OPAL_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
if(flags & MCA_OOB_PERSISTENT) {
opal_list_append(&mca_oob_tcp_component.tcp_msg_post, (opal_list_item_t *) msg);
while(NULL != (match = mca_oob_tcp_msg_match_recv(peer,tag))) {
mca_oob_tcp_msg_matched(msg, match);
}
} else {
/* check to see if a matching receive is on the list */
match = mca_oob_tcp_msg_match_recv(peer, tag);
if(NULL != match) {
mca_oob_tcp_msg_matched(msg, match);
MCA_OOB_TCP_MSG_RETURN(msg);
} else {
opal_list_append(&mca_oob_tcp_component.tcp_msg_post, (opal_list_item_t *) msg);
}
}
OPAL_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
return 0;
}

Просмотреть файл

@ -57,6 +57,7 @@ typedef uint32_t orte_rml_tag_t;
#define ORTE_RML_ALLOC 0x04 /**< flag to oob_recv to request the oob to allocate a buffer of the appropriate
* size for the receive and return the allocated buffer and size in the first
* element of the iovec array. */
#define ORTE_RML_PERSISTENT 0x08 /**< posted non-blocking recv is persistent */
/**
* The wildcard for receives from any peer.