OOB API changes:
- removed send_hton/recv_ntoh routines as we now have send_packed/recv_packed - removed constness from apis - adding flag (in work) to allow recv to allocate and return recv buffer - updated edgars communicator code to use pack routines rather than ntoh routines This commit was SVN r2095.
Этот коммит содержится в:
родитель
478de2a9fa
Коммит
a2bc814a08
@ -501,25 +501,25 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,
|
||||
}
|
||||
|
||||
if (local_rank == local_leader ) {
|
||||
struct iovec smsg, rmsg;
|
||||
mca_oob_base_type_t otype;
|
||||
|
||||
smsg.iov_base = tmpbuf;
|
||||
smsg.iov_len = count * sizeof(int);
|
||||
otype = MCA_OOB_BASE_INT32;
|
||||
ompi_buffer_t sbuf;
|
||||
ompi_buffer_t rbuf;
|
||||
|
||||
rmsg.iov_base = outbuf;
|
||||
rmsg.iov_len = count * sizeof(int);
|
||||
ompi_buffer_init(&sbuf, count * sizeof(int));
|
||||
ompi_pack(sbuf, tmpbuf, count, OMPI_INT32);
|
||||
|
||||
if ( send_first ) {
|
||||
rc = mca_oob_send_hton (remote_leader, &smsg, &otype, 1,0,0);
|
||||
rc = mca_oob_recv_ntoh (remote_leader, &rmsg, &otype, 1,0,0);
|
||||
if ( send_first ) {
|
||||
rc = mca_oob_send_packed(remote_leader, sbuf, 0, 0);
|
||||
rc = mca_oob_recv_packed (remote_leader, &rbuf, NULL);
|
||||
}
|
||||
else {
|
||||
rc = mca_oob_recv_ntoh (remote_leader, &rmsg, &otype, 1,0,0);
|
||||
rc = mca_oob_send_hton (remote_leader, &smsg, &otype, 1,0,0);
|
||||
rc = mca_oob_recv_packed(remote_leader, &rbuf, NULL);
|
||||
rc = mca_oob_send_packed(remote_leader, sbuf, 0, 0);
|
||||
}
|
||||
|
||||
ompi_unpack(rbuf, outbuf, count, OMPI_INT32);
|
||||
ompi_buffer_free(sbuf);
|
||||
ompi_buffer_free(rbuf);
|
||||
|
||||
if ( &ompi_mpi_op_max == op ) {
|
||||
for ( i = 0 ; i < count; i++ ) {
|
||||
if (tmpbuf[i] > outbuf[i]) outbuf[i] = tmpbuf[i];
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include "proc/proc.h"
|
||||
#include "threads/mutex.h"
|
||||
#include "util/bit_ops.h"
|
||||
#include "util/pack.h"
|
||||
#include "include/constants.h"
|
||||
#include "mca/pcm/pcm.h"
|
||||
#include "mca/pml/pml.h"
|
||||
@ -29,8 +30,8 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
|
||||
int namebuflen, rnamebuflen;
|
||||
char *namebuf=NULL, *rnamebuf=NULL;
|
||||
|
||||
struct iovec smsg[2], rmsg[2];
|
||||
mca_oob_base_type_t otype[2];
|
||||
ompi_buffer_t sbuf;
|
||||
ompi_buffer_t rbuf;
|
||||
ompi_communicator_t *newcomp=MPI_COMM_NULL;
|
||||
ompi_proc_t **rprocs=NULL;
|
||||
ompi_group_t *group=comm->c_local_group;
|
||||
@ -50,30 +51,28 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
|
||||
ompi_proc_get_namebuf_by_proc (group->grp_proc_pointers,
|
||||
size, &namebuf, &namebuflen);
|
||||
|
||||
smsg[0].iov_base = &size;
|
||||
smsg[0].iov_len = sizeof(int);
|
||||
otype[0] = MCA_OOB_BASE_INT32;
|
||||
|
||||
smsg[1].iov_base = &namebuflen;
|
||||
smsg[1].iov_len = sizeof(int);
|
||||
otype[1] = MCA_OOB_BASE_INT32;
|
||||
|
||||
rmsg[0].iov_base = &rsize;
|
||||
rmsg[0].iov_len = sizeof(int);
|
||||
otype[0] = MCA_OOB_BASE_INT32;
|
||||
|
||||
rmsg[1].iov_base = &rnamebuflen;
|
||||
rmsg[1].iov_len = sizeof(int);
|
||||
otype[1] = MCA_OOB_BASE_INT32;
|
||||
ompi_buffer_init(&sbuf, 128);
|
||||
ompi_pack(sbuf, &size, 1, OMPI_INT32);
|
||||
ompi_pack(sbuf, &namebuflen, 1, OMPI_INT32);
|
||||
ompi_pack(sbuf, &rsize, 1, OMPI_INT32);
|
||||
ompi_pack(sbuf, &rnamebuflen, 1, OMPI_INT32);
|
||||
|
||||
if ( send_first ) {
|
||||
rc = mca_oob_send_hton (rport, smsg, otype, 2, 0, 0);
|
||||
rc = mca_oob_recv_ntoh (rport, rmsg, otype, 2, 0, 0);
|
||||
rc = mca_oob_send_packed(rport, sbuf, 0, 0);
|
||||
rc = mca_oob_recv_packed (rport, &rbuf, NULL);
|
||||
}
|
||||
else {
|
||||
rc = mca_oob_recv_ntoh (rport, rmsg, otype, 2, 0, 0);
|
||||
rc = mca_oob_send_hton (rport, smsg, otype, 2, 0, 0);
|
||||
rc = mca_oob_recv_packed(rport, &rbuf, NULL);
|
||||
rc = mca_oob_send_packed(rport, sbuf, 0, 0);
|
||||
}
|
||||
|
||||
ompi_unpack(rbuf, &size, 1, OMPI_INT32);
|
||||
ompi_unpack(rbuf, &namebuflen, 1, OMPI_INT32);
|
||||
ompi_unpack(rbuf, &rsize, 1, OMPI_INT32);
|
||||
ompi_unpack(rbuf, &rnamebuflen, 1, OMPI_INT32);
|
||||
|
||||
ompi_buffer_free(sbuf);
|
||||
ompi_buffer_free(rbuf);
|
||||
}
|
||||
|
||||
/* bcast the information to all processes in the local comm */
|
||||
@ -93,20 +92,24 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
|
||||
|
||||
if ( rank == root ) {
|
||||
/* Exchange list of processes in the groups */
|
||||
smsg[0].iov_base = namebuf;
|
||||
smsg[0].iov_len = namebuflen;
|
||||
|
||||
rmsg[0].iov_base = rnamebuf;
|
||||
rmsg[0].iov_len = rnamebuflen;
|
||||
|
||||
ompi_buffer_init(&sbuf, 128);
|
||||
ompi_pack(sbuf, namebuf, namebuflen, OMPI_BYTE);
|
||||
|
||||
if ( send_first ) {
|
||||
rc = mca_oob_send (rport, smsg, 1, 0, 0);
|
||||
rc = mca_oob_recv (rport, rmsg, 1, 0, 0);
|
||||
rc = mca_oob_send_packed(rport, sbuf, 0, 0);
|
||||
rc = mca_oob_recv_packed (rport, &rbuf, NULL);
|
||||
}
|
||||
else {
|
||||
rc = mca_oob_recv (rport, rmsg, 1, 0, 0);
|
||||
rc = mca_oob_send (rport, smsg, 1, 0, 0);
|
||||
rc = mca_oob_recv_packed(rport, &rbuf, NULL);
|
||||
rc = mca_oob_send_packed(rport, sbuf, 0, 0);
|
||||
}
|
||||
|
||||
ompi_unpack(rbuf, rnamebuf, rnamebuflen, OMPI_BYTE);
|
||||
|
||||
ompi_buffer_free(sbuf);
|
||||
ompi_buffer_free(rbuf);
|
||||
}
|
||||
|
||||
/* bcast list of processes to all procs in local group
|
||||
@ -211,41 +214,41 @@ ompi_process_name_t *ompi_comm_get_rport (ompi_process_name_t *port, int send_fi
|
||||
{
|
||||
int namebuflen, rc;
|
||||
char *namebuf=NULL;
|
||||
|
||||
struct iovec msg;
|
||||
mca_oob_base_type_t otype;
|
||||
ompi_proc_t **rproc;
|
||||
ompi_process_name_t *rport;
|
||||
|
||||
|
||||
if ( send_first ) {
|
||||
ompi_buffer_t sbuf;
|
||||
ompi_proc_get_namebuf_by_proc(&proc, 1, &namebuf, &namebuflen );
|
||||
msg.iov_base = &namebuflen;
|
||||
msg.iov_len = sizeof(int);
|
||||
otype = MCA_OOB_BASE_INT32;
|
||||
rc = mca_oob_send_hton (port, &msg, &otype, 1, 0, 0);
|
||||
|
||||
msg.iov_base = namebuf;
|
||||
msg.iov_len = namebuflen;
|
||||
rc = mca_oob_send (rport, &msg, 1, 0, 0);
|
||||
ompi_buffer_init(&sbuf, sizeof(int));
|
||||
ompi_pack(sbuf, &namebuflen, 1, OMPI_INT32);
|
||||
rc = mca_oob_send_packed(port, sbuf, 0, 0);
|
||||
ompi_buffer_free(sbuf);
|
||||
|
||||
ompi_buffer_init(&sbuf, namebuflen);
|
||||
ompi_pack(sbuf, namebuf, namebuflen, OMPI_BYTE);
|
||||
rc = mca_oob_send_packed(port, sbuf, 0, 0);
|
||||
ompi_buffer_free(sbuf);
|
||||
|
||||
ompi_proc_namebuf_returnbuf (namebuf);
|
||||
rport = port;
|
||||
}
|
||||
else {
|
||||
msg.iov_base = &namebuflen;
|
||||
msg.iov_len = sizeof(int);
|
||||
otype = MCA_OOB_BASE_INT32;
|
||||
rc = mca_oob_recv_ntoh(MCA_OOB_NAME_ANY, &msg, &otype, 1, 0, 0);
|
||||
ompi_buffer_t rbuf;
|
||||
rc = mca_oob_recv_packed(MCA_OOB_NAME_ANY, &rbuf, NULL);
|
||||
ompi_unpack(rbuf, &namebuflen, 1, OMPI_INT32);
|
||||
ompi_buffer_free(rbuf);
|
||||
|
||||
namebuf = (char *) malloc (namebuflen);
|
||||
if ( NULL != namebuf ) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
msg.iov_base = namebuf;
|
||||
msg.iov_len = namebuflen;
|
||||
rc = mca_oob_recv (MCA_OOB_NAME_ANY, &msg, 1, 0, 0);
|
||||
rc = mca_oob_recv_packed(MCA_OOB_NAME_ANY, &rbuf, NULL);
|
||||
ompi_unpack(rbuf, namebuf, namebuflen, OMPI_BYTE);
|
||||
ompi_buffer_free(rbuf);
|
||||
|
||||
ompi_proc_get_proclist (namebuf, namebuflen, 1, &rproc);
|
||||
rport = &(rproc[0]->proc_name);
|
||||
|
@ -68,8 +68,13 @@ typedef enum {
|
||||
|
||||
#define MCA_OOB_PEEK 0x01 /**< flag to oob_recv to allow caller to peek a portion of the next available
|
||||
* message w/out removing the message from the queue. */
|
||||
#define MCA_OOB_TRUNC 0x02 /**< flag to oob_recv to return the actual size of the message even if the receive
|
||||
buffer is smaller than the number of bytes available */
|
||||
#define MCA_OOB_TRUNC 0x02 /**< flag to oob_recv to return the actual size of the message even if
|
||||
* the receive buffer is smaller than the number of bytes available */
|
||||
#if 0 /* NOT YET IMPLEMENTED */
|
||||
#define MCA_OOB_ALLOC 0x04 /**< flag to oob_recv to request the oob to allocate a buffer of the appropriate
|
||||
* size for the receive and return the allocated buffer and size in the first
|
||||
* element of the iovec array. */
|
||||
#endif
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
extern "C" {
|
||||
@ -107,8 +112,8 @@ char* mca_oob_get_contact_info(void);
|
||||
*/
|
||||
|
||||
int mca_oob_send(
|
||||
const ompi_process_name_t* peer,
|
||||
const struct iovec *msg,
|
||||
ompi_process_name_t* peer,
|
||||
struct iovec *msg,
|
||||
int count,
|
||||
int tag,
|
||||
int flags);
|
||||
@ -122,46 +127,26 @@ int mca_oob_send(
|
||||
* @return OMPI error code (<0) on error or number of bytes actually sent.
|
||||
*/
|
||||
|
||||
|
||||
int mca_oob_send_packed (const ompi_process_name_t* peer, const ompi_buffer_t buffer, int tag, int flags);
|
||||
|
||||
|
||||
/**
|
||||
* Convert data (if required) to network byte order prior to sending to peer.
|
||||
*
|
||||
* @param peer (IN) Opaque name of peer process.
|
||||
* @param msg (IN) Array of iovecs describing user buffers and lengths.
|
||||
* @param types (IN) Parallel array to iovecs describing data type of each iovec element.
|
||||
* @param count (IN) Number of elements in iovec array.
|
||||
* @param tag (IN) User defined tag for matching send/recv.
|
||||
* @param flags (IN) Currently unused.
|
||||
* @return OMPI error code (<0) on error number of bytes actually sent.
|
||||
*
|
||||
* This routine is equivalent to mca_oob_send, with the exception that it excepts
|
||||
* an additional array of type codes describing the data types contained within the
|
||||
* iovec array. This information is used to convert the data to network byte order
|
||||
* (if required) prior to transmission over the underlying network transport.
|
||||
*/
|
||||
|
||||
int mca_oob_send_hton(
|
||||
const ompi_process_name_t* peer,
|
||||
const struct iovec *msg,
|
||||
const mca_oob_base_type_t *types,
|
||||
int count,
|
||||
int tag,
|
||||
int mca_oob_send_packed(
|
||||
ompi_process_name_t* peer,
|
||||
ompi_buffer_t buffer,
|
||||
int tag,
|
||||
int flags);
|
||||
|
||||
|
||||
/**
|
||||
* Similiar to unix readv(2)
|
||||
*
|
||||
* @param peer (IN) Opaque name of peer process or MCA_OOB_NAME_ANY for wildcard receive.
|
||||
* @param msg (IN) Array of iovecs describing user buffers and lengths.
|
||||
* @param count (IN) Number of elements in iovec array.
|
||||
* @param tag (IN) User defined tag for matching send/recv.
|
||||
* @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the
|
||||
* iovec array without removing the message from the queue.
|
||||
* @return OMPI error code (<0) on error or number of bytes actually received.
|
||||
* @param peer (IN/OUT) Opaque name of peer process or MCA_OOB_NAME_ANY for wildcard receive. In the
|
||||
* case of a wildcard receive, will be modified to return the matched peer name.
|
||||
* @param msg (IN) Array of iovecs describing user buffers and lengths.
|
||||
* @param count (IN) Number of elements in iovec array.
|
||||
* @param tag (IN/OUT) User defined tag for matching send/recv. In the case of a wildcard receive, will
|
||||
* be modified to return the matched tag. May be optionally by NULL to specify a
|
||||
* wildcard receive with no return value.
|
||||
* @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the
|
||||
* iovec array without removing the message from the queue.
|
||||
* @return OMPI error code (<0) on error or number of bytes actually received.
|
||||
*
|
||||
* The OOB recv call is similar to unix recv/readv in that it requires the caller to manage
|
||||
* memory associated w/ the message. The routine accepts an array of iovecs (<i>msg</i>); however,
|
||||
@ -180,45 +165,17 @@ int mca_oob_send_hton(
|
||||
|
||||
int mca_oob_recv(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec *msg,
|
||||
struct iovec *msg,
|
||||
int count,
|
||||
int tag,
|
||||
int* tag,
|
||||
int flags);
|
||||
|
||||
/**
|
||||
* Receive data and convert (if required) to host byte order.
|
||||
*
|
||||
* @param peer (IN) Opaque name of peer process or MCA_OOB_NAME_ANY for wildcard receive.
|
||||
* @param msg (IN) Array of iovecs describing user buffers and lengths.
|
||||
* @param types (IN) Parallel array to iovecs describing data type of each iovec element.
|
||||
* @param count (IN) Number of elements in iovec array.
|
||||
* @param tag (IN) User defined tag for matching send/recv.
|
||||
* @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the
|
||||
* iovec array without removing the message from the queue.
|
||||
* @return OMPI error code (<0) on error or number of bytes actually received.
|
||||
*
|
||||
* This routine is equivalent to mca_oob_recv, with the exception that it accepts
|
||||
* an additional array of type codes describing the data types contained within the
|
||||
* iovec array. This information is used to convert the data from network byte order
|
||||
* (if required) to host byte order prior to receiving into the users buffer.
|
||||
*
|
||||
*/
|
||||
|
||||
int mca_oob_recv_ntoh(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec *msg,
|
||||
const mca_oob_base_type_t *types,
|
||||
int count,
|
||||
int tag,
|
||||
int flags);
|
||||
|
||||
|
||||
/**
|
||||
* Similiar to unix read(2)
|
||||
*
|
||||
* @param peer (IN) Opaque name of peer process or MCA_OOB_NAME_ANY for wildcard receive.
|
||||
* @param buf (OUT) Array of iovecs describing user buffers and lengths.
|
||||
* @param tag (IN) User defined tag for matching send/recv.
|
||||
* @param buf (OUT) Array of iovecs describing user buffers and lengths.
|
||||
* @param tag (IN/OUT) User defined tag for matching send/recv.
|
||||
* @return OMPI error code (<0) on error or number of bytes actually received.
|
||||
*
|
||||
*
|
||||
@ -233,9 +190,7 @@ int mca_oob_recv_ntoh(
|
||||
int mca_oob_recv_packed (
|
||||
ompi_process_name_t* peer,
|
||||
ompi_buffer_t *buf,
|
||||
int tag);
|
||||
|
||||
|
||||
int* tag);
|
||||
|
||||
/*
|
||||
* Non-blocking versions of send/recv.
|
||||
@ -255,8 +210,8 @@ int mca_oob_recv_packed (
|
||||
|
||||
typedef void (*mca_oob_callback_fn_t)(
|
||||
int status,
|
||||
const ompi_process_name_t* peer,
|
||||
const struct iovec* msg,
|
||||
ompi_process_name_t* peer,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
int tag,
|
||||
void* cbdata);
|
||||
@ -275,7 +230,7 @@ typedef void (*mca_oob_callback_fn_t)(
|
||||
|
||||
typedef void (*mca_oob_callback_packed_fn_t)(
|
||||
int status,
|
||||
const ompi_process_name_t* peer,
|
||||
ompi_process_name_t* peer,
|
||||
ompi_buffer_t* buffer,
|
||||
int count,
|
||||
int tag,
|
||||
@ -300,37 +255,8 @@ typedef void (*mca_oob_callback_packed_fn_t)(
|
||||
*/
|
||||
|
||||
int mca_oob_send_nb(
|
||||
const ompi_process_name_t* peer,
|
||||
const struct iovec* msg,
|
||||
int count,
|
||||
int tag,
|
||||
int flags,
|
||||
mca_oob_callback_fn_t cbfunc,
|
||||
void* cbdata);
|
||||
|
||||
/**
|
||||
* Non-blocking version of mca_oob_send_hton().
|
||||
*
|
||||
* @param peer (IN) Opaque name of peer process.
|
||||
* @param msg (IN) Array of iovecs describing user buffers and lengths.
|
||||
* @param types (IN) Parallel array to iovecs describing data type of each iovec element.
|
||||
* @param count (IN) Number of elements in iovec array.
|
||||
* @param tag (IN) User defined tag for matching send/recv.
|
||||
* @param flags (IN) Currently unused.
|
||||
* @param cbfunc (IN) Callback function on send completion.
|
||||
* @param cbdata (IN) User data that is passed to callback function.
|
||||
* @return OMPI error code (<0) on error number of bytes actually sent.
|
||||
*
|
||||
* The user supplied callback function is called when the send completes. Note that
|
||||
* the callback may occur before the call to mca_oob_send returns to the caller,
|
||||
* if the send completes during the call.
|
||||
*
|
||||
*/
|
||||
|
||||
int mca_oob_send_hton_nb(
|
||||
const ompi_process_name_t* peer,
|
||||
const struct iovec* msg,
|
||||
const mca_oob_base_type_t* types,
|
||||
ompi_process_name_t* peer,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
int tag,
|
||||
int flags,
|
||||
@ -355,46 +281,13 @@ int mca_oob_send_hton_nb(
|
||||
|
||||
int mca_oob_recv_nb(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec* msg,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
int tag,
|
||||
int flags,
|
||||
mca_oob_callback_fn_t cbfunc,
|
||||
void* cbdata);
|
||||
|
||||
/**
|
||||
* Non-blocking version of mca_oob_recv_ntoh().
|
||||
*
|
||||
* @param peer (IN/OUT) Opaque name of peer process or MCA_OOB_NAME_ANY for wildcard receive.
|
||||
* @param msg (IN) Array of iovecs describing user buffers and lengths.
|
||||
* @param types (IN) Parallel array to iovecs describing data type of each iovec element.
|
||||
* @param count (IN) Number of elements in iovec array.
|
||||
* @param tag (IN) User defined tag for matching send/recv.
|
||||
* @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue,
|
||||
* @param cbfunc (IN) Callback function on recv completion.
|
||||
* @param cbdata (IN) User data that is passed to callback function.
|
||||
* @return OMPI error code (<0) on error or number of bytes actually received.
|
||||
*
|
||||
* The user supplied callback function is called asynchronously when a message is received
|
||||
* that matches the call parameters.
|
||||
*/
|
||||
|
||||
int mca_oob_recv_ntoh_nb(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec* msg,
|
||||
const mca_oob_base_type_t* types,
|
||||
int count,
|
||||
int tag,
|
||||
int flags,
|
||||
mca_oob_callback_fn_t cbfunc,
|
||||
void* cbdata);
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* functions for pack and unpack routines
|
||||
*/
|
||||
@ -409,7 +302,8 @@ int mca_oob_recv_ntoh_nb(
|
||||
* @retval OMPI_SUCCESS
|
||||
* @retval OMPI_ERROR
|
||||
*/
|
||||
int mca_oob_base_pack(void * dest, void * src, size_t n, mca_oob_base_type_t type);
|
||||
|
||||
int mca_oob_base_pack(void * dest, void * src, size_t n, mca_oob_base_type_t type);
|
||||
|
||||
/**
|
||||
* This function unpacks the passed data according to the type enum.
|
||||
@ -422,7 +316,8 @@ int mca_oob_recv_ntoh_nb(
|
||||
* @retval OMPI_SUCCESS
|
||||
* @retval OMPI_ERROR
|
||||
*/
|
||||
int mca_oob_base_unpack(void * dest, void * src, size_t n, mca_oob_base_type_t type);
|
||||
|
||||
int mca_oob_base_unpack(void * dest, void * src, size_t n, mca_oob_base_type_t type);
|
||||
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
|
@ -17,88 +17,11 @@
|
||||
* iovec array without removing the message from the queue.
|
||||
* @return OMPI error code (<0) on error or number of bytes actually received.
|
||||
*/
|
||||
int mca_oob_recv(ompi_process_name_t* peer, const struct iovec *msg, int count, int tag, int flags)
|
||||
int mca_oob_recv(ompi_process_name_t* peer, struct iovec *msg, int count, int* tag, int flags)
|
||||
{
|
||||
return(mca_oob.oob_recv(peer, msg, count, tag, flags));
|
||||
}
|
||||
|
||||
/*
|
||||
* Receive data and convert (if required) to host byte order.
|
||||
*
|
||||
* @param peer (IN) Opaque name of peer process or MCA_OOB_NAME_ANY for wildcard receive.
|
||||
* @param msg (IN) Array of iovecs describing user buffers and lengths.
|
||||
* @param types (IN) Parallel array to iovecs describing data type of each iovec element.
|
||||
* @param count (IN) Number of elements in iovec array.
|
||||
* @param tag (IN) User defined tag for matching send/recv.
|
||||
* @param flags (IN) May be MCA_OOB_PEEK to return up to the number of bytes provided in the
|
||||
* iovec array without removing the message from the queue.
|
||||
* @return OMPI error code (<0) on error or number of bytes actually received.
|
||||
*/
|
||||
int mca_oob_recv_ntoh(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec *msg,
|
||||
const mca_oob_base_type_t *types,
|
||||
int count,
|
||||
int tag,
|
||||
int flags)
|
||||
{
|
||||
int rc, num, i = 0;
|
||||
struct iovec * orig;
|
||||
bool convert = false;
|
||||
/* see if we actually have to convert anything */
|
||||
/* first check to see if we are already in network byte order */
|
||||
if(1 != htons(1)) {
|
||||
/* if we aren't, see if there is any data types that need to be converted */
|
||||
while(!convert && (i < count)) {
|
||||
if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) {
|
||||
convert = true;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
if(convert) {
|
||||
/* now if we need to convert anything we neeg to create a new iovec
|
||||
* to recieve into */
|
||||
orig = malloc(sizeof(struct iovec) * count);
|
||||
/* copy their array into ours */
|
||||
memcpy(orig, msg, sizeof(struct iovec) * count);
|
||||
/* now we need to go through the iovects, and any ints we need to
|
||||
* allocate our own space to recieve into, so we can convert into
|
||||
* their space later */
|
||||
for(i = 0; i < count; i++) {
|
||||
if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) {
|
||||
orig[i].iov_base = malloc(orig[i].iov_len);
|
||||
}
|
||||
}
|
||||
/* now the new buffers are ready. do the recieve */
|
||||
rc = mca_oob.oob_recv(peer, orig, count, tag, flags);
|
||||
/* now we have to do the conversions */
|
||||
for(i = 0; i < count; i++) {
|
||||
if(types[i] == MCA_OOB_BASE_INT16) {
|
||||
/* figure out how many integers we have */
|
||||
num = orig[i].iov_len / 2;
|
||||
/* unpack the data */
|
||||
mca_oob_base_unpack(msg[i].iov_base, orig[i].iov_base, num, MCA_OOB_BASE_INT16);
|
||||
/* free the old buffer */
|
||||
free(orig[i].iov_base);
|
||||
} else if(types[i] == MCA_OOB_BASE_INT32) {
|
||||
/* figure out how many integers we have */
|
||||
num = orig[i].iov_len / 4;
|
||||
/* unpack the data */
|
||||
mca_oob_base_unpack(msg[i].iov_base, orig[i].iov_base, num, MCA_OOB_BASE_INT32);
|
||||
/* free the old buffer */
|
||||
free(orig[i].iov_base);
|
||||
}
|
||||
}
|
||||
/* free the iovecs we allocated */
|
||||
free(orig);
|
||||
} else {
|
||||
rc = mca_oob.oob_recv(peer, msg, count, tag, flags);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
* Similiar to unix recv(2)
|
||||
*
|
||||
@ -108,7 +31,7 @@ int mca_oob_recv_ntoh(
|
||||
* iovec array without removing the message from the queue.
|
||||
* @return OMPI error code (<0) on error or number of bytes actually received.
|
||||
*/
|
||||
int mca_oob_recv_packed (ompi_process_name_t* peer, ompi_buffer_t *buf, int tag)
|
||||
int mca_oob_recv_packed (ompi_process_name_t* peer, ompi_buffer_t *buf, int* tag)
|
||||
{
|
||||
/* ok, this routine is a bit of a cow */
|
||||
/* the oob_recv actually needs the real target buffers in advance */
|
||||
@ -124,8 +47,8 @@ int mca_oob_recv_packed (ompi_process_name_t* peer, ompi_buffer_t *buf, int tag)
|
||||
/* Or do locking on all recv posting between the peek and recv! GEF */
|
||||
|
||||
uint32_t insize;
|
||||
int rc;
|
||||
struct iovec msg[1];
|
||||
int rc;
|
||||
struct iovec msg[1];
|
||||
ompi_buffer_t tmpbuf;
|
||||
void *targetptr;
|
||||
|
||||
@ -142,7 +65,7 @@ int mca_oob_recv_packed (ompi_process_name_t* peer, ompi_buffer_t *buf, int tag)
|
||||
msg[0].iov_base = (char*) targetptr;
|
||||
msg[0].iov_len = insize;
|
||||
|
||||
rc = mca_oob.oob_recv(peer, msg, 1, tag, 0);
|
||||
rc = mca_oob.oob_recv(peer, msg, 1, tag, 0);
|
||||
|
||||
if (OMPI_ERROR!=rc) *buf = tmpbuf;
|
||||
|
||||
|
@ -3,53 +3,6 @@
|
||||
#include <string.h>
|
||||
#include <netinet/in.h>
|
||||
|
||||
struct mca_oob_base_cb_data_t {
|
||||
mca_oob_callback_fn_t user_callback;
|
||||
void * user_data;
|
||||
const mca_oob_base_type_t * types;
|
||||
const struct iovec * user_iovec;
|
||||
};
|
||||
|
||||
/* this is the callback function we will register when we have to do any conversion */
|
||||
static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer,
|
||||
const struct iovec* msg,
|
||||
int count, int tag, void* cbdata);
|
||||
|
||||
static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer,
|
||||
const struct iovec* msg,
|
||||
int count, int tag, void* cbdata)
|
||||
{
|
||||
int i, num;
|
||||
struct mca_oob_base_cb_data_t * cb_struct = (struct mca_oob_base_cb_data_t *) cbdata;
|
||||
const struct iovec * user_iovec = cb_struct->user_iovec;
|
||||
mca_oob_base_type_t * types = cb_struct->types;
|
||||
|
||||
for(i = 0; i < count; i++) {
|
||||
if(types[i] == MCA_OOB_BASE_INT16) {
|
||||
/* figure out how many integers we have */
|
||||
num = msg[i].iov_len / 2;
|
||||
/* unpack the data */
|
||||
mca_oob_base_unpack(user_iovec[i].iov_base, msg[i].iov_base, num, MCA_OOB_BASE_INT16);
|
||||
/* free the old buffer */
|
||||
free(msg[i].iov_base);
|
||||
} else if(types[i] == MCA_OOB_BASE_INT32) {
|
||||
/* figure out how many integers we have */
|
||||
num = msg[i].iov_len / 4;
|
||||
/* unpack the data */
|
||||
mca_oob_base_unpack(user_iovec[i].iov_base, msg[i].iov_base, num, MCA_OOB_BASE_INT32);
|
||||
/* free the old buffer */
|
||||
free(msg[i].iov_base);
|
||||
}
|
||||
}
|
||||
/* free the iovecs we allocated */
|
||||
free((void *)msg);
|
||||
/* call the user callback function */
|
||||
cb_struct->user_callback(status, peer, user_iovec, count, tag, cb_struct->user_data);
|
||||
/* free the cb structure */
|
||||
free(cb_struct);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Non-blocking version of mca_oob_recv_nb().
|
||||
*
|
||||
@ -61,70 +14,9 @@ static void mca_oob_base_recv_cb(int status, const ompi_process_name_t* peer,
|
||||
* @param cbdata (IN) User data that is passed to callback function.
|
||||
* @return OMPI error code (<0) on error or number of bytes actually received.
|
||||
*/
|
||||
int mca_oob_recv_nb(ompi_process_name_t* peer, const struct iovec* msg, int count, int tag, int flags,
|
||||
int mca_oob_recv_nb(ompi_process_name_t* peer, struct iovec* msg, int count, int tag, int flags,
|
||||
mca_oob_callback_fn_t cbfunc, void* cbdata)
|
||||
{
|
||||
return(mca_oob.oob_recv_nb(peer, msg, count, tag, flags, cbfunc, cbdata));
|
||||
}
|
||||
|
||||
/*
|
||||
* Non-blocking version of mca_oob_recv_ntoh().
|
||||
*
|
||||
* @param peer (IN/OUT) Opaque name of peer process or MCA_OOB_NAME_ANY for wildcard receive.
|
||||
* @param msg (IN) Array of iovecs describing user buffers and lengths.
|
||||
* @param types (IN) Parallel array to iovecs describing data type of each iovec element.
|
||||
* @param count (IN) Number of elements in iovec array.
|
||||
* @param flags (IN) May be MCA_OOB_PEEK to return up to size bytes of msg w/out removing it from the queue,
|
||||
* @param cbfunc (IN) Callback function on recv completion.
|
||||
* @param cbdata (IN) User data that is passed to callback function.
|
||||
* @return OMPI error code (<0) on error or number of bytes actually received.
|
||||
*/
|
||||
|
||||
int mca_oob_recv_ntoh_nb(ompi_process_name_t* peer, const struct iovec* msg,
|
||||
const mca_oob_base_type_t* types, int count, int tag, int flags,
|
||||
mca_oob_callback_fn_t cbfunc, void* cbdata)
|
||||
{
|
||||
int rc, i = 0;
|
||||
struct iovec * orig;
|
||||
bool convert = false;
|
||||
struct mca_oob_base_cb_data_t * cb_struct;
|
||||
/* see if we actually have to convert anything */
|
||||
/* first check to see if we are already in network byte order */
|
||||
if(1 != htons(1)) {
|
||||
/* if we aren't, see if there is any data types that need to be converted */
|
||||
while(!convert && (i < count)) {
|
||||
if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) {
|
||||
convert = true;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
}
|
||||
if(convert) {
|
||||
/* now if we need to convert anything we neeg to create a new iovec
|
||||
* to recieve into */
|
||||
orig = malloc(sizeof(struct iovec) * count);
|
||||
/* copy their iovecs */
|
||||
memcpy(orig, msg, sizeof(struct iovec) * count);
|
||||
cb_struct = malloc(sizeof(struct mca_oob_base_cb_data_t));
|
||||
cb_struct->user_data = cbdata;
|
||||
cb_struct->user_callback = cbfunc;
|
||||
cb_struct->user_iovec = msg;
|
||||
cb_struct->types = types;
|
||||
/* copy their array into ours */
|
||||
memcpy(orig, msg, sizeof(struct iovec) * count);
|
||||
/* now we need to go through the iovects, and any ints we need to
|
||||
* allocate our own space to recieve into, so we can convert into
|
||||
* their space later */
|
||||
for(i = 0; i < count; i++) {
|
||||
if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) {
|
||||
orig[i].iov_base = malloc(orig[i].iov_len);
|
||||
}
|
||||
}
|
||||
/* now the new buffers are ready. do the recieve */
|
||||
rc = mca_oob.oob_recv_nb(peer, orig, count, tag, flags, mca_oob_base_recv_cb, cb_struct);
|
||||
} else {
|
||||
rc = mca_oob.oob_recv_nb(peer, msg, count, tag, flags, cbfunc, cbdata);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -12,7 +12,7 @@
|
||||
* @return OMPI error code (<0) on error or number of bytes actually sent.
|
||||
*/
|
||||
|
||||
int mca_oob_send(const ompi_process_name_t* peer, const struct iovec *msg, int count, int tag, int flags)
|
||||
int mca_oob_send(ompi_process_name_t* peer, struct iovec *msg, int count, int tag, int flags)
|
||||
{
|
||||
return(mca_oob.oob_send(peer, msg, count, tag, flags));
|
||||
}
|
||||
@ -26,7 +26,7 @@ int mca_oob_send(const ompi_process_name_t* peer, const struct iovec *msg, int c
|
||||
* @return OMPI error code (<0) on error or number of bytes actually sent.
|
||||
*/
|
||||
|
||||
int mca_oob_send_packed (const ompi_process_name_t* peer, const ompi_buffer_t buffer, int tag, int flags)
|
||||
int mca_oob_send_packed (ompi_process_name_t* peer, ompi_buffer_t buffer, int tag, int flags)
|
||||
{
|
||||
void *dataptr;
|
||||
size_t datalen;
|
||||
@ -46,67 +46,3 @@ int rc;
|
||||
return(mca_oob.oob_send(peer, msg, 1, tag, flags));
|
||||
}
|
||||
|
||||
/*
|
||||
* Convert data (if required) to network byte order prior to sending to peer.
|
||||
*
|
||||
* @param peer (IN) Opaque name of peer process.
|
||||
* @param msg (IN) Array of iovecs describing user buffers and lengths.
|
||||
* @param types (IN) Parallel array to iovecs describing data type of each iovec element.
|
||||
* @param count (IN) Number of elements in iovec array.
|
||||
* @param flags (IN) Currently unused.
|
||||
* @return OMPI error code (<0) on error number of bytes actually sent.
|
||||
*/
|
||||
|
||||
int mca_oob_send_hton(const ompi_process_name_t* peer, const struct iovec *msg,
|
||||
const mca_oob_base_type_t *types, int count, int tag, int flags)
|
||||
{
|
||||
int rc, i = 0;
|
||||
struct iovec * converted;
|
||||
bool convert = false;
|
||||
/* see if we actually have to convert anything */
|
||||
/* first check to see if we are already in network byte order */
|
||||
if(1 != htons(1)) {
|
||||
/* if we aren't, see if there is any data types that need to be converted */
|
||||
while(!convert && (i < count)) {
|
||||
if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) {
|
||||
convert = true;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
if(convert) {
|
||||
converted = malloc(sizeof(struct iovec) * count);
|
||||
memcpy(converted, msg, sizeof(struct iovec) * count);
|
||||
for(i = 0; i < count; i++) {
|
||||
if(types[i] == MCA_OOB_BASE_INT16) {
|
||||
/* figure out how many integers we have */
|
||||
rc = msg[i].iov_len / 2;
|
||||
/* allocate a buffer for the converted data */
|
||||
converted[i].iov_base = malloc(msg[i].iov_len);
|
||||
/* pack the data */
|
||||
mca_oob_base_pack(converted[i].iov_base, msg[i].iov_base, rc, MCA_OOB_BASE_INT16);
|
||||
} else if(types[i] == MCA_OOB_BASE_INT32) {
|
||||
/* figure out how many integers we have */
|
||||
rc = msg[i].iov_len / 4;
|
||||
/* allocate a buffer for the converted data */
|
||||
converted[i].iov_base = malloc(msg[i].iov_len);
|
||||
/* pack the data */
|
||||
mca_oob_base_pack(converted[i].iov_base, msg[i].iov_base, rc, MCA_OOB_BASE_INT32);
|
||||
}
|
||||
}
|
||||
rc = mca_oob.oob_send(peer, converted, count, tag, flags);
|
||||
/* clean up any space we allocated */
|
||||
for(i = 0; i < count; i++) {
|
||||
if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) {
|
||||
free(converted[i].iov_base);
|
||||
}
|
||||
}
|
||||
free(converted);
|
||||
} else {
|
||||
rc = mca_oob.oob_send(peer, msg, count, tag, flags);
|
||||
}
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
|
@ -2,38 +2,6 @@
|
||||
#include "mca/oob/base/base.h"
|
||||
#include <string.h>
|
||||
#include <netinet/in.h>
|
||||
|
||||
struct mca_oob_base_cb_data_t {
|
||||
mca_oob_callback_fn_t user_callback;
|
||||
void * user_data;
|
||||
const mca_oob_base_type_t * types;
|
||||
const struct iovec * user_iovec;
|
||||
};
|
||||
|
||||
/* this is the callback function we will register when we have to do any conversion */
|
||||
static void mca_oob_base_send_cb(
|
||||
int status,
|
||||
const ompi_process_name_t* peer,
|
||||
const struct iovec* msg,
|
||||
int count,
|
||||
int tag,
|
||||
void* cbdata)
|
||||
{
|
||||
int i;
|
||||
struct mca_oob_base_cb_data_t * cb_struct = (struct mca_oob_base_cb_data_t *) cbdata;
|
||||
mca_oob_base_type_t * types = cb_struct->types;
|
||||
for(i = 0; i < count; i++) {
|
||||
if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) {
|
||||
free(msg[i].iov_base);
|
||||
}
|
||||
}
|
||||
free((void *)msg);
|
||||
/* call the user callback function */
|
||||
cb_struct->user_callback(status, peer, cb_struct->user_iovec, count, tag, cb_struct->user_data);
|
||||
free(cb_struct);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
@ -49,74 +17,9 @@ static void mca_oob_base_send_cb(
|
||||
*
|
||||
*/
|
||||
|
||||
int mca_oob_send_nb(const ompi_process_name_t* peer, const struct iovec* msg, int count, int tag,
|
||||
int mca_oob_send_nb(ompi_process_name_t* peer, struct iovec* msg, int count, int tag,
|
||||
int flags, mca_oob_callback_fn_t cbfunc, void* cbdata)
|
||||
{
|
||||
return(mca_oob.oob_send_nb(peer, msg, count, tag, flags, cbfunc, cbdata));
|
||||
}
|
||||
|
||||
/*
|
||||
* Non-blocking version of mca_oob_send_hton().
|
||||
*
|
||||
* @param peer (IN) Opaque name of peer process.
|
||||
* @param msg (IN) Array of iovecs describing user buffers and lengths.
|
||||
* @param types (IN) Parallel array to iovecs describing data type of each iovec element.
|
||||
* @param count (IN) Number of elements in iovec array.
|
||||
* @param flags (IN) Currently unused.
|
||||
* @param cbfunc (IN) Callback function on send completion.
|
||||
* @param cbdata (IN) User data that is passed to callback function.
|
||||
* @return OMPI error code (<0) on error number of bytes actually sent.
|
||||
*/
|
||||
|
||||
int mca_oob_send_hton_nb(const ompi_process_name_t* peer, const struct iovec* msg,
|
||||
const mca_oob_base_type_t* types, int count, int tag, int flags,
|
||||
mca_oob_callback_fn_t cbfunc, void* cbdata)
|
||||
{
|
||||
int rc, i = 0;
|
||||
struct iovec * converted;
|
||||
struct mca_oob_base_cb_data_t * cb_struct;
|
||||
bool convert = false;
|
||||
/* see if we actually have to convert anything */
|
||||
/* first check to see if we are already in network byte order */
|
||||
if(1 != htons(1)) {
|
||||
/* if we aren't, see if there is any data types that need to be converted */
|
||||
while(!convert && (i < count)) {
|
||||
if((types[i] == MCA_OOB_BASE_INT16) || (types[i] == MCA_OOB_BASE_INT32)) {
|
||||
convert = true;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
}
|
||||
if(convert) {
|
||||
converted = malloc(sizeof(struct iovec) * count);
|
||||
/* copy the passed iovect into the new one */
|
||||
memcpy(converted, msg, sizeof(struct iovec) * count);
|
||||
cb_struct = malloc(sizeof(struct mca_oob_base_cb_data_t));
|
||||
cb_struct->user_data = cbdata;
|
||||
cb_struct->user_callback = cbfunc;
|
||||
cb_struct->user_iovec = msg;
|
||||
cb_struct->types = types;
|
||||
for(i = 0; i < count; i++) {
|
||||
if(types[i] == MCA_OOB_BASE_INT16) {
|
||||
/* figure out how many integers we have */
|
||||
rc = msg[i].iov_len / 2;
|
||||
/* allocate a buffer for the converted data */
|
||||
converted[i].iov_base = malloc(msg[i].iov_len);
|
||||
/* pack the data */
|
||||
mca_oob_base_pack(converted[i].iov_base, msg[i].iov_base, rc, MCA_OOB_BASE_INT16);
|
||||
} else if(types[i] == MCA_OOB_BASE_INT32) {
|
||||
/* figure out how many integers we have */
|
||||
rc = msg[i].iov_len / 4;
|
||||
/* allocate a buffer for the converted data */
|
||||
converted[i].iov_base = malloc(msg[i].iov_len);
|
||||
/* pack the data */
|
||||
mca_oob_base_pack(converted[i].iov_base, msg[i].iov_base, rc, MCA_OOB_BASE_INT32);
|
||||
}
|
||||
}
|
||||
rc = mca_oob.oob_send_nb(peer, converted, count, tag, flags, mca_oob_base_send_cb, cb_struct);
|
||||
} else {
|
||||
rc = mca_oob.oob_send_nb(peer, msg, count, tag, flags, cbfunc, cbdata);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,14 @@
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
|
||||
static int do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const struct iovec* iov, int count, int tag, int flags);
|
||||
static int do_recv(
|
||||
mca_ns_base_jobid_t jobid,
|
||||
mca_ns_base_vpid_t procid,
|
||||
struct iovec* iov,
|
||||
int count,
|
||||
int* tag,
|
||||
int flags);
|
||||
|
||||
|
||||
/*
|
||||
* Similiar to unix send(2).
|
||||
@ -33,8 +40,8 @@ static int do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const s
|
||||
*/
|
||||
|
||||
int mca_oob_cofs_send(
|
||||
const ompi_process_name_t* peer,
|
||||
const struct iovec *iov,
|
||||
ompi_process_name_t* peer,
|
||||
struct iovec *iov,
|
||||
int count,
|
||||
int tag,
|
||||
int flags)
|
||||
@ -88,8 +95,8 @@ int mca_oob_cofs_send(
|
||||
|
||||
|
||||
int mca_oob_cofs_send_nb(
|
||||
const ompi_process_name_t* peer,
|
||||
const struct iovec *iov,
|
||||
ompi_process_name_t* peer,
|
||||
struct iovec *iov,
|
||||
int count,
|
||||
int tag,
|
||||
int flags,
|
||||
@ -106,9 +113,9 @@ int mca_oob_cofs_send_nb(
|
||||
int
|
||||
mca_oob_cofs_recv(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec* iov,
|
||||
struct iovec* iov,
|
||||
int count,
|
||||
int tag,
|
||||
int* tag,
|
||||
int flags)
|
||||
{
|
||||
int ret = OMPI_ERR_WOULD_BLOCK;
|
||||
@ -124,14 +131,14 @@ mca_oob_cofs_recv(
|
||||
int
|
||||
mca_oob_cofs_recv_nb(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec* iov,
|
||||
struct iovec* iov,
|
||||
int count,
|
||||
int tag,
|
||||
int flags,
|
||||
mca_oob_callback_fn_t cbfunc,
|
||||
void* cbdata)
|
||||
{
|
||||
int status = mca_oob_cofs_recv(peer, iov, count, tag, flags);
|
||||
int status = mca_oob_cofs_recv(peer, iov, count, &tag, flags);
|
||||
if(NULL != cbfunc)
|
||||
cbfunc(status, peer, iov, count, tag, cbdata);
|
||||
return status;
|
||||
@ -139,13 +146,14 @@ mca_oob_cofs_recv_nb(
|
||||
|
||||
|
||||
static char*
|
||||
find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, int tag)
|
||||
find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, int* tagp)
|
||||
{
|
||||
DIR* dir;
|
||||
struct dirent *ent;
|
||||
unsigned long tmp_serial;
|
||||
int tmp_jobid, tmp_procid, tmp_myprocid, tmp_tag;
|
||||
int ret;
|
||||
int tag = (tagp != NULL) ? *tagp : MCA_OOB_TAG_ANY;
|
||||
bool found = false;
|
||||
char best_name[OMPI_PATH_MAX];
|
||||
uint64_t best_serial = ((1ULL << 63) - 1);
|
||||
@ -181,6 +189,7 @@ find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, int tag)
|
||||
if (tmp_serial < best_serial) {
|
||||
strcpy(best_name, ent->d_name);
|
||||
best_serial = tmp_serial;
|
||||
if(tagp != NULL) *tagp = tmp_tag;
|
||||
}
|
||||
}
|
||||
|
||||
@ -194,7 +203,7 @@ find_match(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, int tag)
|
||||
|
||||
|
||||
static int
|
||||
do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, const struct iovec* iov, int count, int tag, int flags)
|
||||
do_recv(mca_ns_base_jobid_t jobid, mca_ns_base_vpid_t procid, struct iovec* iov, int count, int* tag, int flags)
|
||||
{
|
||||
char *fname;
|
||||
char full_fname[OMPI_PATH_MAX];
|
||||
|
@ -37,8 +37,8 @@ int mca_oob_cofs_finalize(mca_oob_t*);
|
||||
*/
|
||||
|
||||
int mca_oob_cofs_send(
|
||||
const ompi_process_name_t*,
|
||||
const struct iovec* msg,
|
||||
ompi_process_name_t*,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
int tag,
|
||||
int flags);
|
||||
@ -58,9 +58,9 @@ int mca_oob_cofs_send(
|
||||
|
||||
int mca_oob_cofs_recv(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec *msg,
|
||||
struct iovec *msg,
|
||||
int count,
|
||||
int tag,
|
||||
int* tag,
|
||||
int flags);
|
||||
|
||||
|
||||
@ -78,8 +78,8 @@ int mca_oob_cofs_recv(
|
||||
*/
|
||||
|
||||
int mca_oob_cofs_send_nb(
|
||||
const ompi_process_name_t* peer,
|
||||
const struct iovec* msg,
|
||||
ompi_process_name_t* peer,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
int tag,
|
||||
int flags,
|
||||
@ -101,7 +101,7 @@ int mca_oob_cofs_send_nb(
|
||||
|
||||
int mca_oob_cofs_recv_nb(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec* msg,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
int tag,
|
||||
int flags,
|
||||
|
@ -50,8 +50,8 @@ typedef struct mca_oob_1_0_0_t mca_oob_t;
|
||||
*/
|
||||
|
||||
typedef int (*mca_oob_base_module_send_fn_t)(
|
||||
const ompi_process_name_t* peer,
|
||||
const struct iovec *msg,
|
||||
ompi_process_name_t* peer,
|
||||
struct iovec *msg,
|
||||
int count,
|
||||
int tag,
|
||||
int flags);
|
||||
@ -71,9 +71,9 @@ typedef int (*mca_oob_base_module_send_fn_t)(
|
||||
|
||||
typedef int (*mca_oob_base_module_recv_fn_t)(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec *msg,
|
||||
struct iovec *msg,
|
||||
int count,
|
||||
int tag,
|
||||
int* tag,
|
||||
int flags);
|
||||
|
||||
/**
|
||||
@ -91,8 +91,8 @@ typedef int (*mca_oob_base_module_recv_fn_t)(
|
||||
*/
|
||||
|
||||
typedef int (*mca_oob_base_module_send_nb_fn_t)(
|
||||
const ompi_process_name_t* peer,
|
||||
const struct iovec* msg,
|
||||
ompi_process_name_t* peer,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
int tag,
|
||||
int flags,
|
||||
@ -114,7 +114,7 @@ typedef int (*mca_oob_base_module_send_nb_fn_t)(
|
||||
|
||||
typedef int (*mca_oob_base_module_recv_nb_fn_t)(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec* msg,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
int tag,
|
||||
int flags,
|
||||
|
@ -79,8 +79,8 @@ int mca_oob_tcp_process_name_compare(const ompi_process_name_t* n1, const ompi_p
|
||||
*/
|
||||
|
||||
int mca_oob_tcp_send(
|
||||
const ompi_process_name_t* peer,
|
||||
const struct iovec *msg,
|
||||
ompi_process_name_t* peer,
|
||||
struct iovec *msg,
|
||||
int count,
|
||||
int tag,
|
||||
int flags);
|
||||
@ -99,9 +99,9 @@ int mca_oob_tcp_send(
|
||||
|
||||
int mca_oob_tcp_recv(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec * msg,
|
||||
struct iovec * msg,
|
||||
int count,
|
||||
int tag,
|
||||
int* tag,
|
||||
int flags);
|
||||
|
||||
|
||||
@ -124,8 +124,8 @@ int mca_oob_tcp_recv(
|
||||
*/
|
||||
|
||||
int mca_oob_tcp_send_nb(
|
||||
const ompi_process_name_t* peer,
|
||||
const struct iovec* msg,
|
||||
ompi_process_name_t* peer,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
int tag,
|
||||
int flags,
|
||||
@ -147,7 +147,7 @@ int mca_oob_tcp_send_nb(
|
||||
|
||||
int mca_oob_tcp_recv_nb(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec* msg,
|
||||
struct iovec* msg,
|
||||
int count,
|
||||
int tag,
|
||||
int flags,
|
||||
|
@ -161,11 +161,11 @@ bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee
|
||||
* @retval count Number of elements in iovec array.
|
||||
*/
|
||||
|
||||
int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, const struct iovec* iov, int count)
|
||||
int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, struct iovec* iov, int count)
|
||||
{
|
||||
int i;
|
||||
const struct iovec *src = msg->msg_rwiov;
|
||||
const struct iovec *dst = iov;
|
||||
struct iovec *src = msg->msg_rwiov;
|
||||
struct iovec *dst = iov;
|
||||
unsigned char* src_ptr = (unsigned char*)src->iov_base;
|
||||
size_t src_len = src->iov_len;
|
||||
int src_cnt = 0;
|
||||
@ -204,7 +204,7 @@ int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, const struct iovec* iov, int co
|
||||
* Note - this routine requires the caller to be holding the module lock.
|
||||
*/
|
||||
|
||||
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(const ompi_process_name_t* name, int tag)
|
||||
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(ompi_process_name_t* name, int tag)
|
||||
{
|
||||
mca_oob_tcp_msg_t* msg;
|
||||
for(msg = (mca_oob_tcp_msg_t*) ompi_list_get_first(&mca_oob_tcp_component.tcp_msg_recv);
|
||||
@ -230,7 +230,7 @@ mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(const ompi_process_name_t* name, i
|
||||
* Note - this routine requires the caller to be holding the module lock.
|
||||
*/
|
||||
|
||||
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(const ompi_process_name_t* name, int tag, bool peek)
|
||||
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(ompi_process_name_t* name, int tag, bool peek)
|
||||
{
|
||||
mca_oob_tcp_msg_t* msg;
|
||||
for(msg = (mca_oob_tcp_msg_t*) ompi_list_get_first(&mca_oob_tcp_component.tcp_msg_post);
|
||||
|
@ -32,7 +32,7 @@ struct mca_oob_tcp_msg_t {
|
||||
int msg_flags; /**< flags to send/recv */
|
||||
int msg_rc; /**< the return code for the send/recv (amount sent/recvd or errno) */
|
||||
mca_oob_tcp_hdr_t msg_hdr; /**< header used to convey message properties to peer */
|
||||
const struct iovec* msg_uiov; /**< the user supplied iovec array */
|
||||
struct iovec* msg_uiov; /**< the user supplied iovec array */
|
||||
int msg_ucnt; /**< the number of items in the user iovec array */
|
||||
struct iovec * msg_rwiov; /**< copy of iovec array - not data */
|
||||
struct iovec * msg_rwptr; /**< current read/write pointer into msg_iov */
|
||||
@ -102,7 +102,7 @@ int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, ompi_process_name_t * peer)
|
||||
* @retval count Number of elements in iovec array.
|
||||
*/
|
||||
|
||||
int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, const struct iovec* iov, int count);
|
||||
int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, struct iovec* iov, int count);
|
||||
|
||||
/**
|
||||
* Called asynchronously to progress sending a message from the event library thread.
|
||||
@ -131,7 +131,7 @@ bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee
|
||||
* Note - this routine requires the caller to be holding the module lock.
|
||||
*/
|
||||
|
||||
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(const ompi_process_name_t* name, int tag);
|
||||
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(ompi_process_name_t* name, int tag);
|
||||
|
||||
/**
|
||||
* Match name to a posted recv request.
|
||||
@ -144,7 +144,7 @@ mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(const ompi_process_name_t* name, i
|
||||
* Note - this routine requires the caller to be holding the module lock.
|
||||
*/
|
||||
|
||||
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(const ompi_process_name_t* name, int tag, bool peek);
|
||||
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(ompi_process_name_t* name, int tag, bool peek);
|
||||
|
||||
/**
|
||||
* Allocate space for iovec array - if the request number of elements is less than
|
||||
|
@ -131,7 +131,7 @@ int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
|
||||
* this should be true unless the caller already owns the lock.
|
||||
* @retval Pointer to the newly created struture or NULL on error.
|
||||
*/
|
||||
mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(const ompi_process_name_t* name, bool get_lock)
|
||||
mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(ompi_process_name_t* name, bool get_lock)
|
||||
{
|
||||
int rc;
|
||||
mca_oob_tcp_peer_t * peer, * old;
|
||||
|
@ -89,7 +89,7 @@ extern "C" {
|
||||
* @retval pointer to the peer's (possibly newly created) struture
|
||||
* @retval NULL if there was a problem
|
||||
*/
|
||||
mca_oob_tcp_peer_t *mca_oob_tcp_peer_lookup(const ompi_process_name_t* peer_name, bool get_lock);
|
||||
mca_oob_tcp_peer_t *mca_oob_tcp_peer_lookup(ompi_process_name_t* peer_name, bool get_lock);
|
||||
|
||||
/**
|
||||
* Start sending a message to the specified peer. The routine
|
||||
|
@ -14,13 +14,14 @@
|
||||
*/
|
||||
int mca_oob_tcp_recv(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec *iov,
|
||||
struct iovec *iov,
|
||||
int count,
|
||||
int tag,
|
||||
int* tagp,
|
||||
int flags)
|
||||
{
|
||||
mca_oob_tcp_msg_t *msg;
|
||||
int i, rc, size = 0;
|
||||
int tag = (tagp != NULL) ? *tagp : MCA_OOB_TAG_ANY;
|
||||
|
||||
/* lock the tcp struct */
|
||||
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
|
||||
@ -45,6 +46,9 @@ int mca_oob_tcp_recv(
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
|
||||
return rc;
|
||||
}
|
||||
if(NULL != tagp) {
|
||||
*tagp = ntohl(msg->msg_hdr.msg_tag);
|
||||
}
|
||||
|
||||
/* otherwise dequeue the message and return to free list */
|
||||
ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg);
|
||||
@ -100,7 +104,7 @@ int mca_oob_tcp_recv(
|
||||
*/
|
||||
int mca_oob_tcp_recv_nb(
|
||||
ompi_process_name_t* peer,
|
||||
const struct iovec* iov,
|
||||
struct iovec* iov,
|
||||
int count,
|
||||
int tag,
|
||||
int flags,
|
||||
@ -136,7 +140,7 @@ int mca_oob_tcp_recv_nb(
|
||||
/* otherwise dequeue the message and return to free list */
|
||||
ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t *) msg);
|
||||
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
|
||||
cbfunc(rc, &msg->msg_peer, iov, count, tag, cbdata);
|
||||
cbfunc(rc, &msg->msg_peer, iov, count, ntohl(msg->msg_hdr.msg_tag), cbdata);
|
||||
MCA_OOB_TCP_MSG_RETURN(msg);
|
||||
return 0;
|
||||
}
|
||||
|
@ -11,8 +11,8 @@
|
||||
*/
|
||||
|
||||
int mca_oob_tcp_send(
|
||||
const ompi_process_name_t* name,
|
||||
const struct iovec *iov,
|
||||
ompi_process_name_t* name,
|
||||
struct iovec *iov,
|
||||
int count,
|
||||
int tag,
|
||||
int flags)
|
||||
@ -85,8 +85,8 @@ int mca_oob_tcp_send(
|
||||
*/
|
||||
|
||||
int mca_oob_tcp_send_nb(
|
||||
const ompi_process_name_t* name,
|
||||
const struct iovec* iov,
|
||||
ompi_process_name_t* name,
|
||||
struct iovec* iov,
|
||||
int count,
|
||||
int tag,
|
||||
int flags,
|
||||
|
@ -172,19 +172,6 @@ void do_sends(ompi_process_name_t * peer) {
|
||||
test_success();
|
||||
}
|
||||
|
||||
/* nonblocking send with packing */
|
||||
if( 0 > mca_oob_send_hton_nb(peer, send_msg1, types, 4, 0, 0, &callback,
|
||||
(void *) (2 + (NUM_TESTS * i)))) {
|
||||
test_failure("mca_oob_send_hton_nb.");
|
||||
} else {
|
||||
test_success();
|
||||
}
|
||||
if( 0 > mca_oob_send_hton_nb(peer, send_msg1, types, 4, 0, 0, &callback,
|
||||
(void *) (3 + (NUM_TESTS * i)))) {
|
||||
test_failure("mca_oob_send_hton_nb.");
|
||||
} else {
|
||||
test_success();
|
||||
}
|
||||
|
||||
/* blocking send */
|
||||
if( 0 > mca_oob_send(peer, send_msg2, 3, 0, 0)) {
|
||||
@ -198,17 +185,6 @@ void do_sends(ompi_process_name_t * peer) {
|
||||
test_success();
|
||||
}
|
||||
|
||||
/* blocking send with packing */
|
||||
if( 0 > mca_oob_send_hton(peer, send_msg1, types, 4, 0, 0)) {
|
||||
test_failure("mca_oob_send_hton.");
|
||||
} else {
|
||||
test_success();
|
||||
}
|
||||
if( 0 > mca_oob_send_hton(peer, send_msg1, types, 4, 0, 0)) {
|
||||
test_failure("mca_oob_send_hton.");
|
||||
} else {
|
||||
test_success();
|
||||
}
|
||||
}
|
||||
|
||||
void do_recvs(ompi_process_name_t * peer) {
|
||||
@ -228,21 +204,6 @@ void do_recvs(ompi_process_name_t * peer) {
|
||||
if(!compare_iovec(recv_msg1, send_msg1, 4)) {
|
||||
test_failure("compare 1 is wrong");
|
||||
}
|
||||
/* now we'll recieve the packed send - assuming we know the message type */
|
||||
if( 0 > mca_oob_recv_ntoh(peer, recv_msg1, types, 4, 0,0)) {
|
||||
test_failure("mca_oob_recv_ntoh.");
|
||||
} else {
|
||||
test_success();
|
||||
}
|
||||
if(!compare_iovec(recv_msg1, send_msg1, 4)) {
|
||||
test_failure("compare 2 is wrong");
|
||||
}
|
||||
if( 0 > mca_oob_recv_ntoh_nb(peer, recv_msg1, types, 4, 0, 0, &callback,
|
||||
(void *) (5 + (NUM_TESTS * i)))) {
|
||||
test_failure("mca_oob_recv_ntoh_nb.");
|
||||
} else {
|
||||
test_success();
|
||||
}
|
||||
|
||||
/* now we'll do a blocking recv - waiting for the 3rd message to arrive
|
||||
* - and peek the first element of the iovec array to determine
|
||||
@ -284,21 +245,5 @@ void do_recvs(ompi_process_name_t * peer) {
|
||||
} else {
|
||||
test_success();
|
||||
}
|
||||
|
||||
/* now we will recieve the packed data */
|
||||
if( 0 > mca_oob_recv_ntoh(peer, recv_msg1, types, 4, 0, 0)) {
|
||||
test_failure("mca_oob_recv_ntoh.");
|
||||
} else {
|
||||
test_success();
|
||||
}
|
||||
if(!compare_iovec(recv_msg1, send_msg1, 4)) {
|
||||
test_failure("compare 5 is wrong");
|
||||
}
|
||||
if( 0 > mca_oob_recv_ntoh_nb(peer, recv_msg1, types, 4, 0, 0, &callback,
|
||||
(void *) (7 + (NUM_TESTS * i)))) {
|
||||
test_failure("mca_oob_recv_ntoh_nb.");
|
||||
} else {
|
||||
test_success();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -71,11 +71,11 @@ bool compare_iovec(const struct iovec * msg1, const struct iovec * msg2,
|
||||
return true;
|
||||
}
|
||||
|
||||
void callback(int status, const ompi_process_name_t * peer,
|
||||
const struct iovec * msg, int count, int tag, void * cbdata);
|
||||
void callback(int status, ompi_process_name_t * peer,
|
||||
struct iovec * msg, int count, int tag, void * cbdata);
|
||||
|
||||
void callback(int status, const ompi_process_name_t * peer,
|
||||
const struct iovec * msg, int count, int tag, void * cbdata)
|
||||
void callback(int status, ompi_process_name_t * peer,
|
||||
struct iovec * msg, int count, int tag, void * cbdata)
|
||||
{
|
||||
if(0 != tag) {
|
||||
test_failure("Bad tag.");
|
||||
@ -115,22 +115,6 @@ int main(int argc, char ** argv)
|
||||
test_success();
|
||||
}
|
||||
|
||||
/* Nonblocking send followed by a blocking recieve with packing */
|
||||
if( 0 > mca_oob_send_hton_nb(&peer, send_msg1, types, 4, 0, 0, &callback,
|
||||
(void *) 1)) {
|
||||
test_failure("mca_oob_send_hton_nb.");
|
||||
} else {
|
||||
test_success();
|
||||
}
|
||||
if( 0 > mca_oob_recv_ntoh(&peer, recv_msg1, types, 4, 0, 0)) {
|
||||
test_failure("mca_oob_recv_ntoh.");
|
||||
} else {
|
||||
test_success();
|
||||
}
|
||||
if(!compare_iovec(recv_msg1, send_msg1, 4)) {
|
||||
test_failure("compare 1 is wrong");
|
||||
}
|
||||
|
||||
/* non blocking send of message type 2 followed by blocking recieve*/
|
||||
if( 0 > mca_oob_send_nb(&peer, send_msg2, 3, 0, 0, &callback,
|
||||
(void *) 4)) {
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user