1
1
openmpi/orte/mca/rmcast/base/rmcast_base_fns.c
Shiqing Fan 2697a37363 Use the correct type for IO vector base.
This commit was SVN r23229.
2010-06-01 15:40:11 +00:00

531 строка
20 KiB
C

/*
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include "orte/constants.h"
#include <stdio.h>
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
#include "opal/threads/threads.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/rmcast/base/base.h"
#include "orte/mca/rmcast/base/private.h"
static int extract_hdr(opal_buffer_t *buf,
orte_process_name_t *name,
orte_rmcast_channel_t *channel,
orte_rmcast_tag_t *tag,
orte_rmcast_seq_t *seq_num);
static int insert_hdr(opal_buffer_t *buf,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_rmcast_seq_t seq_num);
int orte_rmcast_base_build_msg(rmcast_base_channel_t *ch,
opal_buffer_t **buffer,
rmcast_base_send_t *snd)
{
int32_t sz;
opal_buffer_t *buf;
int rc;
int8_t flag;
int32_t tmp32;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base:build_msg of %d %s"
" for multicast on channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(NULL == snd->iovec_array) ? (int)snd->buf->bytes_used : (int)snd->iovec_count,
(NULL == snd->iovec_array) ? "bytes" : "iovecs",
(int)ch->channel, snd->tag));
/* setup a buffer */
buf = OBJ_NEW(opal_buffer_t);
/* insert the header */
if (ORTE_SUCCESS != (rc = insert_hdr(buf, ch->channel, snd->tag, ch->seq_num))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* are we sending a buffer? */
if (NULL == snd->buf) {
/* no, flag the buffer as containing iovecs */
flag = 0;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* pack the number of iovecs */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &snd->iovec_count, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* pack each iovec into a buffer in prep for sending
* so we can recreate the array at the other end
*/
for (sz=0; sz < snd->iovec_count; sz++) {
/* pack the size */
tmp32 = snd->iovec_array[sz].iov_len;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tmp32, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (0 < tmp32) {
/* pack the bytes */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, snd->iovec_array[sz].iov_base, tmp32, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
}
} else {
/* flag it as being a buffer */
flag = 1;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &flag, 1, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* copy the payload */
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, snd->buf))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
*buffer = buf;
return ORTE_SUCCESS;
cleanup:
if (NULL != buf) {
OBJ_RELEASE(buf);
}
return rc;
}
int orte_rmcast_base_queue_recv(rmcast_base_recv_t **recvptr,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_rmcast_flag_t flags,
orte_rmcast_callback_fn_t cbfunc_iovec,
orte_rmcast_callback_buffer_fn_t cbfunc_buffer,
void *cbdata, bool blocking)
{
opal_list_item_t *item;
rmcast_base_recv_t *rptr;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: queue_recv called on multicast channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
if (!blocking) {
/* do we already have a recv for this channel/tag? */
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
for (item = opal_list_get_first(&orte_rmcast_base.recvs);
item != opal_list_get_end(&orte_rmcast_base.recvs);
item = opal_list_get_next(item)) {
rptr = (rmcast_base_recv_t*)item;
if (channel != rptr->channel) {
/* different channel */
continue;
}
if (tag != rptr->tag) {
/* different tag */
continue;
}
if (NULL != cbfunc_iovec) {
if (NULL != rptr->cbfunc_iovec) {
/* already have one in place */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: matching recv already active on multicast channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
return ORTE_EXISTS;
}
rptr->cbfunc_iovec = cbfunc_iovec;
}
if (NULL != cbfunc_buffer) {
if (NULL != rptr->cbfunc_buffer) {
/* matching type - recv already in place */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: matching recv already active on multicast channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
return ORTE_EXISTS;
}
rptr->cbfunc_buffer = cbfunc_buffer;
}
if (NULL != recvptr) {
*recvptr = rptr;
}
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
return ORTE_SUCCESS;
}
}
/* if we get here, then we need to add a new recv */
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp: adding recv on multicast channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), channel, tag));
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
rptr = OBJ_NEW(rmcast_base_recv_t);
rptr->channel = channel;
rptr->tag = tag;
rptr->flags = flags;
rptr->cbfunc_iovec = cbfunc_iovec;
rptr->cbfunc_buffer = cbfunc_buffer;
rptr->cbdata = cbdata;
if (NULL != recvptr) {
*recvptr = rptr;
}
opal_list_append(&orte_rmcast_base.recvs, &rptr->item);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
return ORTE_SUCCESS;
}
void orte_rmcast_base_process_recv(orte_mcast_msg_event_t *msg)
{
orte_rmcast_channel_t channel;
opal_list_item_t *item;
rmcast_base_recv_t *ptr;
orte_process_name_t name;
orte_rmcast_tag_t tag;
int8_t flag;
struct iovec *iovec_array=NULL;
int32_t iovec_count=0, i, n, isz;
opal_buffer_t *recvd_buf=NULL;
int rc;
orte_rmcast_seq_t recvd_seq_num;
/* extract the header */
if (ORTE_SUCCESS != (rc = extract_hdr(msg->buf, &name, &channel, &tag, &recvd_seq_num))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* if this message is from myself, ignore it */
if (name.jobid == ORTE_PROC_MY_NAME->jobid && name.vpid == ORTE_PROC_MY_NAME->vpid) {
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv sent from myself: %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&name)));
goto cleanup;
}
/* if this message is from a different job family, ignore it unless
* it is on the system channel. We ignore these messages to avoid
* confusion between different jobs since we all may be sharing
* multicast channels. The system channel is left open to support
* cross-job communications via the HNP.
*/
if (ORTE_JOB_FAMILY(name.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) &&
(ORTE_RMCAST_SYS_CHANNEL != channel)) {
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv from a different job family: %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&name)));
goto cleanup;
}
/* unpack the iovec vs buf flag */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &flag, &n, OPAL_INT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:base:process_recv sender: %s channel: %d tag: %d %s seq_num: %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&name), channel, (int)tag,
(0 == flag) ? "iovecs" : "buffer", recvd_seq_num));
/* find the recv for this channel, tag, and type */
for (item = opal_list_get_first(&orte_rmcast_base.recvs);
item != opal_list_get_end(&orte_rmcast_base.recvs);
item = opal_list_get_next(item)) {
ptr = (rmcast_base_recv_t*)item;
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp:recv checking channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)ptr->channel, (int)ptr->tag));
if (channel != ptr->channel) {
continue;
}
if (tag != ptr->tag && ORTE_RMCAST_TAG_WILDCARD != ptr->tag) {
continue;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp:recv delivering message to channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
/* we have a recv - unpack the data */
if (0 == flag) {
/* get the number of iovecs in the buffer */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &iovec_count, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* malloc the required space */
iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec));
/* unpack the iovecs */
for (i=0; i < iovec_count; i++) {
/* unpack the number of bytes in this iovec */
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &isz, &n, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
iovec_array[i].iov_base = NULL;
iovec_array[i].iov_len = isz;
if (0 < isz) {
/* allocate the space */
iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(isz);
/* unpack the data */
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, iovec_array[i].iov_base, &isz, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
}
}
if (NULL != ptr->cbfunc_iovec) {
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp:recv delivering iovecs to channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, tag,
&name, iovec_array, iovec_count, ptr->cbdata);
/* if it isn't persistent, remove it */
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
OBJ_RELEASE(ptr);
}
} else {
/* if something is already present, then we have a problem */
if (NULL != ptr->iovec_array) {
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp:recv blocking recv already fulfilled",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
goto cleanup;
}
/* copy over the iovec array since it will be released by
* the blocking recv
*/
ptr->iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec));
ptr->iovec_count = iovec_count;
for (i=0; i < iovec_count; i++) {
ptr->iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(iovec_array[i].iov_len);
ptr->iovec_array[i].iov_len = iovec_array[i].iov_len;
memcpy(ptr->iovec_array[i].iov_base, iovec_array[i].iov_base, iovec_array[i].iov_len);
}
/* flag it as recvd to release blocking recv */
ptr->recvd = true;
}
} else {
/* buffer was included */
recvd_buf = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(recvd_buf, msg->buf))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (NULL != ptr->cbfunc_buffer) {
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp:recv delivering buffer to channel %d tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, tag,
&name, recvd_buf, ptr->cbdata);
/* if it isn't persistent, remove it */
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
OBJ_RELEASE(ptr);
}
} else {
/* if something is already present, then we have a problem */
if (NULL != ptr->buf) {
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp:recv blocking recv already fulfilled",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:tcp:recv copying buffer for blocking recv",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* copy the buffer across since it will be released
* by the blocking recv
*/
ptr->buf = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(ptr->buf, recvd_buf))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* flag it as recvd to release blocking recv */
ptr->recvd = true;
}
}
/* we are done - only one recv can match */
break;
}
cleanup:
if (NULL != iovec_array) {
for (i=0; i < iovec_count; i++) {
free(iovec_array[i].iov_base);
}
free(iovec_array);
}
if (NULL != recvd_buf) {
OBJ_RELEASE(recvd_buf);
}
return;
}
void orte_rmcast_base_cancel_recv(orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag)
{
opal_list_item_t *item, *next;
rmcast_base_recv_t *ptr;
orte_rmcast_channel_t ch;
if (ORTE_RMCAST_GROUP_CHANNEL == channel) {
ch = orte_rmcast_base.my_group_number;
} else {
ch = channel;
}
/* find all recv's for this channel and tag */
item = opal_list_get_first(&orte_rmcast_base.recvs);
while (item != opal_list_get_end(&orte_rmcast_base.recvs)) {
next = opal_list_get_next(item);
ptr = (rmcast_base_recv_t*)item;
if (ch == ptr->channel &&
tag == ptr->tag) {
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
OBJ_RELEASE(ptr);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
}
item = next;
}
}
int orte_rmcast_base_close_channel(orte_rmcast_channel_t channel)
{
opal_list_item_t *item;
rmcast_base_channel_t *chan;
OPAL_THREAD_LOCK(&orte_rmcast_base.lock);
for (item = opal_list_get_first(&orte_rmcast_base.channels);
item != opal_list_get_end(&orte_rmcast_base.channels);
item = opal_list_get_next(item)) {
chan = (rmcast_base_channel_t*)item;
if (channel == chan->channel) {
opal_list_remove_item(&orte_rmcast_base.channels, item);
OBJ_RELEASE(chan);
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
return ORTE_SUCCESS;
}
}
OPAL_THREAD_UNLOCK(&orte_rmcast_base.lock);
return ORTE_ERR_NOT_FOUND;
}
orte_rmcast_channel_t orte_rmcast_base_query(void)
{
return orte_rmcast_base.my_group_channel->channel;
}
static int extract_hdr(opal_buffer_t *buf,
orte_process_name_t *name,
orte_rmcast_channel_t *channel,
orte_rmcast_tag_t *tag,
orte_rmcast_seq_t *seq_num)
{
int rc;
int32_t n;
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, name, &n, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return rc;
}
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, channel, &n, ORTE_RMCAST_CHANNEL_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, tag, &n, ORTE_RMCAST_TAG_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
n=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buf, seq_num, &n, ORTE_RMCAST_SEQ_T))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
static int insert_hdr(opal_buffer_t *buf,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_rmcast_seq_t seq_num)
{
int rc;
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &channel, 1, ORTE_RMCAST_CHANNEL_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_RMCAST_TAG_T))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &seq_num, 1, ORTE_RMCAST_SEQ_T))) {
ORTE_ERROR_LOG(rc);
}
return rc;
}