1
1
openmpi/orte/mca/rml/oob/rml_oob_recv.c
Brian Barrett 39a6057fc6 A number of improvements / changes to the RML/OOB layers:
* General TCP cleanup for OPAL / ORTE
  * Simplifying the OOB by moving much of the logic into the RML
  * Allowing the OOB RML component to do routing of messages
  * Adding a component framework for handling routing tables
  * Moving the xcast functionality from the OOB base to its own framework

Includes merge from tmp/bwb-oob-rml-merge revisions:

    r15506, r15507, r15508, r15510, r15511, r15512, r15513

This commit was SVN r15528.

The following SVN revisions from the original message are invalid or
inconsistent and therefore were not cross-referenced:
  r15506
  r15507
  r15508
  r15510
  r15511
  r15512
  r15513
2007-07-20 01:34:02 +00:00

230 строки
6.9 KiB
C

/*
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "rml_oob.h"
#include "orte/mca/oob/oob.h"
#include "orte/mca/oob/base/base.h"
#include "orte/dss/dss.h"
static void
orte_rml_recv_msg_callback(int status,
struct orte_process_name_t* peer,
struct iovec* iov,
int count,
orte_rml_tag_t tag,
void* cbdata)
{
orte_rml_oob_msg_t *msg = (orte_rml_oob_msg_t*) cbdata;
orte_rml_oob_msg_header_t *hdr =
(orte_rml_oob_msg_header_t*) iov[0].iov_base;
if (msg->msg_type == ORTE_RML_BLOCKING_RECV) {
/* blocking send */
msg->msg_status = status;
msg->msg_complete = true;
opal_condition_broadcast(&msg->msg_cond);
} else if (msg->msg_type == ORTE_RML_NONBLOCKING_IOV_RECV) {
/* non-blocking iovec send */
if (status > 0) {
status -= sizeof(orte_rml_oob_msg_header_t);
}
msg->msg_cbfunc.iov(status, &hdr->origin, iov + 1, count - 1,
hdr->tag, msg->msg_cbdata);
if (!msg->msg_persistent) OBJ_RELEASE(msg);
} else if (msg->msg_type == ORTE_RML_NONBLOCKING_BUFFER_RECV) {
/* non-blocking buffer send */
status = orte_dss.load(&msg->msg_recv_buffer,
iov[1].iov_base,
iov[1].iov_len);
msg->msg_cbfunc.buffer(status, &hdr->origin, &msg->msg_recv_buffer,
hdr->tag, msg->msg_cbdata);
if (!msg->msg_persistent) OBJ_RELEASE(msg);
} else {
abort();
}
}
int
orte_rml_oob_recv(orte_process_name_t* peer,
struct iovec *iov,
int count,
orte_rml_tag_t tag,
int flags)
{
orte_rml_oob_msg_t *msg = OBJ_NEW(orte_rml_oob_msg_t);
int ret;
int i;
msg->msg_type = ORTE_RML_BLOCKING_RECV;
flags |= ORTE_RML_FLAG_RECURSIVE_CALLBACK;
msg->msg_data = malloc(sizeof(struct iovec) * (count + 1));
msg->msg_data[0].iov_base = &msg->msg_header;
msg->msg_data[0].iov_len = sizeof(orte_rml_oob_msg_header_t);
for (i = 0 ; i < count ; ++i) {
msg->msg_data[i + 1].iov_base = iov[i].iov_base;
msg->msg_data[i + 1].iov_len = iov[i].iov_len;
}
ret = orte_rml_oob_module.active_oob->oob_recv_nb(peer, msg->msg_data,
count + 1, tag, flags,
orte_rml_recv_msg_callback,
msg);
if (0 < ret) goto cleanup;
OPAL_THREAD_LOCK(&msg->msg_lock);
while (!msg->msg_complete) {
opal_condition_wait(&msg->msg_cond, &msg->msg_lock);
}
ret = msg->msg_status;
OPAL_THREAD_UNLOCK(&msg->msg_lock);
cleanup:
OBJ_RELEASE(msg);
if (ret > 0) {
ret -= sizeof(struct orte_rml_oob_msg_header_t);
}
return ret;
}
int
orte_rml_oob_recv_nb(orte_process_name_t* peer,
struct iovec* iov,
int count,
orte_rml_tag_t tag,
int flags,
orte_rml_callback_fn_t cbfunc,
void* cbdata)
{
orte_rml_oob_msg_t *msg = OBJ_NEW(orte_rml_oob_msg_t);
int ret;
int i;
msg->msg_type = ORTE_RML_NONBLOCKING_IOV_RECV;
msg->msg_persistent = (flags & ORTE_RML_PERSISTENT) ? true : false;
msg->msg_cbfunc.iov = cbfunc;
msg->msg_cbdata = cbdata;
msg->msg_data = malloc(sizeof(struct iovec) * (count + 1));
msg->msg_data[0].iov_base = &msg->msg_header;
msg->msg_data[0].iov_len = sizeof(orte_rml_oob_msg_header_t);
for (i = 0 ; i < count ; ++i) {
msg->msg_data[i + 1].iov_base = iov[i].iov_base;
msg->msg_data[i + 1].iov_len = iov[i].iov_len;
}
ret = orte_rml_oob_module.active_oob->oob_recv_nb(peer, msg->msg_data,
count + 1, tag, flags,
orte_rml_recv_msg_callback,
msg);
if (0 < ret) OBJ_RELEASE(msg);
return ret;
}
int
orte_rml_oob_recv_buffer(orte_process_name_t* peer,
orte_buffer_t *buf,
orte_rml_tag_t tag,
int flags)
{
orte_rml_oob_msg_t *msg = OBJ_NEW(orte_rml_oob_msg_t);
int ret;
msg->msg_type = ORTE_RML_BLOCKING_RECV;
flags |= (ORTE_RML_FLAG_RECURSIVE_CALLBACK | ORTE_RML_ALLOC);
msg->msg_data = malloc(sizeof(struct iovec) * 2);
msg->msg_data[0].iov_base = &msg->msg_header;
msg->msg_data[0].iov_len = sizeof(orte_rml_oob_msg_header_t);
msg->msg_data[1].iov_base = NULL;
msg->msg_data[1].iov_len = 0;
ret = orte_rml_oob_module.active_oob->oob_recv_nb(peer, msg->msg_data,
2, tag, flags,
orte_rml_recv_msg_callback,
msg);
if (0 < ret) goto cleanup;
OPAL_THREAD_LOCK(&msg->msg_lock);
while (!msg->msg_complete) {
opal_condition_wait(&msg->msg_cond, &msg->msg_lock);
}
ret = msg->msg_status;
OPAL_THREAD_UNLOCK(&msg->msg_lock);
if (ret > 0) {
ret = orte_dss.load(buf,
msg->msg_data[1].iov_base,
msg->msg_data[1].iov_len);
}
cleanup:
OBJ_RELEASE(msg);
return ret;
}
int
orte_rml_oob_recv_buffer_nb(orte_process_name_t* peer,
orte_rml_tag_t tag,
int flags,
orte_rml_buffer_callback_fn_t cbfunc,
void* cbdata)
{
orte_rml_oob_msg_t *msg = OBJ_NEW(orte_rml_oob_msg_t);
int ret;
msg->msg_data = malloc(sizeof(struct iovec) * 2);
msg->msg_data[0].iov_base = &msg->msg_header;
msg->msg_data[0].iov_len = sizeof(orte_rml_oob_msg_header_t);
msg->msg_data[1].iov_base = NULL;
msg->msg_data[1].iov_len = 0;
msg->msg_type = ORTE_RML_NONBLOCKING_BUFFER_RECV;
msg->msg_persistent = (flags & ORTE_RML_PERSISTENT) ? true : false;
msg->msg_cbfunc.buffer = cbfunc;
msg->msg_cbdata = cbdata;
flags |= ORTE_RML_ALLOC;
ret = orte_rml_oob_module.active_oob->oob_recv_nb(peer,
msg->msg_data,
2,
tag, flags,
orte_rml_recv_msg_callback,
msg);
if (0 < ret) OBJ_RELEASE(msg);
return ret;
}
int
orte_rml_oob_recv_cancel(orte_process_name_t* peer,
orte_rml_tag_t tag)
{
return orte_rml_oob_module.active_oob->oob_recv_cancel(peer, tag);
}