diff --git a/src/communicator/comm_cid.c b/src/communicator/comm_cid.c index 88b3c41740..12f4a46406 100644 --- a/src/communicator/comm_cid.c +++ b/src/communicator/comm_cid.c @@ -501,25 +501,25 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf, } if (local_rank == local_leader ) { - struct iovec smsg, rmsg; - mca_oob_base_type_t otype; - - smsg.iov_base = tmpbuf; - smsg.iov_len = count * sizeof(int); - otype = MCA_OOB_BASE_INT32; + ompi_buffer_t sbuf; + ompi_buffer_t rbuf; - rmsg.iov_base = outbuf; - rmsg.iov_len = count * sizeof(int); + ompi_buffer_init(&sbuf, count * sizeof(int)); + ompi_pack(sbuf, tmpbuf, count, OMPI_INT32); - if ( send_first ) { - rc = mca_oob_send_hton (remote_leader, &smsg, &otype, 1,0,0); - rc = mca_oob_recv_ntoh (remote_leader, &rmsg, &otype, 1,0,0); + if ( send_first ) { + rc = mca_oob_send_packed(remote_leader, sbuf, 0, 0); + rc = mca_oob_recv_packed (remote_leader, &rbuf, NULL); } else { - rc = mca_oob_recv_ntoh (remote_leader, &rmsg, &otype, 1,0,0); - rc = mca_oob_send_hton (remote_leader, &smsg, &otype, 1,0,0); + rc = mca_oob_recv_packed(remote_leader, &rbuf, NULL); + rc = mca_oob_send_packed(remote_leader, sbuf, 0, 0); } + ompi_unpack(rbuf, outbuf, count, OMPI_INT32); + ompi_buffer_free(sbuf); + ompi_buffer_free(rbuf); + if ( &ompi_mpi_op_max == op ) { for ( i = 0 ; i < count; i++ ) { if (tmpbuf[i] > outbuf[i]) outbuf[i] = tmpbuf[i]; diff --git a/src/communicator/comm_dyn.c b/src/communicator/comm_dyn.c index ba1d4a4b39..384d14ee94 100644 --- a/src/communicator/comm_dyn.c +++ b/src/communicator/comm_dyn.c @@ -13,6 +13,7 @@ #include "proc/proc.h" #include "threads/mutex.h" #include "util/bit_ops.h" +#include "util/pack.h" #include "include/constants.h" #include "mca/pcm/pcm.h" #include "mca/pml/pml.h" @@ -29,8 +30,8 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, int namebuflen, rnamebuflen; char *namebuf=NULL, *rnamebuf=NULL; - struct iovec smsg[2], rmsg[2]; - mca_oob_base_type_t otype[2]; + ompi_buffer_t sbuf; + ompi_buffer_t rbuf; ompi_communicator_t *newcomp=MPI_COMM_NULL; ompi_proc_t **rprocs=NULL; ompi_group_t *group=comm->c_local_group; @@ -50,30 +51,28 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, ompi_proc_get_namebuf_by_proc (group->grp_proc_pointers, size, &namebuf, &namebuflen); - smsg[0].iov_base = &size; - smsg[0].iov_len = sizeof(int); - otype[0] = MCA_OOB_BASE_INT32; - - smsg[1].iov_base = &namebuflen; - smsg[1].iov_len = sizeof(int); - otype[1] = MCA_OOB_BASE_INT32; - - rmsg[0].iov_base = &rsize; - rmsg[0].iov_len = sizeof(int); - otype[0] = MCA_OOB_BASE_INT32; - - rmsg[1].iov_base = &rnamebuflen; - rmsg[1].iov_len = sizeof(int); - otype[1] = MCA_OOB_BASE_INT32; + ompi_buffer_init(&sbuf, 128); + ompi_pack(sbuf, &size, 1, OMPI_INT32); + ompi_pack(sbuf, &namebuflen, 1, OMPI_INT32); + ompi_pack(sbuf, &rsize, 1, OMPI_INT32); + ompi_pack(sbuf, &rnamebuflen, 1, OMPI_INT32); if ( send_first ) { - rc = mca_oob_send_hton (rport, smsg, otype, 2, 0, 0); - rc = mca_oob_recv_ntoh (rport, rmsg, otype, 2, 0, 0); + rc = mca_oob_send_packed(rport, sbuf, 0, 0); + rc = mca_oob_recv_packed (rport, &rbuf, NULL); } else { - rc = mca_oob_recv_ntoh (rport, rmsg, otype, 2, 0, 0); - rc = mca_oob_send_hton (rport, smsg, otype, 2, 0, 0); + rc = mca_oob_recv_packed(rport, &rbuf, NULL); + rc = mca_oob_send_packed(rport, sbuf, 0, 0); } + + ompi_unpack(rbuf, &size, 1, OMPI_INT32); + ompi_unpack(rbuf, &namebuflen, 1, OMPI_INT32); + ompi_unpack(rbuf, &rsize, 1, OMPI_INT32); + ompi_unpack(rbuf, &rnamebuflen, 1, OMPI_INT32); + + ompi_buffer_free(sbuf); + ompi_buffer_free(rbuf); } /* bcast the information to all processes in the local comm */ @@ -93,20 +92,24 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, if ( rank == root ) { /* Exchange list of processes in the groups */ - smsg[0].iov_base = namebuf; - smsg[0].iov_len = namebuflen; - rmsg[0].iov_base = rnamebuf; - rmsg[0].iov_len = rnamebuflen; + + ompi_buffer_init(&sbuf, 128); + ompi_pack(sbuf, namebuf, namebuflen, OMPI_BYTE); if ( send_first ) { - rc = mca_oob_send (rport, smsg, 1, 0, 0); - rc = mca_oob_recv (rport, rmsg, 1, 0, 0); + rc = mca_oob_send_packed(rport, sbuf, 0, 0); + rc = mca_oob_recv_packed (rport, &rbuf, NULL); } else { - rc = mca_oob_recv (rport, rmsg, 1, 0, 0); - rc = mca_oob_send (rport, smsg, 1, 0, 0); + rc = mca_oob_recv_packed(rport, &rbuf, NULL); + rc = mca_oob_send_packed(rport, sbuf, 0, 0); } + + ompi_unpack(rbuf, rnamebuf, rnamebuflen, OMPI_BYTE); + + ompi_buffer_free(sbuf); + ompi_buffer_free(rbuf); } /* bcast list of processes to all procs in local group @@ -211,41 +214,41 @@ ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_fi { int namebuflen, rc; char *namebuf=NULL; - - struct iovec msg; - mca_oob_base_type_t otype; ompi_proc_t **rproc; ompi_process_name_t *rport; if ( send_first ) { + ompi_buffer_t sbuf; ompi_proc_get_namebuf_by_proc(&proc, 1, &namebuf, &namebuflen ); - msg.iov_base = &namebuflen; - msg.iov_len = sizeof(int); - otype = MCA_OOB_BASE_INT32; - rc = mca_oob_send_hton (port, &msg, &otype, 1, 0, 0); - msg.iov_base = namebuf; - msg.iov_len = namebuflen; - rc = mca_oob_send (rport, &msg, 1, 0, 0); + ompi_buffer_init(&sbuf, sizeof(int)); + ompi_pack(sbuf, &namebuflen, 1, OMPI_INT32); + rc = mca_oob_send_packed(port, sbuf, 0, 0); + ompi_buffer_free(sbuf); + + ompi_buffer_init(&sbuf, namebuflen); + ompi_pack(sbuf, namebuf, namebuflen, OMPI_BYTE); + rc = mca_oob_send_packed(port, sbuf, 0, 0); + ompi_buffer_free(sbuf); ompi_proc_namebuf_returnbuf (namebuf); rport = port; } else { - msg.iov_base = &namebuflen; - msg.iov_len = sizeof(int); - otype = MCA_OOB_BASE_INT32; - rc = mca_oob_recv_ntoh(MCA_OOB_NAME_ANY, &msg, &otype, 1, 0, 0); + ompi_buffer_t rbuf; + rc = mca_oob_recv_packed(MCA_OOB_NAME_ANY, &rbuf, NULL); + ompi_unpack(rbuf, &namebuflen, 1, OMPI_INT32); + ompi_buffer_free(rbuf); namebuf = (char *) malloc (namebuflen); if ( NULL != namebuf ) { return NULL; } - msg.iov_base = namebuf; - msg.iov_len = namebuflen; - rc = mca_oob_recv (MCA_OOB_NAME_ANY, &msg, 1, 0, 0); + rc = mca_oob_recv_packed(MCA_OOB_NAME_ANY, &rbuf, NULL); + ompi_unpack(rbuf, namebuf, namebuflen, OMPI_BYTE); + ompi_buffer_free(rbuf); ompi_proc_get_proclist (namebuf, namebuflen, 1, &rproc); rport = &(rproc[0]->proc_name); diff --git a/src/mca/oob/base/base.h b/src/mca/oob/base/base.h index efc8478bf8..5ab0b5b69f 100644 --- a/src/mca/oob/base/base.h +++ b/src/mca/oob/base/base.h @@ -68,8 +68,13 @@ typedef enum { #define MCA_OOB_PEEK 0x01 /**< flag to oob_recv to allow caller to peek a portion of the next available * message w/out removing the message from the queue. */ -#define MCA_OOB_TRUNC 0x02 /**< flag to oob_recv to return the actual size of the message even if the receive - buffer is smaller than the number of bytes available */ +#define MCA_OOB_TRUNC 0x02 /**< flag to oob_recv to return the actual size of the message even if + * the receive buffer is smaller than the number of bytes available */ +#if 0 /* NOT YET IMPLEMENTED */ +#define MCA_OOB_ALLOC 0x04 /**< flag to oob_recv to request the oob to allocate a buffer of the appropriate + * size for the receive and return the allocated buffer and size in the first + * element of the iovec array. */ +#endif #if defined(c_plusplus) || defined(__cplusplus) extern "C" { @@ -107,8 +112,8 @@ char* mca_oob_get_contact_info(void); */ int mca_oob_send( - const ompi_process_name_t* peer, - const struct iovec *msg, + ompi_process_name_t* peer, + struct iovec *msg, int count, int tag, int flags); @@ -122,46 +127,26 @@ int mca_oob_send( * @return OMPI error code (<0) on error or number of bytes actually sent. */ - -int mca_oob_send_packed (const ompi_process_name_t* peer, const ompi_buffer_t buffer, int tag, int flags); - - -/** -* Convert data (if required) to network byte order prior to sending to peer. -* -* @param peer (IN) Opaque name of peer process. -* @param msg (IN) Array of iovecs describing user buffers and lengths. -* @param types (IN) Parallel array to iovecs describing data type of each iovec element. -* @param count (IN) Number of elements in iovec array. -* @param tag (IN) User defined tag for matching send/recv. -* @param flags (IN) Currently unused. -* @return OMPI error code (<0) on error number of bytes actually sent. -* -* This routine is equivalent to mca_oob_send, with the exception that it excepts -* an additional array of type codes describing the data types contained within the -* iovec array. This information is used to convert the data to network byte order -* (if required) prior to transmission over the underlying network transport. -*/ - -int mca_oob_send_hton( - const ompi_process_name_t* peer, - const struct iovec *msg, - const mca_oob_base_type_t *types, - int count, - int tag, +int mca_oob_send_packed( + ompi_process_name_t* peer, + ompi_buffer_t buffer, + int tag, int flags); /** * Similiar to unix readv(2) * -* @param peer (IN) Opaque name of peer process or MCA_OOB_NAME_ANY for wildcard receive. -* @param msg (IN) Array of iovecs describing user buffers and lengths. -* @param count (IN) Number of elements in iovec array. -* @param tag (IN) User defined tag for matching send/recv. -* @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the -* iovec array without removing the message from the queue. -* @return OMPI error code (<0) on error or number of bytes actually received. +* @param peer (IN/OUT) Opaque name of peer process or MCA_OOB_NAME_ANY for wildcard receive. In the +* case of a wildcard receive, will be modified to return the matched peer name. +* @param msg (IN) Array of iovecs describing user buffers and lengths. +* @param count (IN) Number of elements in iovec array. +* @param tag (IN/OUT) User defined tag for matching send/recv. In the case of a wildcard receive, will +* be modified to return the matched tag. May be optionally by NULL to specify a +* wildcard receive with no return value. +* @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the +* iovec array without removing the message from the queue. +* @return OMPI error code (<0) on error or number of bytes actually received. * * The OOB recv call is similar to unix recv/readv in that it requires the caller to manage * memory associated w/ the message. The routine accepts an array of iovecs (msg); however, @@ -180,45 +165,17 @@ int mca_oob_send_hton( int mca_oob_recv( ompi_process_name_t* peer, - const struct iovec *msg, + struct iovec *msg, int count, - int tag, + int* tag, int flags); -/** -* Receive data and convert (if required) to host byte order. -* -* @param peer (IN) Opaque name of peer process or MCA_OOB_NAME_ANY for wildcard receive. -* @param msg (IN) Array of iovecs describing user buffers and lengths. -* @param types (IN) Parallel array to iovecs describing data type of each iovec element. -* @param count (IN) Number of elements in iovec array. -* @param tag (IN) User defined tag for matching send/recv. -* @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the -* iovec array without removing the message from the queue. -* @return OMPI error code (<0) on error or number of bytes actually received. -* -* This routine is equivalent to mca_oob_recv, with the exception that it accepts -* an additional array of type codes describing the data types contained within the -* iovec array. This information is used to convert the data from network byte order -* (if required) to host byte order prior to receiving into the users buffer. -* -*/ - -int mca_oob_recv_ntoh( - ompi_process_name_t* peer, - const struct iovec *msg, - const mca_oob_base_type_t *types, - int count, - int tag, - int flags); - - /** * Similiar to unix read(2) * * @param peer (IN) Opaque name of peer process or MCA_OOB_NAME_ANY for wildcard receive. -* @param buf (OUT) Array of iovecs describing user buffers and lengths. -* @param tag (IN) User defined tag for matching send/recv. +* @param buf (OUT) Array of iovecs describing user buffers and lengths. +* @param tag (IN/OUT) User defined tag for matching send/recv. * @return OMPI error code (<0) on error or number of bytes actually received. * * @@ -233,9 +190,7 @@ int mca_oob_recv_ntoh( int mca_oob_recv_packed ( ompi_process_name_t* peer, ompi_buffer_t *buf, - int tag); - - + int* tag); /* * Non-blocking versions of send/recv. @@ -255,8 +210,8 @@ int mca_oob_recv_packed ( typedef void (*mca_oob_callback_fn_t)( int status, - const ompi_process_name_t* peer, - const struct iovec* msg, + ompi_process_name_t* peer, + struct iovec* msg, int count, int tag, void* cbdata); @@ -275,7 +230,7 @@ typedef void (*mca_oob_callback_fn_t)( typedef void (*mca_oob_callback_packed_fn_t)( int status, - const ompi_process_name_t* peer, + ompi_process_name_t* peer, ompi_buffer_t* buffer, int count, int tag, @@ -300,37 +255,8 @@ typedef void (*mca_oob_callback_packed_fn_t)( */ int mca_oob_send_nb( - const ompi_process_name_t* peer, - const struct iovec* msg, - int count, - int tag, - int flags, - mca_oob_callback_fn_t cbfunc, - void* cbdata); - -/** -* Non-blocking version of mca_oob_send_hton(). -* -* @param peer (IN) Opaque name of peer process. -* @param msg (IN) Array of iovecs describing user buffers and lengths. -* @param types (IN) Parallel array to iovecs describing data type of each iovec element. -* @param count (IN) Number of elements in iovec array. -* @param tag (IN) User defined tag for matching send/recv. -* @param flags (IN) Currently unused. -* @param cbfunc (IN) Callback function on send completion. -* @param cbdata (IN) User data that is passed to callback function. -* @return OMPI error code (<0) on error number of bytes actually sent. -* -* The user supplied callback function is called when the send completes. Note that -* the callback may occur before the call to mca_oob_send returns to the caller, -* if the send completes during the call. -* -*/ - -int mca_oob_send_hton_nb( - const ompi_process_name_t* peer, - const struct iovec* msg, - const mca_oob_base_type_t* types, + ompi_process_name_t* peer, + struct iovec* msg, int count, int tag, int flags, @@ -355,46 +281,13 @@ int mca_oob_send_hton_nb( int mca_oob_recv_nb( ompi_process_name_t* peer, - const struct iovec* msg, + struct iovec* msg, int count, int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata); -/** -* Non-blocking version of mca_oob_recv_ntoh(). -* -* @param peer (IN/OUT) Opaque name of peer process or MCA_OOB_NAME_ANY for wildcard receive. -* @param msg (IN) Array of iovecs describing user buffers and lengths. -* @param types (IN) Parallel array to iovecs describing data type of each iovec element. -* @param count (IN) Number of elements in iovec array. -* @param tag (IN) User defined tag for matching send/recv. -* @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue, -* @param cbfunc (IN) Callback function on recv completion. -* @param cbdata (IN) User data that is passed to callback function. -* @return OMPI error code (<0) on error or number of bytes actually received. -* -* The user supplied callback function is called asynchronously when a message is received -* that matches the call parameters. -*/ - -int mca_oob_recv_ntoh_nb( - ompi_process_name_t* peer, - const struct iovec* msg, - const mca_oob_base_type_t* types, - int count, - int tag, - int flags, - mca_oob_callback_fn_t cbfunc, - void* cbdata); - - - - - - - /* * functions for pack and unpack routines */ @@ -409,7 +302,8 @@ int mca_oob_recv_ntoh_nb( * @retval OMPI_SUCCESS * @retval OMPI_ERROR */ - int mca_oob_base_pack(void * dest, void * src, size_t n, mca_oob_base_type_t type); + +int mca_oob_base_pack(void * dest, void * src, size_t n, mca_oob_base_type_t type); /** * This function unpacks the passed data according to the type enum. @@ -422,7 +316,8 @@ int mca_oob_recv_ntoh_nb( * @retval OMPI_SUCCESS * @retval OMPI_ERROR */ - int mca_oob_base_unpack(void * dest, void * src, size_t n, mca_oob_base_type_t type); + +int mca_oob_base_unpack(void * dest, void * src, size_t n, mca_oob_base_type_t type); #if defined(c_plusplus) || defined(__cplusplus) diff --git a/src/mca/oob/base/oob_base_recv.c b/src/mca/oob/base/oob_base_recv.c index 7b44a4d1fa..e8af11cd3d 100644 --- a/src/mca/oob/base/oob_base_recv.c +++ b/src/mca/oob/base/oob_base_recv.c @@ -17,88 +17,11 @@ * iovec array without removing the message from the queue. * @return OMPI error code (<0) on error or number of bytes actually received. */ -int mca_oob_recv(ompi_process_name_t* peer, const struct iovec *msg, int count, int tag, int flags) +int mca_oob_recv(ompi_process_name_t* peer, struct iovec *msg, int count, int* tag, int flags) { return(mca_oob.oob_recv(peer, msg, count, tag, flags)); } -/* - * Receive data and convert (if required) to host byte order. - * - * @param peer (IN) Opaque name of peer process or MCA_OOB_NAME_ANY for wildcard receive. - * @param msg (IN) Array of iovecs describing user buffers and lengths. - * @param types (IN) Parallel array to iovecs describing data type of each iovec element. - * @param count (IN) Number of elements in iovec array. - * @param tag (IN) User defined tag for matching send/recv. - * @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the - * iovec array without removing the message from the queue. - * @return OMPI error code (<0) on error or number of bytes actually received. - */ -int mca_oob_recv_ntoh( - ompi_process_name_t* peer, - const struct iovec *msg, - const mca_oob_base_type_t *types, - int count, - int tag, - int flags) -{ - int rc, num, i = 0; - struct iovec * orig; - bool convert = false; - /* see if we actually have to convert anything */ - /* first check to see if we are already in network byte order */ - if(1 != htons(1)) { - /* if we aren't, see if there is any data types that need to be converted */ - while(!convert && (i < count)) { - if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) { - convert = true; - } - i++; - } - } - - if(convert) { - /* now if we need to convert anything we neeg to create a new iovec - * to recieve into */ - orig = malloc(sizeof(struct iovec) * count); - /* copy their array into ours */ - memcpy(orig, msg, sizeof(struct iovec) * count); - /* now we need to go through the iovects, and any ints we need to - * allocate our own space to recieve into, so we can convert into - * their space later */ - for(i = 0; i < count; i++) { - if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) { - orig[i].iov_base = malloc(orig[i].iov_len); - } - } - /* now the new buffers are ready. do the recieve */ - rc = mca_oob.oob_recv(peer, orig, count, tag, flags); - /* now we have to do the conversions */ - for(i = 0; i < count; i++) { - if(types[i] == MCA_OOB_BASE_INT16) { - /* figure out how many integers we have */ - num = orig[i].iov_len / 2; - /* unpack the data */ - mca_oob_base_unpack(msg[i].iov_base, orig[i].iov_base, num, MCA_OOB_BASE_INT16); - /* free the old buffer */ - free(orig[i].iov_base); - } else if(types[i] == MCA_OOB_BASE_INT32) { - /* figure out how many integers we have */ - num = orig[i].iov_len / 4; - /* unpack the data */ - mca_oob_base_unpack(msg[i].iov_base, orig[i].iov_base, num, MCA_OOB_BASE_INT32); - /* free the old buffer */ - free(orig[i].iov_base); - } - } - /* free the iovecs we allocated */ - free(orig); - } else { - rc = mca_oob.oob_recv(peer, msg, count, tag, flags); - } - return rc; -} - /* * Similiar to unix recv(2) * @@ -108,7 +31,7 @@ int mca_oob_recv_ntoh( * iovec array without removing the message from the queue. * @return OMPI error code (<0) on error or number of bytes actually received. */ -int mca_oob_recv_packed (ompi_process_name_t* peer, ompi_buffer_t *buf, int tag) +int mca_oob_recv_packed (ompi_process_name_t* peer, ompi_buffer_t *buf, int* tag) { /* ok, this routine is a bit of a cow */ /* the oob_recv actually needs the real target buffers in advance */ @@ -124,8 +47,8 @@ int mca_oob_recv_packed (ompi_process_name_t* peer, ompi_buffer_t *buf, int tag) /* Or do locking on all recv posting between the peek and recv! GEF */ uint32_t insize; - int rc; - struct iovec msg[1]; + int rc; + struct iovec msg[1]; ompi_buffer_t tmpbuf; void *targetptr; @@ -142,7 +65,7 @@ int mca_oob_recv_packed (ompi_process_name_t* peer, ompi_buffer_t *buf, int tag) msg[0].iov_base = (char*) targetptr; msg[0].iov_len = insize; - rc = mca_oob.oob_recv(peer, msg, 1, tag, 0); + rc = mca_oob.oob_recv(peer, msg, 1, tag, 0); if (OMPI_ERROR!=rc) *buf = tmpbuf; diff --git a/src/mca/oob/base/oob_base_recv_nb.c b/src/mca/oob/base/oob_base_recv_nb.c index 7cba1c6a65..30f958d0fd 100644 --- a/src/mca/oob/base/oob_base_recv_nb.c +++ b/src/mca/oob/base/oob_base_recv_nb.c @@ -3,53 +3,6 @@ #include #include -struct mca_oob_base_cb_data_t { - mca_oob_callback_fn_t user_callback; - void * user_data; - const mca_oob_base_type_t * types; - const struct iovec * user_iovec; -}; - -/* this is the callback function we will register when we have to do any conversion */ -static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer, - const struct iovec* msg, - int count, int tag, void* cbdata); - -static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer, - const struct iovec* msg, - int count, int tag, void* cbdata) -{ - int i, num; - struct mca_oob_base_cb_data_t * cb_struct = (struct mca_oob_base_cb_data_t *) cbdata; - const struct iovec * user_iovec = cb_struct->user_iovec; - mca_oob_base_type_t * types = cb_struct->types; - - for(i = 0; i < count; i++) { - if(types[i] == MCA_OOB_BASE_INT16) { - /* figure out how many integers we have */ - num = msg[i].iov_len / 2; - /* unpack the data */ - mca_oob_base_unpack(user_iovec[i].iov_base, msg[i].iov_base, num, MCA_OOB_BASE_INT16); - /* free the old buffer */ - free(msg[i].iov_base); - } else if(types[i] == MCA_OOB_BASE_INT32) { - /* figure out how many integers we have */ - num = msg[i].iov_len / 4; - /* unpack the data */ - mca_oob_base_unpack(user_iovec[i].iov_base, msg[i].iov_base, num, MCA_OOB_BASE_INT32); - /* free the old buffer */ - free(msg[i].iov_base); - } - } - /* free the iovecs we allocated */ - free((void *)msg); - /* call the user callback function */ - cb_struct->user_callback(status, peer, user_iovec, count, tag, cb_struct->user_data); - /* free the cb structure */ - free(cb_struct); - return; -} - /* * Non-blocking version of mca_oob_recv_nb(). * @@ -61,70 +14,9 @@ static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer, * @param cbdata (IN) User data that is passed to callback function. * @return OMPI error code (<0) on error or number of bytes actually received. */ -int mca_oob_recv_nb(ompi_process_name_t* peer, const struct iovec* msg, int count, int tag, int flags, +int mca_oob_recv_nb(ompi_process_name_t* peer, struct iovec* msg, int count, int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata) { return(mca_oob.oob_recv_nb(peer, msg, count, tag, flags, cbfunc, cbdata)); } -/* - * Non-blocking version of mca_oob_recv_ntoh(). - * - * @param peer (IN/OUT) Opaque name of peer process or MCA_OOB_NAME_ANY for wildcard receive. - * @param msg (IN) Array of iovecs describing user buffers and lengths. - * @param types (IN) Parallel array to iovecs describing data type of each iovec element. - * @param count (IN) Number of elements in iovec array. - * @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue, - * @param cbfunc (IN) Callback function on recv completion. - * @param cbdata (IN) User data that is passed to callback function. - * @return OMPI error code (<0) on error or number of bytes actually received. - */ - -int mca_oob_recv_ntoh_nb(ompi_process_name_t* peer, const struct iovec* msg, - const mca_oob_base_type_t* types, int count, int tag, int flags, - mca_oob_callback_fn_t cbfunc, void* cbdata) -{ - int rc, i = 0; - struct iovec * orig; - bool convert = false; - struct mca_oob_base_cb_data_t * cb_struct; - /* see if we actually have to convert anything */ - /* first check to see if we are already in network byte order */ - if(1 != htons(1)) { - /* if we aren't, see if there is any data types that need to be converted */ - while(!convert && (i < count)) { - if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) { - convert = true; - } - i++; - } - } - if(convert) { - /* now if we need to convert anything we neeg to create a new iovec - * to recieve into */ - orig = malloc(sizeof(struct iovec) * count); - /* copy their iovecs */ - memcpy(orig, msg, sizeof(struct iovec) * count); - cb_struct = malloc(sizeof(struct mca_oob_base_cb_data_t)); - cb_struct->user_data = cbdata; - cb_struct->user_callback = cbfunc; - cb_struct->user_iovec = msg; - cb_struct->types = types; - /* copy their array into ours */ - memcpy(orig, msg, sizeof(struct iovec) * count); - /* now we need to go through the iovects, and any ints we need to - * allocate our own space to recieve into, so we can convert into - * their space later */ - for(i = 0; i < count; i++) { - if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) { - orig[i].iov_base = malloc(orig[i].iov_len); - } - } - /* now the new buffers are ready. do the recieve */ - rc = mca_oob.oob_recv_nb(peer, orig, count, tag, flags, mca_oob_base_recv_cb, cb_struct); - } else { - rc = mca_oob.oob_recv_nb(peer, msg, count, tag, flags, cbfunc, cbdata); - } - return rc; -} - diff --git a/src/mca/oob/base/oob_base_send.c b/src/mca/oob/base/oob_base_send.c index 6bc88c62bf..6fcedbb8d8 100644 --- a/src/mca/oob/base/oob_base_send.c +++ b/src/mca/oob/base/oob_base_send.c @@ -12,7 +12,7 @@ * @return OMPI error code (<0) on error or number of bytes actually sent. */ -int mca_oob_send(const ompi_process_name_t* peer, const struct iovec *msg, int count, int tag, int flags) +int mca_oob_send(ompi_process_name_t* peer, struct iovec *msg, int count, int tag, int flags) { return(mca_oob.oob_send(peer, msg, count, tag, flags)); } @@ -26,7 +26,7 @@ int mca_oob_send(const ompi_process_name_t* peer, const struct iovec *msg, int c * @return OMPI error code (<0) on error or number of bytes actually sent. */ -int mca_oob_send_packed (const ompi_process_name_t* peer, const ompi_buffer_t buffer, int tag, int flags) +int mca_oob_send_packed (ompi_process_name_t* peer, ompi_buffer_t buffer, int tag, int flags) { void *dataptr; size_t datalen; @@ -46,67 +46,3 @@ int rc; return(mca_oob.oob_send(peer, msg, 1, tag, flags)); } -/* - * Convert data (if required) to network byte order prior to sending to peer. - * - * @param peer (IN) Opaque name of peer process. - * @param msg (IN) Array of iovecs describing user buffers and lengths. - * @param types (IN) Parallel array to iovecs describing data type of each iovec element. - * @param count (IN) Number of elements in iovec array. - * @param flags (IN) Currently unused. - * @return OMPI error code (<0) on error number of bytes actually sent. - */ - -int mca_oob_send_hton(const ompi_process_name_t* peer, const struct iovec *msg, - const mca_oob_base_type_t *types, int count, int tag, int flags) -{ - int rc, i = 0; - struct iovec * converted; - bool convert = false; - /* see if we actually have to convert anything */ - /* first check to see if we are already in network byte order */ - if(1 != htons(1)) { - /* if we aren't, see if there is any data types that need to be converted */ - while(!convert && (i < count)) { - if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) { - convert = true; - } - i++; - } - } - - if(convert) { - converted = malloc(sizeof(struct iovec) * count); - memcpy(converted, msg, sizeof(struct iovec) * count); - for(i = 0; i < count; i++) { - if(types[i] == MCA_OOB_BASE_INT16) { - /* figure out how many integers we have */ - rc = msg[i].iov_len / 2; - /* allocate a buffer for the converted data */ - converted[i].iov_base = malloc(msg[i].iov_len); - /* pack the data */ - mca_oob_base_pack(converted[i].iov_base, msg[i].iov_base, rc, MCA_OOB_BASE_INT16); - } else if(types[i] == MCA_OOB_BASE_INT32) { - /* figure out how many integers we have */ - rc = msg[i].iov_len / 4; - /* allocate a buffer for the converted data */ - converted[i].iov_base = malloc(msg[i].iov_len); - /* pack the data */ - mca_oob_base_pack(converted[i].iov_base, msg[i].iov_base, rc, MCA_OOB_BASE_INT32); - } - } - rc = mca_oob.oob_send(peer, converted, count, tag, flags); - /* clean up any space we allocated */ - for(i = 0; i < count; i++) { - if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) { - free(converted[i].iov_base); - } - } - free(converted); - } else { - rc = mca_oob.oob_send(peer, msg, count, tag, flags); - } - return rc; - -} - diff --git a/src/mca/oob/base/oob_base_send_nb.c b/src/mca/oob/base/oob_base_send_nb.c index 7dfe3d1d80..ad5c1e951d 100644 --- a/src/mca/oob/base/oob_base_send_nb.c +++ b/src/mca/oob/base/oob_base_send_nb.c @@ -2,38 +2,6 @@ #include "mca/oob/base/base.h" #include #include - -struct mca_oob_base_cb_data_t { - mca_oob_callback_fn_t user_callback; - void * user_data; - const mca_oob_base_type_t * types; - const struct iovec * user_iovec; -}; - -/* this is the callback function we will register when we have to do any conversion */ -static void mca_oob_base_send_cb( - int status, - const ompi_process_name_t* peer, - const struct iovec* msg, - int count, - int tag, - void* cbdata) -{ - int i; - struct mca_oob_base_cb_data_t * cb_struct = (struct mca_oob_base_cb_data_t *) cbdata; - mca_oob_base_type_t * types = cb_struct->types; - for(i = 0; i < count; i++) { - if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) { - free(msg[i].iov_base); - } - } - free((void *)msg); - /* call the user callback function */ - cb_struct->user_callback(status, peer, cb_struct->user_iovec, count, tag, cb_struct->user_data); - free(cb_struct); - return; -} - /* @@ -49,74 +17,9 @@ static void mca_oob_base_send_cb( * */ -int mca_oob_send_nb(const ompi_process_name_t* peer, const struct iovec* msg, int count, int tag, +int mca_oob_send_nb(ompi_process_name_t* peer, struct iovec* msg, int count, int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata) { return(mca_oob.oob_send_nb(peer, msg, count, tag, flags, cbfunc, cbdata)); } -/* - * Non-blocking version of mca_oob_send_hton(). - * - * @param peer (IN) Opaque name of peer process. - * @param msg (IN) Array of iovecs describing user buffers and lengths. - * @param types (IN) Parallel array to iovecs describing data type of each iovec element. - * @param count (IN) Number of elements in iovec array. - * @param flags (IN) Currently unused. - * @param cbfunc (IN) Callback function on send completion. - * @param cbdata (IN) User data that is passed to callback function. - * @return OMPI error code (<0) on error number of bytes actually sent. - */ - -int mca_oob_send_hton_nb(const ompi_process_name_t* peer, const struct iovec* msg, - const mca_oob_base_type_t* types, int count, int tag, int flags, - mca_oob_callback_fn_t cbfunc, void* cbdata) -{ - int rc, i = 0; - struct iovec * converted; - struct mca_oob_base_cb_data_t * cb_struct; - bool convert = false; - /* see if we actually have to convert anything */ - /* first check to see if we are already in network byte order */ - if(1 != htons(1)) { - /* if we aren't, see if there is any data types that need to be converted */ - while(!convert && (i < count)) { - if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) { - convert = true; - } - i++; - } - } - if(convert) { - converted = malloc(sizeof(struct iovec) * count); - /* copy the passed iovect into the new one */ - memcpy(converted, msg, sizeof(struct iovec) * count); - cb_struct = malloc(sizeof(struct mca_oob_base_cb_data_t)); - cb_struct->user_data = cbdata; - cb_struct->user_callback = cbfunc; - cb_struct->user_iovec = msg; - cb_struct->types = types; - for(i = 0; i < count; i++) { - if(types[i] == MCA_OOB_BASE_INT16) { - /* figure out how many integers we have */ - rc = msg[i].iov_len / 2; - /* allocate a buffer for the converted data */ - converted[i].iov_base = malloc(msg[i].iov_len); - /* pack the data */ - mca_oob_base_pack(converted[i].iov_base, msg[i].iov_base, rc, MCA_OOB_BASE_INT16); - } else if(types[i] == MCA_OOB_BASE_INT32) { - /* figure out how many integers we have */ - rc = msg[i].iov_len / 4; - /* allocate a buffer for the converted data */ - converted[i].iov_base = malloc(msg[i].iov_len); - /* pack the data */ - mca_oob_base_pack(converted[i].iov_base, msg[i].iov_base, rc, MCA_OOB_BASE_INT32); - } - } - rc = mca_oob.oob_send_nb(peer, converted, count, tag, flags, mca_oob_base_send_cb, cb_struct); - } else { - rc = mca_oob.oob_send_nb(peer, msg, count, tag, flags, cbfunc, cbdata); - } - return rc; -} - diff --git a/src/mca/oob/cofs/src/oob_cofs.c b/src/mca/oob/cofs/src/oob_cofs.c index a6b7867869..6ce2854937 100644 --- a/src/mca/oob/cofs/src/oob_cofs.c +++ b/src/mca/oob/cofs/src/oob_cofs.c @@ -20,7 +20,14 @@ #include #include -static int do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const struct iovec* iov, int count, int tag, int flags); +static int do_recv( + mca_ns_base_jobid_t jobid, + mca_ns_base_vpid_t procid, + struct iovec* iov, + int count, + int* tag, + int flags); + /* * Similiar to unix send(2). @@ -33,8 +40,8 @@ static int do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const s */ int mca_oob_cofs_send( - const ompi_process_name_t* peer, - const struct iovec *iov, + ompi_process_name_t* peer, + struct iovec *iov, int count, int tag, int flags) @@ -88,8 +95,8 @@ int mca_oob_cofs_send( int mca_oob_cofs_send_nb( - const ompi_process_name_t* peer, - const struct iovec *iov, + ompi_process_name_t* peer, + struct iovec *iov, int count, int tag, int flags, @@ -106,9 +113,9 @@ int mca_oob_cofs_send_nb( int mca_oob_cofs_recv( ompi_process_name_t* peer, - const struct iovec* iov, + struct iovec* iov, int count, - int tag, + int* tag, int flags) { int ret = OMPI_ERR_WOULD_BLOCK; @@ -124,14 +131,14 @@ mca_oob_cofs_recv( int mca_oob_cofs_recv_nb( ompi_process_name_t* peer, - const struct iovec* iov, + struct iovec* iov, int count, int tag, int flags, mca_oob_callback_fn_t cbfunc, void* cbdata) { - int status = mca_oob_cofs_recv(peer, iov, count, tag, flags); + int status = mca_oob_cofs_recv(peer, iov, count, &tag, flags); if(NULL != cbfunc) cbfunc(status, peer, iov, count, tag, cbdata); return status; @@ -139,13 +146,14 @@ mca_oob_cofs_recv_nb( static char* -find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, int tag) +find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, int* tagp) { DIR* dir; struct dirent *ent; unsigned long tmp_serial; int tmp_jobid, tmp_procid, tmp_myprocid, tmp_tag; int ret; + int tag = (tagp != NULL) ? *tagp : MCA_OOB_TAG_ANY; bool found = false; char best_name[OMPI_PATH_MAX]; uint64_t best_serial = ((1ULL << 63) - 1); @@ -181,6 +189,7 @@ find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, int tag) if (tmp_serial < best_serial) { strcpy(best_name, ent->d_name); best_serial = tmp_serial; + if(tagp != NULL) *tagp = tmp_tag; } } @@ -194,7 +203,7 @@ find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, int tag) static int -do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const struct iovec* iov, int count, int tag, int flags) +do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, struct iovec* iov, int count, int* tag, int flags) { char *fname; char full_fname[OMPI_PATH_MAX]; diff --git a/src/mca/oob/cofs/src/oob_cofs.h b/src/mca/oob/cofs/src/oob_cofs.h index f70ef3d653..c67d21bc1e 100644 --- a/src/mca/oob/cofs/src/oob_cofs.h +++ b/src/mca/oob/cofs/src/oob_cofs.h @@ -37,8 +37,8 @@ int mca_oob_cofs_finalize(mca_oob_t*); */ int mca_oob_cofs_send( - const ompi_process_name_t*, - const struct iovec* msg, + ompi_process_name_t*, + struct iovec* msg, int count, int tag, int flags); @@ -58,9 +58,9 @@ int mca_oob_cofs_send( int mca_oob_cofs_recv( ompi_process_name_t* peer, - const struct iovec *msg, + struct iovec *msg, int count, - int tag, + int* tag, int flags); @@ -78,8 +78,8 @@ int mca_oob_cofs_recv( */ int mca_oob_cofs_send_nb( - const ompi_process_name_t* peer, - const struct iovec* msg, + ompi_process_name_t* peer, + struct iovec* msg, int count, int tag, int flags, @@ -101,7 +101,7 @@ int mca_oob_cofs_send_nb( int mca_oob_cofs_recv_nb( ompi_process_name_t* peer, - const struct iovec* msg, + struct iovec* msg, int count, int tag, int flags, diff --git a/src/mca/oob/oob.h b/src/mca/oob/oob.h index b54fccc56f..4cd1bd30d3 100644 --- a/src/mca/oob/oob.h +++ b/src/mca/oob/oob.h @@ -50,8 +50,8 @@ typedef struct mca_oob_1_0_0_t mca_oob_t; */ typedef int (*mca_oob_base_module_send_fn_t)( - const ompi_process_name_t* peer, - const struct iovec *msg, + ompi_process_name_t* peer, + struct iovec *msg, int count, int tag, int flags); @@ -71,9 +71,9 @@ typedef int (*mca_oob_base_module_send_fn_t)( typedef int (*mca_oob_base_module_recv_fn_t)( ompi_process_name_t* peer, - const struct iovec *msg, + struct iovec *msg, int count, - int tag, + int* tag, int flags); /** @@ -91,8 +91,8 @@ typedef int (*mca_oob_base_module_recv_fn_t)( */ typedef int (*mca_oob_base_module_send_nb_fn_t)( - const ompi_process_name_t* peer, - const struct iovec* msg, + ompi_process_name_t* peer, + struct iovec* msg, int count, int tag, int flags, @@ -114,7 +114,7 @@ typedef int (*mca_oob_base_module_send_nb_fn_t)( typedef int (*mca_oob_base_module_recv_nb_fn_t)( ompi_process_name_t* peer, - const struct iovec* msg, + struct iovec* msg, int count, int tag, int flags, diff --git a/src/mca/oob/tcp/oob_tcp.h b/src/mca/oob/tcp/oob_tcp.h index 70751e8e10..6b46dd482e 100644 --- a/src/mca/oob/tcp/oob_tcp.h +++ b/src/mca/oob/tcp/oob_tcp.h @@ -79,8 +79,8 @@ int mca_oob_tcp_process_name_compare(const ompi_process_name_t* n1, const ompi_p */ int mca_oob_tcp_send( - const ompi_process_name_t* peer, - const struct iovec *msg, + ompi_process_name_t* peer, + struct iovec *msg, int count, int tag, int flags); @@ -99,9 +99,9 @@ int mca_oob_tcp_send( int mca_oob_tcp_recv( ompi_process_name_t* peer, - const struct iovec * msg, + struct iovec * msg, int count, - int tag, + int* tag, int flags); @@ -124,8 +124,8 @@ int mca_oob_tcp_recv( */ int mca_oob_tcp_send_nb( - const ompi_process_name_t* peer, - const struct iovec* msg, + ompi_process_name_t* peer, + struct iovec* msg, int count, int tag, int flags, @@ -147,7 +147,7 @@ int mca_oob_tcp_send_nb( int mca_oob_tcp_recv_nb( ompi_process_name_t* peer, - const struct iovec* msg, + struct iovec* msg, int count, int tag, int flags, diff --git a/src/mca/oob/tcp/oob_tcp_msg.c b/src/mca/oob/tcp/oob_tcp_msg.c index 04c2bc814b..0e9903bb67 100644 --- a/src/mca/oob/tcp/oob_tcp_msg.c +++ b/src/mca/oob/tcp/oob_tcp_msg.c @@ -161,11 +161,11 @@ bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee * @retval count Number of elements in iovec array. */ -int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, const struct iovec* iov, int count) +int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, struct iovec* iov, int count) { int i; - const struct iovec *src = msg->msg_rwiov; - const struct iovec *dst = iov; + struct iovec *src = msg->msg_rwiov; + struct iovec *dst = iov; unsigned char* src_ptr = (unsigned char*)src->iov_base; size_t src_len = src->iov_len; int src_cnt = 0; @@ -204,7 +204,7 @@ int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, const struct iovec* iov, int co * Note - this routine requires the caller to be holding the module lock. */ -mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(const ompi_process_name_t* name, int tag) +mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(ompi_process_name_t* name, int tag) { mca_oob_tcp_msg_t* msg; for(msg = (mca_oob_tcp_msg_t*) ompi_list_get_first(&mca_oob_tcp_component.tcp_msg_recv); @@ -230,7 +230,7 @@ mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(const ompi_process_name_t* name, i * Note - this routine requires the caller to be holding the module lock. */ -mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(const ompi_process_name_t* name, int tag, bool peek) +mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(ompi_process_name_t* name, int tag, bool peek) { mca_oob_tcp_msg_t* msg; for(msg = (mca_oob_tcp_msg_t*) ompi_list_get_first(&mca_oob_tcp_component.tcp_msg_post); diff --git a/src/mca/oob/tcp/oob_tcp_msg.h b/src/mca/oob/tcp/oob_tcp_msg.h index 3859929320..10d577a700 100644 --- a/src/mca/oob/tcp/oob_tcp_msg.h +++ b/src/mca/oob/tcp/oob_tcp_msg.h @@ -32,7 +32,7 @@ struct mca_oob_tcp_msg_t { int msg_flags; /**< flags to send/recv */ int msg_rc; /**< the return code for the send/recv (amount sent/recvd or errno) */ mca_oob_tcp_hdr_t msg_hdr; /**< header used to convey message properties to peer */ - const struct iovec* msg_uiov; /**< the user supplied iovec array */ + struct iovec* msg_uiov; /**< the user supplied iovec array */ int msg_ucnt; /**< the number of items in the user iovec array */ struct iovec * msg_rwiov; /**< copy of iovec array - not data */ struct iovec * msg_rwptr; /**< current read/write pointer into msg_iov */ @@ -102,7 +102,7 @@ int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, ompi_process_name_t * peer) * @retval count Number of elements in iovec array. */ -int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, const struct iovec* iov, int count); +int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, struct iovec* iov, int count); /** * Called asynchronously to progress sending a message from the event library thread. @@ -131,7 +131,7 @@ bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee * Note - this routine requires the caller to be holding the module lock. */ -mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(const ompi_process_name_t* name, int tag); +mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(ompi_process_name_t* name, int tag); /** * Match name to a posted recv request. @@ -144,7 +144,7 @@ mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(const ompi_process_name_t* name, i * Note - this routine requires the caller to be holding the module lock. */ -mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(const ompi_process_name_t* name, int tag, bool peek); +mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(ompi_process_name_t* name, int tag, bool peek); /** * Allocate space for iovec array - if the request number of elements is less than diff --git a/src/mca/oob/tcp/oob_tcp_peer.c b/src/mca/oob/tcp/oob_tcp_peer.c index e79feecc58..2c24612eda 100644 --- a/src/mca/oob/tcp/oob_tcp_peer.c +++ b/src/mca/oob/tcp/oob_tcp_peer.c @@ -131,7 +131,7 @@ int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg) * this should be true unless the caller already owns the lock. * @retval Pointer to the newly created struture or NULL on error. */ -mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name, bool get_lock) +mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(ompi_process_name_t* name, bool get_lock) { int rc; mca_oob_tcp_peer_t * peer, * old; diff --git a/src/mca/oob/tcp/oob_tcp_peer.h b/src/mca/oob/tcp/oob_tcp_peer.h index c2b68c8d8f..0adf545727 100644 --- a/src/mca/oob/tcp/oob_tcp_peer.h +++ b/src/mca/oob/tcp/oob_tcp_peer.h @@ -89,7 +89,7 @@ extern "C" { * @retval pointer to the peer's (possibly newly created) struture * @retval NULL if there was a problem */ -mca_oob_tcp_peer_t *mca_oob_tcp_peer_lookup(const ompi_process_name_t* peer_name, bool get_lock); +mca_oob_tcp_peer_t *mca_oob_tcp_peer_lookup(ompi_process_name_t* peer_name, bool get_lock); /** * Start sending a message to the specified peer. The routine diff --git a/src/mca/oob/tcp/oob_tcp_recv.c b/src/mca/oob/tcp/oob_tcp_recv.c index f0fe9d27eb..1ce05db5f3 100644 --- a/src/mca/oob/tcp/oob_tcp_recv.c +++ b/src/mca/oob/tcp/oob_tcp_recv.c @@ -14,13 +14,14 @@ */ int mca_oob_tcp_recv( ompi_process_name_t* peer, - const struct iovec *iov, + struct iovec *iov, int count, - int tag, + int* tagp, int flags) { mca_oob_tcp_msg_t *msg; int i, rc, size = 0; + int tag = (tagp != NULL) ? *tagp : MCA_OOB_TAG_ANY; /* lock the tcp struct */ OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock); @@ -45,6 +46,9 @@ int mca_oob_tcp_recv( OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); return rc; } + if(NULL != tagp) { + *tagp = ntohl(msg->msg_hdr.msg_tag); + } /* otherwise dequeue the message and return to free list */ ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg); @@ -100,7 +104,7 @@ int mca_oob_tcp_recv( */ int mca_oob_tcp_recv_nb( ompi_process_name_t* peer, - const struct iovec* iov, + struct iovec* iov, int count, int tag, int flags, @@ -136,7 +140,7 @@ int mca_oob_tcp_recv_nb( /* otherwise dequeue the message and return to free list */ ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg); OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock); - cbfunc(rc, &msg->msg_peer, iov, count, tag, cbdata); + cbfunc(rc, &msg->msg_peer, iov, count, ntohl(msg->msg_hdr.msg_tag), cbdata); MCA_OOB_TCP_MSG_RETURN(msg); return 0; } diff --git a/src/mca/oob/tcp/oob_tcp_send.c b/src/mca/oob/tcp/oob_tcp_send.c index 80e9f073a2..6343eb6e07 100644 --- a/src/mca/oob/tcp/oob_tcp_send.c +++ b/src/mca/oob/tcp/oob_tcp_send.c @@ -11,8 +11,8 @@ */ int mca_oob_tcp_send( - const ompi_process_name_t* name, - const struct iovec *iov, + ompi_process_name_t* name, + struct iovec *iov, int count, int tag, int flags) @@ -85,8 +85,8 @@ int mca_oob_tcp_send( */ int mca_oob_tcp_send_nb( - const ompi_process_name_t* name, - const struct iovec* iov, + ompi_process_name_t* name, + struct iovec* iov, int count, int tag, int flags, diff --git a/test/mca/oob/oob_test.c b/test/mca/oob/oob_test.c index 5d0cab1b06..8414d4d9a9 100644 --- a/test/mca/oob/oob_test.c +++ b/test/mca/oob/oob_test.c @@ -172,19 +172,6 @@ void do_sends(ompi_process_name_t * peer) { test_success(); } - /* nonblocking send with packing */ - if( 0 > mca_oob_send_hton_nb(peer, send_msg1, types, 4, 0, 0, &callback, - (void *) (2 + (NUM_TESTS * i)))) { - test_failure("mca_oob_send_hton_nb."); - } else { - test_success(); - } - if( 0 > mca_oob_send_hton_nb(peer, send_msg1, types, 4, 0, 0, &callback, - (void *) (3 + (NUM_TESTS * i)))) { - test_failure("mca_oob_send_hton_nb."); - } else { - test_success(); - } /* blocking send */ if( 0 > mca_oob_send(peer, send_msg2, 3, 0, 0)) { @@ -198,17 +185,6 @@ void do_sends(ompi_process_name_t * peer) { test_success(); } - /* blocking send with packing */ - if( 0 > mca_oob_send_hton(peer, send_msg1, types, 4, 0, 0)) { - test_failure("mca_oob_send_hton."); - } else { - test_success(); - } - if( 0 > mca_oob_send_hton(peer, send_msg1, types, 4, 0, 0)) { - test_failure("mca_oob_send_hton."); - } else { - test_success(); - } } void do_recvs(ompi_process_name_t * peer) { @@ -228,21 +204,6 @@ void do_recvs(ompi_process_name_t * peer) { if(!compare_iovec(recv_msg1, send_msg1, 4)) { test_failure("compare 1 is wrong"); } - /* now we'll recieve the packed send - assuming we know the message type */ - if( 0 > mca_oob_recv_ntoh(peer, recv_msg1, types, 4, 0,0)) { - test_failure("mca_oob_recv_ntoh."); - } else { - test_success(); - } - if(!compare_iovec(recv_msg1, send_msg1, 4)) { - test_failure("compare 2 is wrong"); - } - if( 0 > mca_oob_recv_ntoh_nb(peer, recv_msg1, types, 4, 0, 0, &callback, - (void *) (5 + (NUM_TESTS * i)))) { - test_failure("mca_oob_recv_ntoh_nb."); - } else { - test_success(); - } /* now we'll do a blocking recv - waiting for the 3rd message to arrive * - and peek the first element of the iovec array to determine @@ -284,21 +245,5 @@ void do_recvs(ompi_process_name_t * peer) { } else { test_success(); } - - /* now we will recieve the packed data */ - if( 0 > mca_oob_recv_ntoh(peer, recv_msg1, types, 4, 0, 0)) { - test_failure("mca_oob_recv_ntoh."); - } else { - test_success(); - } - if(!compare_iovec(recv_msg1, send_msg1, 4)) { - test_failure("compare 5 is wrong"); - } - if( 0 > mca_oob_recv_ntoh_nb(peer, recv_msg1, types, 4, 0, 0, &callback, - (void *) (7 + (NUM_TESTS * i)))) { - test_failure("mca_oob_recv_ntoh_nb."); - } else { - test_success(); - } } diff --git a/test/mca/oob/oob_test_self.c b/test/mca/oob/oob_test_self.c index 073afb4f90..bfb4051f2c 100755 --- a/test/mca/oob/oob_test_self.c +++ b/test/mca/oob/oob_test_self.c @@ -71,11 +71,11 @@ bool compare_iovec(const struct iovec * msg1, const struct iovec * msg2, return true; } -void callback(int status, const ompi_process_name_t * peer, - const struct iovec * msg, int count, int tag, void * cbdata); +void callback(int status, ompi_process_name_t * peer, + struct iovec * msg, int count, int tag, void * cbdata); -void callback(int status, const ompi_process_name_t * peer, - const struct iovec * msg, int count, int tag, void * cbdata) +void callback(int status, ompi_process_name_t * peer, + struct iovec * msg, int count, int tag, void * cbdata) { if(0 != tag) { test_failure("Bad tag."); @@ -115,22 +115,6 @@ int main(int argc, char ** argv) test_success(); } - /* Nonblocking send followed by a blocking recieve with packing */ - if( 0 > mca_oob_send_hton_nb(&peer, send_msg1, types, 4, 0, 0, &callback, -(void *) 1)) { - test_failure("mca_oob_send_hton_nb."); - } else { - test_success(); - } - if( 0 > mca_oob_recv_ntoh(&peer, recv_msg1, types, 4, 0, 0)) { - test_failure("mca_oob_recv_ntoh."); - } else { - test_success(); - } - if(!compare_iovec(recv_msg1, send_msg1, 4)) { - test_failure("compare 1 is wrong"); - } - /* non blocking send of message type 2 followed by blocking recieve*/ if( 0 > mca_oob_send_nb(&peer, send_msg2, 3, 0, 0, &callback, (void *) 4)) {