1
1

Update the rmcast APIs to include tag params and reorder them to look like their rml cousins

This commit was SVN r22218.
Этот коммит содержится в:
Ralph Castain 2009-11-17 15:58:59 +00:00
родитель 766d56dc0a
Коммит 840766a894
5 изменённых файлов: 77 добавлений и 30 удалений

Просмотреть файл

@ -356,7 +356,10 @@ static int update_nidmap(opal_byte_object_t *bo)
static bool arrived = false;
static bool name_success = false;
static void cbfunc(int channel, orte_process_name_t *sender, opal_buffer_t *buf, void *cbdata)
static void cbfunc(int status,
int channel, orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void *cbdata)
{
int32_t n;
orte_daemon_cmd_flag_t cmd;
@ -453,8 +456,8 @@ static int cm_set_name(void)
/* set the recv to get the answer */
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_SYS_CHANNEL,
ORTE_RMCAST_PERSISTENT,
ORTE_RMCAST_TAG_BOOTSTRAP,
ORTE_RMCAST_PERSISTENT,
cbfunc, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);

Просмотреть файл

@ -144,7 +144,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_mcast_msg_event_t);
nm = htonl(ORTE_PROC_MY_NAME->vpid); \
memcpy((bfr)+4, &nm, 4); \
/* add the tag data, also converted */ \
tmp = htons(snd->tag); \
tmp = htons((tg)); \
memcpy((bfr)+8, &tmp, 2); \
} while(0);

Просмотреть файл

@ -87,8 +87,8 @@ static int basic_recv_buffer(orte_process_name_t *sender,
opal_buffer_t *buf);
static int basic_recv_buffer_nb(orte_rmcast_channel_t channel,
orte_rmcast_flag_t flags,
orte_rmcast_tag_t tag,
orte_rmcast_flag_t flags,
orte_rmcast_callback_buffer_fn_t cbfunc,
void *cbdata);
@ -98,8 +98,8 @@ static int basic_recv(orte_process_name_t *sender,
struct iovec **msg, int *count);
static int basic_recv_nb(orte_rmcast_channel_t channel,
orte_rmcast_flag_t flags,
orte_rmcast_tag_t tag,
orte_rmcast_flag_t flags,
orte_rmcast_callback_fn_t cbfunc,
void *cbdata);
@ -244,14 +244,18 @@ static void finalize(void)
/* internal blocking send support */
static bool send_complete, send_buf_complete;
static void internal_snd_cb(orte_rmcast_channel_t channel,
static void internal_snd_cb(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
struct iovec *msg, int count, void *cbdata)
{
send_complete = true;
}
static void internal_snd_buf_cb(orte_rmcast_channel_t channel,
static void internal_snd_buf_cb(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void *cbdata)
{
@ -464,9 +468,11 @@ static int queue_recv(rmcast_base_recv_t *recvptr,
OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output,
"%s rmcast:basic: matching recv_nb already active on multicast channel %03d.%03d.%03d.%03d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network)));
return ORTE_SUCCESS;
OPAL_THREAD_UNLOCK(&lock);
return ORTE_EXISTS;
}
}
OPAL_THREAD_UNLOCK(&lock);
}
OPAL_THREAD_LOCK(&lock);
@ -515,8 +521,8 @@ static int basic_recv(orte_process_name_t *name,
}
static int basic_recv_nb(orte_rmcast_channel_t channel,
orte_rmcast_flag_t flags,
orte_rmcast_tag_t tag,
orte_rmcast_flag_t flags,
orte_rmcast_callback_fn_t cbfunc, void *cbdata)
{
rmcast_base_recv_t *recvptr;
@ -533,6 +539,11 @@ static int basic_recv_nb(orte_rmcast_channel_t channel,
recvptr->cbdata = cbdata;
if (ORTE_SUCCESS != (ret = queue_recv(recvptr, channel, tag, cbfunc, NULL, true))) {
if (ORTE_EXISTS == ret) {
/* this recv already exists - just release the copy */
OBJ_RELEASE(recvptr);
return ORTE_SUCCESS;
}
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(recvptr);
return ret;
@ -586,8 +597,8 @@ cleanup:
}
static int basic_recv_buffer_nb(orte_rmcast_channel_t channel,
orte_rmcast_flag_t flags,
orte_rmcast_tag_t tag,
orte_rmcast_flag_t flags,
orte_rmcast_callback_buffer_fn_t cbfunc, void *cbdata)
{
rmcast_base_recv_t *recvptr;
@ -605,10 +616,12 @@ static int basic_recv_buffer_nb(orte_rmcast_channel_t channel,
recvptr->cbdata = cbdata;
if (ORTE_SUCCESS != (ret = queue_recv(recvptr, channel, tag, NULL, cbfunc, false))) {
if (ORTE_EXISTS == ret) {
/* this recv already exists - just release the copy */
OBJ_RELEASE(recvptr);
return ORTE_SUCCESS;
}
ORTE_ERROR_LOG(ret);
OPAL_THREAD_LOCK(&lock);
opal_list_remove_item(&recvs, &recvptr->item);
OPAL_THREAD_UNLOCK(&lock);
OBJ_RELEASE(recvptr);
return ret;
}
@ -869,7 +882,8 @@ static void process_recv(int fd, short event, void *cbdata)
if (0 == flag) {
/* dealing with iovecs */
if (NULL != ptr->cbfunc_iovec) {
ptr->cbfunc_iovec(ptr->channel, &name, iovec_array, iovec_count, ptr->cbdata);
ptr->cbfunc_iovec(ORTE_SUCCESS, ptr->channel, tag,
&name, iovec_array, iovec_count, ptr->cbdata);
/* if it isn't persistent, remove it */
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
OPAL_THREAD_LOCK(&lock);
@ -893,7 +907,7 @@ static void process_recv(int fd, short event, void *cbdata)
}
} else {
if (NULL != ptr->cbfunc_buffer) {
ptr->cbfunc_buffer(ptr->channel, &name, recvd_buf, ptr->cbdata);
ptr->cbfunc_buffer(ORTE_SUCCESS, ptr->channel, tag, &name, recvd_buf, ptr->cbdata);
/* if it isn't persistent, remove it */
if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) {
OPAL_THREAD_LOCK(&lock);
@ -1130,7 +1144,7 @@ static void xmit_data(int sd, short flags, void* send_req)
snd = (rmcast_base_send_t*)item;
/* start the send data area with our header */
ORTE_MULTICAST_MESSAGE_HDR_HTON(chan->send_data, tag);
ORTE_MULTICAST_MESSAGE_HDR_HTON(chan->send_data, snd->tag);
/* are we sending a buffer? */
if (NULL == snd->buf) {
@ -1226,12 +1240,12 @@ static void xmit_data(int sd, short flags, void* send_req)
/* call the cbfunc if required */
if (NULL != snd->cbfunc_buffer) {
snd->cbfunc_buffer(chan->channel, ORTE_PROC_MY_NAME, snd->buf, snd->cbdata);
snd->cbfunc_buffer(rc, chan->channel, snd->tag, ORTE_PROC_MY_NAME, snd->buf, snd->cbdata);
}
} else {
/* call the cbfunc if required */
if (NULL != snd->cbfunc_iovec) {
snd->cbfunc_iovec(chan->channel, ORTE_PROC_MY_NAME,
snd->cbfunc_iovec(rc, chan->channel, snd->tag, ORTE_PROC_MY_NAME,
snd->iovec_array, snd->iovec_count, snd->cbdata);
}
}

Просмотреть файл

@ -23,6 +23,12 @@
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
#include "opal/mca/mca.h"
#include "opal/dss/dss_types.h"
@ -38,11 +44,15 @@ BEGIN_C_DECLS
/**
* Function prototypes for callback from receiving multicast messages
*/
typedef void (*orte_rmcast_callback_buffer_fn_t)(orte_rmcast_channel_t channel,
typedef void (*orte_rmcast_callback_buffer_fn_t)(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void* cbdata);
typedef void (*orte_rmcast_callback_fn_t)(orte_rmcast_channel_t channel,
typedef void (*orte_rmcast_callback_fn_t)(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
struct iovec *msg, int count, void* cbdata);
@ -84,8 +94,8 @@ typedef int (*orte_rmcast_base_module_recv_buffer_fn_t)(orte_process_name_t *sen
/* non-blocking receive buffer messages from a multicast channel */
typedef int (*orte_rmcast_base_module_recv_buffer_nb_fn_t)(orte_rmcast_channel_t channel,
orte_rmcast_flag_t flags,
orte_rmcast_tag_t tag,
orte_rmcast_flag_t flags,
orte_rmcast_callback_buffer_fn_t cbfunc,
void *cbdata);
@ -97,8 +107,8 @@ typedef int (*orte_rmcast_base_module_recv_fn_t)(orte_process_name_t *sender,
/* non-blocking receive iovec messages from a multicast channel */
typedef int (*orte_rmcast_base_module_recv_nb_fn_t)(orte_rmcast_channel_t channel,
orte_rmcast_flag_t flags,
orte_rmcast_tag_t tag,
orte_rmcast_flag_t flags,
orte_rmcast_callback_fn_t cbfunc,
void *cbdata);

Просмотреть файл

@ -18,10 +18,20 @@
#include "orte/mca/rmcast/rmcast.h"
#include "orte/mca/grpcomm/grpcomm.h"
static void cbfunc(orte_rmcast_channel_t channel, orte_process_name_t *sender, opal_buffer_t *buf, void *cbdata);
static void cbfunc_buf_snt(orte_rmcast_channel_t channel, orte_process_name_t *sender, opal_buffer_t *buf, void *cbdata);
static void cbfunc(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void *cbdata);
static void cbfunc_buf_snt(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void *cbdata);
static void cbfunc_iovec(orte_rmcast_channel_t channel,
static void cbfunc_iovec(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
struct iovec *msg, int count, void* cbdata);
@ -89,14 +99,14 @@ int main(int argc, char* argv[])
return 0;
} else {
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_buffer_nb(ORTE_RMCAST_APP_PUBLIC_CHANNEL,
ORTE_RMCAST_PERSISTENT,
ORTE_RMCAST_TAG_WILDCARD,
ORTE_RMCAST_PERSISTENT,
cbfunc, NULL))) {
ORTE_ERROR_LOG(rc);
}
if (ORTE_SUCCESS != (rc = orte_rmcast.recv_nb(ORTE_RMCAST_APP_PUBLIC_CHANNEL,
ORTE_RMCAST_PERSISTENT,
ORTE_RMCAST_TAG_WILDCARD,
ORTE_RMCAST_PERSISTENT,
cbfunc_iovec, NULL))) {
ORTE_ERROR_LOG(rc);
}
@ -112,7 +122,11 @@ blast:
return 0;
}
static void cbfunc(orte_rmcast_channel_t channel, orte_process_name_t *sender, opal_buffer_t *buffer, void *cbdata)
static void cbfunc(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buffer, void *cbdata)
{
int32_t i32, rc;
@ -150,7 +164,9 @@ static void cbfunc(orte_rmcast_channel_t channel, orte_process_name_t *sender, o
#endif
}
static void cbfunc_iovec(orte_rmcast_channel_t channel,
static void cbfunc_iovec(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
struct iovec *msg, int count, void* cbdata)
{
@ -173,7 +189,11 @@ static void cbfunc_iovec(orte_rmcast_channel_t channel,
exit(0);
}
static void cbfunc_buf_snt(orte_rmcast_channel_t channel, orte_process_name_t *sender, opal_buffer_t *buf, void *cbdata)
static void cbfunc_buf_snt(int status,
orte_rmcast_channel_t channel,
orte_rmcast_tag_t tag,
orte_process_name_t *sender,
opal_buffer_t *buf, void *cbdata)
{
fprintf(stderr, "%s BUFFERED_NB SEND COMPLETE\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
fflush(stderr);