2004-08-25 21:39:08 +04:00
|
|
|
/*
|
|
|
|
* $HEADER$
|
|
|
|
*/
|
2004-07-15 23:08:54 +04:00
|
|
|
#include "mca/oob/tcp/oob_tcp.h"
|
|
|
|
#include "mca/oob/tcp/oob_tcp_msg.h"
|
2004-07-13 02:46:57 +04:00
|
|
|
|
|
|
|
|
|
|
|
static void mca_oob_tcp_msg_construct(mca_oob_tcp_msg_t*);
|
|
|
|
static void mca_oob_tcp_msg_destruct(mca_oob_tcp_msg_t*);
|
|
|
|
|
|
|
|
|
|
|
|
OBJ_CLASS_INSTANCE(
|
|
|
|
mca_oob_tcp_msg_t,
|
|
|
|
ompi_list_item_t,
|
|
|
|
mca_oob_tcp_msg_construct,
|
|
|
|
mca_oob_tcp_msg_destruct);
|
|
|
|
|
|
|
|
|
|
|
|
static void mca_oob_tcp_msg_construct(mca_oob_tcp_msg_t* msg)
|
|
|
|
{
|
2004-07-15 17:51:40 +04:00
|
|
|
OBJ_CONSTRUCT(&msg->msg_lock, ompi_mutex_t);
|
|
|
|
OBJ_CONSTRUCT(&msg->msg_condition, ompi_condition_t);
|
2004-07-13 02:46:57 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static void mca_oob_tcp_msg_destruct(mca_oob_tcp_msg_t* msg)
|
|
|
|
{
|
2004-07-15 17:51:40 +04:00
|
|
|
OBJ_DESTRUCT(&msg->msg_lock);
|
|
|
|
OBJ_DESTRUCT(&msg->msg_condition);
|
2004-07-13 02:46:57 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2004-08-10 03:07:53 +04:00
|
|
|
/*
|
2004-07-13 02:46:57 +04:00
|
|
|
* Wait for a msg to complete.
|
|
|
|
* @param msg (IN) Message to wait on.
|
2004-08-03 01:24:00 +04:00
|
|
|
* @param rc (OUT) Return code (number of bytes read on success or error code on failure).
|
2004-07-13 02:46:57 +04:00
|
|
|
* @retval OMPI_SUCCESS or error code on failure.
|
|
|
|
*/
|
|
|
|
|
2004-08-03 01:24:00 +04:00
|
|
|
int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* rc)
|
2004-07-13 02:46:57 +04:00
|
|
|
{
|
2004-08-28 05:34:48 +04:00
|
|
|
#if OMPI_HAVE_THREADS
|
|
|
|
OMPI_THREAD_LOCK(&msg->msg_lock);
|
|
|
|
while(msg->msg_complete == false) {
|
|
|
|
if(ompi_event_progress_thread()) {
|
2004-08-31 06:57:39 +04:00
|
|
|
int rc;
|
2004-08-28 05:34:48 +04:00
|
|
|
OMPI_THREAD_UNLOCK(&msg->msg_lock);
|
2004-08-31 06:57:39 +04:00
|
|
|
rc = ompi_event_loop(OMPI_EVLOOP_ONCE);
|
|
|
|
assert(rc == 0);
|
2004-08-28 05:34:48 +04:00
|
|
|
OMPI_THREAD_LOCK(&msg->msg_lock);
|
|
|
|
} else {
|
|
|
|
ompi_condition_wait(&msg->msg_condition, &msg->msg_lock);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
OMPI_THREAD_UNLOCK(&msg->msg_lock);
|
|
|
|
|
|
|
|
#else
|
2004-08-03 01:24:00 +04:00
|
|
|
/* wait for message to complete */
|
2004-08-28 05:15:19 +04:00
|
|
|
while(msg->msg_complete == false)
|
|
|
|
ompi_event_loop(OMPI_EVLOOP_ONCE);
|
2004-08-28 05:34:48 +04:00
|
|
|
#endif
|
2004-08-03 01:24:00 +04:00
|
|
|
|
|
|
|
/* return status */
|
|
|
|
if(NULL != rc) {
|
|
|
|
*rc = msg->msg_rc;
|
|
|
|
}
|
|
|
|
return OMPI_SUCCESS;
|
2004-07-13 02:46:57 +04:00
|
|
|
}
|
|
|
|
|
2004-08-10 03:07:53 +04:00
|
|
|
/*
|
2004-07-13 02:46:57 +04:00
|
|
|
* Signal that a message has completed.
|
|
|
|
* @param msg (IN) Message to wait on.
|
2004-07-15 17:51:40 +04:00
|
|
|
* @param peer (IN) the peer of the message
|
2004-07-13 02:46:57 +04:00
|
|
|
* @retval OMPI_SUCCESS or error code on failure.
|
|
|
|
*/
|
2004-07-15 23:08:54 +04:00
|
|
|
int mca_oob_tcp_msg_complete(mca_oob_tcp_msg_t* msg, ompi_process_name_t * peer)
|
2004-07-13 02:46:57 +04:00
|
|
|
{
|
2004-07-15 17:51:40 +04:00
|
|
|
ompi_mutex_lock(&msg->msg_lock);
|
|
|
|
msg->msg_complete = true;
|
|
|
|
if(NULL != msg->msg_cbfunc) {
|
2004-08-03 01:24:00 +04:00
|
|
|
msg->msg_cbfunc(msg->msg_rc, peer, msg->msg_uiov, msg->msg_ucnt, ntohl(msg->msg_hdr.msg_tag), msg->msg_cbdata);
|
2004-07-15 17:51:40 +04:00
|
|
|
ompi_mutex_unlock(&msg->msg_lock);
|
|
|
|
MCA_OOB_TCP_MSG_RETURN(msg);
|
|
|
|
} else {
|
|
|
|
ompi_condition_broadcast(&msg->msg_condition);
|
|
|
|
ompi_mutex_unlock(&msg->msg_lock);
|
|
|
|
}
|
2004-07-13 02:46:57 +04:00
|
|
|
return OMPI_SUCCESS;
|
|
|
|
}
|
|
|
|
|
2004-07-15 23:08:54 +04:00
|
|
|
/*
|
2004-07-15 17:51:40 +04:00
|
|
|
* The function that actually sends the data!
|
|
|
|
* @param msg a pointer to the message to send
|
|
|
|
* @param peer the peer we are sending to
|
|
|
|
* @retval true if the entire message has been sent
|
|
|
|
* @retval false if the entire message has not been sent
|
|
|
|
*/
|
2004-07-15 23:08:54 +04:00
|
|
|
bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer)
|
2004-07-15 17:51:40 +04:00
|
|
|
{
|
|
|
|
int rc;
|
|
|
|
while(1) {
|
2004-08-03 01:24:00 +04:00
|
|
|
rc = writev(peer->peer_sd, msg->msg_rwptr, msg->msg_rwnum);
|
|
|
|
if(rc < 0) {
|
2004-07-15 17:51:40 +04:00
|
|
|
if(errno == EINTR)
|
|
|
|
continue;
|
|
|
|
else if (errno == EAGAIN)
|
|
|
|
return false;
|
|
|
|
else {
|
2004-08-04 18:33:02 +04:00
|
|
|
ompi_output(0, "mca_oob_tcp_msg_send_handler: bad return from writev. errno=%d", errno);
|
2004-08-03 01:24:00 +04:00
|
|
|
mca_oob_tcp_peer_close(peer);
|
2004-07-15 17:51:40 +04:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
2004-08-03 01:24:00 +04:00
|
|
|
|
|
|
|
msg->msg_rc += rc;
|
|
|
|
do {/* while there is still more iovecs to write */
|
2004-07-15 17:51:40 +04:00
|
|
|
if(rc < msg->msg_rwptr->iov_len) {
|
|
|
|
msg->msg_rwptr->iov_len -= rc;
|
|
|
|
msg->msg_rwptr->iov_base = (void *) ((char *) msg->msg_rwptr->iov_base + rc);
|
|
|
|
break;
|
|
|
|
} else {
|
|
|
|
rc -= msg->msg_rwptr->iov_len;
|
2004-08-03 01:24:00 +04:00
|
|
|
(msg->msg_rwnum)--;
|
2004-07-15 17:51:40 +04:00
|
|
|
(msg->msg_rwptr)++;
|
2004-08-03 01:24:00 +04:00
|
|
|
if(0 == msg->msg_rwnum) {
|
2004-07-15 17:51:40 +04:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
2004-08-03 01:24:00 +04:00
|
|
|
} while(msg->msg_rwnum);
|
2004-07-15 17:51:40 +04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2004-08-10 03:07:53 +04:00
|
|
|
/*
|
2004-08-03 01:24:00 +04:00
|
|
|
* Receives message data.
|
2004-07-15 17:51:40 +04:00
|
|
|
* @param msg the message to be recieved into
|
|
|
|
* @param peer the peer to recieve from
|
2004-08-03 01:24:00 +04:00
|
|
|
* @retval true if the whole message was received
|
|
|
|
* @retval false if the whole message was not received
|
2004-07-15 17:51:40 +04:00
|
|
|
*/
|
2004-07-15 23:08:54 +04:00
|
|
|
bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer)
|
2004-07-15 17:51:40 +04:00
|
|
|
{
|
|
|
|
int rc;
|
|
|
|
while(1) {
|
2004-08-03 01:24:00 +04:00
|
|
|
rc = readv(peer->peer_sd, msg->msg_rwptr, msg->msg_rwnum);
|
2004-08-28 05:15:19 +04:00
|
|
|
if(rc < 0) {
|
2004-07-15 17:51:40 +04:00
|
|
|
if(errno == EINTR)
|
|
|
|
continue;
|
|
|
|
else if (errno == EAGAIN)
|
|
|
|
return false;
|
|
|
|
else {
|
2004-08-28 05:15:19 +04:00
|
|
|
ompi_output(0, "mca_oob_tcp_msg_recv_handler: readv failed with errno=%d", errno);
|
2004-08-03 01:24:00 +04:00
|
|
|
mca_oob_tcp_peer_close(peer);
|
2004-07-15 17:51:40 +04:00
|
|
|
return false;
|
|
|
|
}
|
2004-08-28 05:15:19 +04:00
|
|
|
} else if (rc == 0) {
|
|
|
|
ompi_output(0, "mca_oob_tcp_msg_recv_handler: read failedd - peer closed connection");
|
|
|
|
mca_oob_tcp_peer_close(peer);
|
|
|
|
return false;
|
2004-07-15 17:51:40 +04:00
|
|
|
}
|
2004-08-03 01:24:00 +04:00
|
|
|
|
2004-07-15 17:51:40 +04:00
|
|
|
do {
|
|
|
|
if(rc < msg->msg_rwptr->iov_len) {
|
|
|
|
msg->msg_rwptr->iov_len -= rc;
|
|
|
|
msg->msg_rwptr->iov_base = (void *) ((char *) msg->msg_rwptr->iov_base + rc);
|
|
|
|
break;
|
|
|
|
} else {
|
|
|
|
rc -= msg->msg_rwptr->iov_len;
|
2004-08-03 01:24:00 +04:00
|
|
|
(msg->msg_rwnum)--;
|
2004-07-15 17:51:40 +04:00
|
|
|
(msg->msg_rwptr)++;
|
2004-08-03 01:24:00 +04:00
|
|
|
if(0 == msg->msg_rwnum) {
|
2004-07-15 17:51:40 +04:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
2004-08-03 01:24:00 +04:00
|
|
|
} while(msg->msg_rwnum);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2004-08-10 03:07:53 +04:00
|
|
|
/*
|
2004-08-03 01:24:00 +04:00
|
|
|
* Called to copy the results of a message into user supplied iovec array.
|
|
|
|
* @param msg (IN) Message send that is in progress.
|
|
|
|
* @param iov (IN) Iovec array of user supplied buffers.
|
|
|
|
* @retval count Number of elements in iovec array.
|
|
|
|
*/
|
|
|
|
|
2004-08-13 02:41:42 +04:00
|
|
|
int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, struct iovec* iov, int count)
|
2004-08-03 01:24:00 +04:00
|
|
|
{
|
|
|
|
int i;
|
2004-08-13 02:41:42 +04:00
|
|
|
struct iovec *src = msg->msg_rwiov;
|
|
|
|
struct iovec *dst = iov;
|
2004-08-03 01:24:00 +04:00
|
|
|
unsigned char* src_ptr = (unsigned char*)src->iov_base;
|
|
|
|
size_t src_len = src->iov_len;
|
|
|
|
int src_cnt = 0;
|
|
|
|
int rc = 0;
|
|
|
|
|
|
|
|
for(i=0; i<count; i++) {
|
|
|
|
unsigned char* dst_ptr = (unsigned char*)dst->iov_base;
|
|
|
|
size_t dst_len = dst->iov_len;
|
|
|
|
while(dst_len > 0) {
|
|
|
|
size_t len = (dst_len <= src_len) ? dst_len : src_len;
|
|
|
|
memcpy(dst_ptr, src_ptr, len);
|
|
|
|
rc += len;
|
|
|
|
dst_ptr += len;
|
|
|
|
dst_len -= len;
|
|
|
|
src_ptr += len;
|
|
|
|
src_len -= len;
|
|
|
|
if(src_len == 0) {
|
|
|
|
if(++src_cnt == count)
|
|
|
|
return rc;
|
|
|
|
src++;
|
2004-08-28 06:02:31 +04:00
|
|
|
src_ptr = (unsigned char*)src->iov_base;
|
2004-08-03 01:24:00 +04:00
|
|
|
src_len = src->iov_len;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
dst++;
|
2004-07-15 17:51:40 +04:00
|
|
|
}
|
2004-08-03 01:24:00 +04:00
|
|
|
return rc;
|
2004-07-15 17:51:40 +04:00
|
|
|
}
|
|
|
|
|
2004-08-10 03:07:53 +04:00
|
|
|
/*
|
2004-08-03 01:24:00 +04:00
|
|
|
* Match name to a message that has been received asynchronously (unexpected).
|
|
|
|
*
|
|
|
|
* @param name (IN) Name associated with peer or wildcard to match first posted recv.
|
|
|
|
* @return msg Matched message or NULL.
|
|
|
|
*
|
|
|
|
* Note - this routine requires the caller to be holding the module lock.
|
|
|
|
*/
|
|
|
|
|
2004-08-13 02:41:42 +04:00
|
|
|
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_recv(ompi_process_name_t* name, int tag)
|
2004-08-03 01:24:00 +04:00
|
|
|
{
|
|
|
|
mca_oob_tcp_msg_t* msg;
|
|
|
|
for(msg = (mca_oob_tcp_msg_t*) ompi_list_get_first(&mca_oob_tcp_component.tcp_msg_recv);
|
|
|
|
msg != (mca_oob_tcp_msg_t*) ompi_list_get_end(&mca_oob_tcp_component.tcp_msg_recv);
|
|
|
|
msg = (mca_oob_tcp_msg_t*) ompi_list_get_next(msg)) {
|
|
|
|
|
2004-08-05 19:30:36 +04:00
|
|
|
if((0 == mca_oob_tcp_process_name_compare(name,MCA_OOB_NAME_ANY) ||
|
|
|
|
(0 == mca_oob_tcp_process_name_compare(name, &msg->msg_peer)))) {
|
2004-08-03 01:24:00 +04:00
|
|
|
if (tag == MCA_OOB_TAG_ANY || tag == msg->msg_hdr.msg_tag) {
|
|
|
|
return msg;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
|
2004-08-10 03:07:53 +04:00
|
|
|
/*
|
2004-08-03 01:24:00 +04:00
|
|
|
* Match name to a posted recv request.
|
|
|
|
*
|
|
|
|
* @param name (IN) Name associated with peer or wildcard to match first posted recv.
|
|
|
|
* @return msg Matched message or NULL.
|
|
|
|
*
|
|
|
|
* Note - this routine requires the caller to be holding the module lock.
|
|
|
|
*/
|
|
|
|
|
2004-08-13 02:41:42 +04:00
|
|
|
mca_oob_tcp_msg_t* mca_oob_tcp_msg_match_post(ompi_process_name_t* name, int tag, bool peek)
|
2004-08-03 01:24:00 +04:00
|
|
|
{
|
|
|
|
mca_oob_tcp_msg_t* msg;
|
|
|
|
for(msg = (mca_oob_tcp_msg_t*) ompi_list_get_first(&mca_oob_tcp_component.tcp_msg_post);
|
|
|
|
msg != (mca_oob_tcp_msg_t*) ompi_list_get_end(&mca_oob_tcp_component.tcp_msg_post);
|
|
|
|
msg = (mca_oob_tcp_msg_t*) ompi_list_get_next(msg)) {
|
|
|
|
|
2004-08-05 19:30:36 +04:00
|
|
|
if((0 == mca_oob_tcp_process_name_compare(&msg->msg_peer,MCA_OOB_NAME_ANY) ||
|
|
|
|
(0 == mca_oob_tcp_process_name_compare(&msg->msg_peer,name)))) {
|
2004-08-03 01:24:00 +04:00
|
|
|
if (msg->msg_hdr.msg_tag == MCA_OOB_TAG_ANY || msg->msg_hdr.msg_tag == tag) {
|
|
|
|
if((msg->msg_flags & MCA_OOB_PEEK) == 0 || peek) {
|
|
|
|
ompi_list_remove_item(&mca_oob_tcp_component.tcp_msg_post, &msg->super);
|
|
|
|
return msg;
|
|
|
|
} else {
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|