1
1

First take at flow control protocol

This commit was SVN r26274.
Этот коммит содержится в:
Brian Barrett 2012-04-17 21:46:21 +00:00
родитель 213cf6d43f
Коммит fe0dfc8e26
7 изменённых файлов: 388 добавлений и 207 удалений

Просмотреть файл

@ -28,9 +28,7 @@
#include "ompi/mca/mtl/mtl.h"
#include "ompi/mca/mtl/base/base.h"
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
#include "mtl_portals4_flowctl.h"
#endif
BEGIN_C_DECLS
@ -86,29 +84,16 @@ struct mca_mtl_portals4_module_t {
opal_list_t waiting_recv_short_blocks;
/** number of send-side operations started */
uint32_t opcount;
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
/** Number of event slots available for send side operations.
Note that this is slightly smaller than the event queue size
to allow space for flow control related events. */
uint32_t send_queue_slots;
/** free list of send items */
opal_free_list_t send_fl;
/** list of sends which are pending (either due to needing to
retransmit or because we're in flow control recovery */
opal_list_t pending_sends;
/** list of sends which are currently active */
opal_list_t active_sends;
ompi_mtl_portals4_flowctl_t flowctl;
#endif
uint64_t opcount;
#if OPAL_ENABLE_DEBUG
/** number of receive-side operations started. Used only for
debugging */
uint32_t recv_opcount;
uint64_t recv_opcount;
#endif
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
ompi_mtl_portals4_flowctl_t flowctl;
#endif
};
typedef struct mca_mtl_portals4_module_t mca_mtl_portals4_module_t;
@ -122,12 +107,10 @@ extern mca_mtl_portals4_module_t ompi_mtl_portals4;
#define REQ_READ_TABLE_ID 3
#define REQ_FLOWCTL_TABLE_ID 4
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
#define MTL_PORTALS4_FLOWCTL_TRIGGER 0x01
#define MTL_PORTALS4_FLOWCTL_ALERT 0x02
#define MTL_PORTALS4_FLOWCTL_FANIN 0x03
#define MTL_PORTALS4_FLOWCTL_FANOUT 0x04
#endif
/* match/ignore bit manipulation
*

Просмотреть файл

@ -125,6 +125,14 @@ ompi_mtl_portals4_component_open(void)
return OMPI_ERR_NOT_SUPPORTED;
}
opal_output_verbose(1, ompi_mtl_base_output,
"Flow control: "
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
"yes"
#else
"no"
#endif
);
opal_output_verbose(1, ompi_mtl_base_output,
"Eager limit: %d", (int)
ompi_mtl_portals4.eager_limit);
@ -220,7 +228,7 @@ ompi_mtl_portals4_component_init(bool enable_progress_threads,
"My nid,pid = %x,%x",
id.phys.nid, id.phys.pid));
/* create event queue */
/* create event queues */
ret = PtlEQAlloc(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.queue_size,
&ompi_mtl_portals4.send_eq_h);
@ -240,7 +248,7 @@ ompi_mtl_portals4_component_init(bool enable_progress_threads,
goto error;
}
/* Create portal table entries */
/* Create send and long message (read) portal table entries */
ret = PtlPTAlloc(ompi_mtl_portals4.ni_h,
PTL_PT_ONLY_USE_ONCE |
PTL_PT_ONLY_TRUNCATE |
@ -328,11 +336,6 @@ ompi_mtl_portals4_component_init(bool enable_progress_threads,
#endif
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
ompi_mtl_portals4.send_queue_slots = ompi_mtl_portals4.queue_size - 4;
OBJ_CONSTRUCT(&ompi_mtl_portals4.active_sends, opal_list_t);
OBJ_CONSTRUCT(&ompi_mtl_portals4.pending_sends, opal_list_t);
ret = ompi_mtl_portals4_flowctl_init();
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
@ -349,7 +352,7 @@ ompi_mtl_portals4_component_init(bool enable_progress_threads,
goto error;
}
#endif
/* activate progress callback */
ret = opal_progress_register(ompi_mtl_portals4_progress);
if (OMPI_SUCCESS != ret) {
@ -483,12 +486,13 @@ ompi_mtl_portals4_progress(void)
ret = ompi_mtl_portals4_flowctl_start_recover();
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlPTEnable failed: %d\n",
"%s:%d: flowctl_start_recover failed: %d\n",
__FILE__, __LINE__, ret);
abort();
}
#else
opal_output(ompi_mtl_base_output, "Unhandled flow control event.");
opal_output(ompi_mtl_base_output,
"Flow control situation without recovery");
abort();
#endif
break;
@ -503,10 +507,14 @@ ompi_mtl_portals4_progress(void)
}
} else if (PTL_EQ_EMPTY == ret) {
break;
} else if (PTL_EQ_DROPPED == ret) {
opal_output(ompi_mtl_base_output,
"Flow control situation without recovery");
abort();
} else {
opal_output(ompi_mtl_base_output,
"Error returned from PtlEQGet: %d", ret);
abort();
break;
}
}

Просмотреть файл

@ -14,14 +14,72 @@
#include "mtl_portals4_endpoint.h"
#include "mtl_portals4_recv_short.h"
OBJ_CLASS_INSTANCE(ompi_mtl_portals4_pending_request_t, opal_free_list_item_t,
NULL, NULL);
static int
flowctl_alert_callback(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t *ptl_base_request)
{
return ompi_mtl_portals4_flowctl_start_recover();
}
static int
flowctl_fanout_callback(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t *ptl_base_request)
{
/* woo, we're recovered! */
ompi_mtl_portals4.flowctl.flowctl_active = false;
ompi_mtl_portals4_pending_list_progress();
return OMPI_SUCCESS;
}
int
ompi_mtl_portals4_flowctl_init(void)
{
ptl_me_t me;
int ret;
ompi_mtl_portals4.flowctl.flowctl_active = false;
OBJ_CONSTRUCT(&ompi_mtl_portals4.flowctl.active_sends, opal_list_t);
OBJ_CONSTRUCT(&ompi_mtl_portals4.flowctl.pending_sends, opal_list_t);
OBJ_CONSTRUCT(&ompi_mtl_portals4.flowctl.pending_fl, opal_free_list_t);
opal_free_list_init(&ompi_mtl_portals4.flowctl.pending_fl,
sizeof(ompi_mtl_portals4_pending_request_t),
OBJ_CLASS(ompi_mtl_portals4_pending_request_t),
1, -1, 1);
OBJ_CONSTRUCT(&ompi_mtl_portals4.flowctl.mutex, opal_mutex_t);
ompi_mtl_portals4.flowctl.slots = (ompi_mtl_portals4.queue_size - 3) / 3;
ompi_mtl_portals4.flowctl.alert_req.type = portals4_req_flowctl;
ompi_mtl_portals4.flowctl.alert_req.event_callback = flowctl_alert_callback;
ompi_mtl_portals4.flowctl.fanout_req.type = portals4_req_flowctl;
ompi_mtl_portals4.flowctl.fanout_req.event_callback = flowctl_fanout_callback;
ompi_mtl_portals4.flowctl.epoch_counter = 0;
ret = PtlPTAlloc(ompi_mtl_portals4.ni_h,
PTL_PT_ONLY_TRUNCATE,
ompi_mtl_portals4.send_eq_h,
REQ_FLOWCTL_TABLE_ID,
&ompi_mtl_portals4.flowctl_idx);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlPTAlloc failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
ret = PtlCTAlloc(ompi_mtl_portals4.ni_h,
&ompi_mtl_portals4.flowctl.trigger_ct_h);
if (PTL_OK != ret) {
@ -33,17 +91,20 @@ ompi_mtl_portals4_flowctl_init(void)
me.start = NULL;
me.length = 0;
me.ct_handle = ompi_mtl_portals4.flowctl.trigger_ct_h;
me.min_free = 0;
me.uid = PTL_UID_ANY;
me.match_id.phys.nid = PTL_NID_ANY;
me.match_id.phys.pid = PTL_PID_ANY;
me.ignore_bits = 0;
me.options = PTL_ME_OP_PUT |
PTL_ME_ACK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE;
me.match_id.phys.nid = PTL_NID_ANY;
me.match_id.phys.pid = PTL_PID_ANY;
PTL_ME_EVENT_UNLINK_DISABLE |
PTL_ME_EVENT_COMM_DISABLE |
PTL_ME_EVENT_CT_COMM;
me.ct_handle = ompi_mtl_portals4.flowctl.trigger_ct_h;
me.match_bits = MTL_PORTALS4_FLOWCTL_TRIGGER;
me.ignore_bits = 0;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.flowctl_idx,
&me,
@ -66,6 +127,12 @@ ompi_mtl_portals4_flowctl_init(void)
goto error;
}
me.options = PTL_ME_OP_PUT |
PTL_ME_ACK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE |
PTL_ME_EVENT_COMM_DISABLE |
PTL_ME_EVENT_CT_COMM;
me.ct_handle = ompi_mtl_portals4.flowctl.alert_ct_h;
me.match_bits = MTL_PORTALS4_FLOWCTL_ALERT;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
@ -81,10 +148,14 @@ ompi_mtl_portals4_flowctl_init(void)
goto error;
}
me.options = PTL_ME_OP_PUT |
PTL_ME_ACK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE;
me.ct_handle = PTL_CT_NONE;
me.match_bits = MTL_PORTALS4_FLOWCTL_ALERT;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.flowctl_event_idx,
ompi_mtl_portals4.flowctl_idx,
&me,
PTL_PRIORITY_LIST,
NULL,
@ -105,6 +176,12 @@ ompi_mtl_portals4_flowctl_init(void)
goto error;
}
me.options = PTL_ME_OP_PUT |
PTL_ME_ACK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE |
PTL_ME_EVENT_COMM_DISABLE |
PTL_ME_EVENT_CT_COMM;
me.ct_handle = ompi_mtl_portals4.flowctl.fanin_ct_h;
me.match_bits = MTL_PORTALS4_FLOWCTL_FANIN;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
@ -129,6 +206,12 @@ ompi_mtl_portals4_flowctl_init(void)
goto error;
}
me.options = PTL_ME_OP_PUT |
PTL_ME_ACK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE |
PTL_ME_EVENT_COMM_DISABLE |
PTL_ME_EVENT_CT_COMM;
me.ct_handle = ompi_mtl_portals4.flowctl.fanout_ct_h;
me.match_bits = MTL_PORTALS4_FLOWCTL_FANOUT;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
@ -144,10 +227,14 @@ ompi_mtl_portals4_flowctl_init(void)
goto error;
}
me.options = PTL_ME_OP_PUT |
PTL_ME_ACK_DISABLE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE;
me.ct_handle = PTL_CT_NONE;
me.match_bits = MTL_PORTALS4_FLOWCTL_FANOUT;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.flowctl_event_idx,
ompi_mtl_portals4.flowctl_idx,
&me,
PTL_PRIORITY_LIST,
NULL,
@ -185,12 +272,14 @@ ompi_mtl_portals4_flowctl_add_procs(size_t me,
/* if epoch isn't 0, that means setup trees has been called, which
means that this add_procs is a dynamic process, which we don't
support */
if (ompi_mtl_portals4.flowctl.epoch_counter != 0) {
if (ompi_mtl_portals4.flowctl.epoch_counter != 1) {
return OMPI_ERR_NOT_SUPPORTED;
}
ompi_mtl_portals4.flowctl.num_procs = npeers;
if (0 != me) {
if (0 == me) {
ompi_mtl_portals4.flowctl.i_am_root = 1;
} else {
ompi_mtl_portals4.flowctl.parent =
peers[(me - 1) / 2]->ptl_proc;
}
@ -276,7 +365,7 @@ ompi_mtl_portals4_flowctl_setup_comm(void)
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.me,
ompi_mtl_portals4.flowctl_event_idx,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_ALERT,
0,
NULL,
@ -332,7 +421,7 @@ ompi_mtl_portals4_flowctl_setup_comm(void)
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.me,
ompi_mtl_portals4.flowctl_event_idx,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_FANOUT,
0,
NULL,
@ -397,7 +486,7 @@ ompi_mtl_portals4_flowctl_setup_comm(void)
0,
PTL_NO_ACK_REQ,
ompi_mtl_portals4.flowctl.me,
ompi_mtl_portals4.flowctl_event_idx,
ompi_mtl_portals4.flowctl_idx,
MTL_PORTALS4_FLOWCTL_FANOUT,
0,
NULL,
@ -429,6 +518,12 @@ ompi_mtl_portals4_flowctl_start_recover(void)
{
int ret;
if (ompi_mtl_portals4.flowctl.flowctl_active) {
return OMPI_SUCCESS;
} else {
ompi_mtl_portals4.flowctl.flowctl_active = true;
}
ret = ompi_mtl_portals4_flowctl_setup_comm();
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
@ -438,7 +533,7 @@ ompi_mtl_portals4_flowctl_start_recover(void)
}
/* drain all pending sends */
while (0 != opal_list_get_size(&ompi_mtl_portals4.active_sends)) {
while (0 != opal_list_get_size(&ompi_mtl_portals4.flowctl.active_sends)) {
opal_progress();
}

Просмотреть файл

@ -10,31 +10,68 @@
#ifndef MTL_PORTALS_FLOWCTL_H
#define MTL_PORTALS_FLOWCTL_H
#include "opal/class/opal_free_list.h"
#include "mtl_portals4_request.h"
struct mca_mtl_base_endpoint_t;
struct ompi_mtl_portals4_isend_request_t;
struct ompi_mtl_portals4_pending_request_t {
opal_free_list_item_t super;
mca_pml_base_send_mode_t mode;
void *start;
size_t length;
int contextid;
int tag;
int my_rank;
struct mca_mtl_base_endpoint_t *endpoint;
struct ompi_mtl_portals4_isend_request_t *ptl_request;
};
typedef struct ompi_mtl_portals4_pending_request_t ompi_mtl_portals4_pending_request_t;
OBJ_CLASS_DECLARATION(ompi_mtl_portals4_pending_request_t);
struct ompi_mtl_portals4_flowctl_t {
bool flowctl_active;
opal_list_t active_sends;
opal_list_t pending_sends;
opal_free_list_t pending_fl;
opal_mutex_t mutex;
int32_t slots;
ompi_mtl_portals4_base_request_t alert_req;
ompi_mtl_portals4_base_request_t fanout_req;
/** Flow control epoch counter. Triggered events should be
based on epoch counter. */
uint32_t epoch_counter;
/** Flow control trigger CT. Only has meaning at root. */
ptl_handle_ct_t trigger_ct_h;
/** Flow control trigger ME. Only has meaning at root. When an
event is received on this ME, it triggers the flow control
alert broadcast.*/
ptl_handle_me_t trigger_me_h;
/** Flow control trigger CT. Only has meaning at root. */
ptl_handle_ct_t trigger_ct_h;
/** Flow control alert tree broadcast ME. */
ptl_handle_me_t alert_me_h;
/** Flow control alert tree broadcast CT. */
ptl_handle_ct_t alert_ct_h;
/** Flow control alert tree broadcast ME. */
ptl_handle_me_t alert_me_h;
/** Flow control alert tree broadcast ME for a local put to
generate an event */
ptl_handle_me_t alert_event_me_h;
/** Flow control restart fan-in ME. */
ptl_handle_me_t fanin_me_h;
/** Flow control restart fan-in CT. */
ptl_handle_ct_t fanin_ct_h;
/** Flow control restart fan-out ME. */
ptl_handle_me_t fanout_me_h;
/** Flow control restart fan-in ME. */
ptl_handle_me_t fanin_me_h;
/** Flow control restart fan-out CT. */
ptl_handle_ct_t fanout_ct_h;
/** Flow control restart fan-out ME. */
ptl_handle_me_t fanout_me_h;
/** Flow control restart fan-out ME for a local put to generate an
event */
ptl_handle_me_t fanout_event_me_h;
@ -64,4 +101,7 @@ int ompi_mtl_portals4_flowctl_add_procs(size_t me,
int ompi_mtl_portals4_flowctl_setup_comm(void);
int ompi_mtl_portals4_flowctl_start_recover(void);
void ompi_mtl_portals4_pending_list_progress(void);
#endif

Просмотреть файл

@ -50,7 +50,7 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
switch (ev->type) {
case PTL_EVENT_PUT:
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "Recv %d (0x%lx) got put event",
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "Recv %lu (0x%lx) got put event",
ptl_request->opcount, ev->hdr_data));
if (ev->ni_fail_type != PTL_NI_OK) {
@ -128,14 +128,14 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
}
ptl_request->super.super.ompi_req->req_status._ucount = ev->mlength;
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "Recv %d (0x%lx) completed, expected",
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "Recv %lu (0x%lx) completed, expected",
ptl_request->opcount, ptl_request->hdr_data));
ptl_request->super.super.completion_callback(&ptl_request->super.super);
}
break;
case PTL_EVENT_REPLY:
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "Recv %d (0x%lx) got reply event",
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "Recv %lu (0x%lx) got reply event",
ptl_request->opcount, ptl_request->hdr_data));
if (ev->ni_fail_type != PTL_NI_OK) {
@ -169,13 +169,13 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
}
PtlMDRelease(ptl_request->md_h);
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "Recv %d (0x%lx) completed, reply",
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "Recv %lu (0x%lx) completed, reply",
ptl_request->opcount, ptl_request->hdr_data));
ptl_request->super.super.completion_callback(&ptl_request->super.super);
break;
case PTL_EVENT_PUT_OVERFLOW:
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "Recv %d (0x%lx) got put_overflow event",
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "Recv %lu (0x%lx) got put_overflow event",
ptl_request->opcount, ev->hdr_data));
if (ev->ni_fail_type != PTL_NI_OK) {
@ -227,7 +227,8 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
}
/* if it's a sync, send the ack */
if (MTL_PORTALS4_IS_SYNC_MSG(ev->hdr_data)) {
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "Recv %d (0x%lx) sending sync ack",
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"Recv %lu (0x%lx) sending sync ack",
ptl_request->opcount, ptl_request->hdr_data));
ret = PtlPut(ompi_mtl_portals4.zero_md_h,
0,
@ -247,7 +248,8 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
}
}
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "Recv %d (0x%lx) completed, unexpected short (0x%lx)",
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"Recv %lu (0x%lx) completed, unexpected short (0x%lx)",
ptl_request->opcount, ptl_request->hdr_data, (long) ev->start));
ptl_request->super.super.completion_callback(&ptl_request->super.super);
@ -277,7 +279,7 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
goto callback_error;
}
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "Recv %d (0x%lx) getting long data",
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "Recv %lu (0x%lx) getting long data",
ptl_request->opcount, ptl_request->hdr_data));
ret = PtlGet(ptl_request->md_h,
0,
@ -369,7 +371,7 @@ ompi_mtl_portals4_irecv(struct mca_mtl_base_module_t* mtl,
ptl_request->super.super.ompi_req->req_status.MPI_ERROR = OMPI_SUCCESS;
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"Recv %d from %x,%x of length %d (0x%lx, 0x%lx, 0x%lx)\n",
"Recv %lu from %x,%x of length %d (0x%lx, 0x%lx, 0x%lx)\n",
ptl_request->opcount,
remote_proc.phys.nid, remote_proc.phys.pid,
(int)length, match_bits, ignore_bits, (unsigned long) ptl_request));
@ -439,7 +441,7 @@ ompi_mtl_portals4_imrecv(struct mca_mtl_base_module_t* mtl,
}
#if OPAL_ENABLE_DEBUG
ptl_request->opcount = ++ompi_mtl_portals4.recv_opcount;
ptl_request->opcount = opal_atomic_add_64((int64_t*) &ompi_mtl_portals4.recv_opcount, 1);
ptl_request->hdr_data = 0;
#endif
ptl_request->super.type = portals4_req_recv;
@ -451,7 +453,7 @@ ompi_mtl_portals4_imrecv(struct mca_mtl_base_module_t* mtl,
ptl_request->super.super.ompi_req->req_status.MPI_ERROR = OMPI_SUCCESS;
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"Mrecv %d of length %d (0x%lx)\n",
"Mrecv %lu of length %d (0x%lx)\n",
ptl_request->opcount,
(int)length, (unsigned long) ptl_request));

Просмотреть файл

@ -24,12 +24,15 @@
#include "ompi/mca/mtl/mtl.h"
struct ompi_mtl_portals4_message_t;
struct ompi_mtl_portals4_pending_request_t;
typedef enum { portals4_req_isend,
portals4_req_send,
portals4_req_recv,
portals4_req_probe,
portals4_req_recv_short
portals4_req_recv_short,
portals4_req_flowctl
} ompi_mtl_portals4_request_type_t;
@ -46,8 +49,11 @@ struct ompi_mtl_portals4_isend_request_t {
void *buffer_ptr;
ptl_handle_md_t md_h;
ptl_handle_me_t me_h;
int32_t event_count;
int32_t opcount;
uint64_t opcount;
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
struct ompi_mtl_portals4_pending_request_t *pending;
#endif
uint32_t event_count;
};
typedef struct ompi_mtl_portals4_isend_request_t ompi_mtl_portals4_isend_request_t;
@ -70,7 +76,7 @@ struct ompi_mtl_portals4_recv_request_t {
size_t delivery_len;
volatile bool req_started;
#if OPAL_ENABLE_DEBUG
int32_t opcount;
uint64_t opcount;
ptl_hdr_data_t hdr_data;
#endif
};

Просмотреть файл

@ -27,6 +27,9 @@
#include "mtl_portals4.h"
#include "mtl_portals4_request.h"
#include "mtl_portals4_endpoint.h"
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
#include "mtl_portals4_flowctl.h"
#endif
static inline int
@ -37,35 +40,58 @@ ompi_mtl_portals4_callback(ptl_event_t *ev,
int retval = OMPI_SUCCESS, ret, val, add = 1;
ompi_mtl_portals4_isend_request_t* ptl_request =
(ompi_mtl_portals4_isend_request_t*) ptl_base_request;
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
ompi_mtl_portals4_pending_request_t *pending =
ptl_request->pending;
if (ev->ni_fail_type == PTL_NI_FLOW_CTRL) {
ompi_mtl_portals4_flowctl_start_recover();
opal_list_remove_item(&ompi_mtl_portals4.flowctl.active_sends,
&pending->super.super);
opal_list_append(&ompi_mtl_portals4.flowctl.pending_sends,
&pending->super.super);
return OMPI_SUCCESS;
}
#endif
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: isend callback ni_fail_type: %d",
"%s:%d: send callback ni_fail_type: %d",
__FILE__, __LINE__, ev->ni_fail_type);
*complete = true;
return OMPI_ERROR;
}
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"isend %d got event of type %d",
"send %lu got event of type %d",
ptl_request->opcount, ev->type));
/* If we received an ack in the priority list, that's worth two
messages. If it hit the overflow list, that's only one. Since
we start eager messages at one count, have to compare >=
instead of just == */
if (ev->type == PTL_EVENT_ACK &&
ev->ptl_list == PTL_PRIORITY_LIST) {
/* ack on the priority list, so will never see an event on the me */
ret = PtlMEUnlink(ptl_request->me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: isend callback PtlMDUnlink returned %d",
__FILE__, __LINE__, ret);
if (ev->type == PTL_EVENT_ACK) {
if (ev->ptl_list == PTL_PRIORITY_LIST) {
/* ack on the priority list, so will never see an event on the me */
if (PTL_OK != PtlHandleIsEqual(ptl_request->me_h, PTL_INVALID_HANDLE)) {
ret = PtlMEUnlink(ptl_request->me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: send callback PtlMDUnlink returned %d",
__FILE__, __LINE__, ret);
}
}
add++;
}
add++;
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
/* once the ack is received, we're out of flow control problem
regions, so we can remove this list entry */
opal_list_remove_item(&ompi_mtl_portals4.flowctl.active_sends,
&pending->super.super);
OPAL_FREE_LIST_RETURN(&ompi_mtl_portals4.flowctl.pending_fl,
&pending->super);
#endif
}
val = opal_atomic_add_32(&ptl_request->event_count, add);
val = opal_atomic_add_32((int32_t*)&ptl_request->event_count, add);
if (val >= 3) {
if (NULL != ptl_request->buffer_ptr) {
@ -74,13 +100,17 @@ ompi_mtl_portals4_callback(ptl_event_t *ev,
ret = PtlMDRelease(ptl_request->md_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: isend callback PtlMDRelease returned %d",
"%s:%d: send callback PtlMDRelease returned %d",
__FILE__, __LINE__, ret);
retval = OMPI_ERROR;
}
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "isend %d completed",
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output, "send %lu completed",
ptl_request->opcount));
*complete = true;
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
opal_atomic_add_32(&ompi_mtl_portals4.flowctl.slots, 1);
ompi_mtl_portals4_pending_list_progress();
#endif
}
return retval;
@ -126,7 +156,7 @@ ompi_mtl_portals4_isend_callback(ptl_event_t *ev,
}
static int
static inline int
ompi_mtl_portals4_short_isend(mca_pml_base_send_mode_t mode,
void *start, int length, int contextid, int tag,
int localrank,
@ -135,15 +165,15 @@ ompi_mtl_portals4_short_isend(mca_pml_base_send_mode_t mode,
{
int ret;
ptl_match_bits_t match_bits;
ptl_hdr_data_t hdr_data;
ptl_md_t md;
ptl_request->event_count = 1;
ptl_me_t me;
ptl_hdr_data_t hdr_data;
MTL_PORTALS4_SET_SEND_BITS(match_bits, contextid, localrank, tag,
MTL_PORTALS4_SHORT_MSG);
MTL_PORTALS4_SET_HDR_DATA(hdr_data, ptl_request->opcount, length, 0);
MTL_PORTALS4_SET_HDR_DATA(hdr_data, ptl_request->opcount, length,
(MCA_PML_BASE_SEND_SYNCHRONOUS == mode) ? 1 : 0);
md.start = start;
md.length = length;
@ -161,9 +191,46 @@ ompi_mtl_portals4_short_isend(mca_pml_base_send_mode_t mode,
return ompi_mtl_portals4_get_error(ret);
}
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"Send %d short send with hdr_data 0x%lx (0x%lx)",
ptl_request->opcount, hdr_data, match_bits));
if (MCA_PML_BASE_SEND_SYNCHRONOUS == mode) {
me.start = NULL;
me.length = 0;
me.ct_handle = PTL_CT_NONE;
me.min_free = 0;
me.uid = PTL_UID_ANY;
me.options =
PTL_ME_OP_PUT |
PTL_ME_USE_ONCE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE;
me.match_id = endpoint->ptl_proc;
me.match_bits = hdr_data;
me.ignore_bits = 0;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.read_idx,
&me,
PTL_PRIORITY_LIST,
ptl_request,
&ptl_request->me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlMEAppend failed: %d",
__FILE__, __LINE__, ret);
PtlMDRelease(ptl_request->md_h);
return ompi_mtl_portals4_get_error(ret);
}
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"Send %lu short sync send with hdr_data 0x%lx (0x%lx)",
ptl_request->opcount, hdr_data, match_bits));
} else {
ptl_request->event_count = 1;
ptl_request->me_h = PTL_INVALID_HANDLE;
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"Send %lu short send with hdr_data 0x%lx (0x%lx)",
ptl_request->opcount, hdr_data, match_bits));
}
ret = PtlPut(ptl_request->md_h,
0,
@ -179,6 +246,9 @@ ompi_mtl_portals4_short_isend(mca_pml_base_send_mode_t mode,
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlPut failed: %d",
__FILE__, __LINE__, ret);
if (MCA_PML_BASE_SEND_SYNCHRONOUS == mode) {
PtlMEUnlink(ptl_request->me_h);
}
PtlMDRelease(ptl_request->md_h);
return ompi_mtl_portals4_get_error(ret);
}
@ -186,98 +256,9 @@ ompi_mtl_portals4_short_isend(mca_pml_base_send_mode_t mode,
return OMPI_SUCCESS;
}
static int
ompi_mtl_portals4_sync_isend(void *start, int length, int contextid, int tag,
int localrank,
mca_mtl_base_endpoint_t *endpoint,
ompi_mtl_portals4_isend_request_t *ptl_request)
{
int ret;
ptl_match_bits_t match_bits;
ptl_md_t md;
ptl_me_t me;
ptl_hdr_data_t hdr_data;
MTL_PORTALS4_SET_SEND_BITS(match_bits, contextid, localrank, tag,
MTL_PORTALS4_SHORT_MSG);
MTL_PORTALS4_SET_HDR_DATA(hdr_data, ptl_request->opcount, length, 1);
md.start = start;
md.length = length;
md.options = 0;
md.eq_handle = ompi_mtl_portals4.send_eq_h;
md.ct_handle = PTL_CT_NONE;
ret = PtlMDBind(ompi_mtl_portals4.ni_h,
&md,
&ptl_request->md_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlMDBind failed: %d",
__FILE__, __LINE__, ret);
return ompi_mtl_portals4_get_error(ret);
}
me.start = NULL;
me.length = 0;
me.ct_handle = PTL_CT_NONE;
me.min_free = 0;
me.uid = PTL_UID_ANY;
me.options =
PTL_ME_OP_PUT |
PTL_ME_USE_ONCE |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE;
me.match_id = endpoint->ptl_proc;
me.match_bits = hdr_data;
me.ignore_bits = 0;
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.read_idx,
&me,
PTL_PRIORITY_LIST,
ptl_request,
&ptl_request->me_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlMEAppend failed: %d",
__FILE__, __LINE__, ret);
PtlMDRelease(ptl_request->md_h);
return ompi_mtl_portals4_get_error(ret);
}
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"Send %d short sync send with hdr_data 0x%lx (0x%lx)",
ptl_request->opcount, hdr_data, match_bits));
ret = PtlPut(ptl_request->md_h,
0,
length,
PTL_ACK_REQ,
endpoint->ptl_proc,
ompi_mtl_portals4.send_idx,
match_bits,
0,
ptl_request,
hdr_data);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlPut failed: %d",
__FILE__, __LINE__, ret);
PtlMEUnlink(ptl_request->me_h);
PtlMDRelease(ptl_request->md_h);
return ompi_mtl_portals4_get_error(ret);
}
return OMPI_SUCCESS;
}
static int
static inline int
ompi_mtl_portals4_long_isend(void *start, int length, int contextid, int tag,
int localrank, int destrank,
int localrank,
mca_mtl_base_endpoint_t *endpoint,
ompi_mtl_portals4_isend_request_t *ptl_request)
{
@ -338,7 +319,7 @@ ompi_mtl_portals4_long_isend(void *start, int length, int contextid, int tag,
}
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"Send %d long send with hdr_data 0x%lx (0x%lx)",
"Send %lu long send with hdr_data 0x%lx (0x%lx)",
ptl_request->opcount, hdr_data, match_bits));
put_length = (rndv == ompi_mtl_portals4.protocol) ?
@ -366,6 +347,57 @@ ompi_mtl_portals4_long_isend(void *start, int length, int contextid, int tag,
}
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
void
ompi_mtl_portals4_pending_list_progress()
{
int ret, val;
opal_list_item_t *item;
ompi_mtl_portals4_pending_request_t *pending;
while (!ompi_mtl_portals4.flowctl.flowctl_active) {
val = opal_atomic_add_32(&ompi_mtl_portals4.flowctl.slots, -1);
if (val <= 0) {
opal_atomic_add_32(&ompi_mtl_portals4.flowctl.slots, 1);
return;
}
item = opal_list_remove_first(&ompi_mtl_portals4.flowctl.pending_sends);
if (NULL == item) {
opal_atomic_add_32(&ompi_mtl_portals4.flowctl.slots, 1);
return;
}
pending = (ompi_mtl_portals4_pending_request_t*) item;
opal_list_append(&ompi_mtl_portals4.flowctl.active_sends,
&pending->super.super);
if (pending->length <= ompi_mtl_portals4.eager_limit) {
ret = ompi_mtl_portals4_short_isend(pending->mode,
pending->start,
pending->length,
pending->contextid,
pending->tag,
pending->my_rank,
pending->endpoint,
pending->ptl_request);
} else {
ret = ompi_mtl_portals4_long_isend(pending->start,
pending->length,
pending->contextid,
pending->tag,
pending->my_rank,
pending->endpoint,
pending->ptl_request);
}
if (OMPI_SUCCESS != ret) {
opal_list_prepend(&ompi_mtl_portals4.flowctl.pending_sends,
&pending->super.super);
}
}
}
#endif
static inline int
ompi_mtl_portals4_start_send(struct mca_mtl_base_module_t* mtl,
struct ompi_communicator_t* comm,
@ -386,48 +418,62 @@ ompi_mtl_portals4_start_send(struct mca_mtl_base_module_t* mtl,
ret = ompi_mtl_datatype_pack(convertor, &start, &length, &free_after);
if (OMPI_SUCCESS != ret) return ret;
ptl_request->opcount = ++ompi_mtl_portals4.opcount;
ptl_request->opcount = opal_atomic_add_64((int64_t*)&ompi_mtl_portals4.opcount, 1);
ptl_request->buffer_ptr = (free_after) ? start : NULL;
ptl_request->event_count = 0;
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"Send %d to %x,%x of length %d\n",
"Send %lu to %x,%x of length %d\n",
ptl_request->opcount,
endpoint->ptl_proc.phys.nid,
endpoint->ptl_proc.phys.pid,
(int)length));
#if OMPI_MTL_PORTALS4_FLOW_CONTROL
{
opal_free_list_item_t *item;
ompi_mtl_portals4_pending_request_t *pending;
OPAL_FREE_LIST_GET(&ompi_mtl_portals4.flowctl.pending_fl, item, ret);
if (NULL == item) return OMPI_ERR_OUT_OF_RESOURCE;
pending = (ompi_mtl_portals4_pending_request_t*) item;
pending->mode = mode;
pending->start = start;
pending->length = length;
pending->contextid = comm->c_contextid;
pending->tag = tag;
pending->my_rank = comm->c_my_rank;
pending->endpoint = endpoint;
pending->ptl_request = ptl_request;
ptl_request->pending = pending;
opal_list_append(&ompi_mtl_portals4.flowctl.pending_sends,
&pending->super.super);
ompi_mtl_portals4_pending_list_progress();
}
#else
if (length <= ompi_mtl_portals4.eager_limit) {
if (MCA_PML_BASE_SEND_SYNCHRONOUS == mode) {
ret = ompi_mtl_portals4_sync_isend(start,
length,
comm->c_contextid,
tag,
comm->c_my_rank,
endpoint,
ptl_request);
} else {
ret = ompi_mtl_portals4_short_isend(mode,
start,
length,
comm->c_contextid,
tag,
comm->c_my_rank,
endpoint,
ptl_request);
}
ret = ompi_mtl_portals4_short_isend(mode,
start,
length,
comm->c_contextid,
tag,
comm->c_my_rank,
endpoint,
ptl_request);
} else {
ret = ompi_mtl_portals4_long_isend(start,
length,
comm->c_contextid,
tag,
comm->c_my_rank,
dest,
endpoint,
ptl_request);
}
#endif
return OMPI_SUCCESS;
return ret;
}
@ -476,7 +522,8 @@ ompi_mtl_portals4_isend(struct mca_mtl_base_module_t* mtl,
mca_mtl_request_t *mtl_request)
{
int ret = OMPI_SUCCESS;
ompi_mtl_portals4_isend_request_t *ptl_request = (ompi_mtl_portals4_isend_request_t*)mtl_request;
ompi_mtl_portals4_isend_request_t *ptl_request =
(ompi_mtl_portals4_isend_request_t*) mtl_request;
ptl_request->super.type = portals4_req_isend;
ptl_request->super.event_callback = ompi_mtl_portals4_isend_callback;