1
1

Start of the flow control implementation. #defined out for now.

This commit was SVN r26192.
Этот коммит содержится в:
Brian Barrett 2012-03-26 01:31:58 +00:00
родитель ca3ff58c76
Коммит 27c8f71773
7 изменённых файлов: 702 добавлений и 45 удалений

Просмотреть файл

@ -46,6 +46,12 @@ local_sources = \
mtl_portals4_request.h \
mtl_portals4_send.c
if OMPI_MTL_PORTALS4_FLOW_CONTROL
local_sources += \
mtl_portals4_flowctl.h \
mtl_portals4_flowctl.c
endif
mcacomponentdir = $(pkglibdir)
mcacomponent_LTLIBRARIES = $(component_install)
mca_mtl_portals4_la_SOURCES = $(local_sources)

Просмотреть файл

@ -39,6 +39,23 @@ AC_DEFUN([MCA_ompi_mtl_portals4_CONFIG],[
AS_IF([test "$DIRECT_mtl" = "portals4"],
[CPPFLAGS="$CPPFLAGS $mtl_portals4_CPPFLAGS"])
AC_ARG_ENABLE([mtl-portals4-flow-control],
[AC_HELP_STRING([--enable-mtl-portals4-flow-control],
[enable flow control for Portals 4 MTL (default: disabled)])])
AC_MSG_CHECKING([whether to enable flow control])
if test "$enable_mtl_portals4_flow_control" = "yes"; then
AC_MSG_RESULT([yes])
mtl_portals4_flow_control_enabled=1
else
AC_MSG_RESULT([no])
mtl_portals4_flow_control_enabled=0
fi
AC_DEFINE_UNQUOTED([OMPI_MTL_PORTALS4_FLOW_CONTROL],
[$mtl_portals4_flow_control_enabled],
[Enable flow control for Portals4 MTL])
AM_CONDITIONAL([OMPI_MTL_PORTALS4_FLOW_CONTROL],
[test "$mtl_portals4_flow_control_enabled" = "1"])
# substitute in the things needed to build portals4
AC_SUBST([mtl_portals4_CPPFLAGS])
AC_SUBST([mtl_portals4_LDFLAGS])

Просмотреть файл

@ -65,7 +65,7 @@ ompi_mtl_portals4_add_procs(struct mca_mtl_base_module_t *mtl,
struct ompi_proc_t** procs,
struct mca_mtl_base_endpoint_t **mtl_peer_data)
{
int ret;
int ret, me;
size_t i;
/* Get the list of ptl_process_id_t from the runtime and copy into structure */
@ -73,6 +73,10 @@ ompi_mtl_portals4_add_procs(struct mca_mtl_base_module_t *mtl,
ptl_process_t *id;
size_t size;
if( procs[i] == ompi_proc_local_proc ) {
me = i;
}
if (procs[i]->proc_arch != ompi_proc_local()->proc_arch) {
opal_output_verbose(1, ompi_mtl_base_output,
"Portals 4 MTL does not support heterogeneous operations.");
@ -104,10 +108,20 @@ ompi_mtl_portals4_add_procs(struct mca_mtl_base_module_t *mtl,
__FILE__, __LINE__, ret);
return OMPI_ERR_BAD_PARAM;
}
mtl_peer_data[i]->ptl_proc = *id;
}
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
ret = ompi_mtl_portals4_flowctl_add_procs(me, nprocs, mtl_peer_data);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: flowctl_add_procs failed: %d\n",
__FILE__, __LINE__, ret);
return ret;
}
#endif
return OMPI_SUCCESS;
}
@ -136,6 +150,9 @@ ompi_mtl_portals4_finalize(struct mca_mtl_base_module_t *mtl)
opal_progress_unregister(ompi_mtl_portals4_progress);
while (0 != ompi_mtl_portals4_progress()) { }
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
ompi_mtl_portals4_flowctl_fini();
#endif
ompi_mtl_portals4_recv_short_fini();
PtlMEUnlink(ompi_mtl_portals4.long_overflow_me_h);

Просмотреть файл

@ -22,15 +22,16 @@
#include <portals4.h>
#include "ompi_config.h"
#include "opal/class/opal_free_list.h"
#include "opal/class/opal_list.h"
#include "ompi/class/ompi_free_list.h"
#include "opal/datatype/opal_convertor.h"
#include "ompi/mca/mtl/mtl.h"
#include "ompi/mca/mtl/base/base.h"
#include "opal/datatype/opal_convertor.h"
#include "opal/class/opal_free_list.h"
#include "mtl_portals4_request.h"
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
#include "mtl_portals4_flowctl.h"
#endif
BEGIN_C_DECLS
@ -39,36 +40,80 @@ struct mca_mtl_portals4_send_request_t;
struct mca_mtl_portals4_module_t {
mca_mtl_base_module_t base;
/* configuration */
/** Eager limit; messages greater than this use a rendezvous protocol */
size_t eager_limit;
/** Size of short message blocks */
size_t recv_short_size;
/** Number of short message blocks which should be created during startup */
int recv_short_num;
/** Length of both the receive and send event queues */
int queue_size;
/** Protocol for long message transfer */
enum { eager, rndv } protocol;
/* free list of message for matched probe */
opal_free_list_t fl_message;
ptl_pt_index_t send_idx;
ptl_pt_index_t read_idx;
/* global handles */
/** Network interface handle for matched interface */
ptl_handle_ni_t ni_h;
/** portals index for message matching */
ptl_pt_index_t send_idx;
/** portals index for long message rendezvous */
ptl_pt_index_t read_idx;
/** portals index for flow control recovery */
ptl_pt_index_t flowctl_idx;
/** portals index for flow control recovery operatings which
generate full events */
ptl_pt_index_t flowctl_event_idx;
/** Event queue handles. See send_eq_h and recv_eq_h defines for
usage. Array for PtlEQPoll */
ptl_handle_eq_t eqs_h[2];
/* for zero-length sends and acks */
/** MD for zero-length sends and acks. Optimization, can be
reused anywhere a 0-byte ping is necessary */
ptl_handle_md_t zero_md_h;
/* long message receive overflow */
/** long message receive overflow ME. Persistent ME, first in
overflow list on the send_idx portal table. */
ptl_handle_me_t long_overflow_me_h;
/** List of active short receive blocks. Active means that the ME
was posted to the overflow list and the UNLINK event has not
yet been received. */
opal_list_t active_recv_short_blocks;
/** List of short receive blocks waiting for FREE event. Blocks
are added to this list when the UNLINK event has been
received and removed when the FREE event is received. */
opal_list_t waiting_recv_short_blocks;
/* number of operations started */
/** number of send-side operations started */
uint32_t opcount;
#if OPAL_ENABLE_DEBUG
uint32_t recv_opcount;
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
/** Number of event slots available for send side operations.
Note that this is slightly smaller than the event queue size
to allow space for flow control related events. */
uint32_t send_queue_slots;
/** free list of send items */
opal_free_list_t send_fl;
/** list of sends which are pending (either due to needing to
retransmit or because we're in flow control recovery */
opal_list_t pending_sends;
/** list of sends which are currently active */
opal_list_t active_sends;
ompi_mtl_portals4_flowctl_t flowctl;
#endif
enum { eager, rndv } protocol;
#if OPAL_ENABLE_DEBUG
/** number of receive-side operations started. Used only for
debugging */
uint32_t recv_opcount;
#endif
};
typedef struct mca_mtl_portals4_module_t mca_mtl_portals4_module_t;
@ -81,6 +126,12 @@ extern mca_mtl_portals4_module_t ompi_mtl_portals4;
#define REQ_READ_TABLE_ID 3
#define REQ_FLOWCTL_TABLE_ID 4
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
#define MTL_PORTALS4_FLOWCTL_TRIGGER 0x01
#define MTL_PORTALS4_FLOWCTL_ALERT 0x02
#define MTL_PORTALS4_FLOWCTL_FANIN 0x03
#define MTL_PORTALS4_FLOWCTL_FANOUT 0x04
#endif
/* match/ignore bit manipulation
*

Просмотреть файл

@ -87,7 +87,8 @@ ompi_mtl_portals4_component_open(void)
false,
false,
32,
&ompi_mtl_portals4.recv_short_num);
&tmp);
ompi_mtl_portals4.recv_short_num = tmp;
mca_base_param_reg_int(&mca_mtl_portals4_component.mtl_version,
"short_recv_size",
@ -99,15 +100,16 @@ ompi_mtl_portals4_component_open(void)
ompi_mtl_portals4.recv_short_size = tmp;
mca_base_param_reg_int(&mca_mtl_portals4_component.mtl_version,
"queue_size",
"event_queue_size",
"Size of the event queue in entries",
false,
false,
1024,
&ompi_mtl_portals4.queue_size);
&tmp);
ompi_mtl_portals4.queue_size = tmp;
mca_base_param_reg_string(&mca_mtl_portals4_component.mtl_version,
"long_proto",
"long_protocol",
"Protocol to use for long messages. Valid entries are eager and rndv",
false,
false,
@ -124,9 +126,11 @@ ompi_mtl_portals4_component_open(void)
}
opal_output_verbose(1, ompi_mtl_base_output,
"Eager limit: %d", (int) ompi_mtl_portals4.eager_limit);
"Eager limit: %d", (int)
ompi_mtl_portals4.eager_limit);
opal_output_verbose(1, ompi_mtl_base_output,
"Short receive blocks: %d", ompi_mtl_portals4.recv_short_num);
"Short receive blocks: %d",
ompi_mtl_portals4.recv_short_num);
opal_output_verbose(1, ompi_mtl_base_output,
"Queue size: %d", ompi_mtl_portals4.queue_size);
opal_output_verbose(1, ompi_mtl_base_output,
@ -240,7 +244,11 @@ ompi_mtl_portals4_component_init(bool enable_progress_threads,
ret = PtlPTAlloc(ompi_mtl_portals4.ni_h,
PTL_PT_ONLY_USE_ONCE |
PTL_PT_ONLY_TRUNCATE |
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
PTL_PT_FLOWCTRL,
#else
0,
#endif
ompi_mtl_portals4.recv_eq_h,
REQ_SEND_TABLE_ID,
&ompi_mtl_portals4.send_idx);
@ -324,6 +332,29 @@ ompi_mtl_portals4_component_init(bool enable_progress_threads,
ompi_mtl_portals4.recv_opcount = 0;
#endif
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
ompi_mtl_portals4.send_queue_slots = ompi_mtl_portals4.queue_size - 4;
OBJ_CONSTRUCT(&ompi_mtl_portals4.active_sends, opal_list_t);
OBJ_CONSTRUCT(&ompi_mtl_portals4.pending_sends, opal_list_t);
ret = ompi_mtl_portals4_flowctl_init();
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: ompi_mtl_portals4_flowctl_init failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
ret = ompi_mtl_portals4_flowctl_setup_comm();
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: ompi_mtl_portals4_flowctl_setup_trees failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
#endif
/* activate progress callback */
ret = opal_progress_register(ompi_mtl_portals4_progress);
if (OMPI_SUCCESS != ret) {
@ -452,30 +483,18 @@ ompi_mtl_portals4_progress(void)
break;
case PTL_EVENT_PT_DISABLED:
/* catch up by draining rest of the queue */
ompi_mtl_portals4_progress();
/* get restarted */
if (ompi_mtl_portals4.send_idx == ev.pt_index) {
/* make sure we have at least one active short receive block */
ret = ompi_mtl_portals4_recv_short_link(1);
if (OMPI_SUCCESS != ret) {
opal_output(ompi_mtl_base_output,
"Unable to post short receive block after flow control.");
abort();
}
ret = PtlPTEnable(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.send_idx);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlPTEnable failed: %d\n",
__FILE__, __LINE__, ret);
abort();
}
} else {
opal_output(ompi_mtl_base_output, "Unhandled send flow control event.");
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
ret = ompi_mtl_portals4_flowctl_start_recover();
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlPTEnable failed: %d\n",
__FILE__, __LINE__, ret);
abort();
}
#else
opal_output(ompi_mtl_base_output, "Unhandled flow control event.");
abort();
#endif
break;
case PTL_EVENT_LINK:

480
ompi/mca/mtl/portals4/mtl_portals4_flowctl.c Обычный файл
Просмотреть файл

@ -0,0 +1,480 @@
/*
* Copyright (c) 2012 Sandia National Laboratories. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mtl_portals4.h"
#include "mtl_portals4_flowctl.h"
#include "mtl_portals4_endpoint.h"
#include "mtl_portals4_recv_short.h"
int
ompi_mtl_portals4_flowctl_init(void)
{
ptl_me_t me;
int ret;
ompi_mtl_portals4.flowctl.epoch_counter = 0;
ret = PtlCTAlloc(ompi_mtl_portals4.ni_h,
&ompi_mtl_portals4.flowctl.trigger_ct_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlCTAlloc failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
me.start = NULL;
me.length = 0;
me.ct_handle = ompi_mtl_portals4.flowctl.trigger_ct_h;
me.min_free = 0;
me.uid = PTL_UID_ANY;
me.options = PTL_ME_OP_PUT |
PTL_ME_ACK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE;
me.match_id.phys.nid = PTL_NID_ANY;
me.match_id.phys.pid = PTL_PID_ANY;
me.match_bits = MTL_PORTALS4_FLOWCTL_TRIGGER;
me.ignore_bits = 0;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.flowctl_idx,
&me,
PTL_PRIORITY_LIST,
NULL,
&ompi_mtl_portals4.flowctl.trigger_me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlMEAppend failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
ret = PtlCTAlloc(ompi_mtl_portals4.ni_h,
&ompi_mtl_portals4.flowctl.alert_ct_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlCTAlloc failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
me.ct_handle = ompi_mtl_portals4.flowctl.alert_ct_h;
me.match_bits = MTL_PORTALS4_FLOWCTL_ALERT;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.flowctl_idx,
&me,
PTL_PRIORITY_LIST,
NULL,
&ompi_mtl_portals4.flowctl.alert_me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlMEAppend failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
me.ct_handle = PTL_CT_NONE;
me.match_bits = MTL_PORTALS4_FLOWCTL_ALERT;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.flowctl_event_idx,
&me,
PTL_PRIORITY_LIST,
NULL,
&ompi_mtl_portals4.flowctl.alert_event_me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlMEAppend failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
ret = PtlCTAlloc(ompi_mtl_portals4.ni_h,
&ompi_mtl_portals4.flowctl.fanin_ct_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlCTAlloc failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
me.ct_handle = ompi_mtl_portals4.flowctl.fanin_ct_h;
me.match_bits = MTL_PORTALS4_FLOWCTL_FANIN;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.flowctl_idx,
&me,
PTL_PRIORITY_LIST,
NULL,
&ompi_mtl_portals4.flowctl.fanin_me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlMEAppend failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
ret = PtlCTAlloc(ompi_mtl_portals4.ni_h,
&ompi_mtl_portals4.flowctl.fanout_ct_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlCTAlloc failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
me.ct_handle = ompi_mtl_portals4.flowctl.fanout_ct_h;
me.match_bits = MTL_PORTALS4_FLOWCTL_FANOUT;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.flowctl_idx,
&me,
PTL_PRIORITY_LIST,
NULL,
&ompi_mtl_portals4.flowctl.fanout_me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlMEAppend failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
me.ct_handle = PTL_CT_NONE;
me.match_bits = MTL_PORTALS4_FLOWCTL_FANOUT;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.flowctl_event_idx,
&me,
PTL_PRIORITY_LIST,
NULL,
&ompi_mtl_portals4.flowctl.fanout_event_me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlMEAppend failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
ompi_mtl_portals4.flowctl.num_children = 0;
ret = OMPI_SUCCESS;
error:
return ret;
}
int
ompi_mtl_portals4_flowctl_fini(void)
{
return OMPI_SUCCESS;
}
int
ompi_mtl_portals4_flowctl_add_procs(size_t me,
size_t npeers,
struct mca_mtl_base_endpoint_t **peers)
{
int i;
/* if epoch isn't 0, that means setup trees has been called, which
means that this add_procs is a dynamic process, which we don't
support */
if (ompi_mtl_portals4.flowctl.epoch_counter != 0) {
return OMPI_ERR_NOT_SUPPORTED;
}
ompi_mtl_portals4.flowctl.num_procs = npeers;
if (0 != me) {
ompi_mtl_portals4.flowctl.parent =
peers[(me - 1) / 2]->ptl_proc;
}
ompi_mtl_portals4.flowctl.me = peers[me]->ptl_proc;
for (i = 0 ; i < 2 ; ++i) {
size_t tmp = (2 * me) + i + 1;
if (tmp < npeers) {
ompi_mtl_portals4.flowctl.num_children++;
ompi_mtl_portals4.flowctl.children[i] = peers[tmp]->ptl_proc;
}
}
return OMPI_SUCCESS;
}
int
ompi_mtl_portals4_flowctl_setup_comm(void)
{
int ret;
size_t i;
/* increase the flow control epoch count */
ompi_mtl_portals4.flowctl.epoch_counter++;
ret = PtlStartBundle(ompi_mtl_portals4.ni_h);
if (PTL_OK != ret) {
return ret;
}
/* setup the trigger to start the alert broadcast */
if (0 != ompi_mtl_portals4.flowctl.i_am_root) {
int counter =
ompi_mtl_portals4.flowctl.epoch_counter *
(ompi_mtl_portals4.flowctl.num_procs + 1);
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.me,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_ALERT,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.trigger_ct_h,
counter);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
}
/* setup the alert broadcast tree */
for (i = 0 ; i < ompi_mtl_portals4.flowctl.num_children ; ++i) {
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.children[i],
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_ALERT,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.trigger_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
}
/* setup the trigger to generate a full event on alert */
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.me,
ompi_mtl_portals4.flowctl_event_idx,
MTL_PORTALS4_FLOWCTL_ALERT,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.alert_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
/* setup the restart barrier */
if (0 != ompi_mtl_portals4.flowctl.i_am_root) {
ptl_ct_event_t new_ct;
/* turn to fanout part of restart */
for (i = 0 ; i < ompi_mtl_portals4.flowctl.num_children ; ++i) {
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.children[i],
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_FANOUT,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.fanin_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter *
(ompi_mtl_portals4.flowctl.num_children + 1));
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
}
/* reset trigger ME */
new_ct.success = ompi_mtl_portals4.flowctl.epoch_counter *
(ompi_mtl_portals4.flowctl.num_children + 1);
ret = PtlTriggeredCTSet(ompi_mtl_portals4.flowctl.trigger_ct_h,
new_ct,
ompi_mtl_portals4.flowctl.fanin_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter *
(ompi_mtl_portals4.flowctl.num_children + 1));
/* setup the trigger to generate a full event on fan-out */
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.me,
ompi_mtl_portals4.flowctl_event_idx,
MTL_PORTALS4_FLOWCTL_FANOUT,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.fanin_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter *
(ompi_mtl_portals4.flowctl.num_children + 1));
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
} else {
/* fan-in */
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.parent,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_FANIN,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.fanin_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter *
(ompi_mtl_portals4.flowctl.num_children + 1));
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
/* fan-out */
for (i = 0 ; i < ompi_mtl_portals4.flowctl.num_children ; ++i) {
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.children[i],
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_FANOUT,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.fanout_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
}
/* setup the trigger to generate a full event on fan-out */
ret = PtlTriggeredPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.me,
ompi_mtl_portals4.flowctl_event_idx,
MTL_PORTALS4_FLOWCTL_FANOUT,
0,
NULL,
0,
ompi_mtl_portals4.flowctl.fanout_ct_h,
ompi_mtl_portals4.flowctl.epoch_counter);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlTriggeredPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
}
error:
ret = PtlEndBundle(ompi_mtl_portals4.ni_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlEndBundle failed: %d\n",
__FILE__, __LINE__, ret);
}
return ret;
}
int
ompi_mtl_portals4_flowctl_start_recover(void)
{
int ret;
ret = ompi_mtl_portals4_flowctl_setup_comm();
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: flowctl_setup_comm failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
/* drain all pending sends */
while (0 != opal_list_get_size(&ompi_mtl_portals4.active_sends)) {
opal_progress();
}
/* drain event queue */
while (0 != ompi_mtl_portals4_progress()) { ; }
/* check short block active count */
ret = ompi_mtl_portals4_recv_short_link(1);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: recv_short_link failed: %d\n",
__FILE__, __LINE__, ret);
}
/* send barrier entry message */
ret = PtlPut(ompi_mtl_portals4.zero_md_h,
0,
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.me,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_FANIN,
0,
NULL,
0);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlPut failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
/* recovery complete when fan-out event arrives, async event, so
we're done now */
ret = OMPI_SUCCESS;
error:
return ret;
}

Просмотреть файл

@ -0,0 +1,67 @@
/*
* Copyright (c) 2012 Sandia National Laboratories. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef MTL_PORTALS_FLOWCTL_H
#define MTL_PORTALS_FLOWCTL_H
struct ompi_mtl_portals4_flowctl_t {
/** Flow control epoch counter. Triggered events should be
based on epoch counter. */
uint32_t epoch_counter;
/** Flow control trigger ME. Only has meaning at root. When an
event is received on this ME, it triggers the flow control
alert broadcast.*/
ptl_handle_me_t trigger_me_h;
/** Flow control trigger CT. Only has meaning at root. */
ptl_handle_ct_t trigger_ct_h;
/** Flow control alert tree broadcast ME. */
ptl_handle_me_t alert_me_h;
/** Flow control alert tree broadcast CT. */
ptl_handle_ct_t alert_ct_h;
/** Flow control alert tree broadcast ME for a local put to
generate an event */
ptl_handle_me_t alert_event_me_h;
/** Flow control restart fan-in ME. */
ptl_handle_me_t fanin_me_h;
/** Flow control restart fan-in CT. */
ptl_handle_ct_t fanin_ct_h;
/** Flow control restart fan-out ME. */
ptl_handle_me_t fanout_me_h;
/** Flow control restart fan-out CT. */
ptl_handle_ct_t fanout_ct_h;
/** Flow control restart fan-out ME for a local put to generate an
event */
ptl_handle_me_t fanout_event_me_h;
size_t num_procs;
size_t num_children;
ptl_process_t children[2];
ptl_process_t parent;
ptl_process_t me;
int i_am_root;
};
typedef struct ompi_mtl_portals4_flowctl_t ompi_mtl_portals4_flowctl_t;
/**
* Initialize flow control code
*
* Initialize flow control code. This includes initializing epoch
* counter and creating necessary MEs and CTs.
*/
int ompi_mtl_portals4_flowctl_init(void);
int ompi_mtl_portals4_flowctl_fini(void);
int ompi_mtl_portals4_flowctl_add_procs(size_t me,
size_t npeers,
struct mca_mtl_base_endpoint_t **peers);
int ompi_mtl_portals4_flowctl_setup_comm(void);
int ompi_mtl_portals4_flowctl_start_recover(void);
#endif