1
1

* Set all appropriate flags for portal table entries

* split eq into send and receive eqs so that we can control the number
  of outstanding events in send eq and ensure we never lose an ack
* Shouldn't ever truncate on short unexpected receive bocks, so don't set
  the truncate bit
* Track active vs. waiting for free short unexpected receive blocks so
  to ensure an active short unexpected receive block is posted coming out
  of flow control.  Also allow creation of "temporary" blocks which should
  be released once FREE event is received.
* Slight reorganization of some code in preparation for more flow control
  work.

This commit was SVN r26174.
Этот коммит содержится в:
Brian Barrett 2012-03-21 22:20:55 +00:00
родитель 0fb6f1c7ac
Коммит 1c6b5a1358
7 изменённых файлов: 203 добавлений и 120 удалений

Просмотреть файл

@ -134,19 +134,21 @@ ompi_mtl_portals4_finalize(struct mca_mtl_base_module_t *mtl)
opal_progress_unregister(ompi_mtl_portals4_progress);
while (0 != ompi_mtl_portals4_progress()) { }
ompi_mtl_portals4_recv_short_fini(&ompi_mtl_portals4);
ompi_mtl_portals4_recv_short_fini();
PtlMEUnlink(ompi_mtl_portals4.long_overflow_me_h);
PtlMDRelease(ompi_mtl_portals4.zero_md_h);
PtlPTFree(ompi_mtl_portals4.ni_h, ompi_mtl_portals4.read_idx);
PtlPTFree(ompi_mtl_portals4.ni_h, ompi_mtl_portals4.send_idx);
PtlEQFree(ompi_mtl_portals4.eq_h);
PtlEQFree(ompi_mtl_portals4.send_eq_h);
PtlEQFree(ompi_mtl_portals4.recv_eq_h);
PtlNIFini(ompi_mtl_portals4.ni_h);
PtlFini();
return OMPI_SUCCESS;
}
int
ompi_mtl_portals4_cancel(struct mca_mtl_base_module_t* mtl,
mca_mtl_request_t *mtl_request,
@ -154,65 +156,3 @@ ompi_mtl_portals4_cancel(struct mca_mtl_base_module_t* mtl,
{
return OMPI_SUCCESS;
}
int
ompi_mtl_portals4_progress(void)
{
int count = 0, ret;
ptl_event_t ev;
ompi_mtl_portals4_base_request_t *ptl_request;
while (true) {
ret = PtlEQGet(ompi_mtl_portals4.eq_h, &ev);
if (PTL_OK == ret) {
OPAL_OUTPUT_VERBOSE((60, ompi_mtl_base_output,
"Found event of type %d\n", ev.type));
switch (ev.type) {
case PTL_EVENT_GET:
case PTL_EVENT_PUT:
case PTL_EVENT_PUT_OVERFLOW:
case PTL_EVENT_ATOMIC:
case PTL_EVENT_ATOMIC_OVERFLOW:
case PTL_EVENT_REPLY:
case PTL_EVENT_SEND:
case PTL_EVENT_ACK:
case PTL_EVENT_AUTO_FREE:
case PTL_EVENT_SEARCH:
if (NULL != ev.user_ptr) {
ptl_request = ev.user_ptr;
ret = ptl_request->event_callback(&ev, ptl_request);
if (OMPI_SUCCESS != ret) {
opal_output(ompi_mtl_base_output,
"Error returned from target event callback: %d", ret);
abort();
}
}
break;
case PTL_EVENT_PT_DISABLED:
/* BWB: FIX ME: do stuff - flow control */
opal_output(ompi_mtl_base_output, "Unhandled send flow control event.");
abort();
break;
case PTL_EVENT_AUTO_UNLINK:
opal_output_verbose(1, ompi_mtl_base_output,
"Unexpected auto unlink event");
break;
case PTL_EVENT_LINK:
case PTL_EVENT_GET_OVERFLOW:
case PTL_EVENT_FETCH_ATOMIC:
case PTL_EVENT_FETCH_ATOMIC_OVERFLOW:
opal_output_verbose(1, ompi_mtl_base_output,
"Unexpected event of type %d", ev.type);
}
} else if (PTL_EQ_EMPTY == ret) {
break;
} else {
opal_output(ompi_mtl_base_output,
"Error returned from PtlEQGet: %d", ret);
abort();
}
}
return count;
}

Просмотреть файл

@ -33,6 +33,8 @@
BEGIN_C_DECLS
struct mca_mtl_portals4_send_request_t;
struct mca_mtl_portals4_module_t {
mca_mtl_base_module_t base;
@ -47,15 +49,15 @@ struct mca_mtl_portals4_module_t {
/* global handles */
ptl_handle_ni_t ni_h;
ptl_handle_eq_t eq_h;
ptl_handle_eq_t eqs_h[2];
/* for zero-length sends and acks */
ptl_handle_md_t zero_md_h;
/* long message receive overflow */
ptl_handle_me_t long_overflow_me_h;
ompi_mtl_portals4_request_t long_overflow_request;
opal_list_t recv_short_blocks;
opal_list_t active_recv_short_blocks;
opal_list_t waiting_recv_short_blocks;
/* number of operations started */
uint32_t opcount;
@ -67,10 +69,14 @@ struct mca_mtl_portals4_module_t {
};
typedef struct mca_mtl_portals4_module_t mca_mtl_portals4_module_t;
#define send_eq_h eqs_h[0]
#define recv_eq_h eqs_h[1]
extern mca_mtl_portals4_module_t ompi_mtl_portals4;
#define REQ_SEND_TABLE_ID 2
#define REQ_READ_TABLE_ID 3
#define REQ_SEND_TABLE_ID 2
#define REQ_READ_TABLE_ID 3
#define REQ_FLOWCTL_TABLE_ID 4
/* match/ignore bit manipulation

Просмотреть файл

@ -35,6 +35,7 @@ static mca_mtl_base_module_t*
ompi_mtl_portals4_component_init(bool enable_progress_threads,
bool enable_mpi_threads);
OMPI_MODULE_DECLSPEC extern mca_mtl_base_component_2_0_0_t mca_mtl_portals4_component;
mca_mtl_base_component_2_0_0_t mca_mtl_portals4_component = {
@ -135,7 +136,8 @@ ompi_mtl_portals4_component_open(void)
"Other");
ompi_mtl_portals4.ni_h = PTL_INVALID_HANDLE;
ompi_mtl_portals4.eq_h = PTL_INVALID_HANDLE;
ompi_mtl_portals4.send_eq_h = PTL_INVALID_HANDLE;
ompi_mtl_portals4.recv_eq_h = PTL_INVALID_HANDLE;
ompi_mtl_portals4.zero_md_h = PTL_INVALID_HANDLE;
ompi_mtl_portals4.long_overflow_me_h = PTL_INVALID_HANDLE;
ompi_mtl_portals4.send_idx = (ptl_pt_index_t) ~0UL;
@ -208,7 +210,16 @@ ompi_mtl_portals4_component_init(bool enable_progress_threads,
/* create event queue */
ret = PtlEQAlloc(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.queue_size,
&ompi_mtl_portals4.eq_h);
&ompi_mtl_portals4.send_eq_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlEQAlloc failed: %d\n",
__FILE__, __LINE__, ret);
goto error;
}
ret = PtlEQAlloc(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.queue_size,
&ompi_mtl_portals4.recv_eq_h);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlEQAlloc failed: %d\n",
@ -218,8 +229,10 @@ ompi_mtl_portals4_component_init(bool enable_progress_threads,
/* Create portal table entries */
ret = PtlPTAlloc(ompi_mtl_portals4.ni_h,
PTL_PT_ONLY_USE_ONCE | PTL_PT_FLOWCTRL,
ompi_mtl_portals4.eq_h,
PTL_PT_ONLY_USE_ONCE |
PTL_PT_ONLY_TRUNCATE |
PTL_PT_FLOWCTRL,
ompi_mtl_portals4.recv_eq_h,
REQ_SEND_TABLE_ID,
&ompi_mtl_portals4.send_idx);
if (PTL_OK != ret) {
@ -229,8 +242,9 @@ ompi_mtl_portals4_component_init(bool enable_progress_threads,
goto error;
}
ret = PtlPTAlloc(ompi_mtl_portals4.ni_h,
PTL_PT_ONLY_USE_ONCE,
ompi_mtl_portals4.eq_h,
PTL_PT_ONLY_USE_ONCE |
PTL_PT_ONLY_TRUNCATE,
ompi_mtl_portals4.send_eq_h,
REQ_READ_TABLE_ID,
&ompi_mtl_portals4.read_idx);
if (PTL_OK != ret) {
@ -288,7 +302,7 @@ ompi_mtl_portals4_component_init(bool enable_progress_threads,
}
/* attach short unex recv blocks */
ret = ompi_mtl_portals4_recv_short_init(&ompi_mtl_portals4);
ret = ompi_mtl_portals4_recv_short_init();
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: short receive block initialization failed: %d\n",
@ -325,8 +339,11 @@ ompi_mtl_portals4_component_init(bool enable_progress_threads,
if (ompi_mtl_portals4.send_idx != (ptl_pt_index_t) ~0UL) {
PtlPTFree(ompi_mtl_portals4.ni_h, ompi_mtl_portals4.send_idx);
}
if (!PtlHandleIsEqual(ompi_mtl_portals4.eq_h, PTL_INVALID_HANDLE)) {
PtlEQFree(ompi_mtl_portals4.eq_h);
if (!PtlHandleIsEqual(ompi_mtl_portals4.send_eq_h, PTL_INVALID_HANDLE)) {
PtlEQFree(ompi_mtl_portals4.send_eq_h);
}
if (!PtlHandleIsEqual(ompi_mtl_portals4.recv_eq_h, PTL_INVALID_HANDLE)) {
PtlEQFree(ompi_mtl_portals4.recv_eq_h);
}
return NULL;
}
@ -389,3 +406,86 @@ ompi_mtl_portals4_get_error(int ptl_error)
return ret;
}
int
ompi_mtl_portals4_progress(void)
{
int count = 0, ret;
unsigned int which;
ptl_event_t ev;
ompi_mtl_portals4_base_request_t *ptl_request;
while (true) {
ret = PtlEQPoll(ompi_mtl_portals4.eqs_h, 2, 0, &ev, &which);
if (PTL_OK == ret) {
OPAL_OUTPUT_VERBOSE((60, ompi_mtl_base_output,
"Found event of type %d\n", ev.type));
switch (ev.type) {
case PTL_EVENT_GET:
case PTL_EVENT_PUT:
case PTL_EVENT_PUT_OVERFLOW:
case PTL_EVENT_REPLY:
case PTL_EVENT_SEND:
case PTL_EVENT_ACK:
case PTL_EVENT_AUTO_FREE:
case PTL_EVENT_AUTO_UNLINK:
case PTL_EVENT_SEARCH:
if (NULL != ev.user_ptr) {
ptl_request = ev.user_ptr;
ret = ptl_request->event_callback(&ev, ptl_request);
if (OMPI_SUCCESS != ret) {
opal_output(ompi_mtl_base_output,
"Error returned from target event callback: %d", ret);
abort();
}
}
break;
case PTL_EVENT_PT_DISABLED:
/* catch up by draining rest of the queue */
ompi_mtl_portals4_progress();
/* get restarted */
if (ompi_mtl_portals4.send_idx == ev.pt_index) {
/* make sure we have at least one active short receive block */
ret = ompi_mtl_portals4_recv_short_link(1);
if (OMPI_SUCCESS != ret) {
opal_output(ompi_mtl_base_output,
"Unable to post short receive block after flow control.");
abort();
}
ret = PtlPTEnable(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.send_idx);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_mtl_base_output,
"%s:%d: PtlPTEnable failed: %d\n",
__FILE__, __LINE__, ret);
abort();
}
} else {
opal_output(ompi_mtl_base_output, "Unhandled send flow control event.");
abort();
}
break;
case PTL_EVENT_LINK:
case PTL_EVENT_GET_OVERFLOW:
case PTL_EVENT_FETCH_ATOMIC:
case PTL_EVENT_FETCH_ATOMIC_OVERFLOW:
case PTL_EVENT_ATOMIC:
case PTL_EVENT_ATOMIC_OVERFLOW:
opal_output_verbose(1, ompi_mtl_base_output,
"Unexpected event of type %d", ev.type);
}
} else if (PTL_EQ_EMPTY == ret) {
break;
} else {
opal_output(ompi_mtl_base_output,
"Error returned from PtlEQGet: %d", ret);
abort();
}
}
return count;
}

Просмотреть файл

@ -33,7 +33,7 @@
#include "mtl_portals4_recv_short.h"
/* called when a receive should be progressed */
int
static int
ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t* ptl_base_request)
{
@ -76,7 +76,7 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
md.length = ((msg_length > ptl_request->delivery_len) ?
ptl_request->delivery_len : msg_length) - ompi_mtl_portals4.eager_limit;
md.options = 0;
md.eq_handle = ompi_mtl_portals4.eq_h;
md.eq_handle = ompi_mtl_portals4.recv_eq_h;
md.ct_handle = PTL_CT_NONE;
ret = PtlMDBind(ompi_mtl_portals4.ni_h,
@ -253,7 +253,7 @@ ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
md.length = ((msg_length > ptl_request->delivery_len) ?
ptl_request->delivery_len : msg_length) - ev->mlength;
md.options = 0;
md.eq_handle = ompi_mtl_portals4.eq_h;
md.eq_handle = ompi_mtl_portals4.recv_eq_h;
md.ct_handle = PTL_CT_NONE;
ret = PtlMDBind(ompi_mtl_portals4.ni_h,

Просмотреть файл

@ -31,18 +31,30 @@ OBJ_CLASS_INSTANCE(ompi_mtl_portals4_recv_short_block_t,
NULL, NULL);
static inline int ompi_mtl_portals4_activate_block(ompi_mtl_portals4_recv_short_block_t *block);
static int ompi_mtl_portals4_recv_short_block_free(ompi_mtl_portals4_recv_short_block_t *block);
static int
ompi_mtl_portals4_recv_block_progress(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t* ptl_base_request)
{
int ret = OMPI_SUCCESS;
ompi_mtl_portals4_recv_short_request_t *ptl_request =
(ompi_mtl_portals4_recv_short_request_t*) ptl_base_request;
ompi_mtl_portals4_recv_short_block_t *block = ptl_request->block;
if (PTL_EVENT_AUTO_FREE == ev->type) {
return ompi_mtl_portals4_activate_block(block);
if (block->release_on_free) {
opal_list_remove_item(&ompi_mtl_portals4.waiting_recv_short_blocks,
&block->base);
ret = ompi_mtl_portals4_recv_short_block_free(block);
} else {
ret = ompi_mtl_portals4_activate_block(block);
}
} else if (PTL_EVENT_AUTO_UNLINK == ev->type) {
opal_list_remove_item(&ompi_mtl_portals4.active_recv_short_blocks,
&block->base);
opal_list_append(&ompi_mtl_portals4.waiting_recv_short_blocks,
&block->base);
} else {
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"OVERFLOW EVENT %d, hdr_data = %lx", ev->type, (long unsigned) ev->hdr_data));
@ -51,26 +63,20 @@ ompi_mtl_portals4_recv_block_progress(ptl_event_t *ev,
return OMPI_SUCCESS;
}
int
ompi_mtl_portals4_recv_short_block_repost(ptl_event_t *ev)
{
return ompi_mtl_portals4_activate_block(ev->user_ptr);
}
static ompi_mtl_portals4_recv_short_block_t*
ompi_mtl_portals4_recv_short_block_init(mca_mtl_portals4_module_t *mtl)
ompi_mtl_portals4_recv_short_block_alloc(bool release_on_free)
{
ompi_mtl_portals4_recv_short_block_t *block;
block = OBJ_NEW(ompi_mtl_portals4_recv_short_block_t);
block->mtl = mtl;
block->start = malloc(mtl->recv_short_size);
block->start = malloc(ompi_mtl_portals4.recv_short_size);
if (block->start == NULL) return NULL;
block->me_h = PTL_INVALID_HANDLE;
block->request.block = block;
block->request.super.event_callback = ompi_mtl_portals4_recv_block_progress;
block->release_on_free = release_on_free;
return block;
}
@ -103,20 +109,20 @@ ompi_mtl_portals4_activate_block(ompi_mtl_portals4_recv_short_block_t *block)
ptl_me_t me;
int ret;
opal_list_remove_item(&ompi_mtl_portals4.waiting_recv_short_blocks, &block->base);
ignore_bits = MTL_PORTALS4_CONTEXT_MASK | MTL_PORTALS4_SOURCE_MASK | MTL_PORTALS4_TAG_MASK;
me.start = block->start;
me.length = block->mtl->recv_short_size;
me.length = ompi_mtl_portals4.recv_short_size;
me.ct_handle = PTL_CT_NONE;
me.min_free = block->mtl->eager_limit;
me.min_free = ompi_mtl_portals4.eager_limit;
me.uid = PTL_UID_ANY;
me.options =
PTL_ME_OP_PUT |
PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_MANAGE_LOCAL |
PTL_ME_NO_TRUNCATE |
PTL_ME_MAY_ALIGN |
PTL_ME_EVENT_UNLINK_DISABLE |
PTL_ME_ACK_DISABLE;
#if !OPAL_ENABLE_DEBUG
me.options |= PTL_ME_EVENT_COMM_DISABLE;
@ -126,32 +132,41 @@ ompi_mtl_portals4_activate_block(ompi_mtl_portals4_recv_short_block_t *block)
me.match_bits = match_bits;
me.ignore_bits = ignore_bits;
ret = PtlMEAppend(block->mtl->ni_h,
ret = PtlMEAppend(ompi_mtl_portals4.ni_h,
ompi_mtl_portals4.send_idx,
&me,
PTL_OVERFLOW_LIST,
&block->request,
&block->me_h);
return (ret == PTL_OK) ? OMPI_SUCCESS : ompi_mtl_portals4_get_error(ret);
if (ret == PTL_OK) {
ret = OMPI_SUCCESS;
opal_list_append(&ompi_mtl_portals4.active_recv_short_blocks,
&block->base);
} else {
ret = ompi_mtl_portals4_get_error(ret);
}
return ret;
}
int
ompi_mtl_portals4_recv_short_init(mca_mtl_portals4_module_t *mtl)
ompi_mtl_portals4_recv_short_init(void)
{
int i;
OBJ_CONSTRUCT(&(mtl->recv_short_blocks), opal_list_t);
OBJ_CONSTRUCT(&(ompi_mtl_portals4.active_recv_short_blocks), opal_list_t);
OBJ_CONSTRUCT(&(ompi_mtl_portals4.waiting_recv_short_blocks), opal_list_t);
/* create the recv blocks */
for (i = 0 ; i < mtl->recv_short_num ; ++i) {
for (i = 0 ; i < ompi_mtl_portals4.recv_short_num ; ++i) {
ompi_mtl_portals4_recv_short_block_t *block =
ompi_mtl_portals4_recv_short_block_init(mtl);
ompi_mtl_portals4_recv_short_block_alloc(false);
if (NULL == block) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
opal_list_append(&(mtl->recv_short_blocks),
(opal_list_item_t*) block);
opal_list_append(&ompi_mtl_portals4.waiting_recv_short_blocks,
&block->base);
ompi_mtl_portals4_activate_block(block);
}
@ -160,11 +175,16 @@ ompi_mtl_portals4_recv_short_init(mca_mtl_portals4_module_t *mtl)
int
ompi_mtl_portals4_recv_short_fini(mca_mtl_portals4_module_t *mtl)
ompi_mtl_portals4_recv_short_fini(void)
{
opal_list_item_t *item;
while (NULL != (item = opal_list_remove_first(&mtl->recv_short_blocks))) {
while (NULL != (item = opal_list_remove_first(&ompi_mtl_portals4.active_recv_short_blocks))) {
ompi_mtl_portals4_recv_short_block_t *block =
(ompi_mtl_portals4_recv_short_block_t*) item;
ompi_mtl_portals4_recv_short_block_free(block);
}
while (NULL != (item = opal_list_remove_first(&ompi_mtl_portals4.waiting_recv_short_blocks))) {
ompi_mtl_portals4_recv_short_block_t *block =
(ompi_mtl_portals4_recv_short_block_t*) item;
ompi_mtl_portals4_recv_short_block_free(block);
@ -174,3 +194,24 @@ ompi_mtl_portals4_recv_short_fini(mca_mtl_portals4_module_t *mtl)
}
int
ompi_mtl_portals4_recv_short_link(int count)
{
int active = opal_list_get_size(&ompi_mtl_portals4.active_recv_short_blocks);
int i;
if (active < count) {
for (i = 0 ; i < (count - active) ; ++i) {
ompi_mtl_portals4_recv_short_block_t *block =
ompi_mtl_portals4_recv_short_block_alloc(false);
if (NULL == block) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
opal_list_append(&ompi_mtl_portals4.waiting_recv_short_blocks,
&block->base);
ompi_mtl_portals4_activate_block(block);
}
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -22,25 +22,21 @@
struct ompi_mtl_portals4_recv_short_block_t {
opal_list_item_t base;
mca_mtl_portals4_module_t *mtl;
void *start;
ptl_handle_me_t me_h;
struct ompi_mtl_portals4_recv_short_request_t request;
bool release_on_free;
};
typedef struct ompi_mtl_portals4_recv_short_block_t ompi_mtl_portals4_recv_short_block_t;
OBJ_CLASS_DECLARATION(ompi_mtl_portals4_recv_short_block_t);
extern int
ompi_mtl_portals4_recv_short_init(mca_mtl_portals4_module_t *mtl);
/* initialize and post short receive blocks */
extern int ompi_mtl_portals4_recv_short_init(void);
extern int
ompi_mtl_portals4_recv_short_fini(mca_mtl_portals4_module_t *mtl);
/* clean up all short receive blocks */
extern int ompi_mtl_portals4_recv_short_fini(void);
extern int
ompi_mtl_portals4_recv_short_block_repost(ptl_event_t *ev);
extern int
ompi_mtl_portals4_recv_progress(ptl_event_t *ev,
ompi_mtl_portals4_base_request_t* ptl_request);
/* ensure that there's at least N short receive blocks linked */
extern int ompi_mtl_portals4_recv_short_link(int count);
#endif /* OMPI_MTL_PORTALS_RECV_SHORT_H */

Просмотреть файл

@ -108,7 +108,7 @@ ompi_mtl_portals4_short_isend(mca_pml_base_send_mode_t mode,
md.start = start;
md.length = length;
md.options = 0;
md.eq_handle = ompi_mtl_portals4.eq_h;
md.eq_handle = ompi_mtl_portals4.send_eq_h;
md.ct_handle = PTL_CT_NONE;
ret = PtlMDBind(ompi_mtl_portals4.ni_h,
@ -169,7 +169,7 @@ ompi_mtl_portals4_sync_isend(void *start, int length, int contextid, int tag,
md.start = start;
md.length = length;
md.options = 0;
md.eq_handle = ompi_mtl_portals4.eq_h;
md.eq_handle = ompi_mtl_portals4.send_eq_h;
md.ct_handle = PTL_CT_NONE;
ret = PtlMDBind(ompi_mtl_portals4.ni_h,
@ -259,7 +259,7 @@ ompi_mtl_portals4_long_isend(void *start, int length, int contextid, int tag,
md.start = start;
md.length = length;
md.options = 0;
md.eq_handle = ompi_mtl_portals4.eq_h;
md.eq_handle = ompi_mtl_portals4.send_eq_h;
md.ct_handle = PTL_CT_NONE;
ret = PtlMDBind(ompi_mtl_portals4.ni_h,