diff --git a/orte/mca/ras/base/ras_base_allocate.c b/orte/mca/ras/base/ras_base_allocate.c index 8660aae806..b04a265f26 100644 --- a/orte/mca/ras/base/ras_base_allocate.c +++ b/orte/mca/ras/base/ras_base_allocate.c @@ -139,12 +139,10 @@ int orte_ras_base_allocate(orte_job_t *jdata) if (ORTE_SUCCESS != (rc = orte_ras_base.active_module->allocate(&nodes))) { if (ORTE_ERR_SYSTEM_WILL_BOOTSTRAP == rc) { /* this module indicates that nodes will be discovered - * on a bootstrap basis, so there is nothing more - * for us to do + * on a bootstrap basis, so all we do here is add our + * own node to the list */ - OBJ_DESTRUCT(&nodes); - rc = ORTE_SUCCESS; - goto DISPLAY; + goto addlocal; } ORTE_ERROR_LOG(rc); OBJ_DESTRUCT(&nodes); @@ -365,6 +363,7 @@ int orte_ras_base_allocate(orte_job_t *jdata) "%s ras:base:allocate nothing found in rankfile - inserting current node", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); +addlocal: /* if nothing was found by any of the above methods, then we have no * earthly idea what to do - so just add the local host */ diff --git a/orte/mca/rmcast/base/Makefile.am b/orte/mca/rmcast/base/Makefile.am index 75abbdd0ff..b4d60bc7cc 100644 --- a/orte/mca/rmcast/base/Makefile.am +++ b/orte/mca/rmcast/base/Makefile.am @@ -10,7 +10,8 @@ dist_pkgdata_DATA += base/help-rmcast-base.txt headers += \ - base/base.h + base/base.h \ + base/private.h libmca_rmcast_la_SOURCES += \ base/rmcast_base_open.c diff --git a/orte/mca/rmcast/base/base.h b/orte/mca/rmcast/base/base.h index caef895e6c..f005934789 100644 --- a/orte/mca/rmcast/base/base.h +++ b/orte/mca/rmcast/base/base.h @@ -21,6 +21,8 @@ #include #endif +#include "opal/event/event.h" + #include "orte/mca/rmcast/rmcast.h" BEGIN_C_DECLS @@ -29,22 +31,6 @@ ORTE_DECLSPEC int orte_rmcast_base_open(void); #if !ORTE_DISABLE_FULL_SUPPORT -/* - * globals that might be needed - */ -typedef struct { - int rmcast_output; - opal_list_t rmcast_opened; - uint32_t xmit_network; - char *my_group_name; - uint8_t my_group_number; - uint32_t interface; - uint16_t ports[256]; -} orte_rmcast_base_t; - -ORTE_DECLSPEC extern orte_rmcast_base_t orte_rmcast_base; - - /* * function definitions */ diff --git a/orte/mca/rmcast/base/private.h b/orte/mca/rmcast/base/private.h new file mode 100644 index 0000000000..2f5064c051 --- /dev/null +++ b/orte/mca/rmcast/base/private.h @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** @file: + */ + +#ifndef ORTE_MCA_RMCAST_PRIVATE_H +#define ORTE_MCA_RMCAST_PRIVATE_H + +/* + * includes + */ +#include "orte_config.h" + +#ifdef HAVE_NETINET_IN_H +#include +#endif +#ifdef HAVE_ARPA_INET_H +#include +#endif + +#include "opal/event/event.h" +#include "opal/class/opal_list.h" + +#include "orte/mca/rmcast/rmcast.h" + +BEGIN_C_DECLS + +#if !ORTE_DISABLE_FULL_SUPPORT + +#define CLOSE_THE_SOCKET(socket) \ + do { \ + shutdown(socket, 2); \ + close(socket); \ + } while(0) + + + +/* + * globals that might be needed + */ +typedef struct { + int rmcast_output; + opal_list_t rmcast_opened; + uint32_t xmit_network; + char *my_group_name; + uint8_t my_group_number; + uint32_t interface; + uint16_t ports[256]; +} orte_rmcast_base_t; + +ORTE_DECLSPEC extern orte_rmcast_base_t orte_rmcast_base; + + +/**** CLASS DEFINITIONS ****/ +/* + * Data structure for tracking assigned channels + */ +typedef struct { + opal_list_item_t item; + char *name; + orte_rmcast_channel_t channel; + uint32_t network; + uint16_t port; + uint32_t interface; + int xmit; + int recv; + struct sockaddr_in addr; + opal_event_t send_ev; + opal_mutex_t send_lock; + bool sends_in_progress; + opal_list_t pending_sends; + uint8_t *send_data; + opal_event_t recv_ev; +} rmcast_base_channel_t; +ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_base_channel_t); + + +/* + * Data structure for tracking registered non-blocking recvs + */ +typedef struct { + opal_list_item_t item; + orte_rmcast_channel_t channel; + bool recvd; + opal_buffer_t *data; + orte_rmcast_tag_t tag; + orte_rmcast_flag_t flags; + orte_rmcast_callback_fn_t cbfunc; + void *cbdata; +} rmcast_base_recv_t; +ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_base_recv_t); + + +/* + * Data structure for tracking pending sends + */ +typedef struct { + opal_list_item_t item; + bool send_complete; + opal_buffer_t *data; + orte_rmcast_tag_t tag; + orte_rmcast_callback_fn_t cbfunc; + void *cbdata; +} rmcast_base_send_t; +ORTE_DECLSPEC OBJ_CLASS_DECLARATION(rmcast_base_send_t); + + +/* Setup an event to process a multicast message + * + * Multicast messages can come at any time and rate. To minimize + * the probability of loss, and to avoid conflict when we send + * data when responding to an input message, we use a timer + * event to break out of the recv and process the message later + */ +typedef struct { + opal_object_t super; + opal_event_t *ev; + uint8_t *data; + ssize_t sz; + rmcast_base_channel_t *channel; +} orte_mcast_msg_event_t; +ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_mcast_msg_event_t); + +#define ORTE_MULTICAST_MESSAGE_EVENT(dat, n, chan, cbfunc) \ + do { \ + orte_mcast_msg_event_t *mev; \ + struct timeval now; \ + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \ + "defining mcast msg event: %s %d", \ + __FILE__, __LINE__)); \ + mev = OBJ_NEW(orte_mcast_msg_event_t); \ + mev->data = (dat); \ + mev->sz = (n); \ + mev->channel = (chan); \ + opal_evtimer_set(mev->ev, (cbfunc), mev); \ + now.tv_sec = 0; \ + now.tv_usec = 0; \ + opal_evtimer_add(mev->ev, &now); \ + } while(0); + + + +#endif /* ORTE_DISABLE_FULL_SUPPORT */ + +END_C_DECLS + +#endif diff --git a/orte/mca/rmcast/base/rmcast_base_close.c b/orte/mca/rmcast/base/rmcast_base_close.c index 56ec261801..b58a17a102 100644 --- a/orte/mca/rmcast/base/rmcast_base_close.c +++ b/orte/mca/rmcast/base/rmcast_base_close.c @@ -16,6 +16,7 @@ #include "opal/mca/base/base.h" #include "orte/mca/rmcast/base/base.h" +#include "orte/mca/rmcast/base/private.h" int orte_rmcast_base_close(void) { diff --git a/orte/mca/rmcast/base/rmcast_base_open.c b/orte/mca/rmcast/base/rmcast_base_open.c index 04efb086be..2a4c50dd92 100644 --- a/orte/mca/rmcast/base/rmcast_base_open.c +++ b/orte/mca/rmcast/base/rmcast_base_open.c @@ -29,14 +29,14 @@ #include "orte/mca/errmgr/errmgr.h" #include "orte/util/parse_options.h" #include "orte/util/show_help.h" - #include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/rmcast/base/private.h" + #endif #include "orte/mca/rmcast/base/base.h" - /* * The following file was created by configure. It contains extern * statements and the definition of an array of pointers to each @@ -256,4 +256,101 @@ int orte_rmcast_base_open(void) return ORTE_SUCCESS; } + +/**** CLASS INSTANCES ****/ +static void mcast_event_constructor(orte_mcast_msg_event_t *ev) +{ + ev->ev = (opal_event_t*)malloc(sizeof(opal_event_t)); + ev->data = NULL; +} +static void mcast_event_destructor(orte_mcast_msg_event_t *ev) +{ + if (NULL != ev->ev) { + free(ev->ev); + } + if (NULL != ev->data) { + free(ev->data); + } +} +OBJ_CLASS_INSTANCE(orte_mcast_msg_event_t, + opal_object_t, + mcast_event_constructor, + mcast_event_destructor); + +static void send_construct(rmcast_base_send_t *ptr) +{ + ptr->send_complete = false; + ptr->data = NULL; + ptr->tag = ORTE_RMCAST_TAG_INVALID; + ptr->cbfunc = NULL; + ptr->cbdata = NULL; +} +OBJ_CLASS_INSTANCE(rmcast_base_send_t, + opal_list_item_t, + send_construct, + NULL); + +static void recv_construct(rmcast_base_recv_t *ptr) +{ + ptr->channel = ORTE_RMCAST_INVALID_CHANNEL; + ptr->recvd = false; + ptr->data = NULL; + ptr->tag = ORTE_RMCAST_TAG_INVALID; + ptr->flags = ORTE_RMCAST_NON_PERSISTENT; /* default */ + ptr->cbfunc = NULL; + ptr->cbdata = NULL; +} +static void recv_destruct(rmcast_base_recv_t *ptr) +{ + if (NULL != ptr->data) { + OBJ_RELEASE(ptr->data); + } +} +OBJ_CLASS_INSTANCE(rmcast_base_recv_t, + opal_list_item_t, + recv_construct, + recv_destruct); + +static void channel_construct(rmcast_base_channel_t *ptr) +{ + ptr->name = NULL; + ptr->channel = ORTE_RMCAST_INVALID_CHANNEL; + ptr->network = 0; + ptr->port = 0; + ptr->interface = 0; + ptr->xmit = -1; + ptr->recv = -1; + memset(&ptr->addr, 0, sizeof(ptr->addr)); + OBJ_CONSTRUCT(&ptr->send_lock, opal_mutex_t); + ptr->sends_in_progress = false; + OBJ_CONSTRUCT(&ptr->pending_sends, opal_list_t); + ptr->send_data = NULL; +} +static void channel_destruct(rmcast_base_channel_t *ptr) +{ + /* cleanup the recv side */ + opal_event_del(&ptr->recv_ev); + if (0 < ptr->recv) { + CLOSE_THE_SOCKET(ptr->recv); + } + /* attempt to xmit any pending sends */ + /* cleanup the xmit side */ + opal_event_del(&ptr->send_ev); + if (0 < ptr->xmit) { + CLOSE_THE_SOCKET(ptr->xmit); + } + OBJ_DESTRUCT(&ptr->send_lock); + /* release the channel name */ + if (NULL != ptr->name) { + free(ptr->name); + } + if (NULL != ptr->send_data) { + free(ptr->send_data); + } +} +OBJ_CLASS_INSTANCE(rmcast_base_channel_t, + opal_list_item_t, + channel_construct, + channel_destruct); + #endif /* ORTE_DISABLE_FULL_SUPPORT */ diff --git a/orte/mca/rmcast/base/rmcast_base_select.c b/orte/mca/rmcast/base/rmcast_base_select.c index bca84de58c..2830bf630d 100644 --- a/orte/mca/rmcast/base/rmcast_base_select.c +++ b/orte/mca/rmcast/base/rmcast_base_select.c @@ -16,6 +16,7 @@ #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/rmcast/base/base.h" +#include "orte/mca/rmcast/base/private.h" static bool selected = false; diff --git a/orte/mca/rmcast/basic/rmcast_basic.c b/orte/mca/rmcast/basic/rmcast_basic.c index bae685cd15..fcc15f9170 100644 --- a/orte/mca/rmcast/basic/rmcast_basic.c +++ b/orte/mca/rmcast/basic/rmcast_basic.c @@ -33,7 +33,7 @@ #include "orte/util/name_fns.h" #include "orte/util/show_help.h" -#include "orte/mca/rmcast/base/base.h" +#include "orte/mca/rmcast/base/private.h" #include "rmcast_basic.h" /* LOCAL DATA */ @@ -43,156 +43,17 @@ static opal_list_t channels; static bool init_completed = false; static orte_rmcast_channel_t next_channel; -#define CLOSE_THE_SOCKET(socket) \ - do { \ - shutdown(socket, 2); \ - close(socket); \ - } while(0) - - -/* - * Data structure for tracking assigned channels - */ -typedef struct { - opal_list_item_t item; - char *name; - orte_rmcast_channel_t channel; - uint32_t network; - uint16_t port; - uint32_t interface; - int xmit; - int recv; - struct sockaddr_in addr; - opal_event_t send_ev; - opal_mutex_t send_lock; - bool sends_in_progress; - opal_list_t pending_sends; - uint8_t *send_data; - opal_event_t recv_ev; - uint8_t *recvd_data; -} rmcast_basic_channel_t; - -static void channel_construct(rmcast_basic_channel_t *ptr) -{ - ptr->name = NULL; - ptr->channel = ORTE_RMCAST_INVALID_CHANNEL; - ptr->network = 0; - ptr->port = 0; - ptr->interface = 0; - ptr->xmit = -1; - ptr->recv = -1; - memset(&ptr->addr, 0, sizeof(ptr->addr)); - OBJ_CONSTRUCT(&ptr->send_lock, opal_mutex_t); - ptr->sends_in_progress = false; - OBJ_CONSTRUCT(&ptr->pending_sends, opal_list_t); - ptr->send_data = NULL; - ptr->recvd_data = NULL; - OPAL_THREAD_LOCK(&lock); - opal_list_append(&channels, &ptr->item); - OPAL_THREAD_UNLOCK(&lock); -} -static void channel_destruct(rmcast_basic_channel_t *ptr) -{ - /* cleanup the recv side */ - opal_event_del(&ptr->recv_ev); - if (0 < ptr->recv) { - CLOSE_THE_SOCKET(ptr->recv); - } - if (NULL != ptr->recvd_data) { - free(ptr->recvd_data); - } - /* attempt to xmit any pending sends */ - /* cleanup the xmit side */ - opal_event_del(&ptr->send_ev); - if (0 < ptr->xmit) { - CLOSE_THE_SOCKET(ptr->xmit); - } - OBJ_DESTRUCT(&ptr->send_lock); - /* release the channel name */ - if (NULL != ptr->name) { - free(ptr->name); - } - if (NULL != ptr->send_data) { - free(ptr->send_data); - } -} -OBJ_CLASS_INSTANCE(rmcast_basic_channel_t, - opal_list_item_t, - channel_construct, - channel_destruct); - -/* - * Data structure for tracking registered non-blocking recvs - */ -typedef struct { - opal_list_item_t item; - orte_rmcast_channel_t channel; - bool recvd; - opal_buffer_t *data; - orte_rmcast_tag_t tag; - orte_rmcast_flag_t flags; - orte_rmcast_callback_fn_t cbfunc; - void *cbdata; -} rmcast_basic_recv_t; - -static void recv_construct(rmcast_basic_recv_t *ptr) -{ - ptr->channel = ORTE_RMCAST_INVALID_CHANNEL; - ptr->recvd = false; - ptr->data = NULL; - ptr->tag = ORTE_RMCAST_TAG_INVALID; - ptr->flags = ORTE_RMCAST_NON_PERSISTENT; /* default */ - ptr->cbfunc = NULL; - ptr->cbdata = NULL; - OPAL_THREAD_LOCK(&lock); - opal_list_append(&recvs, &ptr->item); - OPAL_THREAD_UNLOCK(&lock); -} -static void recv_destruct(rmcast_basic_recv_t *ptr) -{ - if (NULL != ptr->data) { - OBJ_RELEASE(ptr->data); - } -} -OBJ_CLASS_INSTANCE(rmcast_basic_recv_t, - opal_list_item_t, - recv_construct, - recv_destruct); - -/* - * Data structure for tracking pending sends - */ -typedef struct { - opal_list_item_t item; - bool send_complete; - opal_buffer_t *data; - orte_rmcast_tag_t tag; - orte_rmcast_callback_fn_t cbfunc; - void *cbdata; -} rmcast_basic_send_t; - -static void send_construct(rmcast_basic_send_t *ptr) -{ - ptr->send_complete = false; - ptr->data = NULL; - ptr->tag = ORTE_RMCAST_TAG_INVALID; - ptr->cbfunc = NULL; - ptr->cbdata = NULL; -} -OBJ_CLASS_INSTANCE(rmcast_basic_send_t, - opal_list_item_t, - send_construct, - NULL); - /* LOCAL FUNCTIONS */ static void recv_handler(int sd, short flags, void* user); -static int setup_channel(rmcast_basic_channel_t *chan, uint8_t direction); +static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction); -static int setup_socket(int *sd, rmcast_basic_channel_t *chan, bool recvsocket); +static int setup_socket(int *sd, rmcast_base_channel_t *chan, bool recvsocket); static void xmit_data(int sd, short flags, void* send_req); +/* LOCAL STRUCTURE VALUES */ +static rmcast_base_channel_t *my_group_channel=NULL; /* API FUNCTIONS */ static int init(void); @@ -226,11 +87,7 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, static int close_channel(orte_rmcast_channel_t channel); -/* The API's in this module are solely used to support LOCAL - * procs - i.e., procs that are co-located to the HNP. Remote - * procs interact with the HNP's IOF via the HNP's receive function, - * which operates independently and is in the rmcast_basic_receive.c file - */ +/* Define the module */ orte_rmcast_module_t orte_rmcast_basic_module = { init, @@ -265,7 +122,7 @@ orte_rmcast_module_t orte_rmcast_basic_module = { */ static int init(void) { - rmcast_basic_channel_t *chan; + rmcast_base_channel_t *chan; int rc; if (init_completed) { @@ -283,7 +140,7 @@ static int init(void) next_channel = ORTE_RMCAST_DYNAMIC_CHANNELS; /* setup the respective public address channel */ - chan = OBJ_NEW(rmcast_basic_channel_t); + chan = OBJ_NEW(rmcast_base_channel_t); if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_TOOL) { chan->name = strdup("system"); chan->channel = ORTE_RMCAST_SYS_CHANNEL; @@ -300,6 +157,9 @@ static int init(void) opal_output(0, "rmcast:basic:init - unknown process type"); return ORTE_ERR_SILENT; } + OPAL_THREAD_LOCK(&lock); + opal_list_append(&channels, &chan->item); + OPAL_THREAD_UNLOCK(&lock); if (ORTE_SUCCESS != (rc = setup_channel(chan, ORTE_RMCAST_BIDIR))) { ORTE_ERROR_LOG(rc); return rc; @@ -307,16 +167,20 @@ static int init(void) /* finally, if we are an app, setup our grp channel, if one was given */ if (ORTE_PROC_IS_APP && NULL != orte_rmcast_base.my_group_name) { - chan = OBJ_NEW(rmcast_basic_channel_t); + chan = OBJ_NEW(rmcast_base_channel_t); chan->name = strdup(orte_rmcast_base.my_group_name); chan->channel = orte_rmcast_base.my_group_number; chan->network = orte_rmcast_base.xmit_network + orte_rmcast_base.my_group_number; chan->port = orte_rmcast_base.ports[orte_rmcast_base.my_group_number]; chan->interface = orte_rmcast_base.interface; + OPAL_THREAD_LOCK(&lock); + opal_list_append(&channels, &chan->item); + OPAL_THREAD_UNLOCK(&lock); if (ORTE_SUCCESS != (rc = setup_channel(chan, ORTE_RMCAST_BIDIR))) { ORTE_ERROR_LOG(rc); return rc; } + my_group_channel = chan; } return ORTE_SUCCESS; @@ -349,7 +213,7 @@ static void finalize(void) /* internal blocking send support */ static void internal_snd_cb(orte_rmcast_channel_t channel, opal_buffer_t *buf, void *cbdata) { - rmcast_basic_send_t *snd = (rmcast_basic_send_t*)cbdata; + rmcast_base_send_t *snd = (rmcast_base_send_t*)cbdata; snd->send_complete = true; } @@ -359,15 +223,26 @@ static int basic_send(orte_rmcast_channel_t channel, opal_buffer_t *buf) { opal_list_item_t *item; - rmcast_basic_channel_t *chptr, *ch; - rmcast_basic_send_t *snd; + rmcast_base_channel_t *chptr, *ch; + rmcast_base_send_t *snd; + + /* if we were asked to send this on our group output + * channel, substitute it + */ + if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) { + if (NULL == my_group_channel) { + return ORTE_ERR_NOT_FOUND; + } + ch = my_group_channel; + goto process; + } /* find the channel */ ch = NULL; for (item = opal_list_get_first(&channels); item != opal_list_get_end(&channels); item = opal_list_get_next(item)) { - chptr = (rmcast_basic_channel_t*)item; + chptr = (rmcast_base_channel_t*)item; if (channel == chptr->channel) { ch = chptr; break; @@ -378,6 +253,7 @@ static int basic_send(orte_rmcast_channel_t channel, return ORTE_ERR_NOT_FOUND; } +process: OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, "%s rmcast:basic: send of %lu bytes" " called on multicast channel %03d.%03d.%03d.%03d %0x", @@ -397,7 +273,7 @@ static int basic_send(orte_rmcast_channel_t channel, } /* queue it to be sent - preserves order! */ - snd = OBJ_NEW(rmcast_basic_send_t); + snd = OBJ_NEW(rmcast_base_send_t); snd->data = buf; snd->tag = tag; snd->cbfunc = internal_snd_cb; @@ -427,15 +303,26 @@ static int basic_send_nb(orte_rmcast_channel_t channel, void *cbdata) { opal_list_item_t *item; - rmcast_basic_channel_t *chptr, *ch; - rmcast_basic_send_t *snd; + rmcast_base_channel_t *chptr, *ch; + rmcast_base_send_t *snd; + + /* if we were asked to send this on our group output + * channel, substitute it + */ + if (ORTE_RMCAST_GROUP_OUTPUT_CHANNEL == channel) { + if (NULL == my_group_channel) { + return ORTE_ERR_NOT_FOUND; + } + ch = my_group_channel; + goto process; + } /* find the channel */ ch = NULL; for (item = opal_list_get_first(&channels); item != opal_list_get_end(&channels); item = opal_list_get_next(item)) { - chptr = (rmcast_basic_channel_t*)item; + chptr = (rmcast_base_channel_t*)item; if (channel == chptr->channel) { ch = chptr; break; @@ -446,6 +333,7 @@ static int basic_send_nb(orte_rmcast_channel_t channel, return ORTE_ERR_NOT_FOUND; } +process: OPAL_OUTPUT_VERBOSE((0, orte_rmcast_base.rmcast_output, "%s rmcast:basic: send_nb of %lu bytes" " called on multicast channel %03d.%03d.%03d.%03d %0x", @@ -464,7 +352,7 @@ static int basic_send_nb(orte_rmcast_channel_t channel, } /* queue it to be sent - preserves order! */ - snd = OBJ_NEW(rmcast_basic_send_t); + snd = OBJ_NEW(rmcast_base_send_t); snd->data = buf; snd->tag = tag; snd->cbfunc = cbfunc; @@ -489,15 +377,15 @@ static int basic_recv(orte_rmcast_channel_t channel, opal_buffer_t *buf) { opal_list_item_t *item; - rmcast_basic_recv_t *recvptr; - rmcast_basic_channel_t *ch, *chptr; + rmcast_base_recv_t *recvptr; + rmcast_base_channel_t *ch, *chptr; /* find the channel */ ch = NULL; for (item = opal_list_get_first(&channels); item != opal_list_get_end(&channels); item = opal_list_get_next(item)) { - chptr = (rmcast_basic_channel_t*)item; + chptr = (rmcast_base_channel_t*)item; if (channel == chptr->channel) { ch = chptr; break; @@ -512,9 +400,12 @@ static int basic_recv(orte_rmcast_channel_t channel, "%s rmcast:basic: recv called on multicast channel %03d.%03d.%03d.%03d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network))); - recvptr = OBJ_NEW(rmcast_basic_recv_t); + recvptr = OBJ_NEW(rmcast_base_recv_t); recvptr->channel = channel; recvptr->tag = tag; + OPAL_THREAD_LOCK(&lock); + opal_list_append(&recvs, &recvptr->item); + OPAL_THREAD_UNLOCK(&lock); ORTE_PROGRESSED_WAIT(recvptr->recvd, 0, 1); @@ -533,15 +424,15 @@ static int basic_recv_nb(orte_rmcast_channel_t channel, orte_rmcast_callback_fn_t cbfunc, void *cbdata) { opal_list_item_t *item; - rmcast_basic_recv_t *recvptr; - rmcast_basic_channel_t *ch, *chptr; + rmcast_base_recv_t *recvptr; + rmcast_base_channel_t *ch, *chptr; /* find the channel */ ch = NULL; for (item = opal_list_get_first(&channels); item != opal_list_get_end(&channels); item = opal_list_get_next(item)) { - chptr = (rmcast_basic_channel_t*)item; + chptr = (rmcast_base_channel_t*)item; if (channel == chptr->channel) { ch = chptr; break; @@ -556,12 +447,15 @@ static int basic_recv_nb(orte_rmcast_channel_t channel, "%s rmcast:basic: recv_nb called on multicast channel %03d.%03d.%03d.%03d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), OPAL_IF_FORMAT_ADDR(ch->network))); - recvptr = OBJ_NEW(rmcast_basic_recv_t); + recvptr = OBJ_NEW(rmcast_base_recv_t); recvptr->channel = channel; recvptr->tag = tag; recvptr->flags = flags; recvptr->cbfunc = cbfunc; recvptr->cbdata = cbdata; + OPAL_THREAD_LOCK(&lock); + opal_list_append(&recvs, &recvptr->item); + OPAL_THREAD_UNLOCK(&lock); return ORTE_SUCCESS; } @@ -570,14 +464,14 @@ static void cancel_recv(orte_rmcast_channel_t channel, orte_rmcast_tag_t tag) { opal_list_item_t *item, *next; - rmcast_basic_recv_t *ptr; + rmcast_base_recv_t *ptr; /* find all recv's for this channel and tag */ item = opal_list_get_first(&recvs); while (item != opal_list_get_end(&recvs)) { next = opal_list_get_next(item); - ptr = (rmcast_basic_recv_t*)item; + ptr = (rmcast_base_recv_t*)item; if (channel == ptr->channel && tag == ptr->tag) { OPAL_THREAD_LOCK(&lock); @@ -593,7 +487,7 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, char *network, int port, char *interface, uint8_t direction) { opal_list_item_t *item; - rmcast_basic_channel_t *nchan, *chan; + rmcast_base_channel_t *nchan, *chan; uint32_t netaddr=0, netmask=0, intr=0; int rc; @@ -618,7 +512,7 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, for (item = opal_list_get_first(&channels); item != opal_list_get_end(&channels); item = opal_list_get_next(item)) { - nchan = (rmcast_basic_channel_t*)item; + nchan = (rmcast_base_channel_t*)item; if (0 == strcasecmp(nchan->name, name)) { /* check the network, if one was specified */ @@ -643,7 +537,7 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, } /* we didn't find an existing match, so create a new channel */ - chan = OBJ_NEW(rmcast_basic_channel_t); /* puts it on list */ + chan = OBJ_NEW(rmcast_base_channel_t); /* puts it on list */ chan->name = strdup(name); chan->channel = next_channel++; /* if we were not given a network, use the default */ @@ -664,6 +558,9 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, } else { chan->port = port; } + OPAL_THREAD_LOCK(&lock); + opal_list_append(&channels, &chan->item); + OPAL_THREAD_UNLOCK(&lock); if (ORTE_SUCCESS != (rc = setup_channel(chan, direction))) { ORTE_ERROR_LOG(rc); @@ -678,13 +575,13 @@ static int open_channel(orte_rmcast_channel_t *channel, char *name, static int close_channel(orte_rmcast_channel_t channel) { opal_list_item_t *item; - rmcast_basic_channel_t *chan; + rmcast_base_channel_t *chan; OPAL_THREAD_LOCK(&lock); for (item = opal_list_get_first(&channels); item != opal_list_get_end(&channels); item = opal_list_get_next(item)) { - chan = (rmcast_basic_channel_t*)item; + chan = (rmcast_base_channel_t*)item; if (channel == chan->channel) { opal_list_remove_item(&channels, item); @@ -698,30 +595,26 @@ static int close_channel(orte_rmcast_channel_t channel) return ORTE_ERR_NOT_FOUND; } -static void recv_handler(int sd, short flags, void* cbdata) +static void process_recv(int fd, short event, void *cbdata) { + orte_mcast_msg_event_t *mev = (orte_mcast_msg_event_t*)cbdata; + rmcast_base_channel_t *chan = mev->channel; opal_list_item_t *item, *next; - rmcast_basic_channel_t *chan = (rmcast_basic_channel_t*)cbdata; - rmcast_basic_recv_t *ptr; - uint8_t *payload; - ssize_t sz; + rmcast_base_recv_t *ptr; + uint8_t *payload, *data; uint32_t tmp; orte_process_name_t name; uint16_t tmp16; orte_rmcast_tag_t tag; - - /* read the data */ - sz = read(sd, chan->recvd_data, mca_rmcast_basic_component.max_msg_size); - - OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, - "%s rmcast:basic recvd %d bytes on channel %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (int)sz, (int)chan->channel)); + opal_buffer_t buf; + /* point to the data payload */ + data = (uint8_t*)mev->data; + /* extract the name and convert it to host order */ - memcpy(&tmp, &chan->recvd_data[0], 4); + memcpy(&tmp, &data[0], 4); name.jobid = ntohl(tmp); - memcpy(&tmp, &chan->recvd_data[4], 4); + memcpy(&tmp, &data[4], 4); name.vpid = ntohl(tmp); OPAL_OUTPUT_VERBOSE((4, orte_rmcast_base.rmcast_output, @@ -735,11 +628,11 @@ static void recv_handler(int sd, short flags, void* cbdata) "%s rmcast:basic:recv sent from myself: %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&name))); - return; + goto cleanup; } /* extract the target tag */ - memcpy(&tmp16, &chan->recvd_data[8], 2); + memcpy(&tmp16, &data[8], 2); tag = ntohs(tmp16); OPAL_OUTPUT_VERBOSE((4, orte_rmcast_base.rmcast_output, @@ -751,7 +644,7 @@ static void recv_handler(int sd, short flags, void* cbdata) item = opal_list_get_first(&recvs); while (item != opal_list_get_end(&recvs)) { next = opal_list_get_next(item); - ptr = (rmcast_basic_recv_t*)item; + ptr = (rmcast_base_recv_t*)item; OPAL_OUTPUT_VERBOSE((10, orte_rmcast_base.rmcast_output, "%s rmcast:basic:recv checking channel %d tag %d", @@ -761,17 +654,17 @@ static void recv_handler(int sd, short flags, void* cbdata) if ((chan->channel == ptr->channel || ORTE_RMCAST_WILDCARD_CHANNEL == ptr->channel) && (tag == ptr->tag || ORTE_RMCAST_TAG_WILDCARD == ptr->tag)) { /* data must be placed in malloc'd area for buffer */ - payload = (uint8_t*)malloc(sz-10); - memcpy(payload, &chan->recvd_data[10], sz-10); + payload = (uint8_t*)malloc(mev->sz-10); + memcpy(payload, &data[10], mev->sz-10); /* create a buffer for the data */ - ptr->data = OBJ_NEW(opal_buffer_t); + OBJ_CONSTRUCT(&buf, opal_buffer_t); /* load the data into the buffer */ - opal_dss.load(ptr->data, payload, sz-10); + opal_dss.load(&buf, payload, mev->sz-10); if (NULL != ptr->cbfunc) { - ptr->cbfunc(ptr->channel, ptr->data, ptr->cbdata); + ptr->cbfunc(ptr->channel, &buf, ptr->cbdata); /* if it isn't persistent, remove it */ if (!(ORTE_RMCAST_PERSISTENT & ptr->flags)) { OPAL_THREAD_LOCK(&lock); @@ -783,14 +676,39 @@ static void recv_handler(int sd, short flags, void* cbdata) /* flag it as recvd to release blocking recv */ ptr->recvd = true; } + OBJ_DESTRUCT(&buf); /* will release the data */ } /* move along list */ item = next; } + +cleanup: + OBJ_RELEASE(mev); return; } -static int setup_channel(rmcast_basic_channel_t *chan, uint8_t direction) +static void recv_handler(int sd, short flags, void* cbdata) +{ + uint8_t *data; + ssize_t sz; + rmcast_base_channel_t *chan = (rmcast_base_channel_t*)cbdata; + + /* read the data */ + data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size * sizeof(uint8_t)); + sz = read(sd, data, mca_rmcast_basic_component.max_msg_size); + + OPAL_OUTPUT_VERBOSE((2, orte_rmcast_base.rmcast_output, + "%s rmcast:basic recvd %d bytes from channel %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (int)sz, (int)chan->channel)); + + /* clear the way for the next message */ + ORTE_MULTICAST_MESSAGE_EVENT(data, sz, chan, process_recv); + + return; +} + +static int setup_channel(rmcast_base_channel_t *chan, uint8_t direction) { int rc; int xmitsd, recvsd; @@ -823,7 +741,6 @@ static int setup_channel(rmcast_basic_channel_t *chan, uint8_t direction) return rc; } chan->recv = recvsd; - chan->recvd_data = (uint8_t*)malloc(mca_rmcast_basic_component.max_msg_size); /* setup an event to catch messages */ opal_event_set(&chan->recv_ev, chan->recv, OPAL_EV_READ|OPAL_EV_PERSIST, recv_handler, chan); @@ -833,7 +750,7 @@ static int setup_channel(rmcast_basic_channel_t *chan, uint8_t direction) return ORTE_SUCCESS; } -static int setup_socket(int *sd, rmcast_basic_channel_t *chan, bool recvsocket) +static int setup_socket(int *sd, rmcast_base_channel_t *chan, bool recvsocket) { uint8_t ttl = 1; struct sockaddr_in inaddr; @@ -952,8 +869,8 @@ static int setup_socket(int *sd, rmcast_basic_channel_t *chan, bool recvsocket) static void xmit_data(int sd, short flags, void* send_req) { - rmcast_basic_channel_t *chan = (rmcast_basic_channel_t*)send_req; - rmcast_basic_send_t *snd; + rmcast_base_channel_t *chan = (rmcast_base_channel_t*)send_req; + rmcast_base_send_t *snd; opal_list_item_t *item; char *bytes; int32_t sz; @@ -963,7 +880,7 @@ static void xmit_data(int sd, short flags, void* send_req) OPAL_THREAD_LOCK(&chan->send_lock); while (NULL != (item = opal_list_remove_first(&chan->pending_sends))) { - snd = (rmcast_basic_send_t*)item; + snd = (rmcast_base_send_t*)item; /* extract the payload */ opal_dss.unload(snd->data, (void**)&bytes, &sz); diff --git a/orte/mca/rmcast/rmcast_types.h b/orte/mca/rmcast/rmcast_types.h index 865113b812..7fc4f815c6 100644 --- a/orte/mca/rmcast/rmcast_types.h +++ b/orte/mca/rmcast/rmcast_types.h @@ -26,6 +26,7 @@ typedef int32_t orte_rmcast_channel_t; #define ORTE_RMCAST_CHANNEL_T OPAL_INT32 /* ORTE IP multicast channels */ +#define ORTE_RMCAST_GROUP_OUTPUT_CHANNEL -2 #define ORTE_RMCAST_WILDCARD_CHANNEL -1 #define ORTE_RMCAST_INVALID_CHANNEL 0 #define ORTE_RMCAST_SYS_CHANNEL 1 @@ -47,6 +48,8 @@ typedef int32_t orte_rmcast_tag_t; #define ORTE_RMCAST_TAG_WILDCARD -1 #define ORTE_RMCAST_TAG_INVALID 0 #define ORTE_RMCAST_TAG_BOOTSTRAP 1 +#define ORTE_RMCAST_TAG_ANNOUNCE 2 +#define ORTE_RMCAST_TAG_OUTPUT 3 /* starting value for dynamically assignable tags */ #define ORTE_RMCAST_TAG_DYNAMIC 100