1
1

Added send-side optimizations (persistent zero-length md and copy blocks)

and support for Acclerated Portals.

This commit was SVN r16770.
Этот коммит содержится в:
Ron Brightwell 2007-11-21 21:31:37 +00:00
родитель fb5536f11d
Коммит a6d6be1bb9
12 изменённых файлов: 987 добавлений и 362 удалений

Просмотреть файл

@ -39,6 +39,8 @@ local_sources = \
mtl_portals_recv_short.h \
mtl_portals_recv_short.c \
mtl_portals_request.h \
mtl_portals_send_short.h \
mtl_portals_send_short.c \
mtl_portals_send.c \
mtl_portals_probe.c

Просмотреть файл

@ -25,8 +25,16 @@
#include "mtl_portals.h"
#include "mtl_portals_endpoint.h"
#include "mtl_portals_request.h"
#include "mtl_portals_recv.h"
#include "mtl_portals_recv_short.h"
#include "mtl_portals_send_short.h"
static ptl_handle_md_t send_catchall_md_h;
static ptl_handle_md_t ack_catchall_md_h;
static ptl_handle_md_t read_catchall_md_h;
static ptl_handle_md_t unex_long_md_h;
static ompi_mtl_portals_request_t catchall_request;
mca_mtl_portals_module_t ompi_mtl_portals = {
{
@ -39,7 +47,7 @@ mca_mtl_portals_module_t ompi_mtl_portals = {
ompi_mtl_portals_del_procs,
ompi_mtl_portals_finalize,
NULL,
ompi_mtl_portals_send,
ompi_mtl_portals_isend,
ompi_mtl_portals_irecv,
ompi_mtl_portals_iprobe,
@ -54,6 +62,19 @@ OBJ_CLASS_INSTANCE(ompi_mtl_portals_event_t, opal_list_item_t,
NULL, NULL);
/* catchall callback */
static int
ompi_mtl_portals_catchall_callback(ptl_event_t *ev, ompi_mtl_portals_request_t *ptl_request)
{
opal_output(fileno(stderr),"ERROR - received catchall event\n");
abort();
return OMPI_ERROR;
}
int
ompi_mtl_portals_add_procs(struct mca_mtl_base_module_t *mtl,
@ -63,7 +84,9 @@ ompi_mtl_portals_add_procs(struct mca_mtl_base_module_t *mtl,
{
int ret = OMPI_SUCCESS;
ptl_process_id_t *portals_procs = NULL;
ptl_md_t md;
size_t i;
bool accel;
assert(mtl == &ompi_mtl_portals.base);
@ -73,72 +96,139 @@ ompi_mtl_portals_add_procs(struct mca_mtl_base_module_t *mtl,
implementation, we can't do it until modex information can be
received. */
if (PTL_INVALID_HANDLE == ompi_mtl_portals.ptl_ni_h) {
ptl_md_t md;
ptl_handle_md_t md_h;
ptl_process_id_t ptlproc;
uint64_t match_bits = 0;
ptl_match_bits_t match_bits;
ptl_match_bits_t ignore_bits;
ptl_process_id_t anyid = { PTL_NID_ANY, PTL_PID_ANY };
ret = ompi_common_portals_ni_initialize(&(ompi_mtl_portals.ptl_ni_h));
ret = ompi_common_portals_ni_initialize(&(ompi_mtl_portals.ptl_ni_h), &accel);
if (OMPI_SUCCESS != ret) goto cleanup;
/* initialize the event queues */
/* event queue for expected events */
ret = PtlEQAlloc(ompi_mtl_portals.ptl_ni_h,
ompi_mtl_portals.ptl_expected_queue_size,
PTL_EQ_HANDLER_NONE,
&(ompi_mtl_portals.ptl_eq_h));
assert(ret == PTL_OK);
/* event queue for unexpected receives */
ret = PtlEQAlloc(ompi_mtl_portals.ptl_ni_h,
ompi_mtl_portals.ptl_unexpected_queue_size,
PTL_EQ_HANDLER_NONE,
&(ompi_mtl_portals.ptl_unexpected_recv_eq_h));
&(ompi_mtl_portals.ptl_unex_eq_h));
assert(ret == PTL_OK);
/* create insertion point for matched receives */
ptlproc.nid = 0;
ptlproc.pid = 0;
/* empty event queue for PtlMEMDPost() */
ret = PtlEQAlloc(ompi_mtl_portals.ptl_ni_h,
1,
PTL_EQ_HANDLER_NONE,
&(ompi_mtl_portals.ptl_empty_eq_h));
assert(ret == PTL_OK);
/* attach the long unex msg buffer */
match_bits = PTL_LONG_MSG;
ignore_bits = ~(PTL_LONG_MSG);
ret = PtlMEAttach(ompi_mtl_portals.ptl_ni_h,
OMPI_MTL_PORTALS_SEND_TABLE_ID,
ptlproc,
~match_bits,
anyid,
match_bits,
ignore_bits,
PTL_RETAIN,
PTL_INS_AFTER,
&(ompi_mtl_portals.ptl_match_ins_me_h));
&(ompi_mtl_portals.ptl_unex_long_me_h));
assert(ret == PTL_OK);
/* create unexpected message match entry */
ptlproc.nid = PTL_NID_ANY;
ptlproc.pid = PTL_PID_ANY;
/* unexpected message match entry should receive from anyone,
so ignore bits are all 1 */
ret = PtlMEInsert(ompi_mtl_portals.ptl_match_ins_me_h,
ptlproc,
match_bits,
~match_bits,
PTL_RETAIN,
PTL_INS_AFTER,
&(ompi_mtl_portals.ptl_unexpected_me_h));
assert(ret == PTL_OK);
md.start = NULL;
md.length = 0;
md.start = NULL;
md.length = 0;
md.threshold = PTL_MD_THRESH_INF;
md.max_size = 0;
md.options = (PTL_MD_OP_PUT | PTL_MD_TRUNCATE | PTL_MD_ACK_DISABLE);
md.eq_handle = ompi_mtl_portals.ptl_unexpected_recv_eq_h;
md.max_size = 0;
md.options = PTL_MD_OP_PUT | PTL_MD_TRUNCATE | PTL_MD_ACK_DISABLE;
md.eq_handle = ompi_mtl_portals.ptl_unex_eq_h;
md.user_ptr = NULL;
ret = PtlMDAttach(ompi_mtl_portals.ptl_unexpected_me_h,
md,
PTL_RETAIN,
&md_h);
ret = PtlMDAttach(ompi_mtl_portals.ptl_unex_long_me_h,
md,
PTL_RETAIN,
&unex_long_md_h);
assert(ret == PTL_OK);
/* attach catchalls to the send, ack, and read portals */
catchall_request.event_callback = ompi_mtl_portals_catchall_callback;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
md.user_ptr = &catchall_request;
/* catchall for the send portal */
ret = PtlMEMDPost(ompi_mtl_portals.ptl_ni_h,
ompi_mtl_portals.ptl_unex_long_me_h,
anyid,
0,
~0,
PTL_RETAIN,
PTL_INS_AFTER,
md,
PTL_UNLINK,
&(ompi_mtl_portals.ptl_send_catchall_me_h),
&send_catchall_md_h,
ompi_mtl_portals.ptl_empty_eq_h);
assert(ret == PTL_OK);
/* catchall for ack portal */
ret = PtlMEAttach(ompi_mtl_portals.ptl_ni_h,
OMPI_MTL_PORTALS_ACK_TABLE_ID,
anyid,
0,
~0,
PTL_RETAIN,
PTL_INS_AFTER,
&(ompi_mtl_portals.ptl_ack_catchall_me_h));
assert(ret == PTL_OK);
ret = PtlMDAttach(ompi_mtl_portals.ptl_ack_catchall_me_h,
md,
PTL_UNLINK,
&ack_catchall_md_h);
assert(ret == PTL_OK);
/* catchall for read portal */
ret = PtlMEAttach(ompi_mtl_portals.ptl_ni_h,
OMPI_MTL_PORTALS_READ_TABLE_ID,
anyid,
0,
~0,
PTL_RETAIN,
PTL_INS_AFTER,
&(ompi_mtl_portals.ptl_read_catchall_me_h));
assert(ret == PTL_OK);
ret = PtlMDAttach(ompi_mtl_portals.ptl_read_catchall_me_h,
md,
PTL_RETAIN,
&read_catchall_md_h);
assert(ret == PTL_OK);
/* attach short unex recv blocks */
ret = ompi_mtl_portals_recv_short_enable((mca_mtl_portals_module_t*) mtl);
opal_progress_register(ompi_mtl_portals_progress);
/* bind zero-length md for sending zero-length msgs and acks */
md.start = NULL;
md.length = 0;
md.threshold = PTL_MD_THRESH_INF;
md.max_size = 0;
md.options = PTL_MD_EVENT_START_DISABLE | PTL_MD_EVENT_END_DISABLE;
md.user_ptr = NULL;
md.eq_handle = PTL_EQ_NONE;
ret = PtlMDBind(ompi_mtl_portals.ptl_ni_h,
md,
PTL_RETAIN,
&ompi_mtl_portals.ptl_zero_md_h );
assert(ret == PTL_OK);
/* set up the short copy blocks */
ompi_mtl_portals_short_setup();
}
/* get the list of ptl_process_id_t structures for the given proc
@ -198,6 +288,26 @@ ompi_mtl_portals_finalize(struct mca_mtl_base_module_t *mtl)
while (0 != ompi_mtl_portals_progress()) { }
}
(void)PtlMDUnlink(ompi_mtl_portals.ptl_zero_md_h);
(void)PtlMDUnlink(send_catchall_md_h);
(void)PtlMDUnlink(ack_catchall_md_h);
(void)PtlMDUnlink(read_catchall_md_h);
(void)PtlMDUnlink(unex_long_md_h);
(void)PtlMEUnlink(ompi_mtl_portals.ptl_unex_long_me_h);
(void)PtlMEUnlink(ompi_mtl_portals.ptl_send_catchall_me_h);
(void)PtlMEUnlink(ompi_mtl_portals.ptl_ack_catchall_me_h);
(void)PtlMEUnlink(ompi_mtl_portals.ptl_read_catchall_me_h);
ompi_mtl_portals_short_cleanup();
ompi_common_portals_ni_finalize();
ompi_common_portals_finalize();
@ -211,9 +321,13 @@ ompi_mtl_portals_progress(void)
int count = 0, ret;
ptl_event_t ev;
ompi_mtl_portals_request_t *ptl_request;
int which;
ompi_mtl_portals_event_t *unex_recv;
while (true) {
ret = PtlEQGet(ompi_mtl_portals.ptl_eq_h, &ev);
ret = PtlEQGet(ompi_mtl_portals.ptl_eq_h, &ev);
if (PTL_OK == ret) {
if (ev.type == PTL_EVENT_UNLINK) continue;
@ -234,5 +348,10 @@ ompi_mtl_portals_progress(void)
}
}
/* clean out the unexpected event queue too */
if (ompi_mtl_portals.ptl_aggressive_polling == true) {
while ( NULL != ompi_mtl_portals_search_unex_events(0, ~0, true) );
}
return count;
}

Просмотреть файл

@ -44,12 +44,23 @@ struct mca_mtl_portals_module_t {
size_t eager_limit;
ptl_handle_eq_t ptl_eq_h;
ptl_handle_eq_t ptl_unexpected_recv_eq_h;
/* insert all posted receives before this handle */
ptl_handle_me_t ptl_match_ins_me_h;
/* last handle in the SEND table entry */
ptl_handle_me_t ptl_unexpected_me_h;
ptl_handle_eq_t ptl_unex_eq_h;
/* long unex msgs - insert posted recvs before this */
ptl_handle_me_t ptl_unex_long_me_h;
/* send catchall - insert short unex buffers before this */
ptl_handle_me_t ptl_send_catchall_me_h;
/* catchall for ack portal */
ptl_handle_me_t ptl_ack_catchall_me_h;
/* catchall for read portal */
ptl_handle_me_t ptl_read_catchall_me_h;
/* for zero-length sends and acks */
ptl_handle_md_t ptl_zero_md_h;
ompi_free_list_t event_fl;
@ -61,6 +72,23 @@ struct mca_mtl_portals_module_t {
int ptl_expected_queue_size;
int ptl_unexpected_queue_size;
/* for send-side copy blocks */
ptl_md_t ptl_short_md;
ptl_handle_md_t ptl_short_md_h;
int ptl_num_copy_blocks;
int ptl_copy_block_len;
int *ptl_copy_block_free_list;
int ptl_copy_block_first_free;
/* empty event queue for PtlMEMDPost() */
ptl_handle_eq_t ptl_empty_eq_h;
/* turn off aggressive polling of the unex msg event queue */
bool ptl_aggressive_polling;
};
typedef struct mca_mtl_portals_module_t mca_mtl_portals_module_t;

Просмотреть файл

@ -77,7 +77,7 @@ ompi_mtl_portals_component_open(void)
"Cross-over point from eager to rendezvous sends",
false,
false,
1024,
128 * 1024,
&tmp);
ompi_mtl_portals.eager_limit = tmp;
@ -116,6 +116,32 @@ ompi_mtl_portals_component_open(void)
1024,
&ompi_mtl_portals.ptl_unexpected_queue_size);
mca_base_param_reg_int(&mca_mtl_portals_component.mtl_version,
"num_copy_blocks",
"Number of short message copy blocks",
false,
false,
256,
&ompi_mtl_portals.ptl_num_copy_blocks);
mca_base_param_reg_int(&mca_mtl_portals_component.mtl_version,
"copy_block_len",
"Length (in bytes) of each short message copy block",
false,
false,
8192,
&ompi_mtl_portals.ptl_copy_block_len);
mca_base_param_reg_int(&mca_mtl_portals_component.mtl_version,
"aggressive_polling",
"Turn off aggressive polling of unexpected messages",
false,
false,
0,
&tmp);
ompi_mtl_portals.ptl_aggressive_polling = (tmp == 0) ? true : false;
return OMPI_SUCCESS;
}

Просмотреть файл

@ -17,9 +17,11 @@
*/
#include "ompi_config.h"
#include "ompi/communicator/communicator.h"
#include "ompi/request/request.h"
#include "mtl_portals.h"
#include "mtl_portals_request.h"
#include "mtl_portals_recv.h"
int
ompi_mtl_portals_iprobe(struct mca_mtl_base_module_t* mtl,
@ -29,5 +31,29 @@ ompi_mtl_portals_iprobe(struct mca_mtl_base_module_t* mtl,
int *flag,
struct ompi_status_public_t *status)
{
return OMPI_ERR_NOT_IMPLEMENTED;
ptl_match_bits_t match_bits;
ptl_match_bits_t ignore_bits;
ompi_mtl_portals_event_t *recv_event = NULL;
PTL_SET_RECV_BITS(match_bits, ignore_bits, comm->c_contextid, src, tag);
/* first, check the queue of processed unexpected messages */
recv_event = ompi_mtl_portals_search_unex_q(match_bits, ignore_bits, true);
if (NULL == recv_event) {
/* check for new events */
recv_event = ompi_mtl_portals_search_unex_events(match_bits, ignore_bits, true);
}
if ( NULL != recv_event ) {
/* found it */
*flag = 1;
status->MPI_SOURCE = PTL_GET_SOURCE(recv_event->ev.match_bits);
status->MPI_TAG = PTL_GET_TAG(recv_event->ev.match_bits);
status->_count = recv_event->ev.rlength;
status->MPI_ERROR = OMPI_SUCCESS;
} else {
*flag = 0;
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -30,6 +30,7 @@
#include "mtl_portals.h"
#include "mtl_portals_endpoint.h"
#include "mtl_portals_request.h"
#include "mtl_portals_recv.h"
#include "mtl_portals_recv_short.h"
#define CHECK_MATCH(incoming_bits, match_bits, ignore_bits) \
@ -266,7 +267,7 @@ ompi_mtl_portals_wait_for_put_end(ptl_seq_t link)
/* wait for a PUT_END event that matches the message we're looking for */
while (true) {
ret = PtlEQWait(ompi_mtl_portals.ptl_unexpected_recv_eq_h,&ev);
ret = PtlEQWait(ompi_mtl_portals.ptl_unex_eq_h,&ev);
if (PTL_OK == ret) {
if (PTL_EVENT_PUT_START == ev.type) {
ompi_free_list_item_t *item;
@ -304,16 +305,17 @@ ompi_mtl_portals_wait_for_put_end(ptl_seq_t link)
}
static ompi_mtl_portals_event_t*
ompi_mtl_portals_event_t*
ompi_mtl_portals_search_unex_events(ptl_match_bits_t match_bits,
ptl_match_bits_t ignore_bits)
ptl_match_bits_t ignore_bits,
bool probe)
{
ptl_event_t ev;
int ret;
/* check to see if there are any events in the unexpected event queue */
while (true) {
ret = PtlEQGet(ompi_mtl_portals.ptl_unexpected_recv_eq_h,&ev);
ret = PtlEQGet(ompi_mtl_portals.ptl_unex_eq_h,&ev);
if (PTL_OK == ret) {
if (PTL_EVENT_PUT_START == ev.type) {
ompi_free_list_item_t *item;
@ -331,7 +333,13 @@ ompi_mtl_portals_search_unex_events(ptl_match_bits_t match_bits,
}
if (CHECK_MATCH(recv_event->ev.match_bits, match_bits, ignore_bits)) {
/* the one we want */
ompi_mtl_portals_wait_for_put_end(recv_event->ev.link);
if (probe == false) {
ompi_mtl_portals_wait_for_put_end(recv_event->ev.link);
} else {
/* just probing, so add it to the unex list */
opal_list_append(&(ompi_mtl_portals.unexpected_messages),
(opal_list_item_t*) recv_event);
}
return recv_event;
} else {
/* not the one we want, so add it to the unex list */
@ -357,9 +365,10 @@ ompi_mtl_portals_search_unex_events(ptl_match_bits_t match_bits,
}
static ompi_mtl_portals_event_t*
ompi_mtl_portals_search_unex_q( ptl_match_bits_t match_bits,
ptl_match_bits_t ignore_bits )
ompi_mtl_portals_event_t*
ompi_mtl_portals_search_unex_q(ptl_match_bits_t match_bits,
ptl_match_bits_t ignore_bits,
bool probe)
{
opal_list_item_t *list_item;
ompi_mtl_portals_event_t *recv_event = NULL;
@ -372,12 +381,14 @@ ompi_mtl_portals_search_unex_q( ptl_match_bits_t match_bits,
recv_event = (ompi_mtl_portals_event_t*) list_item;
if (CHECK_MATCH(recv_event->ev.match_bits, match_bits, ignore_bits)) {
/* we have a match... */
if ( false == recv_event->is_complete) {
/* wait for put end event */
ompi_mtl_portals_wait_for_put_end(recv_event->ev.link);
if ( probe == false ) {
if ( false == recv_event->is_complete) {
/* wait for put end event */
ompi_mtl_portals_wait_for_put_end(recv_event->ev.link);
}
opal_list_remove_item(&(ompi_mtl_portals.unexpected_messages),
list_item);
}
opal_list_remove_item(&(ompi_mtl_portals.unexpected_messages),
list_item);
return recv_event;
}
list_item = next_item;
@ -407,6 +418,7 @@ ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl,
(ompi_mtl_portals_request_t*) mtl_request;
ompi_mtl_portals_event_t *recv_event = NULL;
size_t buflen;
bool did_once = false;
ptl_request->convertor = convertor;
@ -427,7 +439,7 @@ ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl,
match_bits, ignore_bits));
/* first, check the queue of processed unexpected messages */
recv_event = ompi_mtl_portals_search_unex_q(match_bits, ignore_bits);
recv_event = ompi_mtl_portals_search_unex_q(match_bits, ignore_bits, false);
if (NULL != recv_event) {
/* found it */
ompi_mtl_portals_get_data(recv_event, convertor, ptl_request);
@ -437,7 +449,7 @@ ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl,
} else {
restart_search:
/* check unexpected events */
recv_event = ompi_mtl_portals_search_unex_events(match_bits, ignore_bits);
recv_event = ompi_mtl_portals_search_unex_events(match_bits, ignore_bits, false);
if (NULL != recv_event) {
/* found it */
ompi_mtl_portals_get_data(recv_event, convertor, ptl_request);
@ -448,46 +460,32 @@ restart_search:
}
/* didn't find it, now post the receive */
ret = ompi_mtl_datatype_recv_buf(convertor, &md.start, &buflen,
&ptl_request->free_after);
md.length = buflen;
/* create ME entry */
ret = PtlMEInsert(ompi_mtl_portals.ptl_match_ins_me_h,
remote_proc,
match_bits,
ignore_bits,
PTL_UNLINK,
PTL_INS_BEFORE,
&me_h);
if( ret !=PTL_OK) {
return ompi_common_portals_error_ptl_to_ompi(ret);
if ( false == did_once ) {
ret = ompi_mtl_datatype_recv_buf(convertor, &md.start, &buflen,
&ptl_request->free_after);
did_once = true;
}
/* associate a memory descriptor with the Match list Entry */
md.threshold = 0;
/*
md.options = PTL_MD_OP_PUT | PTL_MD_TRUNCATE | PTL_MD_EVENT_START_DISABLE;
*/
md.options = PTL_MD_OP_PUT | PTL_MD_TRUNCATE;
md.user_ptr = ptl_request;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
ret=PtlMDAttach(me_h, md, PTL_UNLINK, &md_h);
if( ret !=PTL_OK) {
return ompi_common_portals_error_ptl_to_ompi(ret);
}
/* now try to make active */
md.length = buflen;
md.threshold = 1;
md.options = PTL_MD_OP_PUT | PTL_MD_TRUNCATE | PTL_MD_EVENT_START_DISABLE;
md.user_ptr = ptl_request;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
/* enable the memory descritor, if the ptl_unexpected_recv_eq_h
* queue is empty */
ret = PtlMDUpdate(md_h, NULL, &md,
ompi_mtl_portals.ptl_unexpected_recv_eq_h);
ret = PtlMEMDPost(ompi_mtl_portals.ptl_ni_h,
ompi_mtl_portals.ptl_unex_long_me_h,
remote_proc,
match_bits,
ignore_bits,
PTL_UNLINK,
PTL_INS_BEFORE,
md,
PTL_UNLINK,
&me_h,
&md_h,
ompi_mtl_portals.ptl_unex_eq_h);
if (ret == PTL_MD_NO_UPDATE) {
/* a message has arrived since we searched - look again */
PtlMDUnlink(md_h);
if (ptl_request->free_after) { free(md.start); }
goto restart_search;
} else if( PTL_OK != ret ) {
return ompi_common_portals_error_ptl_to_ompi(ret);
@ -495,8 +493,14 @@ restart_search:
ptl_request->event_callback = ompi_mtl_portals_recv_progress;
return OMPI_SUCCESS;
cleanup:
if ((did_once == true) && (ptl_request->free_after)) {
free(md.start);
}
return OMPI_SUCCESS;
}

32
ompi/mca/mtl/portals/mtl_portals_recv.h Обычный файл
Просмотреть файл

@ -0,0 +1,32 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OMPI_MTL_PORTALS_RECV_H
#define OMPI_MTL_PORTALS_RECV_H
extern ompi_mtl_portals_event_t*
ompi_mtl_portals_search_unex_events(ptl_match_bits_t match_bits,
ptl_match_bits_t ignore_bits,
bool probe);
extern ompi_mtl_portals_event_t*
ompi_mtl_portals_search_unex_q(ptl_match_bits_t match_bits,
ptl_match_bits_t ignore_bits,
bool probe);
#endif /* OMPI_MTL_PORTALS_RECV_SHORT_H */

Просмотреть файл

@ -35,9 +35,11 @@ struct ompi_mtl_portals_recv_short_block_t {
typedef struct ompi_mtl_portals_recv_short_block_t ompi_mtl_portals_recv_short_block_t;
OBJ_CLASS_DECLARATION(ompi_mtl_portals_recv_short_block_t);
int ompi_mtl_portals_recv_short_enable(mca_mtl_portals_module_t *mtl);
extern int
ompi_mtl_portals_recv_short_enable(mca_mtl_portals_module_t *mtl);
int ompi_mtl_portals_recv_short_disable(mca_mtl_portals_module_t *mtl);
extern int
ompi_mtl_portals_recv_short_disable(mca_mtl_portals_module_t *mtl);
/**
* Create a block of memory for receiving send messages. Must call
@ -46,7 +48,7 @@ int ompi_mtl_portals_recv_short_disable(mca_mtl_portals_module_t *mtl);
*
* Module lock must be held before calling this function
*/
ompi_mtl_portals_recv_short_block_t*
extern ompi_mtl_portals_recv_short_block_t*
ompi_mtl_portals_recv_short_block_init(mca_mtl_portals_module_t *mtl);
@ -57,8 +59,8 @@ ompi_mtl_portals_recv_short_block_init(mca_mtl_portals_module_t *mtl);
*
* Module lock must be held before calling this function
*/
int ompi_mtl_portals_recv_short_block_free(ompi_mtl_portals_recv_short_block_t *block);
extern int
ompi_mtl_portals_recv_short_block_free(ompi_mtl_portals_recv_short_block_t *block);
/**
* activate a block. Blocks that are full (have gone inactive) can be
@ -80,40 +82,33 @@ ompi_mtl_portals_activate_block(ompi_mtl_portals_recv_short_block_t *block)
if (NULL == block->start) return OMPI_ERROR;
/* create match entry */
ret = PtlMEInsert(block->mtl->ptl_unexpected_me_h,
any_proc,
match_bits,
ignore_bits,
PTL_UNLINK,
PTL_INS_BEFORE,
&(block->me_h));
if (PTL_OK != ret) return OMPI_ERROR;
/* and the memory descriptor */
md.start = block->start;
md.length = block->length;
/* try to throttle incoming sends so that we don't overrun the incoming
queue size */
md.threshold = PTL_MD_THRESH_INF;
md.max_size = block->mtl->eager_limit;
md.options = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | PTL_MD_ACK_DISABLE;
md.user_ptr = block;
md.eq_handle = block->mtl->ptl_unexpected_recv_eq_h;
md.eq_handle = block->mtl->ptl_unex_eq_h;
block->pending = 0;
block->full = false;
/* make sure that everyone sees the update on full value */
opal_atomic_mb();
ret = PtlMDAttach(block->me_h,
md,
PTL_UNLINK,
&(block->md_h));
if (PTL_OK != ret) {
PtlMEUnlink(block->me_h);
return OMPI_ERROR;
}
ret = PtlMEMDPost(ompi_mtl_portals.ptl_ni_h,
ompi_mtl_portals.ptl_send_catchall_me_h,
any_proc,
match_bits,
ignore_bits,
PTL_UNLINK,
PTL_INS_BEFORE,
md,
PTL_UNLINK,
&(block->me_h),
&(block->md_h),
ompi_mtl_portals.ptl_empty_eq_h);
if (PTL_OK != ret) return OMPI_ERROR;
return OMPI_SUCCESS;
}

Просмотреть файл

@ -23,6 +23,8 @@ struct ompi_mtl_portals_request_t {
struct mca_mtl_request_t super;
bool free_after;
struct ompi_convertor_t *convertor;
volatile bool is_complete;
int event_count;
int (*event_callback)(ptl_event_t *ev, struct ompi_mtl_portals_request_t*);
};

Просмотреть файл

@ -27,70 +27,452 @@
#include "mtl_portals.h"
#include "mtl_portals_request.h"
#include "mtl_portals_endpoint.h"
#include "mtl_portals_send_short.h"
/* send event callback functions */
/* called when no ack is necessary */
static int
ompi_mtl_portals_send_progress_no_ack(ptl_event_t *ev,
struct ompi_mtl_portals_request_t *ptl_request)
ompi_mtl_portals_medium_callback(ptl_event_t *ev, ompi_mtl_portals_request_t *ptl_request)
{
switch (ev->type) {
case PTL_EVENT_SEND_END:
{
/* the get finished, so we're done. */
if (ptl_request->free_after) {
free(ev->md.start);
}
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"send complete: 0x%016llx\n",
ev->match_bits));
case PTL_EVENT_SEND_END:
ptl_request->super.ompi_req->req_status.MPI_ERROR = OMPI_SUCCESS;
ptl_request->super.completion_callback(&ptl_request->super);
}
if (ptl_request->free_after) {
free(ev->md.start);
}
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"send complete: 0x%016llx\n",
ev->match_bits));
ptl_request->is_complete = true;
if ( NULL != ptl_request->super.ompi_req ) {
ptl_request->super.ompi_req->req_status.MPI_ERROR = OMPI_SUCCESS;
ptl_request->super.completion_callback(&ptl_request->super);
}
default:
break;
break;
default:
opal_output(fileno(stderr)," Unexpected event type %d in ompi_mtl_portals_medium_callback()\n",ev->type);
/* abort(); */
}
return OMPI_SUCCESS;
}
/* called when a send that should wait for an ack or longer shold be progressed */
/* called when send should wait for an ack or get */
static int
ompi_mtl_portals_send_progress(ptl_event_t *ev,
struct ompi_mtl_portals_request_t* ptl_request)
ompi_mtl_portals_long_callback(ptl_event_t *ev, struct ompi_mtl_portals_request_t* ptl_request)
{
switch (ev->type) {
case PTL_EVENT_ACK:
case PTL_EVENT_GET_END:
case PTL_EVENT_PUT_END:
case PTL_EVENT_SEND_END:
case PTL_EVENT_ACK:
case PTL_EVENT_GET_END:
/* we only receive an ack if the message was received into an
expected message. Otherwise, we don't get an ack, but mark
completion when the message was pulled (long message) or
acked via an explicit put (short synchronous message). */
{
if (ptl_request->free_after) {
free(ev->md.start);
}
completion when the message was pulled (long message). */
if ( ++(ptl_request->event_count) == 2 ) {
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"send complete: 0x%016llx\n",
ev->match_bits));
if (ptl_request->free_after) {
free(ev->md.start);
}
ptl_request->super.ompi_req->req_status.MPI_ERROR = OMPI_SUCCESS;
ptl_request->super.completion_callback(&ptl_request->super);
}
break;
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"send complete: 0x%016llx\n",
ev->match_bits));
default:
break;
ptl_request->is_complete = true;
if ( NULL != ptl_request->super.ompi_req ) {
ptl_request->super.ompi_req->req_status.MPI_ERROR = OMPI_SUCCESS;
ptl_request->super.completion_callback(&ptl_request->super);
}
}
break;
default:
opal_output(fileno(stderr)," Unexpected event type %d in ompi_mtl_portals_long_callback()\n",ev->type);
abort();
}
return OMPI_SUCCESS;
}
/* called when sync send should wait for an ack or put */
static int
ompi_mtl_portals_sync_callback(ptl_event_t *ev, struct ompi_mtl_portals_request_t* ptl_request)
{
switch (ev->type) {
case PTL_EVENT_SEND_END:
case PTL_EVENT_ACK:
case PTL_EVENT_PUT_END:
/* we only receive an ack if the message was received into an
expected message. Otherwise, we don't get an ack, but mark
completion when a zero-length put arrrives. */
if ( ++(ptl_request->event_count) == 2 ) {
if (ptl_request->free_after) {
free(ev->md.start);
}
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"send complete: 0x%016llx\n",
ev->match_bits));
ptl_request->is_complete = true;
if ( NULL != ptl_request->super.ompi_req ) {
ptl_request->super.ompi_req->req_status.MPI_ERROR = OMPI_SUCCESS;
ptl_request->super.completion_callback(&ptl_request->super);
}
}
break;
default:
opal_output(fileno(stderr)," Unexpected event type %d in ompi_mtl_portals_sync_callback()\n",ev->type);
abort();
}
return OMPI_SUCCESS;
}
/* internal send functions */
static int
ompi_mtl_portals_zero_send(mca_pml_base_send_mode_t mode, int contextid, int localrank,
int tag, ptl_process_id_t dest)
{
int ret;
ptl_match_bits_t match_bits;
ptl_match_bits_t mode_bits;
mode_bits = (MCA_PML_BASE_SEND_STANDARD == mode) ? PTL_SHORT_MSG : PTL_READY_MSG;
PTL_SET_SEND_BITS(match_bits, contextid, localrank, tag, mode_bits);
ret = PtlPut(ompi_mtl_portals.ptl_zero_md_h,
PTL_NO_ACK_REQ,
dest,
OMPI_MTL_PORTALS_SEND_TABLE_ID,
0,
match_bits,
0,
0 );
if (PTL_OK != ret) {
return ompi_common_portals_error_ptl_to_ompi(ret);
}
return OMPI_SUCCESS;
}
static int
ompi_mtl_portals_short_send(mca_pml_base_send_mode_t mode, void *start, int length,
int contextid, int localrank, int tag, ptl_process_id_t dest)
{
int ret;
ptl_match_bits_t match_bits;
ptl_size_t offset;
void *copyblock;
ptl_match_bits_t mode_bits;
mode_bits = (MCA_PML_BASE_SEND_STANDARD == mode) ? PTL_SHORT_MSG : PTL_READY_MSG;
PTL_SET_SEND_BITS(match_bits, contextid, localrank, tag, mode_bits);
offset = ompi_mtl_portals_alloc_short_buf() * ompi_mtl_portals.ptl_copy_block_len;
copyblock = (char *)ompi_mtl_portals.ptl_short_md.start + offset;
memcpy(copyblock, start, length);
ret = PtlPutRegion(ompi_mtl_portals.ptl_short_md_h,
offset,
length,
PTL_NO_ACK_REQ,
dest,
OMPI_MTL_PORTALS_SEND_TABLE_ID,
0,
match_bits,
0,
0);
if (PTL_OK != ret ) {
return ompi_common_portals_error_ptl_to_ompi(ret);
}
return OMPI_SUCCESS;
}
static int
ompi_mtl_portals_medium_isend( mca_pml_base_send_mode_t mode, void *start, int length,
int contextid, int localrank, int tag, ptl_process_id_t dest,
ompi_mtl_portals_request_t *ptl_request )
{
int ret;
ptl_match_bits_t match_bits;
ptl_md_t md;
ptl_handle_md_t md_h;
ptl_match_bits_t mode_bits;
mode_bits = (MCA_PML_BASE_SEND_STANDARD == mode) ? PTL_SHORT_MSG : PTL_READY_MSG;
PTL_SET_SEND_BITS(match_bits, contextid, localrank, tag, mode_bits);
md.start = start;
md.length = length;
md.threshold = 1; /* send end */
md.options = PTL_MD_EVENT_START_DISABLE;
md.user_ptr = ptl_request;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
ret = PtlMDBind(ompi_mtl_portals.ptl_ni_h,
md,
PTL_UNLINK,
&md_h);
if (PTL_OK != ret) {
if (ptl_request->free_after) free(start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ret = PtlPut(md_h,
PTL_NO_ACK_REQ,
dest,
OMPI_MTL_PORTALS_SEND_TABLE_ID,
0,
match_bits,
0,
0);
if (PTL_OK != ret) {
PtlMDUnlink(md_h);
if (ptl_request->free_after) free(start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ptl_request->is_complete = false;
ptl_request->event_callback = ompi_mtl_portals_medium_callback;
ptl_request->event_count = 0;
return OMPI_SUCCESS;
}
static int
ompi_mtl_portals_long_isend( void *start, int length, int contextid, int localrank, int tag,
ptl_process_id_t dest, ompi_mtl_portals_request_t *ptl_request )
{
int ret;
ptl_match_bits_t match_bits;
ptl_md_t md;
ptl_handle_md_t md_h;
ptl_handle_me_t me_h;
PTL_SET_SEND_BITS(match_bits, contextid, localrank, tag, PTL_LONG_MSG);
md.start = start;
md.length = length;
md.threshold = 2; /* send, {ack, get} */
md.options = PTL_MD_OP_GET | PTL_MD_EVENT_START_DISABLE;
md.user_ptr = ptl_request;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
ret = PtlMEMDPost(ompi_mtl_portals.ptl_ni_h,
ompi_mtl_portals.ptl_read_catchall_me_h,
dest,
(ptl_match_bits_t)(uintptr_t)ptl_request,
0,
PTL_UNLINK,
PTL_INS_BEFORE,
md,
PTL_UNLINK,
&me_h,
&md_h,
ompi_mtl_portals.ptl_empty_eq_h);
if (PTL_OK != ret) {
if (ptl_request->free_after) free(start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ret = PtlPut(md_h,
PTL_ACK_REQ,
dest,
OMPI_MTL_PORTALS_SEND_TABLE_ID,
0,
match_bits,
0,
(ptl_hdr_data_t)(uintptr_t)ptl_request);
if (PTL_OK != ret) {
PtlMEUnlink(me_h);
if (ptl_request->free_after) free(start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ptl_request->is_complete = false;
ptl_request->event_callback = ompi_mtl_portals_long_callback;
ptl_request->event_count = 0;
return OMPI_SUCCESS;
}
static int
ompi_mtl_portals_sync_isend( void *start, int length, int contextid, int localrank, int tag,
ptl_process_id_t dest, ompi_mtl_portals_request_t *ptl_request )
{
int ret;
ptl_match_bits_t match_bits;
ptl_md_t md;
ptl_handle_md_t md_h;
ptl_handle_me_t me_h;
PTL_SET_SEND_BITS(match_bits, contextid, localrank, tag, PTL_SHORT_MSG);
md.start = start;
md.length = length;
md.threshold = 2; /* send, {ack, get} */
md.options = PTL_MD_OP_PUT | PTL_MD_EVENT_START_DISABLE;
md.user_ptr = ptl_request;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
ret = PtlMEMDPost(ompi_mtl_portals.ptl_ni_h,
ompi_mtl_portals.ptl_ack_catchall_me_h,
dest,
(ptl_match_bits_t)(uintptr_t)ptl_request,
0,
PTL_UNLINK,
PTL_INS_BEFORE,
md,
PTL_UNLINK,
&me_h,
&md_h,
ompi_mtl_portals.ptl_empty_eq_h);
if (PTL_OK != ret) {
if (ptl_request->free_after) free(start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ret = PtlPut(md_h,
PTL_ACK_REQ,
dest,
OMPI_MTL_PORTALS_SEND_TABLE_ID,
0,
match_bits,
0,
(ptl_hdr_data_t)(uintptr_t)ptl_request);
if (PTL_OK != ret) {
PtlMEUnlink(me_h);
if (ptl_request->free_after) free(start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ptl_request->is_complete = false;
ptl_request->event_callback = ompi_mtl_portals_sync_callback;
ptl_request->event_count = 0;
return OMPI_SUCCESS;
}
/* We use a macro for this since the send code is identical for blocking and non-blocking
sends, but we want both to be as fast as possible. We define it here since it doesn't
get used anywhere else. */
#define PTL_SEND_CODE \
{ \
switch (mode) { \
\
case MCA_PML_BASE_SEND_STANDARD: \
case MCA_PML_BASE_SEND_READY: \
\
if (0 == length) { \
\
ret = ompi_mtl_portals_zero_send(mode, \
comm->c_contextid, \
comm->c_my_rank, \
tag, \
endpoint->ptl_proc); \
if (OMPI_SUCCESS != ret) return ret; \
\
ptl_request->is_complete = true; \
\
break; \
\
} else if (length <= ompi_mtl_portals.ptl_copy_block_len) { \
\
ret = ompi_mtl_portals_short_send(mode, \
start, \
length, \
comm->c_contextid, \
comm->c_my_rank, \
tag, \
endpoint->ptl_proc); \
if (OMPI_SUCCESS != ret) return ret; \
\
ptl_request->is_complete = true; \
\
break; \
\
} else if ((length <= ompi_mtl_portals.eager_limit) || \
(MCA_PML_BASE_SEND_READY == mode)) { \
\
ret = ompi_mtl_portals_medium_isend(mode, \
start, \
length, \
comm->c_contextid, \
comm->c_my_rank, \
tag, \
endpoint->ptl_proc, \
ptl_request ); \
if (OMPI_SUCCESS != ret) return ret; \
\
break; \
\
} \
\
/* long standard send case falls through */ \
\
case MCA_PML_BASE_SEND_SYNCHRONOUS: \
\
if (length <= ompi_mtl_portals.eager_limit) { \
\
ret = ompi_mtl_portals_sync_isend(start, \
length, \
comm->c_contextid, \
comm->c_my_rank, \
tag, \
endpoint->ptl_proc, \
ptl_request ); \
if (OMPI_SUCCESS != ret) return ret; \
\
} else { \
/* if we got this far, we're either a standard or synchronous long send */ \
ret = ompi_mtl_portals_long_isend(start, \
length, \
comm->c_contextid, \
comm->c_my_rank, \
tag, \
endpoint->ptl_proc, \
ptl_request ); \
if (OMPI_SUCCESS != ret) return ret; \
\
} \
\
break; \
\
default: \
opal_output(fileno(stderr),"Unexpected msg type\n"); \
\
} \
}
/* external send functions */
int
ompi_mtl_portals_send(struct mca_mtl_base_module_t* mtl,
struct ompi_communicator_t* comm,
@ -99,9 +481,33 @@ ompi_mtl_portals_send(struct mca_mtl_base_module_t* mtl,
struct ompi_convertor_t *convertor,
mca_pml_base_send_mode_t mode)
{
return OMPI_ERR_NOT_IMPLEMENTED;
}
int ret;
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup( comm, dest );
mca_mtl_base_endpoint_t *endpoint = (mca_mtl_base_endpoint_t*) ompi_proc->proc_pml;
void *start;
size_t length;
mca_mtl_request_t mtl_request;
ompi_mtl_portals_request_t *ptl_request = (ompi_mtl_portals_request_t*)&mtl_request;
assert(mtl == &ompi_mtl_portals.base);
ret = ompi_mtl_datatype_pack(convertor, &start, &length,
&(ptl_request->free_after));
if (OMPI_SUCCESS != ret) return ret;
ptl_request->is_complete = false;
ptl_request->super.ompi_req = NULL;
PTL_SEND_CODE;
/* wait for send to complete */
while (ptl_request->is_complete == false) {
ompi_mtl_portals_progress();
}
return OMPI_SUCCESS;
}
int
ompi_mtl_portals_isend(struct mca_mtl_base_module_t* mtl,
@ -114,215 +520,28 @@ ompi_mtl_portals_isend(struct mca_mtl_base_module_t* mtl,
mca_mtl_request_t *mtl_request)
{
int ret;
ptl_match_bits_t match_bits;
ptl_md_t md;
ptl_handle_md_t md_h;
ptl_handle_me_t me_h;
ompi_proc_t* ompi_proc = ompi_comm_peer_lookup( comm, dest );
mca_mtl_base_endpoint_t *endpoint = (mca_mtl_base_endpoint_t*) ompi_proc->proc_pml;
ompi_mtl_portals_request_t *ptl_request =
(ompi_mtl_portals_request_t*) mtl_request;
size_t buflen;
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup( comm, dest );
mca_mtl_base_endpoint_t *endpoint = (mca_mtl_base_endpoint_t*)ompi_proc->proc_pml;
ompi_mtl_portals_request_t *ptl_request = (ompi_mtl_portals_request_t*)mtl_request;
void *start;
size_t length;
assert(mtl == &ompi_mtl_portals.base);
ret = ompi_mtl_datatype_pack(convertor, &md.start, &buflen,
&(ptl_request->free_after));
ret = ompi_mtl_datatype_pack(convertor, &start, &length,
&(ptl_request->free_after));
if (OMPI_SUCCESS != ret) return ret;
md.length = buflen;
ptl_request->event_callback = ompi_mtl_portals_send_progress;
ptl_request->is_complete = false;
if ((MCA_PML_BASE_SEND_READY == mode)) {
/* ready send (length doesn't matter) or short non-sync send.
Eagerly send data and don't wait for completion */
PTL_SET_SEND_BITS(match_bits, comm->c_contextid,
comm->c_my_rank,
tag, PTL_READY_MSG);
PTL_SEND_CODE;
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"ready send bits: 0x%016llx\n",
match_bits));
md.threshold = 1;
md.options = PTL_MD_EVENT_START_DISABLE;
md.user_ptr = ptl_request;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
ret = PtlMDBind(ompi_mtl_portals.ptl_ni_h,
md,
PTL_UNLINK,
&(md_h));
if (OMPI_SUCCESS != ret) {
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ptl_request->event_callback = ompi_mtl_portals_send_progress_no_ack;
ret = PtlPut(md_h,
PTL_NO_ACK_REQ,
endpoint->ptl_proc,
OMPI_MTL_PORTALS_SEND_TABLE_ID,
0,
match_bits,
0,
0);
if (OMPI_SUCCESS != ret) {
PtlMDUnlink(md_h);
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
} else if (md.length > ompi_mtl_portals.eager_limit) {
/* it's a long message - same protocol for all send modes
other than ready */
PTL_SET_SEND_BITS(match_bits, comm->c_contextid,
comm->c_my_rank,
tag, PTL_LONG_MSG);
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"long send bits: 0x%016llx (%d)\n",
match_bits, dest));
md.threshold = 2; /* send, {ack, get} */
md.options = PTL_MD_OP_GET | PTL_MD_EVENT_START_DISABLE;
md.user_ptr = ptl_request;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
ret = PtlMEAttach(ompi_mtl_portals.ptl_ni_h,
OMPI_MTL_PORTALS_READ_TABLE_ID,
endpoint->ptl_proc,
(ptl_match_bits_t)(uintptr_t) ptl_request,
0,
PTL_UNLINK,
PTL_INS_AFTER,
&me_h);
if (OMPI_SUCCESS != ret) {
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ret = PtlMDAttach(me_h,
md,
PTL_UNLINK,
&(md_h));
if (OMPI_SUCCESS != ret) {
PtlMEUnlink(me_h);
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ret = PtlPut(md_h,
PTL_ACK_REQ,
endpoint->ptl_proc,
OMPI_MTL_PORTALS_SEND_TABLE_ID,
0,
match_bits,
0,
(ptl_hdr_data_t)(uintptr_t) ptl_request);
if (OMPI_SUCCESS != ret) {
PtlMDUnlink(md_h);
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
} else if (MCA_PML_BASE_SEND_SYNCHRONOUS == mode) {
/* short synchronous message */
PTL_SET_SEND_BITS(match_bits, comm->c_contextid,
comm->c_my_rank,
tag, PTL_SHORT_MSG);
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"short ssend bits: 0x%016llx (%d)\n",
match_bits, dest));
md.threshold = 2; /* send, {ack, put} */
md.options = PTL_MD_OP_PUT | PTL_MD_EVENT_START_DISABLE;
md.user_ptr = ptl_request;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
ret = PtlMEAttach(ompi_mtl_portals.ptl_ni_h,
OMPI_MTL_PORTALS_ACK_TABLE_ID,
endpoint->ptl_proc,
(ptl_match_bits_t)(uintptr_t) ptl_request,
0,
PTL_UNLINK,
PTL_INS_AFTER,
&me_h);
if (OMPI_SUCCESS != ret) {
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ret = PtlMDAttach(me_h,
md,
PTL_UNLINK,
&(md_h));
if (OMPI_SUCCESS != ret) {
PtlMEUnlink(me_h);
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ret = PtlPut(md_h,
PTL_ACK_REQ,
endpoint->ptl_proc,
OMPI_MTL_PORTALS_SEND_TABLE_ID,
0,
match_bits,
0,
(ptl_hdr_data_t)(uintptr_t) ptl_request);
if (OMPI_SUCCESS != ret) {
PtlMDUnlink(md_h);
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
} else {
/* short send message */
PTL_SET_SEND_BITS(match_bits, comm->c_contextid,
comm->c_my_rank,
tag, PTL_SHORT_MSG);
OPAL_OUTPUT_VERBOSE((50, ompi_mtl_base_output,
"short send bits: 0x%016llx\n",
match_bits));
md.threshold = 1;
md.options = PTL_MD_EVENT_START_DISABLE;
md.user_ptr = ptl_request;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
ret = PtlMDBind(ompi_mtl_portals.ptl_ni_h,
md,
PTL_UNLINK,
&(md_h));
if (OMPI_SUCCESS != ret) {
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ptl_request->event_callback = ompi_mtl_portals_send_progress_no_ack;
ret = PtlPut(md_h,
PTL_NO_ACK_REQ,
endpoint->ptl_proc,
OMPI_MTL_PORTALS_SEND_TABLE_ID,
0,
match_bits,
0,
0);
if (OMPI_SUCCESS != ret) {
PtlMDUnlink(md_h);
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
if (ptl_request->is_complete == true ) {
ptl_request->super.ompi_req->req_status.MPI_ERROR = OMPI_SUCCESS;
ptl_request->super.completion_callback(&ptl_request->super);
}
return OMPI_SUCCESS;
}

113
ompi/mca/mtl/portals/mtl_portals_send_short.c Обычный файл
Просмотреть файл

@ -0,0 +1,113 @@
/*
* Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mtl_portals.h"
#include "mtl_portals_request.h"
#include "mtl_portals_send_short.h"
static ompi_mtl_portals_request_t ptl_short_request;
/* short send callback */
static int
ompi_mtl_portals_short_callback(ptl_event_t *ev, ompi_mtl_portals_request_t *ptl_request)
{
switch (ev->type) {
case PTL_EVENT_SEND_END:
ompi_mtl_portals_free_short_buf(ev->offset);
break;
default:
opal_output(fileno(stderr)," Unexpected event type %d in ompi_mtl_portals_short_callback()\n",ev->type);
abort();
}
return OMPI_SUCCESS;
}
/* initialize short copy blocks */
void
ompi_mtl_portals_short_setup()
{
int ret;
int i;
if ((ompi_mtl_portals.ptl_num_copy_blocks > 0) && (ompi_mtl_portals.ptl_copy_block_len > 0)) {
ompi_mtl_portals.ptl_short_md.length = ompi_mtl_portals.ptl_num_copy_blocks *
ompi_mtl_portals.ptl_copy_block_len;
ompi_mtl_portals.ptl_short_md.start = malloc(ompi_mtl_portals.ptl_short_md.length);
if (NULL == ompi_mtl_portals.ptl_short_md.start ) {
ompi_mtl_portals.ptl_num_copy_blocks = 0;
return;
}
ompi_mtl_portals.ptl_short_md.threshold = PTL_MD_THRESH_INF;
ompi_mtl_portals.ptl_short_md.max_size = 0;
ompi_mtl_portals.ptl_short_md.options = PTL_MD_EVENT_START_DISABLE;
ompi_mtl_portals.ptl_short_md.user_ptr = &ptl_short_request;
ompi_mtl_portals.ptl_short_md.eq_handle = ompi_mtl_portals.ptl_eq_h;
ret = PtlMDBind(ompi_mtl_portals.ptl_ni_h,
ompi_mtl_portals.ptl_short_md,
PTL_RETAIN,
&ompi_mtl_portals.ptl_short_md_h);
if (PTL_OK != ret) {
free(ompi_mtl_portals.ptl_short_md.start);
ompi_mtl_portals.ptl_num_copy_blocks = 0;
return;
}
ptl_short_request.event_callback = ompi_mtl_portals_short_callback;
ompi_mtl_portals.ptl_copy_block_free_list = malloc(ompi_mtl_portals.ptl_num_copy_blocks * sizeof(int));
if (NULL == ompi_mtl_portals.ptl_copy_block_free_list) {
free(ompi_mtl_portals.ptl_short_md.start);
ompi_mtl_portals.ptl_num_copy_blocks = 0;
return;
}
for (i=0; i<ompi_mtl_portals.ptl_num_copy_blocks; i++) {
ompi_mtl_portals.ptl_copy_block_free_list[i] = i;
}
ompi_mtl_portals.ptl_copy_block_first_free = 0;
}
}
/* free short resources */
void
ompi_mtl_portals_short_cleanup()
{
if (ompi_mtl_portals.ptl_num_copy_blocks > 0) {
free(ompi_mtl_portals.ptl_short_md.start);
free(ompi_mtl_portals.ptl_copy_block_free_list);
ompi_mtl_portals.ptl_num_copy_blocks = 0;
}
}

Просмотреть файл

@ -0,0 +1,59 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2006 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OMPI_MTL_PORTALS_SEND_SHORT_H
#define OMPI_MTL_PORTALS_SEND_SHORT_H
extern void ompi_mtl_portals_short_setup(void);
extern void ompi_mtl_portals_short_cleanup(void);
static inline int
ompi_mtl_portals_alloc_short_buf()
{
int buf_num;
while ( ompi_mtl_portals.ptl_copy_block_first_free == ompi_mtl_portals.ptl_num_copy_blocks ) {
ompi_mtl_portals_progress();
}
buf_num = ompi_mtl_portals.ptl_copy_block_free_list[ompi_mtl_portals.ptl_copy_block_first_free++];
assert((buf_num >= 0) && (buf_num < ompi_mtl_portals.ptl_num_copy_blocks));
return buf_num;
}
static inline void
ompi_mtl_portals_free_short_buf( int offset )
{
int buf_num;
buf_num = offset / ompi_mtl_portals.ptl_copy_block_len;
assert((buf_num >= 0) && (buf_num < ompi_mtl_portals.ptl_num_copy_blocks));
ompi_mtl_portals.ptl_copy_block_first_free--;
assert(ompi_mtl_portals.ptl_copy_block_first_free >= 0);
ompi_mtl_portals.ptl_copy_block_free_list[ompi_mtl_portals.ptl_copy_block_first_free] = buf_num;
}
#endif /* OMPI_MTL_PORTALS_SEND_SHORT_H */