1
1

* Fix shutdown code in utcp portals code

* make all sends long sends for now in Portals MTL
* More optimized match check

This commit was SVN r10667.
Этот коммит содержится в:
Brian Barrett 2006-07-05 21:46:45 +00:00
родитель b1da6f8bc4
Коммит 3e29949cc8
5 изменённых файлов: 102 добавлений и 109 удалений

Просмотреть файл

@ -32,6 +32,7 @@
#define OMPI_MTL_PORTALS_SEND_TABLE_ID 2
#define OMPI_MTL_PORTALS_READ_TABLE_ID 3
#define OMPI_MTL_PORTALS_ACK_TABLE_ID 4
#elif OMPI_PORTALS_CRAYXT3
@ -43,6 +44,7 @@
#define OMPI_MTL_PORTALS_SEND_TABLE_ID 32
#define OMPI_MTL_PORTALS_READ_TABLE_ID 33
#define OMPI_MTL_PORTALS_ACK_TABLE_ID 34
#else

Просмотреть файл

@ -168,7 +168,7 @@ ompi_common_portals_ni_initialize(ptl_handle_ni_t *ni_handle)
{
int ret;
OPAL_THREAD_ADD32(&usage_count, 1);
OPAL_THREAD_ADD32(&ni_usage_count, 1);
if (PTL_INVALID_HANDLE != active_ni_h) {
*ni_handle = active_ni_h;
return OMPI_SUCCESS;
@ -313,12 +313,10 @@ ompi_common_portals_ni_finalize(void)
{
if (OPAL_THREAD_ADD32(&ni_usage_count, -1) <= 0) {
if (PTL_INVALID_HANDLE != active_ni_h) {
#if 0
if (PTL_OK != PtlNIFini(active_ni_h)) {
active_ni_h = PTL_INVALID_HANDLE;
return OMPI_ERROR;
}
#endif
active_ni_h = PTL_INVALID_HANDLE;
}
}
@ -331,7 +329,9 @@ int
ompi_common_portals_finalize(void)
{
if (OPAL_THREAD_ADD32(&usage_count, -1) <= 0) {
if (init_called) PtlFini();
if (init_called) {
PtlFini();
}
}
return OMPI_SUCCESS;

Просмотреть файл

@ -84,7 +84,7 @@ OBJ_CLASS_DECLARATION(ompi_mtl_portals_event_t);
#define PTL_PROTOCOL_IGNR PTL_PROTOCOL_MASK
#define PTL_CONTEXT_IGNR PTL_CONTEXT_MASK
#define PTL_SOURCE_IGNR PTL_SOURCE_MASK
#define PTL_TAG_IGNR 0x00000000EFFFFFFFULL
#define PTL_TAG_IGNR 0x000000007FFFFFFFULL
#define PTL_SHORT_MSG 0x1000000000000000ULL
#define PTL_LONG_MSG 0x2000000000000000ULL

Просмотреть файл

@ -31,37 +31,48 @@
#include "mtl_portals_endpoint.h"
#include "mtl_portals_request.h"
#define CHECK_MATCH(incoming_bits, match_bits, ignore_bits) \
(((incoming_bits ^ match_bits) & ~ignore_bits) == 0)
/* called when a receive should be progressed */
static int
ompi_mtl_portals_recv_progress(ptl_event_t *ev,
struct ompi_mtl_portals_request_t* ptl_request)
{
mca_pml_base_recv_request_t *recvreq =
(mca_pml_base_recv_request_t*) ptl_request->super.ompi_req;
switch (ev->type) {
case PTL_EVENT_PUT_END:
case PTL_EVENT_REPLY_END:
{
mca_pml_base_recv_request_t *recvreq =
(mca_pml_base_recv_request_t*) ptl_request->super.ompi_req;
/* make sure the data is in the right place */
ompi_mtl_datatype_unpack(&recvreq->req_convertor,
ev->md.start, ev->md.length);
/* make sure the data is in the right place */
ompi_mtl_datatype_unpack(&recvreq->req_convertor,
ev->md.start, ev->md.length);
/* set the status */
recvreq->req_base.req_ompi.req_status.MPI_SOURCE =
PTL_GET_SOURCE(ev->match_bits);
recvreq->req_base.req_ompi.req_status.MPI_TAG =
/* set the status */
recvreq->req_base.req_ompi.req_status.MPI_SOURCE =
PTL_GET_SOURCE(ev->match_bits);
recvreq->req_base.req_ompi.req_status.MPI_TAG =
PTL_GET_TAG(ev->match_bits);
/* BWB - fix me - this is right for put but not for
unexpected, I think */
recvreq->req_base.req_ompi.req_status.MPI_ERROR =
(ev->rlength > ev->mlength) ?
MPI_ERR_TRUNCATE : MPI_SUCCESS;
recvreq->req_base.req_ompi.req_status._count =
ev->mlength;
recvreq->req_base.req_ompi.req_status.MPI_ERROR =
(ev->rlength > ev->mlength) ?
MPI_ERR_TRUNCATE : MPI_SUCCESS;
recvreq->req_base.req_ompi.req_status._count =
ev->mlength;
ptl_request->super.completion_callback(&ptl_request->super);
break;
case PTL_EVENT_REPLY_END:
/* make sure the data is in the right place */
ompi_mtl_datatype_unpack(&recvreq->req_convertor,
ev->md.start, ev->md.length);
ptl_request->super.completion_callback(&ptl_request->super);
}
PtlMDUnlink(ev->md_handle);
/* set the status */
recvreq->req_base.req_ompi.req_status._count =
ev->mlength;
ptl_request->super.completion_callback(&ptl_request->super);
break;
default:
@ -87,19 +98,25 @@ ompi_mtl_portals_get_data(ompi_mtl_portals_event_t *recv_event,
abort();
} else {
size_t buflen;
mca_pml_base_recv_request_t *recvreq =
(mca_pml_base_recv_request_t*) ptl_request->super.ompi_req;
ret = ompi_mtl_datatype_recv_buf(convertor, &md.start, &buflen,
&ptl_request->free_after);
if (OMPI_SUCCESS != ret) abort();
md.length = buflen;
md.length = (recv_event->ev.rlength > buflen) ? buflen : recv_event->ev.rlength;
md.threshold = 2; /* send and get */
md.options = PTL_MD_EVENT_START_DISABLE;
md.user_ptr = ptl_request;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
/* retain because it's unclear how many events we'll get here ... */
ret = PtlMDBind(ompi_mtl_portals.ptl_ni_h, md,
PTL_UNLINK, &md_h);
PTL_RETAIN, &md_h);
if (PTL_OK != ret) abort();
ptl_request->event_callback = ompi_mtl_portals_recv_progress;
ret = PtlGet(md_h,
recv_event->ev.initiator,
OMPI_MTL_PORTALS_READ_TABLE_ID,
@ -108,7 +125,13 @@ ompi_mtl_portals_get_data(ompi_mtl_portals_event_t *recv_event,
0);
if (PTL_OK != ret) abort();
ptl_request->event_callback = ompi_mtl_portals_recv_progress;
recvreq->req_base.req_ompi.req_status.MPI_SOURCE =
PTL_GET_SOURCE(recv_event->ev.match_bits);
recvreq->req_base.req_ompi.req_status.MPI_TAG =
PTL_GET_TAG(recv_event->ev.match_bits);
recvreq->req_base.req_ompi.req_status.MPI_ERROR =
(recv_event->ev.rlength > buflen) ?
MPI_ERR_TRUNCATE : MPI_SUCCESS;
}
return OMPI_SUCCESS;
@ -155,13 +178,13 @@ ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl,
ompi_mtl_portals_event_t *recv_event =
(ompi_mtl_portals_event_t*) list_item;
if ((recv_event->ev.match_bits & ~ignore_bits) ==
(match_bits & ~ignore_bits)) {
if (CHECK_MATCH(recv_event->ev.match_bits, match_bits, ignore_bits)) {
/* we have a match... */
opal_list_remove_item(&(ompi_mtl_portals.unexpected_messages),
list_item);
ompi_mtl_portals_get_data(recv_event, convertor, ptl_request);
OMPI_FREE_LIST_RETURN(&ompi_mtl_portals.event_fl, list_item);
OMPI_FREE_LIST_RETURN(&ompi_mtl_portals.event_fl,
(ompi_free_list_item_t*) list_item);
goto cleanup;
}
list_item = next_item;
@ -170,13 +193,15 @@ ompi_mtl_portals_irecv(struct mca_mtl_base_module_t* mtl,
/* now check the unexpected queue */
restart_search:
while (true) {
ompi_free_list_item_t *item;
ompi_mtl_portals_event_t *recv_event;
OMPI_FREE_LIST_GET(&ompi_mtl_portals.event_fl, recv_event, ret);
OMPI_FREE_LIST_GET(&ompi_mtl_portals.event_fl, item, ret);
recv_event = (ompi_mtl_portals_event_t*) item;
ret = PtlEQGet(ompi_mtl_portals.ptl_unexpected_recv_eq_h,
&recv_event->ev);
if (PTL_OK == ret) {
if ((recv_event->ev.match_bits & ~ignore_bits) ==
(match_bits & ~ignore_bits)) {
if (CHECK_MATCH(recv_event->ev.match_bits, match_bits, ignore_bits)) {
/* we have a match... */
ompi_mtl_portals_get_data(recv_event, convertor, ptl_request);
goto cleanup;

Просмотреть файл

@ -28,31 +28,45 @@
#include "mtl_portals_request.h"
#include "mtl_portals_endpoint.h"
/* called when a send should be progressed */
/* called when no ack is necessary */
static int
ompi_mtl_portals_send_progress_no_ack(ptl_event_t *ev,
struct ompi_mtl_portals_request_t *ptl_request)
{
switch (ev->type) {
case PTL_EVENT_SEND_END:
{
/* the get finished, so we're done. */
if (ptl_request->free_after) {
free(ev->md.start);
}
ptl_request->super.completion_callback(&ptl_request->super);
}
default:
break;
}
return OMPI_SUCCESS;
}
/* called when a send that should wait for an ack or longer shold be progressed */
static int
ompi_mtl_portals_send_progress(ptl_event_t *ev,
struct ompi_mtl_portals_request_t* ptl_request)
{
switch (ev->type) {
case PTL_EVENT_ACK:
/* message received - if they receivd the entire message,
we're done. If not, wait for the get */
if (ev->md.length == ev->mlength) {
if (ptl_request->free_after) {
free(ev->md.start);
}
PtlMDUnlink(ev->md_handle);
ptl_request->super.completion_callback(&ptl_request->super);
}
break;
case PTL_EVENT_GET_END:
case PTL_EVENT_PUT_END:
/* we only receive an ack if the message was received into an
expected message. Otherwise, we don't get an ack, but mark
completion when the message was pulled (long message) or
acked via an explicit put (short synchronous message). */
{
/* the get finished, so we're done. */
if (ptl_request->free_after) {
free(ev->md.start);
}
PtlMDUnlink(ev->md_handle);
ptl_request->super.completion_callback(&ptl_request->super);
}
break;
@ -108,8 +122,9 @@ ompi_mtl_portals_isend(struct mca_mtl_base_module_t* mtl,
ptl_request->event_callback = ompi_mtl_portals_send_progress;
if (MCA_PML_BASE_SEND_READY == mode) {
/* ready send - same protocol regardless of length */
if ((MCA_PML_BASE_SEND_READY == mode)) {
/* ready send (length doesn't matter) or short non-sync send.
Eagerly send data and don't wait for completion */
PTL_SET_SEND_BITS(match_bits, comm->c_contextid,
comm->c_my_rank,
tag, PTL_READY_MSG);
@ -124,13 +139,15 @@ ompi_mtl_portals_isend(struct mca_mtl_base_module_t* mtl,
ret = PtlMDBind(ompi_mtl_portals.ptl_ni_h,
md,
PTL_RETAIN,
PTL_UNLINK,
&(md_h));
if (OMPI_SUCCESS != ret) {
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ptl_request->event_callback = ompi_mtl_portals_send_progress_no_ack;
ret = PtlPut(md_h,
PTL_NO_ACK_REQ,
endpoint->ptl_proc,
@ -144,17 +161,17 @@ ompi_mtl_portals_isend(struct mca_mtl_base_module_t* mtl,
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
} else if (md.length > ompi_mtl_portals.eager_limit) {
/* it's a long message - same protocol for all send modes */
} else {
/* it's a long message - same protocol for all send modes
other than ready */
PTL_SET_SEND_BITS(match_bits, comm->c_contextid,
comm->c_my_rank,
tag, PTL_LONG_MSG);
#if OMPI_MTL_PORTALS_DEBUG
printf("long send bits: 0x%016llx\n", match_bits);
printf("long send bits: 0x%016llx (%d)\n", match_bits, dest);
#endif
md.threshold = 3; /* send, ack, get */
md.threshold = 2; /* send, {ack, get} */
md.options = PTL_MD_OP_GET | PTL_MD_EVENT_START_DISABLE;
md.user_ptr = ptl_request;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
@ -175,7 +192,7 @@ ompi_mtl_portals_isend(struct mca_mtl_base_module_t* mtl,
ret = PtlMDAttach(me_h,
md,
PTL_RETAIN,
PTL_UNLINK,
&(md_h));
if (OMPI_SUCCESS != ret) {
@ -197,57 +214,6 @@ ompi_mtl_portals_isend(struct mca_mtl_base_module_t* mtl,
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
} else if (MCA_PML_BASE_SEND_SYNCHRONOUS) {
/* short synchronous message */
PTL_SET_SEND_BITS(match_bits, comm->c_contextid,
comm->c_my_rank,
tag, PTL_SHORT_MSG);
#if OMPI_MTL_PORTALS_DEBUG
printf("short ssend bits: 0x%016llx\n", match_bits);
#endif
/* BWB - fix me */
return OMPI_ERR_NOT_IMPLEMENTED;
} else {
/* short message for something not ack-worthy */
PTL_SET_SEND_BITS(match_bits, comm->c_contextid,
comm->c_my_rank,
tag, PTL_SHORT_MSG);
#if OMPI_MTL_PORTALS_DEBUG
printf("short send bits: 0x%016llx\n", match_bits);
#endif
return OMPI_ERR_NOT_IMPLEMENTED;
md.threshold = 1;
md.options = PTL_MD_EVENT_START_DISABLE;
md.user_ptr = ptl_request;
md.eq_handle = ompi_mtl_portals.ptl_eq_h;
ret = PtlMDBind(ompi_mtl_portals.ptl_ni_h,
md,
PTL_RETAIN,
&(md_h));
if (OMPI_SUCCESS != ret) {
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
ret = PtlPut(md_h,
PTL_NO_ACK_REQ,
endpoint->ptl_proc,
OMPI_MTL_PORTALS_SEND_TABLE_ID,
0,
match_bits,
0,
0);
if (OMPI_SUCCESS != ret) {
PtlMDUnlink(md_h);
if (ptl_request->free_after) free(md.start);
return ompi_common_portals_error_ptl_to_ompi(ret);
}
}
return OMPI_SUCCESS;