1
1

* Make sure to release all resources on failed send

* Avoid triggered ops until we get everything debugged
* Simplify flowctl interface a bit

This commit was SVN r26356.
Этот коммит содержится в:
Brian Barrett 2012-04-27 21:11:01 +00:00
родитель fff1612c04
Коммит e6a0a1cf8a
5 изменённых файлов: 331 добавлений и 329 удалений

Просмотреть файл

@ -343,14 +343,6 @@ ompi_mtl_portals4_component_init(bool enable_progress_threads,
__FILE__, __LINE__, ret);
goto error;
}
ret = ompi_mtl_portals4_flowctl_setup_comm();
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: ompi_mtl_portals4_flowctl_setup_trees failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
#endif
/* activate progress callback */
@ -459,6 +451,7 @@ ompi_mtl_portals4_progress(void)
if (PTL_OK == ret) {
OPAL_OUTPUT_VERBOSE((60, ompi_mtl_base_output,
"Found event of type %d\n", ev.type));
count++;
switch (ev.type) {
case PTL_EVENT_GET:
case PTL_EVENT_PUT:
@ -483,7 +476,10 @@ ompi_mtl_portals4_progress(void)
case PTL_EVENT_PT_DISABLED:
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
ret = ompi_mtl_portals4_flowctl_start_recover();
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"Received PT_DISABLED event on pt %d\n",
(int) ev.pt_index));
ret = ompi_mtl_portals4_flowctl_trigger();
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: flowctl_start_recover failed: %d\n",
@ -519,5 +515,11 @@ ompi_mtl_portals4_progress(void)
}
}
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
if (0 == count) {
ompi_mtl_portals4_pending_list_progress();
}
#endif
return count;
}

Просмотреть файл

@ -14,28 +14,19 @@
#include "mtl_portals4_endpoint.h"
#include "mtl_portals4_recv_short.h"
OBJ_CLASS_INSTANCE(ompi_mtl_portals4_pending_request_t, opal_free_list_item_t,
NULL, NULL);
static int
flowctl_alert_callback(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t *ptl_base_request)
{
return ompi_mtl_portals4_flowctl_start_recover();
}
static int flowctl_trigger_callback(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t *ptl_base_request);
static int flowctl_alert_callback(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t *ptl_base_request);
static int flowctl_fanin_callback(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t *ptl_base_request);
static int flowctl_fanout_callback(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t *ptl_base_request);
static int
flowctl_fanout_callback(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t *ptl_base_request)
{
/* woo, we're recovered! */
ompi_mtl_portals4.flowctl.flowctl_active = false;
ompi_mtl_portals4_pending_list_progress();
return OMPI_SUCCESS;
}
static int ompi_mtl_portals4_flowctl_start_recover(void);
int
@ -45,6 +36,7 @@ ompi_mtl_portals4_flowctl_init(void)
int ret;
ompi_mtl_portals4.flowctl.flowctl_active = false;
ompi_mtl_portals4.flowctl.send_alert = true;
OBJ_CONSTRUCT(&ompi_mtl_portals4.flowctl.active_sends, opal_list_t);
@ -60,12 +52,18 @@ ompi_mtl_portals4_flowctl_init(void)
ompi_mtl_portals4.flowctl.slots = (ompi_mtl_portals4.queue_size - 3) / 3;
ompi_mtl_portals4.flowctl.trigger_req.type = portals4_req_flowctl;
ompi_mtl_portals4.flowctl.trigger_req.event_callback = flowctl_trigger_callback;
ompi_mtl_portals4.flowctl.alert_req.type = portals4_req_flowctl;
ompi_mtl_portals4.flowctl.alert_req.event_callback = flowctl_alert_callback;
ompi_mtl_portals4.flowctl.fanout_req.type = portals4_req_flowctl;
ompi_mtl_portals4.flowctl.fanout_req.event_callback = flowctl_fanout_callback;
ompi_mtl_portals4.flowctl.fanin_req.type = portals4_req_flowctl;
ompi_mtl_portals4.flowctl.fanin_req.event_callback = flowctl_fanin_callback;
ompi_mtl_portals4.flowctl.epoch_counter = 0;
ret = PtlPTAlloc(ompi_mtl_portals4.ni_h,
@ -89,6 +87,8 @@ ompi_mtl_portals4_flowctl_init(void)
goto error;
}
/* everyone creates the trigger ME, even if the root may be the
only to use it */
me.start = NULL;
me.length = 0;
me.min_free = 0;
@ -101,7 +101,6 @@ ompi_mtl_portals4_flowctl_init(void)
PTL_ME_ACK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE |
PTL_ME_EVENT_COMM_DISABLE |
PTL_ME_EVENT_CT_COMM;
me.ct_handle = ompi_mtl_portals4.flowctl.trigger_ct_h;
me.match_bits = MTL_PORTALS4_FLOWCTL_TRIGGER;
@ -109,7 +108,7 @@ ompi_mtl_portals4_flowctl_init(void)
ompi_mtl_portals4.flowctl_idx,
&me,
PTL_PRIORITY_LIST,
NULL,
&ompi_mtl_portals4.flowctl.trigger_req,
&ompi_mtl_portals4.flowctl.trigger_me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
@ -118,6 +117,9 @@ ompi_mtl_portals4_flowctl_init(void)
goto error;
}
/* Alert CT/ME for broadcasting out alert when root receives a
trigger */
ret = PtlCTAlloc(ompi_mtl_portals4.ni_h,
&ompi_mtl_portals4.flowctl.alert_ct_h);
if (PTL_OK != ret) {
@ -126,12 +128,10 @@ ompi_mtl_portals4_flowctl_init(void)
__FILE__, __LINE__, ret);
goto error;
}
me.options = PTL_ME_OP_PUT |
PTL_ME_ACK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE |
PTL_ME_EVENT_COMM_DISABLE |
PTL_ME_EVENT_CT_COMM;
me.ct_handle = ompi_mtl_portals4.flowctl.alert_ct_h;
me.match_bits = MTL_PORTALS4_FLOWCTL_ALERT;
@ -139,7 +139,7 @@ ompi_mtl_portals4_flowctl_init(void)
ompi_mtl_portals4.flowctl_idx,
&me,
PTL_PRIORITY_LIST,
NULL,
&ompi_mtl_portals4.flowctl.alert_req,
&ompi_mtl_portals4.flowctl.alert_me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
@ -148,25 +148,7 @@ ompi_mtl_portals4_flowctl_init(void)
goto error;
}
me.options = PTL_ME_OP_PUT |
PTL_ME_ACK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE;
me.ct_handle = PTL_CT_NONE;
me.match_bits = MTL_PORTALS4_FLOWCTL_ALERT;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.flowctl_idx,
&me,
PTL_PRIORITY_LIST,
NULL,
&ompi_mtl_portals4.flowctl.alert_event_me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlMEAppend failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
/* Fanin CT/ME for receiving fan-in for restart */
ret = PtlCTAlloc(ompi_mtl_portals4.ni_h,
&ompi_mtl_portals4.flowctl.fanin_ct_h);
if (PTL_OK != ret) {
@ -175,12 +157,10 @@ ompi_mtl_portals4_flowctl_init(void)
__FILE__, __LINE__, ret);
goto error;
}
me.options = PTL_ME_OP_PUT |
PTL_ME_ACK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE |
PTL_ME_EVENT_COMM_DISABLE |
PTL_ME_EVENT_CT_COMM;
me.ct_handle = ompi_mtl_portals4.flowctl.fanin_ct_h;
me.match_bits = MTL_PORTALS4_FLOWCTL_FANIN;
@ -188,7 +168,7 @@ ompi_mtl_portals4_flowctl_init(void)
ompi_mtl_portals4.flowctl_idx,
&me,
PTL_PRIORITY_LIST,
NULL,
&ompi_mtl_portals4.flowctl.fanin_req,
&ompi_mtl_portals4.flowctl.fanin_me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
@ -197,6 +177,7 @@ ompi_mtl_portals4_flowctl_init(void)
goto error;
}
/* Fan-out CT/ME for sending restart messages after fan-in */
ret = PtlCTAlloc(ompi_mtl_portals4.ni_h,
&ompi_mtl_portals4.flowctl.fanout_ct_h);
if (PTL_OK != ret) {
@ -205,12 +186,10 @@ ompi_mtl_portals4_flowctl_init(void)
__FILE__, __LINE__, ret);
goto error;
}
me.options = PTL_ME_OP_PUT |
PTL_ME_ACK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE |
PTL_ME_EVENT_COMM_DISABLE |
PTL_ME_EVENT_CT_COMM;
me.ct_handle = ompi_mtl_portals4.flowctl.fanout_ct_h;
me.match_bits = MTL_PORTALS4_FLOWCTL_FANOUT;
@ -218,7 +197,7 @@ ompi_mtl_portals4_flowctl_init(void)
ompi_mtl_portals4.flowctl_idx,
&me,
PTL_PRIORITY_LIST,
NULL,
&ompi_mtl_portals4.flowctl.fanout_req,
&ompi_mtl_portals4.flowctl.fanout_me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
@ -227,25 +206,6 @@ ompi_mtl_portals4_flowctl_init(void)
goto error;
}
me.options = PTL_ME_OP_PUT |
PTL_ME_ACK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE;
me.ct_handle = PTL_CT_NONE;
me.match_bits = MTL_PORTALS4_FLOWCTL_FANOUT;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.flowctl_idx,
&me,
PTL_PRIORITY_LIST,
NULL,
&ompi_mtl_portals4.flowctl.fanout_event_me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlMEAppend failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
ompi_mtl_portals4.flowctl.num_children = 0;
ret = OMPI_SUCCESS;
@ -258,6 +218,7 @@ ompi_mtl_portals4_flowctl_init(void)
int
ompi_mtl_portals4_flowctl_fini(void)
{
/* BWB: FIX ME */
return OMPI_SUCCESS;
}
@ -267,19 +228,21 @@ ompi_mtl_portals4_flowctl_add_procs(size_t me,
size_t npeers,
struct mca_mtl_base_endpoint_t **peers)
{
int i;
int i, ret;
/* if epoch isn't 0, that means setup trees has been called, which
means that this add_procs is a dynamic process, which we don't
support */
if (ompi_mtl_portals4.flowctl.epoch_counter != 1) {
if (ompi_mtl_portals4.flowctl.epoch_counter != 0) {
return OMPI_ERR_NOT_SUPPORTED;
}
ompi_mtl_portals4.flowctl.num_procs = npeers;
ompi_mtl_portals4.flowctl.root = peers[0]->ptl_proc;
if (0 == me) {
ompi_mtl_portals4.flowctl.i_am_root = 1;
ompi_mtl_portals4.flowctl.i_am_root = true;
} else {
ompi_mtl_portals4.flowctl.i_am_root = false;
ompi_mtl_portals4.flowctl.parent =
peers[(me - 1) / 2]->ptl_proc;
}
@ -293,264 +256,92 @@ ompi_mtl_portals4_flowctl_add_procs(size_t me,
}
}
return OMPI_SUCCESS;
}
int
ompi_mtl_portals4_flowctl_setup_comm(void)
{
int ret;
size_t i;
/* increase the flow control epoch count */
ompi_mtl_portals4.flowctl.epoch_counter++;
ret = PtlStartBundle(ompi_mtl_portals4.ni_h);
if (PTL_OK != ret) {
return ret;
}
/* setup the trigger to start the alert broadcast */
if (0 != ompi_mtl_portals4.flowctl.i_am_root) {
int counter =
ompi_mtl_portals4.flowctl.epoch_counter *
(ompi_mtl_portals4.flowctl.num_procs + 1);
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.me,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_ALERT,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.trigger_ct_h,
counter);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
}
/* setup the alert broadcast tree */
for (i = 0 ; i < ompi_mtl_portals4.flowctl.num_children ; ++i) {
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.children[i],
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_ALERT,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.trigger_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
}
/* setup the trigger to generate a full event on alert */
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.me,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_ALERT,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.alert_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
/* setup the restart barrier */
if (0 != ompi_mtl_portals4.flowctl.i_am_root) {
ptl_ct_event_t new_ct;
/* turn to fanout part of restart */
for (i = 0 ; i < ompi_mtl_portals4.flowctl.num_children ; ++i) {
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.children[i],
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_FANOUT,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.fanin_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter *
(ompi_mtl_portals4.flowctl.num_children + 1));
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
}
/* reset trigger ME */
new_ct.success = ompi_mtl_portals4.flowctl.epoch_counter *
(ompi_mtl_portals4.flowctl.num_children + 1);
ret = PtlTriggeredCTSet(ompi_mtl_portals4.flowctl.trigger_ct_h,
new_ct,
ompi_mtl_portals4.flowctl.fanin_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter *
(ompi_mtl_portals4.flowctl.num_children + 1));
/* setup the trigger to generate a full event on fan-out */
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.me,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_FANOUT,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.fanin_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter *
(ompi_mtl_portals4.flowctl.num_children + 1));
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
} else {
/* fan-in */
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.parent,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_FANIN,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.fanin_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter *
(ompi_mtl_portals4.flowctl.num_children + 1));
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
/* fan-out */
for (i = 0 ; i < ompi_mtl_portals4.flowctl.num_children ; ++i) {
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.children[i],
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_FANOUT,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.fanout_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
}
/* setup the trigger to generate a full event on fan-out */
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.me,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_FANOUT,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.fanout_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
}
error:
ret = PtlEndBundle(ompi_mtl_portals4.ni_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlEndBundle failed: %d\n",
__FILE__, __LINE__, ret);
}
return ret;
}
int
ompi_mtl_portals4_flowctl_trigger(void)
{
int ret;
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"Enter flowctl_trigger"));
if (false == ompi_mtl_portals4.flowctl.flowctl_active) {
ompi_mtl_portals4.flowctl.flowctl_active = true;
/* send trigger to root */
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"Sending flow control trigger"));
ret = PtlPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.root,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_TRIGGER,
0,
NULL,
0);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlPut failed: %d\n",
__FILE__, __LINE__, ret);
return ret;
}
}
return OMPI_SUCCESS;
}
static int
ompi_mtl_portals4_flowctl_start_recover(void)
{
int ret;
if (ompi_mtl_portals4.flowctl.flowctl_active) {
return OMPI_SUCCESS;
} else {
ompi_mtl_portals4.flowctl.flowctl_active = true;
}
ompi_mtl_portals4.flowctl.flowctl_active = true;
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"entering flowctl_start_recover"));
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"Entering flowctl_start_recover %d",
ompi_mtl_portals4.flowctl.epoch_counter));
ret = ompi_mtl_portals4_flowctl_setup_comm();
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: flowctl_setup_comm failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"flowctl_start_recover %d: draining active sends",
ompi_mtl_portals4.flowctl.epoch_counter));
/* drain all pending sends */
while (0 != opal_list_get_size(&ompi_mtl_portals4.flowctl.active_sends)) {
opal_progress();
}
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"flowctl_start_recover %d: draining event queue",
ompi_mtl_portals4.flowctl.epoch_counter));
/* drain event queue */
while (0 != ompi_mtl_portals4_progress()) { ; }
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"flowctl_start_recover %d: checking short blocks",
ompi_mtl_portals4.flowctl.epoch_counter));
/* check short block active count */
ret = ompi_mtl_portals4_recv_short_link(1);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: recv_short_link failed: %d\n",
"%s:%d: recv_short_link failed: %d",
__FILE__, __LINE__, ret);
}
/* drain event queue */
while (0 != ompi_mtl_portals4_progress()) { ; }
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"flowctl_start_recover %d: starting barrier. Async time!",
ompi_mtl_portals4.flowctl.epoch_counter));
/* send barrier entry message */
ret = PtlPut(ompi_mtl_portals4.zero_md_h,
0,
@ -571,8 +362,205 @@ ompi_mtl_portals4_flowctl_start_recover(void)
/* recovery complete when fan-out event arrives, async event, so
we're done now */
ret = PtlPTEnable(ompi_mtl_portals4.ni_h, ompi_mtl_portals4.recv_idx);
if (PTL_OK != ret) abort();
ret = OMPI_SUCCESS;
error:
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"Exiting flowctl_start_recover %d",
ompi_mtl_portals4.flowctl.epoch_counter));
return ret;
}
static int
flowctl_trigger_callback(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t *ptl_base_request)
{
int ret;
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"-----> trigger_alert_callback (%d) [%ld %ld] <-----",
ompi_mtl_portals4.flowctl.send_alert ? 1 : 0,
(unsigned long) ev->initiator.phys.nid,
(unsigned long) ev->initiator.phys.pid));
if (ompi_mtl_portals4.flowctl.send_alert) {
/* setup the trigger to start the alert broadcast */
if (!ompi_mtl_portals4.flowctl.i_am_root) {
opal_output(ompi_mtl_base_output, "***** NON-ROOT GOT TRIGGER *****");
}
ret = PtlPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.me,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_ALERT,
0,
NULL,
0);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
return ret;
}
ompi_mtl_portals4.flowctl.send_alert = false;
}
return OMPI_SUCCESS;
}
static int
flowctl_alert_callback(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t *ptl_base_request)
{
int ret = OMPI_SUCCESS;
size_t i;
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"-----> flowctl_alert_callback <-----"));
/* setup the alert broadcast tree */
for (i = 0 ; i < ompi_mtl_portals4.flowctl.num_children ; ++i) {
ret = PtlPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.children[i],
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_ALERT,
0,
NULL,
0);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto cleanup;
}
}
ret = ompi_mtl_portals4_flowctl_start_recover();
cleanup:
return ret;
}
static int
flowctl_fanin_callback(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t *ptl_base_request)
{
int ret = OMPI_SUCCESS;
ompi_mtl_portals4.flowctl.fanin_count++;
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"flowctl_fanin_callback: %d (%d of %d)",
ompi_mtl_portals4.flowctl.epoch_counter,
(int) ompi_mtl_portals4.flowctl.fanin_count,
(int) ompi_mtl_portals4.flowctl.num_children + 1));
if (ompi_mtl_portals4.flowctl.fanin_count ==
ompi_mtl_portals4.flowctl.num_children + 1) {
if (ompi_mtl_portals4.flowctl.i_am_root) {
/* drain event queue */
while (0 != ompi_mtl_portals4_progress()) { ; }
ompi_mtl_portals4.flowctl.send_alert = true;
ret = PtlPTEnable(ompi_mtl_portals4.ni_h, ompi_mtl_portals4.recv_idx);
if (PTL_OK != ret) abort();
ret = PtlPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.me,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_FANOUT,
0,
NULL,
0);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto cleanup;
}
} else {
ret = PtlPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.parent,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_FANIN,
0,
NULL,
0);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto cleanup;
}
}
ompi_mtl_portals4.flowctl.fanin_count = 0;
}
cleanup:
return ret;
}
static int
flowctl_fanout_callback(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t *ptl_base_request)
{
int ret;
int tmp = ompi_mtl_portals4.flowctl.epoch_counter;
size_t i;
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"Enter flowctl_fanout_callback: %d", tmp));
/* setup the alert broadcast tree */
for (i = 0 ; i < ompi_mtl_portals4.flowctl.num_children ; ++i) {
ret = PtlPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.children[i],
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_FANOUT,
0,
NULL,
0);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
return ret;
}
}
/* woo, we're recovered! */
ompi_mtl_portals4.flowctl.flowctl_active = false;
ret = PtlPTEnable(ompi_mtl_portals4.ni_h, ompi_mtl_portals4.recv_idx);
if (PTL_OK != ret) abort();
ompi_mtl_portals4_pending_list_progress();
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"Exit flowctl_fanout_callback: %d", tmp));
return OMPI_SUCCESS;
}

Просмотреть файл

@ -34,6 +34,8 @@ OBJ_CLASS_DECLARATION(ompi_mtl_portals4_pending_request_t);
struct ompi_mtl_portals4_flowctl_t {
bool flowctl_active;
bool send_trigger;
bool send_alert;
opal_list_t active_sends;
opal_list_t pending_sends;
@ -41,7 +43,11 @@ struct ompi_mtl_portals4_flowctl_t {
opal_mutex_t mutex;
int32_t slots;
size_t fanin_count;
ompi_mtl_portals4_base_request_t trigger_req;
ompi_mtl_portals4_base_request_t alert_req;
ompi_mtl_portals4_base_request_t fanin_req;
ompi_mtl_portals4_base_request_t fanout_req;
/** Flow control epoch counter. Triggered events should be
@ -59,9 +65,6 @@ struct ompi_mtl_portals4_flowctl_t {
ptl_handle_ct_t alert_ct_h;
/** Flow control alert tree broadcast ME. */
ptl_handle_me_t alert_me_h;
/** Flow control alert tree broadcast ME for a local put to
generate an event */
ptl_handle_me_t alert_event_me_h;
/** Flow control restart fan-in CT. */
ptl_handle_ct_t fanin_ct_h;
@ -72,25 +75,18 @@ struct ompi_mtl_portals4_flowctl_t {
ptl_handle_ct_t fanout_ct_h;
/** Flow control restart fan-out ME. */
ptl_handle_me_t fanout_me_h;
/** Flow control restart fan-out ME for a local put to generate an
event */
ptl_handle_me_t fanout_event_me_h;
size_t num_procs;
size_t num_children;
ptl_process_t children[2];
ptl_process_t parent;
ptl_process_t me;
int i_am_root;
ptl_process_t root;
bool i_am_root;
};
typedef struct ompi_mtl_portals4_flowctl_t ompi_mtl_portals4_flowctl_t;
/**
* Initialize flow control code
*
* Initialize flow control code. This includes initializing epoch
* counter and creating necessary MEs and CTs.
*/
int ompi_mtl_portals4_flowctl_init(void);
int ompi_mtl_portals4_flowctl_fini(void);
@ -98,9 +94,8 @@ int ompi_mtl_portals4_flowctl_fini(void);
int ompi_mtl_portals4_flowctl_add_procs(size_t me,
size_t npeers,
struct mca_mtl_base_endpoint_t **peers);
int ompi_mtl_portals4_flowctl_setup_comm(void);
int ompi_mtl_portals4_flowctl_start_recover(void);
int ompi_mtl_portals4_flowctl_trigger(void);
void ompi_mtl_portals4_pending_list_progress(void);

Просмотреть файл

@ -124,8 +124,10 @@ ompi_mtl_portals4_activate_block(ompi_mtl_portals4_recv_short_block_t *block)
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_MANAGE_LOCAL |
PTL_ME_MAY_ALIGN;
#if 0
#if !OPAL_ENABLE_DEBUG
me.options |= PTL_ME_EVENT_COMM_DISABLE;
#endif
#endif
me.match_id.phys.nid = PTL_NID_ANY;
me.match_id.phys.pid = PTL_PID_ANY;

Просмотреть файл

@ -44,16 +44,28 @@ ompi_mtl_portals4_callback(ptl_event_t *ev,
ompi_mtl_portals4_pending_request_t *pending =
ptl_request->pending;
if (ev->ni_fail_type == PTL_NI_FLOW_CTRL) {
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
if (ev->ni_fail_type == PTL_NI_PT_DISABLED) {
OPAL_OUTPUT_VERBOSE((10, ompi_mtl_base_output,
"send %lu hit flow control",
ptl_request->opcount));
ompi_mtl_portals4_flowctl_start_recover();
PtlMDRelease(ptl_request->md_h);
if (PTL_OK != PtlHandleIsEqual(ptl_request->me_h, PTL_INVALID_HANDLE)) {
ret = PtlMEUnlink(ptl_request->me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: send callback PtlMDUnlink returned %d",
__FILE__, __LINE__, ret);
}
}
opal_list_remove_item(&ompi_mtl_portals4.flowctl.active_sends,
&pending->super.super);
opal_list_append(&ompi_mtl_portals4.flowctl.pending_sends,
&pending->super.super);
opal_atomic_add_32(&ompi_mtl_portals4.flowctl.slots, 1);
ompi_mtl_portals4_flowctl_trigger();
return OMPI_SUCCESS;
}
#endif
@ -359,7 +371,8 @@ ompi_mtl_portals4_pending_list_progress()
opal_list_item_t *item;
ompi_mtl_portals4_pending_request_t *pending;
while (!ompi_mtl_portals4.flowctl.flowctl_active) {
while ((!ompi_mtl_portals4.flowctl.flowctl_active) &&
(0 != opal_list_get_size(&ompi_mtl_portals4.flowctl.pending_sends))) {
val = opal_atomic_add_32(&ompi_mtl_portals4.flowctl.slots, -1);
if (val <= 0) {
opal_atomic_add_32(&ompi_mtl_portals4.flowctl.slots, 1);
@ -394,6 +407,8 @@ ompi_mtl_portals4_pending_list_progress()
pending->ptl_request);
}
if (OMPI_SUCCESS != ret) {
opal_list_remove_item(&ompi_mtl_portals4.flowctl.active_sends,
&pending->super.super);
opal_list_prepend(&ompi_mtl_portals4.flowctl.pending_sends,
&pending->super.super);
}