* implement short unexpected message copy optimization
This commit was SVN r10813.
Этот коммит содержится в:
родитель
f6e7e11ee6
Коммит
3b978e3985
@ -36,6 +36,8 @@ local_sources = \
|
||||
mtl_portals.h \
|
||||
mtl_portals_endpoint.h \
|
||||
mtl_portals_recv.c \
|
||||
mtl_portals_recv_short.h \
|
||||
mtl_portals_recv_short.c \
|
||||
mtl_portals_request.h \
|
||||
mtl_portals_send.c \
|
||||
mtl_portals_probe.c
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include "mtl_portals.h"
|
||||
#include "mtl_portals_endpoint.h"
|
||||
#include "mtl_portals_request.h"
|
||||
#include "mtl_portals_recv_short.h"
|
||||
|
||||
|
||||
mca_mtl_portals_module_t ompi_mtl_portals = {
|
||||
@ -53,8 +54,6 @@ OBJ_CLASS_INSTANCE(ompi_mtl_portals_event_t, opal_list_item_t,
|
||||
NULL, NULL);
|
||||
|
||||
|
||||
static int ompi_mtl_portals_progress(void);
|
||||
|
||||
|
||||
int
|
||||
ompi_mtl_portals_add_procs(struct mca_mtl_base_module_t *mtl,
|
||||
@ -76,7 +75,7 @@ ompi_mtl_portals_add_procs(struct mca_mtl_base_module_t *mtl,
|
||||
if (PTL_INVALID_HANDLE == ompi_mtl_portals.ptl_ni_h) {
|
||||
ptl_md_t md;
|
||||
ptl_handle_md_t md_h;
|
||||
ptl_process_id_t anyproc;
|
||||
ptl_process_id_t ptlproc;
|
||||
uint64_t match_bits = 0;
|
||||
|
||||
ret = ompi_common_portals_ni_initialize(&(ompi_mtl_portals.ptl_ni_h));
|
||||
@ -95,15 +94,28 @@ ompi_mtl_portals_add_procs(struct mca_mtl_base_module_t *mtl,
|
||||
&(ompi_mtl_portals.ptl_unexpected_recv_eq_h));
|
||||
assert(ret == PTL_OK);
|
||||
|
||||
/* create insertion point for matched receives */
|
||||
ptlproc.nid = 0;
|
||||
ptlproc.pid = 0;
|
||||
|
||||
ret = PtlMEAttach(ompi_mtl_portals.ptl_ni_h,
|
||||
OMPI_MTL_PORTALS_SEND_TABLE_ID,
|
||||
ptlproc,
|
||||
~match_bits,
|
||||
match_bits,
|
||||
PTL_RETAIN,
|
||||
PTL_INS_AFTER,
|
||||
&(ompi_mtl_portals.ptl_match_ins_me_h));
|
||||
assert(ret == PTL_OK);
|
||||
|
||||
/* create unexpected message match entry */
|
||||
anyproc.nid = PTL_NID_ANY;
|
||||
anyproc.pid = PTL_PID_ANY;
|
||||
ptlproc.nid = PTL_NID_ANY;
|
||||
ptlproc.pid = PTL_PID_ANY;
|
||||
|
||||
/* unexpected message match entry should receive from anyone,
|
||||
so ignore bits are all 1 */
|
||||
ret = PtlMEAttach(ompi_mtl_portals.ptl_ni_h,
|
||||
OMPI_MTL_PORTALS_SEND_TABLE_ID,
|
||||
anyproc,
|
||||
ret = PtlMEInsert(ompi_mtl_portals.ptl_match_ins_me_h,
|
||||
ptlproc,
|
||||
match_bits,
|
||||
~match_bits,
|
||||
PTL_RETAIN,
|
||||
@ -124,6 +136,8 @@ ompi_mtl_portals_add_procs(struct mca_mtl_base_module_t *mtl,
|
||||
&md_h);
|
||||
assert(ret == PTL_OK);
|
||||
|
||||
ret = ompi_mtl_portals_recv_short_enable((mca_mtl_portals_module_t*) mtl);
|
||||
|
||||
opal_progress_register(ompi_mtl_portals_progress);
|
||||
}
|
||||
|
||||
@ -176,6 +190,8 @@ ompi_mtl_portals_finalize(struct mca_mtl_base_module_t *mtl)
|
||||
{
|
||||
assert(mtl == &ompi_mtl_portals.base);
|
||||
|
||||
ompi_mtl_portals_recv_short_disable((mca_mtl_portals_module_t *) mtl);
|
||||
|
||||
opal_progress_unregister(ompi_mtl_portals_progress);
|
||||
|
||||
while (0 != ompi_mtl_portals_progress()) { }
|
||||
@ -187,7 +203,7 @@ ompi_mtl_portals_finalize(struct mca_mtl_base_module_t *mtl)
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
int
|
||||
ompi_mtl_portals_progress(void)
|
||||
{
|
||||
int count = 0, ret;
|
||||
@ -199,10 +215,13 @@ ompi_mtl_portals_progress(void)
|
||||
if (PTL_OK == ret) {
|
||||
if (ev.type == PTL_EVENT_UNLINK) continue;
|
||||
|
||||
ptl_request = ev.md.user_ptr;
|
||||
ret = ptl_request->event_callback(&ev, ptl_request);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
abort();
|
||||
if (NULL != ev.md.user_ptr) {
|
||||
ptl_request = ev.md.user_ptr;
|
||||
ret = ptl_request->event_callback(&ev, ptl_request);
|
||||
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
abort();
|
||||
}
|
||||
}
|
||||
} else if (PTL_EQ_EMPTY == ret) {
|
||||
break;
|
||||
|
@ -48,10 +48,17 @@ struct mca_mtl_portals_module_t {
|
||||
ptl_handle_eq_t ptl_eq_h;
|
||||
ptl_handle_eq_t ptl_unexpected_recv_eq_h;
|
||||
|
||||
/* insert all posted receives before this handle */
|
||||
ptl_handle_me_t ptl_match_ins_me_h;
|
||||
/* last handle in the SEND table entry */
|
||||
ptl_handle_me_t ptl_unexpected_me_h;
|
||||
|
||||
ompi_free_list_t event_fl;
|
||||
|
||||
int ptl_recv_short_mds_num;
|
||||
int ptl_recv_short_mds_size;
|
||||
|
||||
opal_list_t ptl_recv_short_blocks;
|
||||
opal_list_t unexpected_messages;
|
||||
};
|
||||
typedef struct mca_mtl_portals_module_t mca_mtl_portals_module_t;
|
||||
@ -88,11 +95,11 @@ OBJ_CLASS_DECLARATION(ompi_mtl_portals_event_t);
|
||||
|
||||
#define PTL_SHORT_MSG 0x1000000000000000ULL
|
||||
#define PTL_LONG_MSG 0x2000000000000000ULL
|
||||
#define PTL_READY_MSG 0x3000000000000000ULL
|
||||
#define PTL_READY_MSG 0x4000000000000000ULL
|
||||
|
||||
/* send posting */
|
||||
#define PTL_SET_SEND_BITS(match_bits, contextid, source, tag, type) \
|
||||
{ \
|
||||
{ \
|
||||
match_bits = contextid; \
|
||||
match_bits = (match_bits << 16); \
|
||||
match_bits |= source; \
|
||||
@ -124,10 +131,12 @@ OBJ_CLASS_DECLARATION(ompi_mtl_portals_event_t);
|
||||
} \
|
||||
}
|
||||
|
||||
#define PTL_IS_SHORT_MSG(match_bits, ret) \
|
||||
{ \
|
||||
ret = (0 != (PTL_SHORT_MSG & match_bits)); \
|
||||
}
|
||||
#define PTL_IS_SHORT_MSG(match_bits) \
|
||||
(0 != (PTL_SHORT_MSG & match_bits))
|
||||
#define PTL_IS_READY_MSG(match_bits) \
|
||||
(0 != (PTL_READY_MSG & match_bits))
|
||||
#define PTL_IS_SYNC_MSG(event) \
|
||||
(0 != event.hdr_data)
|
||||
|
||||
#define PTL_GET_TAG(match_bits) ((int)(match_bits & PTL_TAG_MASK))
|
||||
#define PTL_GET_SOURCE(match_bits) ((int)((match_bits & PTL_SOURCE_MASK) >> 32))
|
||||
@ -179,6 +188,9 @@ extern int ompi_mtl_portals_cancel(struct mca_mtl_base_module_t* mtl,
|
||||
mca_mtl_request_t *mtl_request,
|
||||
int flag);
|
||||
|
||||
extern int ompi_mtl_portals_progress(void);
|
||||
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -77,11 +77,27 @@ ompi_mtl_portals_component_open(void)
|
||||
"Cross-over point from eager to rendezvous sends",
|
||||
false,
|
||||
false,
|
||||
0,
|
||||
1024,
|
||||
&tmp);
|
||||
|
||||
ompi_mtl_portals.eager_limit = tmp;
|
||||
|
||||
mca_base_param_reg_int(&mca_mtl_portals_component.mtl_version,
|
||||
"short_recv_mds_num",
|
||||
"Number of short message receive blocks",
|
||||
false,
|
||||
false,
|
||||
3,
|
||||
&ompi_mtl_portals.ptl_recv_short_mds_num);
|
||||
|
||||
mca_base_param_reg_int(&mca_mtl_portals_component.mtl_version,
|
||||
"short_recv_mds_size",
|
||||
"Size of short message receive blocks",
|
||||
false,
|
||||
false,
|
||||
15 * 1024 * 1024,
|
||||
&ompi_mtl_portals.ptl_recv_short_mds_size);
|
||||
|
||||
ompi_mtl_portals.ptl_ni_h = PTL_INVALID_HANDLE;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
@ -112,6 +128,8 @@ ompi_mtl_portals_component_init(bool enable_progress_threads,
|
||||
sizeof(ompi_mtl_portals_event_t),
|
||||
OBJ_CLASS(ompi_mtl_portals_event_t),
|
||||
1, -1, 1, NULL);
|
||||
|
||||
OBJ_CONSTRUCT(&ompi_mtl_portals.ptl_recv_short_blocks, opal_list_t);
|
||||
OBJ_CONSTRUCT(&ompi_mtl_portals.unexpected_messages, opal_list_t);
|
||||
|
||||
return &ompi_mtl_portals.base;
|
||||
|
@ -30,6 +30,7 @@
|
||||
#include "mtl_portals.h"
|
||||
#include "mtl_portals_endpoint.h"
|
||||
#include "mtl_portals_request.h"
|
||||
#include "mtl_portals_recv_short.h"
|
||||
|
||||
#define CHECK_MATCH(incoming_bits, match_bits, ignore_bits) \
|
||||
(((incoming_bits ^ match_bits) & ~ignore_bits) == 0)
|
||||
@ -55,9 +56,14 @@ ompi_mtl_portals_recv_progress(ptl_event_t *ev,
|
||||
MPI_ERR_TRUNCATE : MPI_SUCCESS;
|
||||
ptl_request->super.ompi_req->req_status._count =
|
||||
ev->mlength;
|
||||
|
||||
#if OMPI_MTL_PORTALS_DEBUG
|
||||
printf("recv complete: 0x%016llx\n", ev->match_bits);
|
||||
#endif
|
||||
|
||||
ptl_request->super.completion_callback(&ptl_request->super);
|
||||
break;
|
||||
|
||||
case PTL_EVENT_REPLY_END:
|
||||
/* make sure the data is in the right place */
|
||||
ompi_mtl_datatype_unpack(ptl_request->convertor,
|
||||
@ -68,6 +74,10 @@ ompi_mtl_portals_recv_progress(ptl_event_t *ev,
|
||||
/* set the status */
|
||||
ptl_request->super.ompi_req->req_status._count =
|
||||
ev->mlength;
|
||||
|
||||
#if OMPI_MTL_PORTALS_DEBUG
|
||||
printf("recv complete: 0x%016llx\n", ev->match_bits);
|
||||
#endif
|
||||
|
||||
ptl_request->super.completion_callback(&ptl_request->super);
|
||||
break;
|
||||
@ -88,14 +98,86 @@ ompi_mtl_portals_get_data(ompi_mtl_portals_event_t *recv_event,
|
||||
int ret;
|
||||
ptl_md_t md;
|
||||
ptl_handle_md_t md_h;
|
||||
|
||||
PTL_IS_SHORT_MSG(recv_event->ev.match_bits, ret);
|
||||
if (ret) {
|
||||
size_t buflen;
|
||||
|
||||
if (PTL_IS_SHORT_MSG(recv_event->ev.match_bits)) {
|
||||
/* the buffer is sitting in the short message queue */
|
||||
abort();
|
||||
} else {
|
||||
size_t buflen;
|
||||
|
||||
struct iovec iov;
|
||||
uint32_t iov_count = 1;
|
||||
int32_t free_after;
|
||||
size_t max_data;
|
||||
|
||||
ompi_mtl_portals_recv_short_block_t *block =
|
||||
recv_event->ev.md.user_ptr;
|
||||
|
||||
iov.iov_base = (((char*) recv_event->ev.md.start) + recv_event->ev.offset);
|
||||
iov.iov_len = recv_event->ev.mlength;
|
||||
max_data = iov.iov_len;
|
||||
|
||||
/* see if this message filled the receive block */
|
||||
if (recv_event->ev.md.length - (recv_event->ev.offset +
|
||||
recv_event->ev.mlength) <
|
||||
recv_event->ev.md.max_size) {
|
||||
block->full = true;
|
||||
}
|
||||
|
||||
/* pull out the data */
|
||||
if (iov.iov_len > 0) {
|
||||
ompi_convertor_unpack(convertor, &iov, &iov_count,
|
||||
&max_data, &free_after);
|
||||
}
|
||||
|
||||
/* if synchronous, return an ack */
|
||||
if (PTL_IS_SYNC_MSG(recv_event->ev)) {
|
||||
md.length = 0;
|
||||
md.start = (((char*) recv_event->ev.md.start) + recv_event->ev.offset);
|
||||
md.threshold = 1; /* send */
|
||||
md.options = PTL_MD_EVENT_START_DISABLE;
|
||||
md.user_ptr = NULL;
|
||||
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
|
||||
|
||||
ret = PtlMDBind(ompi_mtl_portals.ptl_ni_h, md,
|
||||
PTL_UNLINK, &md_h);
|
||||
if (PTL_OK != ret) abort();
|
||||
|
||||
#if OMPI_MTL_PORTALS_DEBUG
|
||||
printf("acking recv: 0x%016llx\n", recv_event->ev.match_bits);
|
||||
#endif
|
||||
|
||||
ret = PtlPut(md_h,
|
||||
PTL_NO_ACK_REQ,
|
||||
recv_event->ev.initiator,
|
||||
OMPI_MTL_PORTALS_ACK_TABLE_ID,
|
||||
0,
|
||||
recv_event->ev.hdr_data,
|
||||
0,
|
||||
0);
|
||||
if (PTL_OK != ret) abort();
|
||||
}
|
||||
|
||||
/* finished with our buffer space */
|
||||
ompi_mtl_portals_return_block_part(&ompi_mtl_portals, block);
|
||||
|
||||
ompi_convertor_get_packed_size(convertor, &buflen);
|
||||
|
||||
ptl_request->super.ompi_req->req_status.MPI_SOURCE =
|
||||
PTL_GET_SOURCE(recv_event->ev.match_bits);
|
||||
ptl_request->super.ompi_req->req_status.MPI_TAG =
|
||||
PTL_GET_TAG(recv_event->ev.match_bits);
|
||||
ptl_request->super.ompi_req->req_status.MPI_ERROR =
|
||||
(recv_event->ev.rlength > buflen) ?
|
||||
MPI_ERR_TRUNCATE : MPI_SUCCESS;
|
||||
ptl_request->super.ompi_req->req_status._count =
|
||||
recv_event->ev.mlength;
|
||||
|
||||
#if OMPI_MTL_PORTALS_DEBUG
|
||||
printf("recv complete: 0x%016llx\n", recv_event->ev.match_bits);
|
||||
#endif
|
||||
|
||||
ptl_request->super.completion_callback(&ptl_request->super);
|
||||
|
||||
} else {
|
||||
ret = ompi_mtl_datatype_recv_buf(convertor, &md.start, &buflen,
|
||||
&ptl_request->free_after);
|
||||
if (OMPI_SUCCESS != ret) abort();
|
||||
@ -105,7 +187,9 @@ ompi_mtl_portals_get_data(ompi_mtl_portals_event_t *recv_event,
|
||||
md.user_ptr = ptl_request;
|
||||
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
|
||||
|
||||
/* retain because it's unclear how many events we'll get here ... */
|
||||
/* retain because it's unclear how many events we'll get here.
|
||||
Some implementations give just the REPLY, others give SEND
|
||||
and REPLY */
|
||||
ret = PtlMDBind(ompi_mtl_portals.ptl_ni_h, md,
|
||||
PTL_RETAIN, &md_h);
|
||||
if (PTL_OK != ret) abort();
|
||||
@ -123,7 +207,7 @@ ompi_mtl_portals_get_data(ompi_mtl_portals_event_t *recv_event,
|
||||
ptl_request->super.ompi_req->req_status.MPI_SOURCE =
|
||||
PTL_GET_SOURCE(recv_event->ev.match_bits);
|
||||
ptl_request->super.ompi_req->req_status.MPI_TAG =
|
||||
PTL_GET_TAG(recv_event->ev.match_bits);
|
||||
PTL_GET_TAG(recv_event->ev.match_bits);
|
||||
ptl_request->super.ompi_req->req_status.MPI_ERROR =
|
||||
(recv_event->ev.rlength > buflen) ?
|
||||
MPI_ERR_TRUNCATE : MPI_SUCCESS;
|
||||
@ -198,14 +282,27 @@ ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl,
|
||||
ret = PtlEQGet(ompi_mtl_portals.ptl_unexpected_recv_eq_h,
|
||||
&recv_event->ev);
|
||||
if (PTL_OK == ret) {
|
||||
if (CHECK_MATCH(recv_event->ev.match_bits, match_bits, ignore_bits)) {
|
||||
/* we have a match... */
|
||||
ompi_mtl_portals_get_data(recv_event, convertor, ptl_request);
|
||||
goto cleanup;
|
||||
} else {
|
||||
/* not ours - put in unexpected queue */
|
||||
opal_list_append(&(ompi_mtl_portals.unexpected_messages),
|
||||
(opal_list_item_t*) recv_event);
|
||||
switch (recv_event->ev.type) {
|
||||
case PTL_EVENT_PUT_START:
|
||||
if (PTL_IS_SHORT_MSG(recv_event->ev.match_bits)) {
|
||||
ompi_mtl_portals_recv_short_block_t *block =
|
||||
recv_event->ev.md.user_ptr;
|
||||
OPAL_THREAD_ADD32(&block->pending, 1);
|
||||
}
|
||||
break;
|
||||
case PTL_EVENT_PUT_END:
|
||||
if (CHECK_MATCH(recv_event->ev.match_bits, match_bits, ignore_bits)) {
|
||||
/* we have a match... */
|
||||
ompi_mtl_portals_get_data(recv_event, convertor, ptl_request);
|
||||
goto cleanup;
|
||||
} else {
|
||||
/* not ours - put in unexpected queue */
|
||||
opal_list_append(&(ompi_mtl_portals.unexpected_messages),
|
||||
(opal_list_item_t*) recv_event);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
} else if (PTL_EQ_EMPTY == ret) {
|
||||
break;
|
||||
@ -219,7 +316,7 @@ ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl,
|
||||
&ptl_request->free_after);
|
||||
md.length = buflen;
|
||||
|
||||
PtlMEInsert(ompi_mtl_portals.ptl_unexpected_me_h,
|
||||
PtlMEInsert(ompi_mtl_portals.ptl_match_ins_me_h,
|
||||
remote_proc,
|
||||
match_bits,
|
||||
ignore_bits,
|
||||
|
115
ompi/mca/mtl/portals/mtl_portals_recv_short.c
Обычный файл
115
ompi/mca/mtl/portals/mtl_portals_recv_short.c
Обычный файл
@ -0,0 +1,115 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "ompi/constants.h"
|
||||
#include "opal/util/output.h"
|
||||
|
||||
#include "mtl_portals.h"
|
||||
#include "mtl_portals_recv_short.h"
|
||||
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_mtl_portals_recv_short_block_t,
|
||||
opal_list_item_t,
|
||||
NULL, NULL);
|
||||
|
||||
int
|
||||
ompi_mtl_portals_recv_short_enable(mca_mtl_portals_module_t *mtl)
|
||||
{
|
||||
int i;
|
||||
|
||||
/* create the recv blocks */
|
||||
for (i = 0 ; i < mtl->ptl_recv_short_mds_num ; ++i) {
|
||||
ompi_mtl_portals_recv_short_block_t *block =
|
||||
ompi_mtl_portals_recv_short_block_init(mtl);
|
||||
if (NULL == block) {
|
||||
ompi_mtl_portals_recv_short_disable(mtl);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
opal_list_append(&(mtl->ptl_recv_short_blocks),
|
||||
(opal_list_item_t*) block);
|
||||
ompi_mtl_portals_activate_block(block);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
ompi_mtl_portals_recv_short_disable(mca_mtl_portals_module_t *mtl)
|
||||
{
|
||||
opal_list_item_t *item;
|
||||
|
||||
if (opal_list_get_size(&mtl->ptl_recv_short_blocks) > 0) {
|
||||
while (NULL !=
|
||||
(item = opal_list_remove_first(&mtl->ptl_recv_short_blocks))) {
|
||||
ompi_mtl_portals_recv_short_block_t *block =
|
||||
(ompi_mtl_portals_recv_short_block_t*) item;
|
||||
ompi_mtl_portals_recv_short_block_free(block);
|
||||
}
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
ompi_mtl_portals_recv_short_block_t*
|
||||
ompi_mtl_portals_recv_short_block_init(mca_mtl_portals_module_t *mtl)
|
||||
{
|
||||
ompi_mtl_portals_recv_short_block_t *block;
|
||||
|
||||
block = OBJ_NEW(ompi_mtl_portals_recv_short_block_t);
|
||||
block->mtl = mtl;
|
||||
block->length = mtl->ptl_recv_short_mds_size;
|
||||
block->start = malloc(block->length);
|
||||
if (block->start == NULL) return NULL;
|
||||
|
||||
block->me_h = PTL_INVALID_HANDLE;
|
||||
block->md_h = PTL_INVALID_HANDLE;
|
||||
|
||||
block->full = false;
|
||||
block->pending = 0;
|
||||
|
||||
return block;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
ompi_mtl_portals_recv_short_block_free(ompi_mtl_portals_recv_short_block_t *block)
|
||||
{
|
||||
/* need to clear out the md */
|
||||
while (block->pending != 0) {
|
||||
ompi_mtl_portals_progress();
|
||||
}
|
||||
|
||||
if (PTL_INVALID_HANDLE != block->md_h) {
|
||||
PtlMDUnlink(block->md_h);
|
||||
block->md_h = PTL_INVALID_HANDLE;
|
||||
}
|
||||
|
||||
if (NULL != block->start) {
|
||||
free(block->start);
|
||||
block->start = NULL;
|
||||
}
|
||||
block->length = 0;
|
||||
block->full = false;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
140
ompi/mca/mtl/portals/mtl_portals_recv_short.h
Обычный файл
140
ompi/mca/mtl/portals/mtl_portals_recv_short.h
Обычный файл
@ -0,0 +1,140 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef OMPI_MTL_PORTALS_RECV_SHORT_H
|
||||
#define OMPI_MTL_PORTALS_RECV_SHORT_H
|
||||
|
||||
struct ompi_mtl_portals_recv_short_block_t {
|
||||
opal_list_item_t base;
|
||||
|
||||
mca_mtl_portals_module_t *mtl;
|
||||
|
||||
void *start;
|
||||
size_t length;
|
||||
ptl_handle_me_t me_h;
|
||||
ptl_handle_md_t md_h;
|
||||
|
||||
volatile bool full;
|
||||
volatile int32_t pending;
|
||||
};
|
||||
typedef struct ompi_mtl_portals_recv_short_block_t ompi_mtl_portals_recv_short_block_t;
|
||||
OBJ_CLASS_DECLARATION(ompi_mtl_portals_recv_short_block_t);
|
||||
|
||||
|
||||
int ompi_mtl_portals_recv_short_enable(mca_mtl_portals_module_t *mtl);
|
||||
|
||||
int ompi_mtl_portals_recv_short_disable(mca_mtl_portals_module_t *mtl);
|
||||
|
||||
/**
|
||||
* Create a block of memory for receiving send messages. Must call
|
||||
* activate_block on the returned block of memory before it will be
|
||||
* active with the POrtals library
|
||||
*
|
||||
* Module lock must be held before calling this function
|
||||
*/
|
||||
ompi_mtl_portals_recv_short_block_t*
|
||||
ompi_mtl_portals_recv_short_block_init(mca_mtl_portals_module_t *mtl);
|
||||
|
||||
|
||||
/**
|
||||
* Free a block of memory. Will remove the match entry, then progress
|
||||
* Portals until the pending count is returned to 0. Will then free
|
||||
* all resources associated with block.
|
||||
*
|
||||
* Module lock must be held before calling this function
|
||||
*/
|
||||
int ompi_mtl_portals_recv_short_block_free(ompi_mtl_portals_recv_short_block_t *block);
|
||||
|
||||
|
||||
/**
|
||||
* activate a block. Blocks that are full (have gone inactive) can be
|
||||
* re-activated with this call. There is no need to hold the lock
|
||||
* before calling this function
|
||||
*/
|
||||
static inline int
|
||||
ompi_mtl_portals_activate_block(ompi_mtl_portals_recv_short_block_t *block)
|
||||
{
|
||||
int ret;
|
||||
ptl_process_id_t any_proc = { PTL_NID_ANY, PTL_PID_ANY };
|
||||
ptl_md_t md;
|
||||
uint64_t match_bits = PTL_SHORT_MSG;
|
||||
uint64_t ignore_bits = PTL_CONTEXT_MASK | PTL_SOURCE_MASK | PTL_TAG_MASK;
|
||||
|
||||
/* if we have pending operations, something very, very, very bad
|
||||
has happened... */
|
||||
assert(block->pending == 0);
|
||||
|
||||
if (NULL == block->start) return OMPI_ERROR;
|
||||
|
||||
/* create match entry */
|
||||
ret = PtlMEInsert(block->mtl->ptl_unexpected_me_h,
|
||||
any_proc,
|
||||
match_bits,
|
||||
ignore_bits,
|
||||
PTL_UNLINK,
|
||||
PTL_INS_BEFORE,
|
||||
&(block->me_h));
|
||||
if (PTL_OK != ret) return OMPI_ERROR;
|
||||
|
||||
/* and the memory descriptor */
|
||||
md.start = block->start;
|
||||
md.length = block->length;
|
||||
/* try to throttle incoming sends so that we don't overrun the incoming
|
||||
queue size */
|
||||
md.threshold = PTL_MD_THRESH_INF;
|
||||
md.max_size = block->mtl->eager_limit;
|
||||
md.options = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | PTL_MD_ACK_DISABLE;
|
||||
md.user_ptr = block;
|
||||
md.eq_handle = block->mtl->ptl_unexpected_recv_eq_h;
|
||||
|
||||
block->pending = 0;
|
||||
block->full = false;
|
||||
/* make sure that everyone sees the update on full value */
|
||||
opal_atomic_mb();
|
||||
|
||||
ret = PtlMDAttach(block->me_h,
|
||||
md,
|
||||
PTL_UNLINK,
|
||||
&(block->md_h));
|
||||
if (PTL_OK != ret) {
|
||||
PtlMEUnlink(block->me_h);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static inline void
|
||||
ompi_mtl_portals_return_block_part(mca_mtl_portals_module_t *mtl,
|
||||
ompi_mtl_portals_recv_short_block_t *block)
|
||||
{
|
||||
int ret;
|
||||
|
||||
OPAL_THREAD_ADD32(&(block->pending), -1);
|
||||
if (block->full == true) {
|
||||
if (block->pending == 0) {
|
||||
ret = ompi_mtl_portals_activate_block(block);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
/* BWB - now what? */
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif /* OMPI_MTL_PORTALS_RECV_SHORT_H */
|
@ -40,6 +40,9 @@ ompi_mtl_portals_send_progress_no_ack(ptl_event_t *ev,
|
||||
if (ptl_request->free_after) {
|
||||
free(ev->md.start);
|
||||
}
|
||||
#if OMPI_MTL_PORTALS_DEBUG
|
||||
printf("send complete: 0x%016llx\n", ev->match_bits);
|
||||
#endif
|
||||
ptl_request->super.completion_callback(&ptl_request->super);
|
||||
}
|
||||
|
||||
@ -67,6 +70,9 @@ ompi_mtl_portals_send_progress(ptl_event_t *ev,
|
||||
if (ptl_request->free_after) {
|
||||
free(ev->md.start);
|
||||
}
|
||||
#if OMPI_MTL_PORTALS_DEBUG
|
||||
printf("send complete: 0x%016llx\n", ev->match_bits);
|
||||
#endif
|
||||
ptl_request->super.completion_callback(&ptl_request->super);
|
||||
}
|
||||
break;
|
||||
@ -161,7 +167,8 @@ ompi_mtl_portals_isend(struct mca_mtl_base_module_t* mtl,
|
||||
if (ptl_request->free_after) free(md.start);
|
||||
return ompi_common_portals_error_ptl_to_ompi(ret);
|
||||
}
|
||||
} else {
|
||||
|
||||
} else if (md.length > ompi_mtl_portals.eager_limit) {
|
||||
/* it's a long message - same protocol for all send modes
|
||||
other than ready */
|
||||
PTL_SET_SEND_BITS(match_bits, comm->c_contextid,
|
||||
@ -170,7 +177,7 @@ ompi_mtl_portals_isend(struct mca_mtl_base_module_t* mtl,
|
||||
#if OMPI_MTL_PORTALS_DEBUG
|
||||
printf("long send bits: 0x%016llx (%d)\n", match_bits, dest);
|
||||
#endif
|
||||
|
||||
|
||||
md.threshold = 2; /* send, {ack, get} */
|
||||
md.options = PTL_MD_OP_GET | PTL_MD_EVENT_START_DISABLE;
|
||||
md.user_ptr = ptl_request;
|
||||
@ -214,6 +221,100 @@ ompi_mtl_portals_isend(struct mca_mtl_base_module_t* mtl,
|
||||
if (ptl_request->free_after) free(md.start);
|
||||
return ompi_common_portals_error_ptl_to_ompi(ret);
|
||||
}
|
||||
|
||||
} else if (MCA_PML_BASE_SEND_SYNCHRONOUS == mode) {
|
||||
/* short synchronous message */
|
||||
PTL_SET_SEND_BITS(match_bits, comm->c_contextid,
|
||||
comm->c_my_rank,
|
||||
tag, PTL_SHORT_MSG);
|
||||
#if OMPI_MTL_PORTALS_DEBUG
|
||||
printf("short ssend bits: 0x%016llx (%d)\n", match_bits, dest);
|
||||
#endif
|
||||
|
||||
md.threshold = 2; /* send, {ack, put} */
|
||||
md.options = PTL_MD_OP_PUT | PTL_MD_EVENT_START_DISABLE;
|
||||
md.user_ptr = ptl_request;
|
||||
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
|
||||
|
||||
ptr.pval = ptl_request;
|
||||
ret = PtlMEAttach(ompi_mtl_portals.ptl_ni_h,
|
||||
OMPI_MTL_PORTALS_ACK_TABLE_ID,
|
||||
endpoint->ptl_proc,
|
||||
(ptl_match_bits_t) ptr.lval,
|
||||
0,
|
||||
PTL_UNLINK,
|
||||
PTL_INS_AFTER,
|
||||
&me_h);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
if (ptl_request->free_after) free(md.start);
|
||||
return ompi_common_portals_error_ptl_to_ompi(ret);
|
||||
}
|
||||
|
||||
ret = PtlMDAttach(me_h,
|
||||
md,
|
||||
PTL_UNLINK,
|
||||
&(md_h));
|
||||
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
PtlMEUnlink(me_h);
|
||||
if (ptl_request->free_after) free(md.start);
|
||||
return ompi_common_portals_error_ptl_to_ompi(ret);
|
||||
}
|
||||
|
||||
ret = PtlPut(md_h,
|
||||
PTL_ACK_REQ,
|
||||
endpoint->ptl_proc,
|
||||
OMPI_MTL_PORTALS_SEND_TABLE_ID,
|
||||
0,
|
||||
match_bits,
|
||||
0,
|
||||
(ptl_hdr_data_t) ptr.lval);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
PtlMDUnlink(md_h);
|
||||
if (ptl_request->free_after) free(md.start);
|
||||
return ompi_common_portals_error_ptl_to_ompi(ret);
|
||||
}
|
||||
|
||||
} else {
|
||||
/* short send message */
|
||||
|
||||
PTL_SET_SEND_BITS(match_bits, comm->c_contextid,
|
||||
comm->c_my_rank,
|
||||
tag, PTL_SHORT_MSG);
|
||||
#if OMPI_MTL_PORTALS_DEBUG
|
||||
printf("short send bits: 0x%016llx\n", match_bits);
|
||||
#endif
|
||||
|
||||
md.threshold = 1;
|
||||
md.options = PTL_MD_EVENT_START_DISABLE;
|
||||
md.user_ptr = ptl_request;
|
||||
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
|
||||
|
||||
ret = PtlMDBind(ompi_mtl_portals.ptl_ni_h,
|
||||
md,
|
||||
PTL_UNLINK,
|
||||
&(md_h));
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
if (ptl_request->free_after) free(md.start);
|
||||
return ompi_common_portals_error_ptl_to_ompi(ret);
|
||||
}
|
||||
|
||||
ptl_request->event_callback = ompi_mtl_portals_send_progress_no_ack;
|
||||
|
||||
ret = PtlPut(md_h,
|
||||
PTL_NO_ACK_REQ,
|
||||
endpoint->ptl_proc,
|
||||
OMPI_MTL_PORTALS_SEND_TABLE_ID,
|
||||
0,
|
||||
match_bits,
|
||||
0,
|
||||
0);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
PtlMDUnlink(md_h);
|
||||
if (ptl_request->free_after) free(md.start);
|
||||
return ompi_common_portals_error_ptl_to_ompi(ret);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user