1
1

* fix matching logic - since tag might be negative, need to mask the proper bits

or the bit-wise or changes all the high bits, which is bad
* push convertor creation to init to save a bit of time
* make debugging use macros so that it can go bye-bye

This commit was SVN r9810.
Этот коммит содержится в:
Brian Barrett 2006-05-04 13:48:32 +00:00
родитель 93ccbc666a
Коммит d101e91b97
6 изменённых файлов: 169 добавлений и 87 удалений

Просмотреть файл

@ -95,10 +95,11 @@ int
ompi_pml_portals_add_procs(struct ompi_proc_t** procs, size_t nprocs)
{
size_t i;
bool done_init = false;
static bool done_init = false;
int ret;
uint64_t match_bits = 0;
ptl_process_id_t portals_proc;
opal_output_verbose(100, ompi_pml_portals.portals_output,
"pml_add_procs called with %d procs\n", nprocs);
if (0 == nprocs) return OMPI_SUCCESS;
@ -110,53 +111,71 @@ ompi_pml_portals_add_procs(struct ompi_proc_t** procs, size_t nprocs)
ompi_pml_portals_add_procs_compat(procs, nprocs);
if (!done_init) {
opal_output_verbose(100, ompi_pml_portals.portals_output,
"proc list:");
for (i = 0 ; i < nprocs ; ++i) {
ompi_pml_portals_proc_t *ptlproc = (ompi_pml_portals_proc_t*) procs[i]->proc_pml;
opal_output_verbose(100, ompi_pml_portals.portals_output,
" procs[%d] = %u, %u",
i, ptlproc->proc_id.nid, ptlproc->proc_id.pid);
}
if (!done_init) {
ptl_md_t md;
ptl_handle_md_t md_h;
ptl_process_id_t anyproc;
uint64_t match_bits = 0;
opal_output_verbose(100, ompi_pml_portals.portals_output,
"running initialization");
/* setup our event queues */
ret = PtlEQAlloc(ompi_pml_portals.portals_ni_h,
10, /* BWB - fix me */
3, /* BWB - fix me */
PTL_EQ_HANDLER_NONE,
&(ompi_pml_portals.portals_blocking_send_queue));
assert(ret == PTL_OK);
ret = PtlEQAlloc(ompi_pml_portals.portals_ni_h,
10, /* BWB - fix me */
3, /* BWB - fix me */
PTL_EQ_HANDLER_NONE,
&(ompi_pml_portals.portals_blocking_receive_queue));
assert(ret == PTL_OK);
ret = PtlEQAlloc(ompi_pml_portals.portals_ni_h,
1024, /* BWB - fix me */
PTL_EQ_HANDLER_NONE,
&(ompi_pml_portals.portals_unexpected_receive_queue));
ret = PtlEQAlloc(ompi_pml_portals.portals_ni_h,
1024, /* BWB - fix me */
PTL_EQ_HANDLER_NONE,
&(ompi_pml_portals.portals_nonblocking_queue));
assert(ret == PTL_OK);
/* create unexpected message match entry */
portals_proc.nid = PTL_NID_ANY;
portals_proc.pid = PTL_PID_ANY;
anyproc.nid = PTL_NID_ANY;
anyproc.pid = PTL_PID_ANY;
PtlMEAttach(ompi_pml_portals.portals_ni_h,
PML_PTLS_INDEX_RECV,
portals_proc,
match_bits,
~match_bits,
PTL_RETAIN,
PTL_INS_AFTER,
&(ompi_pml_portals.portals_unexpected_me_h));
/* unexpected message match entry should receive from anyone,
so ignore bits are all 1 */
ret = PtlMEAttach(ompi_pml_portals.portals_ni_h,
PML_PTLS_INDEX_RECV,
anyproc,
match_bits,
~match_bits,
PTL_RETAIN,
PTL_INS_AFTER,
&(ompi_pml_portals.portals_unexpected_me_h));
assert(ret == PTL_OK);
md.start = NULL;
md.length = 0;
md.threshold = PTL_MD_THRESH_INF;
md.max_size = 0;
md.options = (PTL_MD_OP_PUT | PTL_MD_TRUNCATE | PTL_MD_ACK_DISABLE | PTL_MD_EVENT_START_DISABLE);
md.eq_handle = ompi_pml_portals.portals_unexpected_receive_queue;
PtlMDAttach(ompi_pml_portals.portals_unexpected_me_h,
md, PTL_RETAIN, &md_h);
ret = PtlMDAttach(ompi_pml_portals.portals_unexpected_me_h,
md,
PTL_RETAIN,
&md_h);
assert(ret == PTL_OK);
done_init = true;
}
@ -170,6 +189,9 @@ ompi_pml_portals_del_procs(struct ompi_proc_t** procs, size_t nprocs)
{
size_t i;
opal_output_verbose(100, ompi_pml_portals.portals_output,
"pml_del_procs called with %d procs\n", nprocs);
if (0 == nprocs) return OMPI_SUCCESS;
/* allocate space for our pml information */

Просмотреть файл

@ -27,6 +27,7 @@
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/datatype/datatype.h"
#include "ompi/datatype/convertor.h"
#include "pml_portals_compat.h"
@ -62,24 +63,20 @@ struct ompi_pml_portals_t {
*/
int portals_output;
/* switch over point for eager -> long messages */
int portals_eager_limit;
/* our portals network interface */
ptl_handle_ni_t portals_ni_h;
/* blocking send event queue */
ptl_handle_eq_t portals_blocking_send_queue;
ompi_convertor_t portals_blocking_send_convertor;
/* blocking receive event queue */
ptl_handle_eq_t portals_blocking_receive_queue;
ompi_convertor_t portals_blocking_receive_convertor;
/* unexpected receive event queue */
ptl_handle_eq_t portals_unexpected_receive_queue;
/* nonblocking event queue */
ptl_handle_eq_t portals_nonblocking_queue;
ptl_handle_me_t portals_unexpected_me_h;
opal_list_t portals_unexpected_events;
@ -104,9 +101,9 @@ extern opal_class_t ompi_pml_portals_proc_t_class;
* Portals match info
*/
#define PML_PTLS_READY 0x4000000000000000ULL
#define PML_PTLS_LONG 0x2000000000000000ULL
#define PML_PTLS_SHORT 0x1000000000000000ULL
#define PML_PTLS_READY 0x8000000000000000ULL
#define PML_PTLS_LONG 0x4000000000000000ULL
#define PML_PTLS_SHORT 0x2000000000000000ULL
#define PML_PTLS_PROT_MASK 0xE000000000000000ULL
#define PML_PTLS_CTX_MASK 0x1FFF000000000000ULL
@ -132,18 +129,18 @@ extern opal_class_t ompi_pml_portals_proc_t_class;
if (tag == MPI_ANY_TAG) { \
ignore_bits |= PML_PTLS_TAG_MASK; \
} else { \
match_bits |= tag; \
match_bits |= (PML_PTLS_TAG_MASK & tag); \
} \
} \
#define PML_PTLS_SEND_BITS(match_bits, ctxid, src, tag) \
{ \
match_bits = ctxid; \
match_bits = (match_bits << 16); \
match_bits |= src; \
match_bits = (match_bits << 32); \
match_bits |= tag; \
match_bits |= PML_PTLS_LONG; \
match_bits |= (PML_PTLS_TAG_MASK & tag); \
}
#define PML_PTLS_IS_LONG(match_bits) (match_bits & PML_PTLS_LONG)
@ -164,9 +161,9 @@ extern opal_class_t ompi_pml_portals_proc_t_class;
/* table indexes */
#define PML_PTLS_INDEX_RECV (OMPI_PML_PORTALS_STARTING_TABLE_ID)
#define PML_PTLS_INDEX_READ (OMPI_PML_PORTALS_STARTING_TABLE_ID)
#define PML_PTLS_INDEX_ACK (OMPI_PML_PORTALS_STARTING_TABLE_ID)
#define PML_PTLS_INDEX_RECV (OMPI_PML_PORTALS_STARTING_TABLE_ID + 0)
#define PML_PTLS_INDEX_READ (OMPI_PML_PORTALS_STARTING_TABLE_ID + 1)
#define PML_PTLS_INDEX_ACK (OMPI_PML_PORTALS_STARTING_TABLE_ID + 2)
/*
* PML interface functions.

Просмотреть файл

@ -255,7 +255,10 @@ ompi_pml_portals_add_procs_compat(struct ompi_proc_t **procs, size_t nprocs)
}
#if 0
PtlNIDebug(mca_pml_portals_module.portals_ni_h, PTL_DBG_ALL);
PtlNIDebug(ompi_pml_portals.portals_ni_h, PTL_DBG_API |
PTL_DBG_MOVE | PTL_DBG_DROP | PTL_DBG_REQUEST |
PTL_DBG_DELIVERY | PTL_DBG_MD | PTL_DBG_UNLINK |
PTL_DBG_EQ | PTL_DBG_EVENT | PTL_DBG_MEMORY );
#endif
return OMPI_SUCCESS;

Просмотреть файл

@ -21,6 +21,7 @@
#include "opal/event/event.h"
#include "pml_portals.h"
#include "opal/mca/base/mca_base_param.h"
#include "ompi/datatype/convertor.h"
#if OMPI_BTL_PORTALS_REDSTORM
#include <catamount/cnos_mpi_os.h>
@ -101,15 +102,6 @@ ompi_pml_portals_component_open(void)
&(ompi_pml_portals.portals_ifname));
#endif
/* eager limit */
mca_base_param_reg_int(&mca_pml_portals_component.pmlm_version,
"eager_limit",
"short message eager send limit",
false,
false,
8192,
&(ompi_pml_portals.portals_eager_limit));
return OMPI_SUCCESS;
}
@ -155,6 +147,14 @@ ompi_pml_portals_component_init(int* priority,
OBJ_CONSTRUCT(&(ompi_pml_portals.portals_unexpected_events),
opal_list_t);
OBJ_CONSTRUCT(&ompi_pml_portals.portals_blocking_send_convertor,
ompi_convertor_t);
OBJ_CONSTRUCT(&ompi_pml_portals.portals_blocking_receive_convertor,
ompi_convertor_t);
opal_output_verbose(20, ompi_pml_portals.portals_output,
"successfully initialized portals pml");
return &ompi_pml_portals.super;
}
@ -162,13 +162,15 @@ ompi_pml_portals_component_init(int* priority,
static int
ompi_pml_portals_component_fini(void)
{
PtlEQFree(ompi_pml_portals.portals_nonblocking_queue);
PtlEQFree(ompi_pml_portals.portals_unexpected_receive_queue);
PtlEQFree(ompi_pml_portals.portals_blocking_receive_queue);
PtlEQFree(ompi_pml_portals.portals_blocking_send_queue);
PtlNIFini(ompi_pml_portals.portals_ni_h);
OBJ_DESTRUCT(&ompi_pml_portals.portals_blocking_send_convertor);
OBJ_DESTRUCT(&ompi_pml_portals.portals_blocking_receive_convertor);
opal_output_verbose(20, ompi_pml_portals.portals_output,
"successfully finalized portals pml");

Просмотреть файл

@ -39,19 +39,26 @@ get_data(ptl_event_t ev, ptl_md_t md, ompi_convertor_t *convertor)
{
ptl_handle_md_t md_h;
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"calling get_data for %ld", ev.hdr_data));
/* create the floating md */
md.threshold = 1;
md.options = PTL_MD_EVENT_START_DISABLE;
md.eq_handle = ompi_pml_portals.portals_blocking_receive_queue;
PtlMDBind(ompi_pml_portals.portals_ni_h, md,
PTL_UNLINK, &md_h);
PTL_RETAIN, &md_h);
PtlGet(md_h, ev.initiator, PML_PTLS_INDEX_READ,
0, ev.hdr_data, 0);
PtlEQWait(ompi_pml_portals.portals_blocking_receive_queue, &ev);
assert(ev.type == PTL_EVENT_GET_END);
assert(ev.type == PTL_EVENT_REPLY_END);
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"get: REPLY_END event received"));
PtlMDUnlink(md_h);
return OMPI_SUCCESS;
}
@ -91,11 +98,10 @@ ompi_pml_portals_recv(void *buf,
struct ompi_communicator_t *comm,
ompi_status_public_t * status)
{
ompi_convertor_t convertor;
uint64_t ignore_bits, match_bits;
opal_list_item_t *list_item;
int ret, free_after;
ptl_md_t md, newmd;
ptl_md_t md, new_md;
ptl_handle_md_t md_h;
ptl_handle_me_t me_h;
ptl_process_id_t portals_proc;
@ -103,14 +109,13 @@ ompi_pml_portals_recv(void *buf,
(ompi_pml_portals_proc_t*) comm->c_pml_procs[src];
ptl_event_t ev;
OBJ_CONSTRUCT(&convertor, ompi_convertor_t);
/* BWB - fix me - need some way of finding source in ANY_SOURCE case */
ompi_convertor_copy_and_prepare_for_send(comm->c_pml_procs[comm->c_my_rank]->proc_ompi->proc_convertor,
datatype,
count,
buf,
0,
&convertor);
&ompi_pml_portals.portals_blocking_receive_convertor);
if (MPI_ANY_SOURCE == src) {
portals_proc.nid = PTL_NID_ANY;
@ -118,7 +123,7 @@ ompi_pml_portals_recv(void *buf,
} else {
portals_proc = pml_portals_proc->proc_id;
}
ompi_pml_portals_prepare_md_recv(&convertor, &md, &free_after);
ompi_pml_portals_prepare_md_recv(&ompi_pml_portals.portals_blocking_receive_convertor, &md, &free_after);
PML_PTLS_RECV_BITS(match_bits, ignore_bits, comm->c_contextid, src, tag);
@ -130,11 +135,16 @@ ompi_pml_portals_recv(void *buf,
if ((info->ev.match_bits & ~ignore_bits) == match_bits) {
/* we have a match... */
get_data(ev, md, &convertor);
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"recv: event waiting in queue\n"));
get_data(ev, md, &ompi_pml_portals.portals_blocking_receive_convertor);
opal_list_remove_item(&(ompi_pml_portals.portals_unexpected_events),
list_item);
OBJ_RELEASE(list_item);
goto cleanup;
} else {
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"recv: ignoring data: %lx", info->ev.match_bits));
}
list_item = next;
}
@ -147,41 +157,88 @@ ompi_pml_portals_recv(void *buf,
if (PTL_OK == ret) {
if ((ev.match_bits & ~ignore_bits) == match_bits) {
/* we have a match... */
get_data(ev, md, &convertor);
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"recv: event waiting in portals\n"));
get_data(ev, md, &ompi_pml_portals.portals_blocking_receive_convertor);
goto cleanup;
} else {
pml_portals_recv_info_t *item = OBJ_NEW(pml_portals_recv_info_t);
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"recv: ignoring data: %lx", ev.match_bits));
item->ev = ev;
opal_list_append(&(ompi_pml_portals.portals_unexpected_events),
&(item->super));
}
} else if (PTL_EQ_EMPTY == ret) {
break;
} else {
abort();
}
}
/* now post a receive */
printf("receive from: %d, %d\n", portals_proc.nid, portals_proc.pid);
printf("receive match bits: %lx, %lx\n", match_bits, ignore_bits);
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"receive from: %u, %u\n", portals_proc.nid, portals_proc.pid));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"receive match bits: %lx, %lx\n", match_bits, ignore_bits));
PtlMEInsert(ompi_pml_portals.portals_unexpected_me_h,
portals_proc,
match_bits,
ignore_bits,
PTL_UNLINK,
PTL_RETAIN,
PTL_INS_BEFORE,
&me_h);
md.threshold = 0;
md.options |= (PTL_MD_OP_PUT | PTL_MD_EVENT_START_DISABLE);
md.eq_handle = ompi_pml_portals.portals_blocking_receive_queue;
PtlMDAttach(me_h, md, PTL_UNLINK, &md_h);
PtlMDAttach(me_h, md, PTL_RETAIN, &md_h);
/* now try to make active */
newmd = md;
newmd.threshold = 1;
ret = PtlMDUpdate(md_h, NULL, &newmd, ompi_pml_portals.portals_unexpected_receive_queue);
new_md = md;
new_md.threshold = 3;
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" calling PtlMDUpdate(\n"));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" md_handle = %d\n",md_h));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" old md =>\n"));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" start = %p\n",md.start));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" length = %d\n",md.length));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" threshold = %d\n",md.threshold));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" options = %d\n",md.options));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" user_ptr = %p\n",md.user_ptr));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" eventq = %d\n",md.eq_handle));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" new md =>\n"));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" start = %p\n",new_md.start));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" length = %d\n",new_md.length));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" threshold = %d\n",new_md.threshold));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" options = %d\n",new_md.options));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" user_ptr = %p\n",new_md.user_ptr));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" eventq = %d\n",new_md.eq_handle));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
" test eventq = %d )\n",ompi_pml_portals.portals_unexpected_receive_queue));
ret = PtlMDUpdate(md_h, &md, &new_md, ompi_pml_portals.portals_unexpected_receive_queue);
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"recv: Update ret: %d\n", ret));
if (ret == PTL_MD_NO_UPDATE) {
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"recv: no update :(\n"));
/* a message has arrived since we searched - look again */
PtlMDUnlink(md_h);
if (free_after) { free(md.start); }
@ -191,11 +248,13 @@ ompi_pml_portals_recv(void *buf,
/* wait for our completion event */
PtlEQWait(ompi_pml_portals.portals_blocking_receive_queue, &ev);
assert(ev.type == PTL_EVENT_PUT_END);
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"recv: PUT_END event received"));
PtlMDUnlink(md_h);
cleanup:
ompi_pml_portals_free_md_recv(&convertor, &md, free_after);
OBJ_DESTRUCT(&convertor);
ompi_pml_portals_free_md_recv(&ompi_pml_portals.portals_blocking_receive_convertor, &md, free_after);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -60,9 +60,8 @@ ompi_pml_portals_send(void *buf,
mca_pml_base_send_mode_t sendmode,
ompi_communicator_t* comm)
{
ompi_convertor_t convertor;
int ret, free_after;
static int msg_count = 0;
static int msg_count = 1;
uint64_t match_bits;
ptl_md_t md;
@ -75,34 +74,34 @@ ompi_pml_portals_send(void *buf,
if (MCA_PML_BASE_SEND_SYNCHRONOUS == sendmode) abort();
OBJ_CONSTRUCT(&convertor, ompi_convertor_t);
ompi_convertor_copy_and_prepare_for_send(comm->c_pml_procs[dst]->proc_ompi->proc_convertor,
datatype,
count,
buf,
0,
&convertor);
&ompi_pml_portals.portals_blocking_send_convertor);
PtlMEAttach(ompi_pml_portals.portals_ni_h,
PML_PTLS_INDEX_READ,
portals_proc->proc_id,
msg_count,
0,
PTL_UNLINK,
PTL_RETAIN,
PTL_INS_AFTER,
&me_h);
ompi_pml_portals_prepare_md_send(&convertor, &md, &free_after);
ompi_pml_portals_prepare_md_send(&ompi_pml_portals.portals_blocking_send_convertor,
&md, &free_after);
md.threshold = 2;
md.options |= (PTL_MD_OP_GET | PTL_MD_EVENT_START_DISABLE);
md.eq_handle = ompi_pml_portals.portals_blocking_send_queue;
PtlMDAttach(me_h, md, PTL_UNLINK, &md_h);
PtlMDAttach(me_h, md, PTL_RETAIN, &md_h);
PML_PTLS_SEND_BITS(match_bits, comm->c_contextid, comm->c_my_rank, tag);
printf("send to: %d, %d\n", portals_proc->proc_id.nid, portals_proc->proc_id.pid);
printf("send match bits: %lx\n", match_bits);
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"send to: %u, %u\n",
portals_proc->proc_id.nid, portals_proc->proc_id.pid));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"send match bits: %lx\n", match_bits));
PtlPut(md_h, PTL_ACK_REQ, portals_proc->proc_id,
PML_PTLS_INDEX_RECV, 0,
match_bits, 0, msg_count);
@ -113,21 +112,21 @@ ompi_pml_portals_send(void *buf,
ret = PtlEQWait(ompi_pml_portals.portals_blocking_send_queue, &ev);
assert(ret == PTL_OK);
assert(ev.type == PTL_EVENT_SEND_END);
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"send: SEND_END event received"));
/* our ack / get event */
ret = PtlEQWait(ompi_pml_portals.portals_blocking_send_queue, &ev);
assert(ret == PTL_OK);
#if OMPI_PML_PORTALS_HAVE_EVENT_UNLINK
if (ev.type == PTL_EVENT_UNLINK) {
ret = PtlEQWait(ompi_pml_portals.portals_blocking_send_queue, &ev);
assert(ret == PTL_OK);
}
#endif
assert((ev.type == PTL_EVENT_ACK) || (ev.type == PTL_EVENT_GET_END));
OPAL_OUTPUT_VERBOSE((100, ompi_pml_portals.portals_output,
"send: GET_END/ACK event received"));
ompi_pml_portals_free_md_send(&md, free_after);
OBJ_DESTRUCT(&convertor);
PtlMDUnlink(md_h);
ompi_convertor_cleanup(&ompi_pml_portals.portals_blocking_send_convertor);
return OMPI_SUCCESS;
}