From ac977827b94b7385067202bba316885562db98d9 Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Fri, 13 Aug 2004 15:15:14 +0000 Subject: [PATCH] implement the MCA_OOB_ALLOC flag for recv - in this case the library allocates a buffer for the receive and returns it to the caller - who is then responsible for freeing it. This commit was SVN r2115. --- src/mca/oob/base/base.h | 2 - src/mca/oob/cofs/src/oob_cofs.c | 14 +++++ src/mca/oob/tcp/oob_tcp_peer.c | 30 +++++++--- src/mca/oob/tcp/oob_tcp_recv.c | 34 ++++++++---- test/mca/oob/cofs/Makefile.am | 13 ----- test/mca/oob/cofs/oob_cofs_test.c | 92 ------------------------------- test/mca/oob/oob_test.c | 88 ++++++++++++++++------------- test/support/support.c | 4 +- 8 files changed, 113 insertions(+), 164 deletions(-) delete mode 100644 test/mca/oob/cofs/Makefile.am delete mode 100644 test/mca/oob/cofs/oob_cofs_test.c diff --git a/src/mca/oob/base/base.h b/src/mca/oob/base/base.h index 710be752f3..d9cd3aa240 100644 --- a/src/mca/oob/base/base.h +++ b/src/mca/oob/base/base.h @@ -79,11 +79,9 @@ typedef enum { * message w/out removing the message from the queue. */ #define MCA_OOB_TRUNC 0x02 /**< flag to oob_recv to return the actual size of the message even if * the receive buffer is smaller than the number of bytes available */ -#if 0 /* NOT YET IMPLEMENTED */ #define MCA_OOB_ALLOC 0x04 /**< flag to oob_recv to request the oob to allocate a buffer of the appropriate * size for the receive and return the allocated buffer and size in the first * element of the iovec array. */ -#endif #if defined(c_plusplus) || defined(__cplusplus) extern "C" { diff --git a/src/mca/oob/cofs/src/oob_cofs.c b/src/mca/oob/cofs/src/oob_cofs.c index 6ce2854937..d505a30602 100644 --- a/src/mca/oob/cofs/src/oob_cofs.c +++ b/src/mca/oob/cofs/src/oob_cofs.c @@ -231,8 +231,22 @@ do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, struct iovec* iov, return OMPI_ERROR; } + if(flags & MCA_OOB_ALLOC) { + if(NULL == iov || 0 == count) { + return OMPI_ERR_BAD_PARAM; + } + iov->iov_base = malloc(size); + if(NULL == iov->iov_base) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + iov->iov_len = size; + count = 1; + } + if(iov != NULL && count > 0) { rlen = readv(fd, iov, count); + } else { + rlen = 0; } close(fd); return (flags & MCA_OOB_TRUNC) ? size : rlen; diff --git a/src/mca/oob/tcp/oob_tcp_peer.c b/src/mca/oob/tcp/oob_tcp_peer.c index 2c24612eda..b670e98116 100644 --- a/src/mca/oob/tcp/oob_tcp_peer.c +++ b/src/mca/oob/tcp/oob_tcp_peer.c @@ -461,14 +461,30 @@ static void mca_oob_tcp_peer_recv_progress(mca_oob_tcp_peer_t* peer, mca_oob_tcp post = mca_oob_tcp_msg_match_post(&peer->peer_name, msg->msg_hdr.msg_tag,true); if(NULL != post) { - /* copy msg data into posted recv */ - post->msg_rc = mca_oob_tcp_msg_copy(msg, post->msg_uiov, post->msg_ucnt); - if(post->msg_flags & MCA_OOB_TRUNC) { - int i, size = 0; - for(i=0; imsg_rwcnt; i++) - size += msg->msg_rwiov[i].iov_len; - post->msg_rc = size; + if(post->msg_flags & MCA_OOB_ALLOC) { + + /* set the users iovec struct to point to pre-allocated buffer */ + if(NULL == post->msg_uiov || 0 == post->msg_ucnt) { + post->msg_rc = OMPI_ERR_BAD_PARAM; + } else { + post->msg_uiov[0].iov_base = msg->msg_rwiov->iov_base; + post->msg_uiov[0].iov_len = msg->msg_rwiov->iov_len; + msg->msg_rwbuf = NULL; + post->msg_rc = msg->msg_rwiov->iov_len; + } + + } else { + + /* copy msg data into posted recv */ + post->msg_rc = mca_oob_tcp_msg_copy(msg, post->msg_uiov, post->msg_ucnt); + if(post->msg_flags & MCA_OOB_TRUNC) { + int i, size = 0; + for(i=0; imsg_rwcnt; i++) + size += msg->msg_rwiov[i].iov_len; + post->msg_rc = size; + } } + if(post->msg_flags & MCA_OOB_PEEK) { /* will need message for actual receive */ ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, &msg->super); diff --git a/src/mca/oob/tcp/oob_tcp_recv.c b/src/mca/oob/tcp/oob_tcp_recv.c index 1ce05db5f3..c26f10977a 100644 --- a/src/mca/oob/tcp/oob_tcp_recv.c +++ b/src/mca/oob/tcp/oob_tcp_recv.c @@ -30,22 +30,36 @@ int mca_oob_tcp_recv( msg = mca_oob_tcp_msg_match_recv(peer, tag); if(NULL != msg) { - /* if we are just doing peek, return bytes without dequeing message */ if(msg->msg_rc < 0) { OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); return msg->msg_rc; } + + /* if we are returning an allocated buffer - just take it from the message */ + if(flags & MCA_OOB_ALLOC) { - rc = mca_oob_tcp_msg_copy(msg, iov, count); - if(rc >= 0 && MCA_OOB_TRUNC & flags) { - rc = 0; - for(i=0; imsg_rwcnt; i++) - rc += msg->msg_rwiov[i].iov_len; - } - if(MCA_OOB_PEEK & flags) { - OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); - return rc; + if(NULL == iov || 0 == count) { + return OMPI_ERR_BAD_PARAM; + } + iov[0].iov_base = msg->msg_rwiov->iov_base; + iov[0].iov_len = msg->msg_rwiov->iov_len; + msg->msg_rwbuf = NULL; + + } else { + + /* if we are just doing peek, return bytes without dequeing message */ + rc = mca_oob_tcp_msg_copy(msg, iov, count); + if(rc >= 0 && MCA_OOB_TRUNC & flags) { + rc = 0; + for(i=0; imsg_rwcnt; i++) + rc += msg->msg_rwiov[i].iov_len; + } + if(MCA_OOB_PEEK & flags) { + OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); + return rc; + } } + if(NULL != tagp) { *tagp = ntohl(msg->msg_hdr.msg_tag); } diff --git a/test/mca/oob/cofs/Makefile.am b/test/mca/oob/cofs/Makefile.am deleted file mode 100644 index 7631130780..0000000000 --- a/test/mca/oob/cofs/Makefile.am +++ /dev/null @@ -1,13 +0,0 @@ -# -*- makefile -*- -# -# $HEADER$ -# - -include $(top_srcdir)/config/Makefile.options - -noinst_PROGRAMS = oob_cofs_test - -oob_cofs_test_SOURCES = oob_cofs_test.c -oob_cofs_test_LDADD = \ - ../../../../src/mca/ompi/oob/cofs/libmca_ompi_oob_cofs.la \ - ../../../../src/libompi.la diff --git a/test/mca/oob/cofs/oob_cofs_test.c b/test/mca/oob/cofs/oob_cofs_test.c deleted file mode 100644 index 30b9d59d8d..0000000000 --- a/test/mca/oob/cofs/oob_cofs_test.c +++ /dev/null @@ -1,92 +0,0 @@ -#include "ompi/runtime/runtime.h" -#include "mca/ompi/oob/oob.h" -#include "mca/ompi/pcm/pcm.h" -#include "mca/ompi/base/base.h" - -#include -#include -#include -#include - -int -main(int argc, char* argv[]) -{ - int ret; - mca_pcm_proc_t *procs; - size_t nprocs; - mca_pcm_proc_t *me; - int left_vpid, right_vpid, me_vpid; - int count = 2; - ompi_job_handle_t job; - int data = 0xDEADBEEF; - int tag = MCA_OOB_ANY_TAG; - bool threads, hidden; - - printf("hello, world!\n"); - ret = ompi_init(argc, argv); - assert(ret == OMPI_SUCCESS); - - ret = mca_base_open(); - assert(ret == OMPI_SUCCESS); - - ret = ompi_rte_init(&threads, &hidden); - assert(ret == OMPI_SUCCESS); - - ret = mca_pcm.pcm_proc_startup(); - assert(ret == OMPI_SUCCESS); - - ret = mca_pcm.pcm_proc_get_peers(&procs, &nprocs); - assert(ret == OMPI_SUCCESS); - - job = mca_pcm.pcm_handle_get(); - assert(job != NULL); - - me = mca_pcm.pcm_proc_get_me(); - assert(me != NULL); - - /* time to play the ring game! */ - me_vpid = me->vpid; - printf("Hello, World. I am vpid %d\n", me_vpid); - - left_vpid = me_vpid == 0 ? nprocs - 1 : me_vpid - 1; - right_vpid = (me_vpid + 1) % nprocs; - - if (me_vpid == 0) { - printf("vpid %d sending to vpid %d\n", me_vpid, right_vpid); - ret = mca_oob.oob_send(job, right_vpid, 0, &data, sizeof(int)); - assert(ret == OMPI_SUCCESS); - count--; - } - - while (count > 0) { - int *data_ptr; - size_t data_ptr_len; - printf("vpid %d recving from vpid %d\n", me_vpid, left_vpid); - ret = mca_oob.oob_recv(job, left_vpid, &tag, &data_ptr, &data_ptr_len); - assert(ret == OMPI_SUCCESS); - assert(data_ptr_len == sizeof(int)); - assert(*data_ptr == data); - - printf("vpid %d sending to vpid %d\n", me_vpid, right_vpid); - ret = mca_oob.oob_send(job, right_vpid, 0, &data, sizeof(int)); - assert(ret == OMPI_SUCCESS); - - count--; - } - - - if (me_vpid == 0) { - int *data_ptr; - size_t data_ptr_len; - printf("vpid %d recving from vpid %d\n", me_vpid, left_vpid); - ret = mca_oob.oob_recv(job, left_vpid, &tag, &data_ptr, &data_ptr_len); - assert(ret == OMPI_SUCCESS); - assert(data_ptr_len == sizeof(int)); - assert(*data_ptr == data); - } - - ret = ompi_rte_finalize(); - assert(ret == OMPI_SUCCESS); - - return 0; -} diff --git a/test/mca/oob/oob_test.c b/test/mca/oob/oob_test.c index bafa8e722a..e321f7c23d 100644 --- a/test/mca/oob/oob_test.c +++ b/test/mca/oob/oob_test.c @@ -21,7 +21,7 @@ #define NUM_TIMES 1 int i; -bool testdone[NUM_TESTS * NUM_TIMES]; +int testdone = 0; void do_sends(ompi_process_name_t * peer); void do_recvs(ompi_process_name_t * peer); @@ -42,23 +42,16 @@ bool compare_iovec(const struct iovec * msg1, const struct iovec * msg2, } -void callback(int status, const ompi_process_name_t * peer, - const struct iovec * msg, int count, int tag, void * cbdata); +void callback(int status, ompi_process_name_t * peer, + struct iovec * msg, int count, int tag, void * cbdata); -void callback(int status, const ompi_process_name_t * peer, - const struct iovec * msg, int count, int tag, void * cbdata) +void callback(int status, ompi_process_name_t * peer, + struct iovec * msg, int count, int tag, void * cbdata) { if(0 != tag) { test_failure("Bad tag."); } - if(((int) cbdata) >= NUM_TESTS * NUM_TIMES) { - test_failure("Bad value in callback function."); - } else if (testdone[(int) cbdata]) { - test_failure("Callback function called on an already completed test."); - } else { - testdone[(int) cbdata] = true; - test_success(); - } + testdone++; } /* data */ @@ -79,12 +72,6 @@ struct iovec send_msg2[3] = {{(void *) &msg_type_2, sizeof(msg_type_2)}, {(void *) &send2, sizeof(send2)}, {(void *) &send3, sizeof(send3)}}; -/* if we want the send/ recieve functions to do the packing for us, - * we have to provide an array that describes our data types - */ -/* mca_oob_base_type_t types[] = {MCA_OOB_BASE_INT32, MCA_OOB_BASE_BYTE, */ -/* MCA_OOB_BASE_INT32, MCA_OOB_BASE_INT16}; */ - /* we'll pass the array an identical iovec */ uint32_t msg_type; char recv1[6]; @@ -107,16 +94,13 @@ int main(int argc, char ** argv) int n; MPI_Init(&argc, &argv); - for(i = 0; i < NUM_TESTS * NUM_TIMES; i++) { - testdone[i] = false; - } /* setup peer address */ peer = mca_oob_name_self; fprintf(stderr, "my vpid %d my jobid %d my cellid %d my pid %d\n", peer.vpid, peer.jobid, peer.cellid, getpid()); if(peer.vpid == 1) { - test_init("oob send then recieve"); + test_init("oob send then receive"); /* local vpid is 1 - peer is 0 */ peer.vpid = 0; for(i = 0; i < NUM_TIMES; i++) { @@ -124,7 +108,7 @@ int main(int argc, char ** argv) do_recvs(&peer); } } else { - test_init("oob recieve then send"); + test_init("oob receive then send"); /* local vpid is 0 - peer is 1 */ peer.vpid = 1; for(i = 0; i < NUM_TIMES; i++) { @@ -137,11 +121,8 @@ int main(int argc, char ** argv) /* done */ n = 0; while(!all_complete && n < 10) { - all_complete = true; - for(i = 0; i < NUM_TESTS * NUM_TIMES; i++) { - if(!testdone[i]) { - all_complete = false; - } + if(testdone == NUM_TIMES*4) { + all_complete = true; } if(!all_complete) { sleep(1); @@ -149,7 +130,8 @@ int main(int argc, char ** argv) n++; } if(!all_complete) { - test_failure("not all sends or recieves were completed"); + test_failure("not all sends or receives were completed"); + fprintf(stderr, "%d != %d\n", testdone, NUM_TIMES); } test_finalize(); /* this is to give the oob time to finish all sends */ @@ -159,8 +141,7 @@ int main(int argc, char ** argv) void do_sends(ompi_process_name_t * peer) { /* non blocking send without doing any packing */ - if( 0 > mca_oob_send_nb(peer, send_msg1, 4, 0, 0, &callback, - (void *) (0 + (NUM_TESTS * i)))) { + if( 0 > mca_oob_send_nb(peer, send_msg1, 4, 0, 0, &callback, (void *) (0 + (NUM_TESTS * i)))) { test_failure("mca_oob_send_nb."); } else { test_success(); @@ -172,7 +153,6 @@ void do_sends(ompi_process_name_t * peer) { test_success(); } - /* blocking send */ if( 0 > mca_oob_send(peer, send_msg2, 3, 0, 0)) { test_failure("mca_oob_send."); @@ -184,12 +164,20 @@ void do_sends(ompi_process_name_t * peer) { } else { test_success(); } - + if( 0 > mca_oob_send(peer, send_msg2, 3, 0, 0)) { + test_failure("mca_oob_send."); + } else { + test_success(); + } } void do_recvs(ompi_process_name_t * peer) { - /*first, we'll recieve the nonpacked send - assuming we know the - * message type */ + struct iovec iov[1]; + int index; + unsigned char* ptr; + + /* first, we'll receive the nonpacked send - assuming we know the + * message type */ if( 0 > mca_oob_recv_nb(peer, recv_msg1, 4, 0, 0, &callback, (void *) (4 + (NUM_TESTS * i)))) { test_failure("mca_oob_recv_nb."); @@ -202,7 +190,7 @@ void do_recvs(ompi_process_name_t * peer) { test_success(); } if(!compare_iovec(recv_msg1, send_msg1, 4)) { - test_failure("compare 1 is wrong"); + test_failure("compare 1 is wrong"); } /* now we'll do a blocking recv - waiting for the 3rd message to arrive @@ -239,11 +227,33 @@ void do_recvs(ompi_process_name_t * peer) { test_failure("Message peek did not return a valid type number."); break; } - if( 0 > mca_oob_recv_nb(peer, recv_msg2, 3, 0, 0, &callback, - (void *) (6 + (NUM_TESTS * i)))) { + + /* now we'll do a blocking recv - and have the buffer allocated and returned */ + if( 0 > mca_oob_recv(peer, iov, 1, NULL, MCA_OOB_ALLOC)) { + test_failure("mca_oob_recv(MCA_OOB_ALLOC)"); + } else { + test_success(); + } + + /* validate the data received with an allocated buffer */ + ptr = iov->iov_base; + for(index=0; index < (sizeof(send_msg2) / sizeof(struct iovec)); index++) { + struct iovec *iov = &send_msg2[index]; + if(memcmp(iov->iov_base, ptr, iov->iov_len) != 0) { + test_failure("mca_oob_recv(MCA_OOB_ALLOC)"); + } else { + test_success(); + } + ptr += iov->iov_len; + } + + if( 0 > mca_oob_recv_nb(peer, recv_msg2, 3, 0, 0, &callback, 0)) { test_failure("mca_oob_recv_nb."); } else { test_success(); } + + /* validate the data */ + } diff --git a/test/support/support.c b/test/support/support.c index da62b4d1a1..a5f6040af4 100644 --- a/test/support/support.c +++ b/test/support/support.c @@ -52,7 +52,9 @@ void test_failure(char *a) ompi_n_tests++; ompi_n_failures++; - fprintf(stderr, " Failure : %s\n", a); + fprintf(stderr, " Failure : "); + fprintf(stderr, a); + fprintf(stderr, "\n"); fflush(stderr); }