Allow mcast threads to be disabled
This commit was SVN r24032.
Этот коммит содержится в:
родитель
22e40d92a0
Коммит
cbb758c4fb
@ -55,6 +55,7 @@ typedef struct {
|
||||
opal_list_t channels;
|
||||
rmcast_base_channel_t *my_output_channel;
|
||||
rmcast_base_channel_t *my_input_channel;
|
||||
bool enable_progress_thread;
|
||||
int recv_ctl_pipe[2];
|
||||
int process_ctl_pipe[2];
|
||||
opal_list_t msg_list;
|
||||
|
@ -142,19 +142,23 @@ typedef struct {
|
||||
} rmcast_send_log_t;
|
||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_send_log_t);
|
||||
|
||||
#define ORTE_MULTICAST_MESSAGE_EVENT(dt, sz) \
|
||||
do { \
|
||||
orte_mcast_msg_event_t *mev; \
|
||||
char byte='a'; \
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_rmcast_base.rmcast_output, \
|
||||
"defining mcast msg event: %s %d", \
|
||||
__FILE__, __LINE__)); \
|
||||
mev = OBJ_NEW(orte_mcast_msg_event_t); \
|
||||
opal_dss.load(mev->buf, (dt), (sz)); \
|
||||
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); \
|
||||
opal_list_append(&orte_rmcast_base.msg_list, &mev->super); \
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); \
|
||||
opal_fd_write(orte_rmcast_base.process_ctl_pipe[1], 1, &byte); \
|
||||
#define ORTE_MULTICAST_MESSAGE_EVENT(dt, sz) \
|
||||
do { \
|
||||
orte_mcast_msg_event_t *mev; \
|
||||
char byte='a'; \
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_rmcast_base.rmcast_output, \
|
||||
"defining mcast msg event: %s %d", \
|
||||
__FILE__, __LINE__)); \
|
||||
mev = OBJ_NEW(orte_mcast_msg_event_t); \
|
||||
opal_dss.load(mev->buf, (dt), (sz)); \
|
||||
if (orte_rmcast_base.enable_progress_thread) { \
|
||||
ORTE_ACQUIRE_THREAD(&orte_rmcast_base.recv_process_ctl); \
|
||||
opal_list_append(&orte_rmcast_base.msg_list, &mev->super); \
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl); \
|
||||
opal_fd_write(orte_rmcast_base.process_ctl_pipe[1], 1, &byte); \
|
||||
} else { \
|
||||
orte_rmcast_base_process_msg(mev); \
|
||||
} \
|
||||
} while(0);
|
||||
|
||||
|
||||
@ -184,6 +188,8 @@ ORTE_DECLSPEC int orte_rmcast_base_queue_xmit(rmcast_base_send_t *snd,
|
||||
ORTE_DECLSPEC int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread);
|
||||
ORTE_DECLSPEC void orte_rmcast_base_stop_threads(void);
|
||||
|
||||
ORTE_DECLSPEC int orte_rmcast_base_process_msg(orte_mcast_msg_event_t *msg);
|
||||
|
||||
ORTE_DECLSPEC void orte_rmcast_base_cancel_recv(orte_rmcast_channel_t channel,
|
||||
orte_rmcast_tag_t tag);
|
||||
|
||||
|
@ -130,14 +130,11 @@ int orte_rmcast_base_open(void)
|
||||
orte_rmcast_base.my_output_channel = NULL;
|
||||
orte_rmcast_base.my_input_channel = NULL;
|
||||
|
||||
/* progress rate */
|
||||
mca_base_param_reg_int_name("rmcast", "base_msg_tick_rate",
|
||||
"Number of microsecs between message event loops (default: 10)",
|
||||
false, false, 10, (int *) &(orte_rmcast_base.recv_ctl.rate.tv_usec));
|
||||
|
||||
mca_base_param_reg_int_name("rmcast", "base_msg_process_tick_rate",
|
||||
"Number of microsecs between message event loops (default: 100)",
|
||||
false, false, 100, (int *) &(orte_rmcast_base.recv_process_ctl.rate.tv_usec));
|
||||
/* whether or not to use progress thread */
|
||||
mca_base_param_reg_int_name("rmcast", "enable_progress_thread",
|
||||
"Whether or not to enable progress thread (default: false)",
|
||||
false, false, (int)false, &value);
|
||||
orte_rmcast_base.enable_progress_thread = OPAL_INT_TO_BOOL(value);
|
||||
|
||||
/* public multicast channel for this job */
|
||||
mca_base_param_reg_string_name("rmcast", "base_multicast_network",
|
||||
|
@ -36,6 +36,10 @@ int orte_rmcast_base_start_threads(bool rcv_thread, bool processing_thread)
|
||||
{
|
||||
int rc;
|
||||
|
||||
if (!orte_rmcast_base.enable_progress_thread) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
if (rcv_thread) {
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: starting recv thread",
|
||||
@ -84,7 +88,11 @@ void orte_rmcast_base_stop_threads(void)
|
||||
{
|
||||
char byte='s';
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
if (!orte_rmcast_base.enable_progress_thread) {
|
||||
return;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: stopping recv thread",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
/* if the thread is active, stop it */
|
||||
@ -110,20 +118,221 @@ void orte_rmcast_base_stop_threads(void)
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
}
|
||||
|
||||
static void* rcv_processing_thread(opal_object_t *obj)
|
||||
int orte_rmcast_base_process_msg(orte_mcast_msg_event_t *msg)
|
||||
{
|
||||
orte_mcast_msg_event_t *msg;
|
||||
orte_rmcast_channel_t channel;
|
||||
opal_list_item_t *item;
|
||||
rmcast_base_recv_t *ptr;
|
||||
orte_process_name_t name;
|
||||
orte_rmcast_tag_t tag;
|
||||
int8_t flag;
|
||||
struct iovec *iovec_array=NULL;
|
||||
int32_t iovec_count=0, i, n, isz;
|
||||
int rc;
|
||||
int rc=ORTE_SUCCESS;
|
||||
orte_rmcast_seq_t recvd_seq_num;
|
||||
opal_list_item_t *item;
|
||||
|
||||
/* extract the header */
|
||||
if (ORTE_SUCCESS != (rc = extract_hdr(msg->buf, &name, &channel, &tag, &recvd_seq_num))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* if this message is from myself, ignore it */
|
||||
if (name.jobid == ORTE_PROC_MY_NAME->jobid && name.vpid == ORTE_PROC_MY_NAME->vpid) {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv sent from myself: %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name)));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* if this message is from a different job family, ignore it unless
|
||||
* it is on the system channel. We ignore these messages to avoid
|
||||
* confusion between different jobs since we all may be sharing
|
||||
* multicast channels. The system channel is left open to support
|
||||
* cross-job communications for detecting multiple conflicting DVMs.
|
||||
*/
|
||||
if (ORTE_JOB_FAMILY(name.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) &&
|
||||
(ORTE_RMCAST_SYS_CHANNEL != channel)) {
|
||||
/* if we are not the HNP or a daemon, then we ignore this */
|
||||
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv from a different job family: %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name)));
|
||||
} else {
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
/* unpack the iovec vs buf flag */
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &flag, &n, OPAL_INT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv sender: %s channel: %d tag: %d %s seq_num: %lu",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name), channel, (int)tag,
|
||||
(0 == flag) ? "iovecs" : "buffer", recvd_seq_num));
|
||||
|
||||
|
||||
/* find the recv for this channel, tag, and type */
|
||||
for (item = opal_list_get_first(&orte_rmcast_base.recvs);
|
||||
item != opal_list_get_end(&orte_rmcast_base.recvs);
|
||||
item = opal_list_get_next(item)) {
|
||||
ptr = (rmcast_base_recv_t*)item;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv checking channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(int)ptr->channel, (int)ptr->tag));
|
||||
|
||||
if (channel != ptr->channel) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tag != ptr->tag && ORTE_RMCAST_TAG_WILDCARD != ptr->tag) {
|
||||
continue;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv delivering message to channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
||||
|
||||
/* we have a recv - unpack the data */
|
||||
if (0 == flag) {
|
||||
/* get the number of iovecs in the buffer */
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &iovec_count, &n, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
/* malloc the required space */
|
||||
iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec));
|
||||
/* unpack the iovecs */
|
||||
for (i=0; i < iovec_count; i++) {
|
||||
/* unpack the number of bytes in this iovec */
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &isz, &n, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
iovec_array[i].iov_base = NULL;
|
||||
iovec_array[i].iov_len = isz;
|
||||
if (0 < isz) {
|
||||
/* allocate the space */
|
||||
iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(isz);
|
||||
/* unpack the data */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, iovec_array[i].iov_base, &isz, OPAL_UINT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (NULL != ptr->cbfunc_iovec) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv delivering iovecs to channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
||||
ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, recvd_seq_num, tag,
|
||||
&name, iovec_array, iovec_count, ptr->cbdata);
|
||||
/* if it isn't persistent, remove it */
|
||||
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv removing non-persistent recv",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
|
||||
OBJ_RELEASE(ptr);
|
||||
}
|
||||
} else {
|
||||
/* if something is already present, then we have a problem */
|
||||
if (NULL != ptr->iovec_array) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv blocking recv already fulfilled",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
goto cleanup;
|
||||
}
|
||||
ptr->seq_num = recvd_seq_num;
|
||||
/* copy over the iovec array since it will be released by
|
||||
* the blocking recv
|
||||
*/
|
||||
ptr->iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec));
|
||||
ptr->iovec_count = iovec_count;
|
||||
for (i=0; i < iovec_count; i++) {
|
||||
ptr->iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(iovec_array[i].iov_len);
|
||||
ptr->iovec_array[i].iov_len = iovec_array[i].iov_len;
|
||||
memcpy(ptr->iovec_array[i].iov_base, iovec_array[i].iov_base, iovec_array[i].iov_len);
|
||||
}
|
||||
/* flag it as recvd to release blocking recv */
|
||||
ptr->recvd = true;
|
||||
}
|
||||
} else {
|
||||
if (NULL != ptr->cbfunc_buffer) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv delivering buffer to channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
||||
ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, recvd_seq_num, tag,
|
||||
&name, msg->buf, ptr->cbdata);
|
||||
/* if it isn't persistent, remove it */
|
||||
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv removing non-persistent recv",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
|
||||
OBJ_RELEASE(ptr);
|
||||
}
|
||||
} else {
|
||||
/* if something is already present, then we have a problem */
|
||||
if (NULL != ptr->buf) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv blocking recv already fulfilled",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
goto cleanup;
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv copying buffer for blocking recv",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
ptr->seq_num = recvd_seq_num;
|
||||
/* copy the buffer across since it will be released
|
||||
* by the blocking recv
|
||||
*/
|
||||
ptr->buf = OBJ_NEW(opal_buffer_t);
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(ptr->buf, msg->buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
/* flag it as recvd to release blocking recv */
|
||||
ptr->recvd = true;
|
||||
}
|
||||
}
|
||||
/* we are done - only one recv can match */
|
||||
break;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if (NULL != iovec_array) {
|
||||
for (i=0; i < iovec_count; i++) {
|
||||
free(iovec_array[i].iov_base);
|
||||
}
|
||||
free(iovec_array);
|
||||
iovec_array = NULL;
|
||||
iovec_count = 0;
|
||||
}
|
||||
if (NULL != msg) {
|
||||
OBJ_RELEASE(msg);
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
static void* rcv_processing_thread(opal_object_t *obj)
|
||||
{
|
||||
orte_mcast_msg_event_t *msg;
|
||||
char byte;
|
||||
int rc;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base: recv processing thread operational",
|
||||
@ -155,201 +364,13 @@ static void* rcv_processing_thread(opal_object_t *obj)
|
||||
opal_output(0, "%s ERROR PROCESSING MULTICAST MESSAGES",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
goto cleanup;
|
||||
continue;
|
||||
}
|
||||
ORTE_RELEASE_THREAD(&orte_rmcast_base.recv_process_ctl);
|
||||
|
||||
/* extract the header */
|
||||
if (ORTE_SUCCESS != (rc = extract_hdr(msg->buf, &name, &channel, &tag, &recvd_seq_num))) {
|
||||
/* process it - processing function release the msg */
|
||||
if (ORTE_SUCCESS != (rc = orte_rmcast_base_process_msg(msg))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* if this message is from myself, ignore it */
|
||||
if (name.jobid == ORTE_PROC_MY_NAME->jobid && name.vpid == ORTE_PROC_MY_NAME->vpid) {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv sent from myself: %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name)));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* if this message is from a different job family, ignore it unless
|
||||
* it is on the system channel. We ignore these messages to avoid
|
||||
* confusion between different jobs since we all may be sharing
|
||||
* multicast channels. The system channel is left open to support
|
||||
* cross-job communications for detecting multiple conflicting DVMs.
|
||||
*/
|
||||
if (ORTE_JOB_FAMILY(name.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) &&
|
||||
(ORTE_RMCAST_SYS_CHANNEL != channel)) {
|
||||
/* if we are not the HNP or a daemon, then we ignore this */
|
||||
if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) {
|
||||
OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv from a different job family: %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name)));
|
||||
} else {
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
/* unpack the iovec vs buf flag */
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &flag, &n, OPAL_INT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((5, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv sender: %s channel: %d tag: %d %s seq_num: %lu",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
ORTE_NAME_PRINT(&name), channel, (int)tag,
|
||||
(0 == flag) ? "iovecs" : "buffer", recvd_seq_num));
|
||||
|
||||
|
||||
/* find the recv for this channel, tag, and type */
|
||||
for (item = opal_list_get_first(&orte_rmcast_base.recvs);
|
||||
item != opal_list_get_end(&orte_rmcast_base.recvs);
|
||||
item = opal_list_get_next(item)) {
|
||||
ptr = (rmcast_base_recv_t*)item;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv checking channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(int)ptr->channel, (int)ptr->tag));
|
||||
|
||||
if (channel != ptr->channel) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tag != ptr->tag && ORTE_RMCAST_TAG_WILDCARD != ptr->tag) {
|
||||
continue;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv delivering message to channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
||||
|
||||
/* we have a recv - unpack the data */
|
||||
if (0 == flag) {
|
||||
/* get the number of iovecs in the buffer */
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &iovec_count, &n, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
/* malloc the required space */
|
||||
iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec));
|
||||
/* unpack the iovecs */
|
||||
for (i=0; i < iovec_count; i++) {
|
||||
/* unpack the number of bytes in this iovec */
|
||||
n=1;
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, &isz, &n, OPAL_INT32))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
iovec_array[i].iov_base = NULL;
|
||||
iovec_array[i].iov_len = isz;
|
||||
if (0 < isz) {
|
||||
/* allocate the space */
|
||||
iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(isz);
|
||||
/* unpack the data */
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msg->buf, iovec_array[i].iov_base, &isz, OPAL_UINT8))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (NULL != ptr->cbfunc_iovec) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv delivering iovecs to channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
||||
ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, recvd_seq_num, tag,
|
||||
&name, iovec_array, iovec_count, ptr->cbdata);
|
||||
/* if it isn't persistent, remove it */
|
||||
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv removing non-persistent recv",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
|
||||
OBJ_RELEASE(ptr);
|
||||
}
|
||||
} else {
|
||||
/* if something is already present, then we have a problem */
|
||||
if (NULL != ptr->iovec_array) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv blocking recv already fulfilled",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
goto cleanup;
|
||||
}
|
||||
ptr->seq_num = recvd_seq_num;
|
||||
/* copy over the iovec array since it will be released by
|
||||
* the blocking recv
|
||||
*/
|
||||
ptr->iovec_array = (struct iovec *)malloc(iovec_count * sizeof(struct iovec));
|
||||
ptr->iovec_count = iovec_count;
|
||||
for (i=0; i < iovec_count; i++) {
|
||||
ptr->iovec_array[i].iov_base = (IOVBASE_TYPE*)malloc(iovec_array[i].iov_len);
|
||||
ptr->iovec_array[i].iov_len = iovec_array[i].iov_len;
|
||||
memcpy(ptr->iovec_array[i].iov_base, iovec_array[i].iov_base, iovec_array[i].iov_len);
|
||||
}
|
||||
/* flag it as recvd to release blocking recv */
|
||||
ptr->recvd = true;
|
||||
}
|
||||
} else {
|
||||
if (NULL != ptr->cbfunc_buffer) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv delivering buffer to channel %d tag %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ptr->channel, (int)tag));
|
||||
ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, recvd_seq_num, tag,
|
||||
&name, msg->buf, ptr->cbdata);
|
||||
/* if it isn't persistent, remove it */
|
||||
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv removing non-persistent recv",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
opal_list_remove_item(&orte_rmcast_base.recvs, &ptr->item);
|
||||
OBJ_RELEASE(ptr);
|
||||
}
|
||||
} else {
|
||||
/* if something is already present, then we have a problem */
|
||||
if (NULL != ptr->buf) {
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv blocking recv already fulfilled",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
goto cleanup;
|
||||
}
|
||||
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
|
||||
"%s rmcast:base:process_recv copying buffer for blocking recv",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
|
||||
ptr->seq_num = recvd_seq_num;
|
||||
/* copy the buffer across since it will be released
|
||||
* by the blocking recv
|
||||
*/
|
||||
ptr->buf = OBJ_NEW(opal_buffer_t);
|
||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(ptr->buf, msg->buf))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto cleanup;
|
||||
}
|
||||
/* flag it as recvd to release blocking recv */
|
||||
ptr->recvd = true;
|
||||
}
|
||||
}
|
||||
/* we are done - only one recv can match */
|
||||
break;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
if (NULL != iovec_array) {
|
||||
for (i=0; i < iovec_count; i++) {
|
||||
free(iovec_array[i].iov_base);
|
||||
}
|
||||
free(iovec_array);
|
||||
iovec_array = NULL;
|
||||
iovec_count = 0;
|
||||
}
|
||||
if (NULL != msg) {
|
||||
OBJ_RELEASE(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user