1
1

implement the MCA_OOB_ALLOC flag for recv - in this case the library allocates a buffer

for the receive and returns it to the caller - who is then responsible for freeing it.

This commit was SVN r2115.
Этот коммит содержится в:
Tim Woodall 2004-08-13 15:15:14 +00:00
родитель 4f7febc161
Коммит ac977827b9
8 изменённых файлов: 113 добавлений и 164 удалений

Просмотреть файл

@ -79,11 +79,9 @@ typedef enum {
* message w/out removing the message from the queue. */ * message w/out removing the message from the queue. */
#define MCA_OOB_TRUNC 0x02 /**< flag to oob_recv to return the actual size of the message even if #define MCA_OOB_TRUNC 0x02 /**< flag to oob_recv to return the actual size of the message even if
* the receive buffer is smaller than the number of bytes available */ * the receive buffer is smaller than the number of bytes available */
#if 0 /* NOT YET IMPLEMENTED */
#define MCA_OOB_ALLOC 0x04 /**< flag to oob_recv to request the oob to allocate a buffer of the appropriate #define MCA_OOB_ALLOC 0x04 /**< flag to oob_recv to request the oob to allocate a buffer of the appropriate
* size for the receive and return the allocated buffer and size in the first * size for the receive and return the allocated buffer and size in the first
* element of the iovec array. */ * element of the iovec array. */
#endif
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
extern "C" { extern "C" {

Просмотреть файл

@ -231,8 +231,22 @@ do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, struct iovec* iov,
return OMPI_ERROR; return OMPI_ERROR;
} }
if(flags & MCA_OOB_ALLOC) {
if(NULL == iov || 0 == count) {
return OMPI_ERR_BAD_PARAM;
}
iov->iov_base = malloc(size);
if(NULL == iov->iov_base) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
iov->iov_len = size;
count = 1;
}
if(iov != NULL && count > 0) { if(iov != NULL && count > 0) {
rlen = readv(fd, iov, count); rlen = readv(fd, iov, count);
} else {
rlen = 0;
} }
close(fd); close(fd);
return (flags & MCA_OOB_TRUNC) ? size : rlen; return (flags & MCA_OOB_TRUNC) ? size : rlen;

Просмотреть файл

@ -461,14 +461,30 @@ static void mca_oob_tcp_peer_recv_progress(mca_oob_tcp_peer_t* peer, mca_oob_tcp
post = mca_oob_tcp_msg_match_post(&peer->peer_name, msg->msg_hdr.msg_tag,true); post = mca_oob_tcp_msg_match_post(&peer->peer_name, msg->msg_hdr.msg_tag,true);
if(NULL != post) { if(NULL != post) {
/* copy msg data into posted recv */ if(post->msg_flags & MCA_OOB_ALLOC) {
post->msg_rc = mca_oob_tcp_msg_copy(msg, post->msg_uiov, post->msg_ucnt);
if(post->msg_flags & MCA_OOB_TRUNC) { /* set the users iovec struct to point to pre-allocated buffer */
int i, size = 0; if(NULL == post->msg_uiov || 0 == post->msg_ucnt) {
for(i=0; i<msg->msg_rwcnt; i++) post->msg_rc = OMPI_ERR_BAD_PARAM;
size += msg->msg_rwiov[i].iov_len; } else {
post->msg_rc = size; post->msg_uiov[0].iov_base = msg->msg_rwiov->iov_base;
post->msg_uiov[0].iov_len = msg->msg_rwiov->iov_len;
msg->msg_rwbuf = NULL;
post->msg_rc = msg->msg_rwiov->iov_len;
}
} else {
/* copy msg data into posted recv */
post->msg_rc = mca_oob_tcp_msg_copy(msg, post->msg_uiov, post->msg_ucnt);
if(post->msg_flags & MCA_OOB_TRUNC) {
int i, size = 0;
for(i=0; i<msg->msg_rwcnt; i++)
size += msg->msg_rwiov[i].iov_len;
post->msg_rc = size;
}
} }
if(post->msg_flags & MCA_OOB_PEEK) { if(post->msg_flags & MCA_OOB_PEEK) {
/* will need message for actual receive */ /* will need message for actual receive */
ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, &msg->super); ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, &msg->super);

Просмотреть файл

@ -30,22 +30,36 @@ int mca_oob_tcp_recv(
msg = mca_oob_tcp_msg_match_recv(peer, tag); msg = mca_oob_tcp_msg_match_recv(peer, tag);
if(NULL != msg) { if(NULL != msg) {
/* if we are just doing peek, return bytes without dequeing message */
if(msg->msg_rc < 0) { if(msg->msg_rc < 0) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
return msg->msg_rc; return msg->msg_rc;
} }
rc = mca_oob_tcp_msg_copy(msg, iov, count); /* if we are returning an allocated buffer - just take it from the message */
if(rc >= 0 && MCA_OOB_TRUNC & flags) { if(flags & MCA_OOB_ALLOC) {
rc = 0;
for(i=0; i<msg->msg_rwcnt; i++) if(NULL == iov || 0 == count) {
rc += msg->msg_rwiov[i].iov_len; return OMPI_ERR_BAD_PARAM;
} }
if(MCA_OOB_PEEK & flags) { iov[0].iov_base = msg->msg_rwiov->iov_base;
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); iov[0].iov_len = msg->msg_rwiov->iov_len;
return rc; msg->msg_rwbuf = NULL;
} else {
/* if we are just doing peek, return bytes without dequeing message */
rc = mca_oob_tcp_msg_copy(msg, iov, count);
if(rc >= 0 && MCA_OOB_TRUNC & flags) {
rc = 0;
for(i=0; i<msg->msg_rwcnt; i++)
rc += msg->msg_rwiov[i].iov_len;
}
if(MCA_OOB_PEEK & flags) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
return rc;
}
} }
if(NULL != tagp) { if(NULL != tagp) {
*tagp = ntohl(msg->msg_hdr.msg_tag); *tagp = ntohl(msg->msg_hdr.msg_tag);
} }

Просмотреть файл

@ -1,13 +0,0 @@
# -*- makefile -*-
#
# $HEADER$
#
include $(top_srcdir)/config/Makefile.options
noinst_PROGRAMS = oob_cofs_test
oob_cofs_test_SOURCES = oob_cofs_test.c
oob_cofs_test_LDADD = \
../../../../src/mca/ompi/oob/cofs/libmca_ompi_oob_cofs.la \
../../../../src/libompi.la

Просмотреть файл

@ -1,92 +0,0 @@
#include "ompi/runtime/runtime.h"
#include "mca/ompi/oob/oob.h"
#include "mca/ompi/pcm/pcm.h"
#include "mca/ompi/base/base.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
int
main(int argc, char* argv[])
{
int ret;
mca_pcm_proc_t *procs;
size_t nprocs;
mca_pcm_proc_t *me;
int left_vpid, right_vpid, me_vpid;
int count = 2;
ompi_job_handle_t job;
int data = 0xDEADBEEF;
int tag = MCA_OOB_ANY_TAG;
bool threads, hidden;
printf("hello, world!\n");
ret = ompi_init(argc, argv);
assert(ret == OMPI_SUCCESS);
ret = mca_base_open();
assert(ret == OMPI_SUCCESS);
ret = ompi_rte_init(&threads, &hidden);
assert(ret == OMPI_SUCCESS);
ret = mca_pcm.pcm_proc_startup();
assert(ret == OMPI_SUCCESS);
ret = mca_pcm.pcm_proc_get_peers(&procs, &nprocs);
assert(ret == OMPI_SUCCESS);
job = mca_pcm.pcm_handle_get();
assert(job != NULL);
me = mca_pcm.pcm_proc_get_me();
assert(me != NULL);
/* time to play the ring game! */
me_vpid = me->vpid;
printf("Hello, World. I am vpid %d\n", me_vpid);
left_vpid = me_vpid == 0 ? nprocs - 1 : me_vpid - 1;
right_vpid = (me_vpid + 1) % nprocs;
if (me_vpid == 0) {
printf("vpid %d sending to vpid %d\n", me_vpid, right_vpid);
ret = mca_oob.oob_send(job, right_vpid, 0, &data, sizeof(int));
assert(ret == OMPI_SUCCESS);
count--;
}
while (count > 0) {
int *data_ptr;
size_t data_ptr_len;
printf("vpid %d recving from vpid %d\n", me_vpid, left_vpid);
ret = mca_oob.oob_recv(job, left_vpid, &tag, &data_ptr, &data_ptr_len);
assert(ret == OMPI_SUCCESS);
assert(data_ptr_len == sizeof(int));
assert(*data_ptr == data);
printf("vpid %d sending to vpid %d\n", me_vpid, right_vpid);
ret = mca_oob.oob_send(job, right_vpid, 0, &data, sizeof(int));
assert(ret == OMPI_SUCCESS);
count--;
}
if (me_vpid == 0) {
int *data_ptr;
size_t data_ptr_len;
printf("vpid %d recving from vpid %d\n", me_vpid, left_vpid);
ret = mca_oob.oob_recv(job, left_vpid, &tag, &data_ptr, &data_ptr_len);
assert(ret == OMPI_SUCCESS);
assert(data_ptr_len == sizeof(int));
assert(*data_ptr == data);
}
ret = ompi_rte_finalize();
assert(ret == OMPI_SUCCESS);
return 0;
}

Просмотреть файл

@ -21,7 +21,7 @@
#define NUM_TIMES 1 #define NUM_TIMES 1
int i; int i;
bool testdone[NUM_TESTS * NUM_TIMES]; int testdone = 0;
void do_sends(ompi_process_name_t * peer); void do_sends(ompi_process_name_t * peer);
void do_recvs(ompi_process_name_t * peer); void do_recvs(ompi_process_name_t * peer);
@ -42,23 +42,16 @@ bool compare_iovec(const struct iovec * msg1, const struct iovec * msg2,
} }
void callback(int status, const ompi_process_name_t * peer, void callback(int status, ompi_process_name_t * peer,
const struct iovec * msg, int count, int tag, void * cbdata); struct iovec * msg, int count, int tag, void * cbdata);
void callback(int status, const ompi_process_name_t * peer, void callback(int status, ompi_process_name_t * peer,
const struct iovec * msg, int count, int tag, void * cbdata) struct iovec * msg, int count, int tag, void * cbdata)
{ {
if(0 != tag) { if(0 != tag) {
test_failure("Bad tag."); test_failure("Bad tag.");
} }
if(((int) cbdata) >= NUM_TESTS * NUM_TIMES) { testdone++;
test_failure("Bad value in callback function.");
} else if (testdone[(int) cbdata]) {
test_failure("Callback function called on an already completed test.");
} else {
testdone[(int) cbdata] = true;
test_success();
}
} }
/* data */ /* data */
@ -79,12 +72,6 @@ struct iovec send_msg2[3] = {{(void *) &msg_type_2, sizeof(msg_type_2)},
{(void *) &send2, sizeof(send2)}, {(void *) &send2, sizeof(send2)},
{(void *) &send3, sizeof(send3)}}; {(void *) &send3, sizeof(send3)}};
/* if we want the send/ recieve functions to do the packing for us,
* we have to provide an array that describes our data types
*/
/* mca_oob_base_type_t types[] = {MCA_OOB_BASE_INT32, MCA_OOB_BASE_BYTE, */
/* MCA_OOB_BASE_INT32, MCA_OOB_BASE_INT16}; */
/* we'll pass the array an identical iovec */ /* we'll pass the array an identical iovec */
uint32_t msg_type; uint32_t msg_type;
char recv1[6]; char recv1[6];
@ -107,16 +94,13 @@ int main(int argc, char ** argv)
int n; int n;
MPI_Init(&argc, &argv); MPI_Init(&argc, &argv);
for(i = 0; i < NUM_TESTS * NUM_TIMES; i++) {
testdone[i] = false;
}
/* setup peer address */ /* setup peer address */
peer = mca_oob_name_self; peer = mca_oob_name_self;
fprintf(stderr, "my vpid %d my jobid %d my cellid %d my pid %d\n", fprintf(stderr, "my vpid %d my jobid %d my cellid %d my pid %d\n",
peer.vpid, peer.jobid, peer.cellid, getpid()); peer.vpid, peer.jobid, peer.cellid, getpid());
if(peer.vpid == 1) { if(peer.vpid == 1) {
test_init("oob send then recieve"); test_init("oob send then receive");
/* local vpid is 1 - peer is 0 */ /* local vpid is 1 - peer is 0 */
peer.vpid = 0; peer.vpid = 0;
for(i = 0; i < NUM_TIMES; i++) { for(i = 0; i < NUM_TIMES; i++) {
@ -124,7 +108,7 @@ int main(int argc, char ** argv)
do_recvs(&peer); do_recvs(&peer);
} }
} else { } else {
test_init("oob recieve then send"); test_init("oob receive then send");
/* local vpid is 0 - peer is 1 */ /* local vpid is 0 - peer is 1 */
peer.vpid = 1; peer.vpid = 1;
for(i = 0; i < NUM_TIMES; i++) { for(i = 0; i < NUM_TIMES; i++) {
@ -137,11 +121,8 @@ int main(int argc, char ** argv)
/* done */ /* done */
n = 0; n = 0;
while(!all_complete && n < 10) { while(!all_complete && n < 10) {
all_complete = true; if(testdone == NUM_TIMES*4) {
for(i = 0; i < NUM_TESTS * NUM_TIMES; i++) { all_complete = true;
if(!testdone[i]) {
all_complete = false;
}
} }
if(!all_complete) { if(!all_complete) {
sleep(1); sleep(1);
@ -149,7 +130,8 @@ int main(int argc, char ** argv)
n++; n++;
} }
if(!all_complete) { if(!all_complete) {
test_failure("not all sends or recieves were completed"); test_failure("not all sends or receives were completed");
fprintf(stderr, "%d != %d\n", testdone, NUM_TIMES);
} }
test_finalize(); test_finalize();
/* this is to give the oob time to finish all sends */ /* this is to give the oob time to finish all sends */
@ -159,8 +141,7 @@ int main(int argc, char ** argv)
void do_sends(ompi_process_name_t * peer) { void do_sends(ompi_process_name_t * peer) {
/* non blocking send without doing any packing */ /* non blocking send without doing any packing */
if( 0 > mca_oob_send_nb(peer, send_msg1, 4, 0, 0, &callback, if( 0 > mca_oob_send_nb(peer, send_msg1, 4, 0, 0, &callback, (void *) (0 + (NUM_TESTS * i)))) {
(void *) (0 + (NUM_TESTS * i)))) {
test_failure("mca_oob_send_nb."); test_failure("mca_oob_send_nb.");
} else { } else {
test_success(); test_success();
@ -172,7 +153,6 @@ void do_sends(ompi_process_name_t * peer) {
test_success(); test_success();
} }
/* blocking send */ /* blocking send */
if( 0 > mca_oob_send(peer, send_msg2, 3, 0, 0)) { if( 0 > mca_oob_send(peer, send_msg2, 3, 0, 0)) {
test_failure("mca_oob_send."); test_failure("mca_oob_send.");
@ -184,12 +164,20 @@ void do_sends(ompi_process_name_t * peer) {
} else { } else {
test_success(); test_success();
} }
if( 0 > mca_oob_send(peer, send_msg2, 3, 0, 0)) {
test_failure("mca_oob_send.");
} else {
test_success();
}
} }
void do_recvs(ompi_process_name_t * peer) { void do_recvs(ompi_process_name_t * peer) {
/*first, we'll recieve the nonpacked send - assuming we know the struct iovec iov[1];
* message type */ int index;
unsigned char* ptr;
/* first, we'll receive the nonpacked send - assuming we know the
* message type */
if( 0 > mca_oob_recv_nb(peer, recv_msg1, 4, 0, 0, &callback, if( 0 > mca_oob_recv_nb(peer, recv_msg1, 4, 0, 0, &callback,
(void *) (4 + (NUM_TESTS * i)))) { (void *) (4 + (NUM_TESTS * i)))) {
test_failure("mca_oob_recv_nb."); test_failure("mca_oob_recv_nb.");
@ -202,7 +190,7 @@ void do_recvs(ompi_process_name_t * peer) {
test_success(); test_success();
} }
if(!compare_iovec(recv_msg1, send_msg1, 4)) { if(!compare_iovec(recv_msg1, send_msg1, 4)) {
test_failure("compare 1 is wrong"); test_failure("compare 1 is wrong");
} }
/* now we'll do a blocking recv - waiting for the 3rd message to arrive /* now we'll do a blocking recv - waiting for the 3rd message to arrive
@ -239,11 +227,33 @@ void do_recvs(ompi_process_name_t * peer) {
test_failure("Message peek did not return a valid type number."); test_failure("Message peek did not return a valid type number.");
break; break;
} }
if( 0 > mca_oob_recv_nb(peer, recv_msg2, 3, 0, 0, &callback,
(void *) (6 + (NUM_TESTS * i)))) { /* now we'll do a blocking recv - and have the buffer allocated and returned */
if( 0 > mca_oob_recv(peer, iov, 1, NULL, MCA_OOB_ALLOC)) {
test_failure("mca_oob_recv(MCA_OOB_ALLOC)");
} else {
test_success();
}
/* validate the data received with an allocated buffer */
ptr = iov->iov_base;
for(index=0; index < (sizeof(send_msg2) / sizeof(struct iovec)); index++) {
struct iovec *iov = &send_msg2[index];
if(memcmp(iov->iov_base, ptr, iov->iov_len) != 0) {
test_failure("mca_oob_recv(MCA_OOB_ALLOC)");
} else {
test_success();
}
ptr += iov->iov_len;
}
if( 0 > mca_oob_recv_nb(peer, recv_msg2, 3, 0, 0, &callback, 0)) {
test_failure("mca_oob_recv_nb."); test_failure("mca_oob_recv_nb.");
} else { } else {
test_success(); test_success();
} }
/* validate the data */
} }

Просмотреть файл

@ -52,7 +52,9 @@ void test_failure(char *a)
ompi_n_tests++; ompi_n_tests++;
ompi_n_failures++; ompi_n_failures++;
fprintf(stderr, " Failure : %s\n", a); fprintf(stderr, " Failure : ");
fprintf(stderr, a);
fprintf(stderr, "\n");
fflush(stderr); fflush(stderr);
} }