1
1

Substantial changes to the CM PML, allows us to have a very thin request for

all but buffered and persistent requests. Unfortunately we were note able to
reuse the pml_base_request_t as it was just too heavy for our needs. Lots of
code for 2/10 usec ;-) 

This commit was SVN r10810.
Этот коммит содержится в:
Galen Shipman 2006-07-14 19:32:26 +00:00
родитель c22b0d516e
Коммит 6ed255f114
15 изменённых файлов: 1004 добавлений и 457 удалений

Просмотреть файл

@ -203,6 +203,19 @@ int ompi_mtl_mx_progress( void ) {
if(mtl_mx_request->free_after) {
free(mtl_mx_request->mx_segment[0].segment_ptr);
}
switch (mx_status.code) {
case MX_STATUS_SUCCESS:
mtl_mx_request->super.ompi_req->req_status.MPI_ERROR =
OMPI_SUCCESS;
break;
case MX_STATUS_TRUNCATED:
mtl_mx_request->super.ompi_req->req_status.MPI_ERROR =
MPI_ERR_TRUNCATE;
break;
default:
mtl_mx_request->super.ompi_req->req_status.MPI_ERROR =
MPI_ERR_INTERN;
}
mtl_mx_request->super.completion_callback(&mtl_mx_request->super);
}
if(OMPI_MTL_MX_IRECV == mtl_mx_request->type) {

Просмотреть файл

@ -283,6 +283,57 @@ int mca_pml_base_bsend_request_alloc(ompi_request_t* request)
return OMPI_SUCCESS;
}
/*
* allocate buffer
*/
void* mca_pml_base_bsend_request_alloc_buf( size_t length )
{
void* buf = NULL;
/* has a buffer been provided */
OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);
if(NULL == mca_pml_bsend_addr) {
OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
return NULL;
}
/* allocate a buffer to hold packed message */
buf = mca_pml_bsend_allocator->alc_alloc(
mca_pml_bsend_allocator, length, 0, NULL);
if(NULL == buf) {
/* release resources when request is freed */
OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
return NULL;
}
/* increment count of pending requests */
mca_pml_bsend_count++;
OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
return buf;
}
/*
* Request completed - free buffer and decrement pending count
*/
int mca_pml_base_bsend_request_free(void* addr)
{
/* remove from list of pending requests */
OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);
/* free buffer */
mca_pml_bsend_allocator->alc_free(mca_pml_bsend_allocator, addr);
/* decrement count of buffered requests */
if(--mca_pml_bsend_count == 0)
opal_condition_signal(&mca_pml_bsend_condition);
OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
return OMPI_SUCCESS;
}
/*
* Request completed - free buffer and decrement pending count

Просмотреть файл

@ -34,6 +34,9 @@ OMPI_DECLSPEC int mca_pml_base_bsend_detach(void* addr, int* size);
OMPI_DECLSPEC int mca_pml_base_bsend_request_alloc(ompi_request_t*);
OMPI_DECLSPEC int mca_pml_base_bsend_request_start(ompi_request_t*);
OMPI_DECLSPEC int mca_pml_base_bsend_request_fini(ompi_request_t*);
OMPI_DECLSPEC void* mca_pml_base_bsend_request_alloc_buf( size_t length );
OMPI_DECLSPEC int mca_pml_base_bsend_request_free(void* addr);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif

Просмотреть файл

@ -33,6 +33,8 @@ local_sources = \
pml_cm_recv.c \
pml_cm_recvreq.h \
pml_cm_recvreq.c \
pml_cm_request.h \
pml_cm_request.c \
pml_cm_send.c \
pml_cm_sendreq.h \
pml_cm_sendreq.c \

Просмотреть файл

@ -168,27 +168,35 @@ mca_pml_cm_dump(struct ompi_communicator_t* comm, int verbose)
void
mca_pml_cm_request_completion(struct mca_mtl_request_t *mtl_request)
mca_pml_cm_thin_send_request_completion(struct mca_mtl_request_t *mtl_request)
{
mca_pml_base_request_t *base_request =
(mca_pml_base_request_t*) mtl_request->ompi_req;
switch (base_request->req_type) {
case MCA_PML_REQUEST_SEND:
{
mca_pml_cm_send_request_t* sendreq =
(mca_pml_cm_send_request_t*) base_request;
MCA_PML_CM_SEND_REQUEST_PML_COMPLETE(sendreq);
}
break;
case MCA_PML_REQUEST_RECV:
{
mca_pml_cm_recv_request_t* recvreq =
(mca_pml_cm_recv_request_t*) base_request;
MCA_PML_CM_RECV_REQUEST_PML_COMPLETE(recvreq);
}
break;
default:
break;
}
mca_pml_cm_send_request_t *base_request =
(mca_pml_cm_send_request_t*) mtl_request->ompi_req;
MCA_PML_CM_THIN_SEND_REQUEST_PML_COMPLETE(((mca_pml_cm_thin_send_request_t*) base_request));
}
void
mca_pml_cm_hvy_send_request_completion(struct mca_mtl_request_t *mtl_request)
{
mca_pml_cm_send_request_t *base_request =
(mca_pml_cm_send_request_t*) mtl_request->ompi_req;
MCA_PML_CM_HVY_SEND_REQUEST_PML_COMPLETE(((mca_pml_cm_hvy_send_request_t*) base_request));
}
void
mca_pml_cm_thin_recv_request_completion(struct mca_mtl_request_t *mtl_request)
{
mca_pml_cm_request_t *base_request =
(mca_pml_cm_request_t*) mtl_request->ompi_req;
MCA_PML_CM_THIN_RECV_REQUEST_PML_COMPLETE(((mca_pml_cm_thin_recv_request_t*) base_request));
}
void
mca_pml_cm_hvy_recv_request_completion(struct mca_mtl_request_t *mtl_request)
{
mca_pml_cm_request_t *base_request =
(mca_pml_cm_request_t*) mtl_request->ompi_req;
MCA_PML_CM_HVY_RECV_REQUEST_PML_COMPLETE(((mca_pml_cm_hvy_recv_request_t*) base_request));
}

Просмотреть файл

@ -30,9 +30,11 @@ struct mca_mtl_request_t;
struct ompi_pml_cm_t {
mca_pml_base_module_t super;
/** free list of send request structures */
ompi_free_list_t cm_send_requests;
ompi_free_list_t cm_thin_send_requests;
ompi_free_list_t cm_hvy_send_requests;
/** free list of recv request structures */
ompi_free_list_t cm_recv_requests;
ompi_free_list_t cm_thin_recv_requests;
ompi_free_list_t cm_hvy_recv_requests;
};
typedef struct ompi_pml_cm_t ompi_pml_cm_t;
extern ompi_pml_cm_t ompi_pml_cm;
@ -116,7 +118,11 @@ extern int mca_pml_cm_dump(struct ompi_communicator_t* comm,
extern int mca_pml_cm_cancel(struct ompi_request_t *request, int flag);
extern void mca_pml_cm_request_completion(struct mca_mtl_request_t *mtl_request);
extern void mca_pml_cm_thin_send_request_completion(struct mca_mtl_request_t *mtl_request);
extern void mca_pml_cm_hvy_send_request_completion(struct mca_mtl_request_t *mtl_request);
extern void mca_pml_cm_thin_recv_request_completion(struct mca_mtl_request_t *mtl_request);
extern void mca_pml_cm_hvy_recv_request_completion(struct mca_mtl_request_t *mtl_request);
#if defined(c_plusplus) || defined(__cplusplus)
}

Просмотреть файл

@ -21,31 +21,10 @@ int
mca_pml_cm_cancel(struct ompi_request_t *request, int flag)
{
int ret;
mca_pml_base_request_t *base_request =
(mca_pml_base_request_t*) request;
switch (base_request->req_type) {
case MCA_PML_REQUEST_SEND:
{
mca_pml_cm_send_request_t* sendreq =
(mca_pml_cm_send_request_t*) request;
ret = OMPI_MTL_CALL(cancel(ompi_mtl,
&sendreq->req_mtl,
flag));
}
break;
case MCA_PML_REQUEST_RECV:
{
mca_pml_cm_recv_request_t* recvreq =
(mca_pml_cm_recv_request_t*) request;
ret = OMPI_MTL_CALL(cancel(ompi_mtl,
&recvreq->req_mtl,
flag));
}
break;
default:
ret = OMPI_SUCCESS;
}
/* mca_pml_cm_request_t *base_request = */
/* (mca_pml_cm_request_t*) request; */
/* ret = OMPI_MTL_CALL(cancel(ompi_mtl, */
/* &base_request->req_mtl, */
/* flag)); */
return ret;
}

Просмотреть файл

@ -99,19 +99,37 @@ mca_pml_cm_component_init(int* priority,
ompi_pml_cm.super.pml_max_tag = ompi_mtl->mtl_max_tag;
/* BWB - FIX ME - add mca parameters for free list water marks */
OBJ_CONSTRUCT(&ompi_pml_cm.cm_send_requests, ompi_free_list_t);
ompi_free_list_init(&ompi_pml_cm.cm_send_requests,
sizeof(mca_pml_cm_send_request_t) +
OBJ_CONSTRUCT(&ompi_pml_cm.cm_thin_send_requests, ompi_free_list_t);
ompi_free_list_init(&ompi_pml_cm.cm_thin_send_requests,
sizeof(mca_pml_cm_thin_send_request_t) +
ompi_mtl->mtl_request_size,
OBJ_CLASS(mca_pml_cm_send_request_t),
OBJ_CLASS(mca_pml_cm_thin_send_request_t),
1, -1, 1,
NULL);
OBJ_CONSTRUCT(&ompi_pml_cm.cm_recv_requests, ompi_free_list_t);
ompi_free_list_init(&ompi_pml_cm.cm_recv_requests,
sizeof(mca_pml_cm_recv_request_t) +
OBJ_CONSTRUCT(&ompi_pml_cm.cm_hvy_send_requests, ompi_free_list_t);
ompi_free_list_init(&ompi_pml_cm.cm_hvy_send_requests,
sizeof(mca_pml_cm_hvy_send_request_t) +
ompi_mtl->mtl_request_size,
OBJ_CLASS(mca_pml_cm_recv_request_t),
OBJ_CLASS(mca_pml_cm_hvy_send_request_t),
1, -1, 1,
NULL);
OBJ_CONSTRUCT(&ompi_pml_cm.cm_thin_recv_requests, ompi_free_list_t);
ompi_free_list_init(&ompi_pml_cm.cm_thin_recv_requests,
sizeof(mca_pml_cm_thin_recv_request_t) +
ompi_mtl->mtl_request_size,
OBJ_CLASS(mca_pml_cm_thin_recv_request_t),
1, -1, 1,
NULL);
OBJ_CONSTRUCT(&ompi_pml_cm.cm_hvy_recv_requests, ompi_free_list_t);
ompi_free_list_init(&ompi_pml_cm.cm_hvy_recv_requests,
sizeof(mca_pml_cm_hvy_recv_request_t) +
ompi_mtl->mtl_request_size,
OBJ_CLASS(mca_pml_cm_hvy_recv_request_t),
1, -1, 1,
NULL);
@ -132,8 +150,10 @@ mca_pml_cm_component_fini(void)
/* shut down buffered send code */
mca_pml_base_bsend_fini();
OBJ_DESTRUCT(&ompi_pml_cm.cm_send_requests);
OBJ_DESTRUCT(&ompi_pml_cm.cm_recv_requests);
OBJ_DESTRUCT(&ompi_pml_cm.cm_thin_send_requests);
OBJ_DESTRUCT(&ompi_pml_cm.cm_hvy_send_requests);
OBJ_DESTRUCT(&ompi_pml_cm.cm_thin_recv_requests);
OBJ_DESTRUCT(&ompi_pml_cm.cm_hvy_recv_requests);
if (NULL != ompi_mtl && NULL != ompi_mtl->mtl_finalize) {
return ompi_mtl->mtl_finalize(ompi_mtl);

Просмотреть файл

@ -21,21 +21,25 @@
int
mca_pml_cm_irecv_init(void *addr,
size_t count,
ompi_datatype_t * datatype,
int src,
int tag,
struct ompi_communicator_t *comm,
struct ompi_request_t **request)
size_t count,
ompi_datatype_t * datatype,
int src,
int tag,
struct ompi_communicator_t *comm,
struct ompi_request_t **request)
{
int ret;
mca_pml_cm_recv_request_t *recvreq;
MCA_PML_CM_RECV_REQUEST_ALLOC(recvreq, ret);
mca_pml_cm_hvy_recv_request_t *recvreq;
ompi_proc_t* ompi_proc;
MCA_PML_CM_HVY_RECV_REQUEST_ALLOC(recvreq, ret);
if (NULL == recvreq || OMPI_SUCCESS != ret) return ret;
MCA_PML_CM_RECV_REQUEST_INIT(recvreq, addr, count,
datatype, src, tag, comm, true);
MCA_PML_CM_HVY_RECV_REQUEST_INIT(recvreq, ompi_proc, comm, tag, src,
datatype, addr, count, true);
recvreq->req_base.req_pml_type = MCA_PML_CM_REQUEST_RECV;
*request = (ompi_request_t*) recvreq;
return OMPI_SUCCESS;
@ -44,23 +48,30 @@ mca_pml_cm_irecv_init(void *addr,
int
mca_pml_cm_irecv(void *addr,
size_t count,
ompi_datatype_t * datatype,
int src,
int tag,
struct ompi_communicator_t *comm,
struct ompi_request_t **request)
size_t count,
ompi_datatype_t * datatype,
int src,
int tag,
struct ompi_communicator_t *comm,
struct ompi_request_t **request)
{
int ret;
mca_pml_cm_recv_request_t *recvreq;
MCA_PML_CM_RECV_REQUEST_ALLOC(recvreq, ret);
mca_pml_cm_thin_recv_request_t *recvreq;
ompi_proc_t* ompi_proc;
MCA_PML_CM_THIN_RECV_REQUEST_ALLOC(recvreq, ret);
if (NULL == recvreq || OMPI_SUCCESS != ret) return ret;
MCA_PML_CM_RECV_REQUEST_INIT(recvreq, addr, count,
datatype, src, tag, comm, false);
MCA_PML_CM_RECV_REQUEST_START(recvreq, ret);
MCA_PML_CM_THIN_RECV_REQUEST_INIT(recvreq,
ompi_proc,
comm,
tag,
src,
datatype,
addr,
count);
MCA_PML_CM_THIN_RECV_REQUEST_START(recvreq, comm, tag, src, ret);
if (OMPI_SUCCESS == ret) *request = (ompi_request_t*) recvreq;
@ -69,7 +80,7 @@ mca_pml_cm_irecv(void *addr,
int
mca_pml_cm_recv(void *buf,
mca_pml_cm_recv(void *addr,
size_t count,
ompi_datatype_t * datatype,
int src,
@ -78,41 +89,49 @@ mca_pml_cm_recv(void *buf,
ompi_status_public_t * status)
{
int ret;
mca_pml_cm_recv_request_t *recvreq;
MCA_PML_CM_RECV_REQUEST_ALLOC(recvreq, ret);
mca_pml_cm_thin_recv_request_t *recvreq;
ompi_proc_t* ompi_proc;
MCA_PML_CM_THIN_RECV_REQUEST_ALLOC(recvreq, ret);
if (NULL == recvreq || OMPI_SUCCESS != ret) return ret;
MCA_PML_CM_RECV_REQUEST_INIT(recvreq, buf, count,
datatype, src, tag, comm, false);
MCA_PML_CM_RECV_REQUEST_START(recvreq, ret);
MCA_PML_CM_THIN_RECV_REQUEST_INIT(recvreq,
ompi_proc,
comm,
tag,
src,
datatype,
addr,
count);
MCA_PML_CM_THIN_RECV_REQUEST_START(recvreq, comm, tag, src, ret);
if (OMPI_SUCCESS != ret) {
/* BWB - XXX - need cleanup of request here */
MCA_PML_CM_RECV_REQUEST_RETURN(recvreq);
MCA_PML_CM_THIN_RECV_REQUEST_RETURN(recvreq);
}
if (recvreq->req_recv.req_base.req_ompi.req_complete == false) {
if (recvreq->req_base.req_ompi.req_complete == false) {
/* give up and sleep until completion */
if (opal_using_threads()) {
opal_mutex_lock(&ompi_request_lock);
ompi_request_waiting++;
while (recvreq->req_recv.req_base.req_ompi.req_complete == false)
while (recvreq->req_base.req_ompi.req_complete == false)
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--;
opal_mutex_unlock(&ompi_request_lock);
} else {
ompi_request_waiting++;
while (recvreq->req_recv.req_base.req_ompi.req_complete == false)
while (recvreq->req_base.req_ompi.req_complete == false)
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--;
}
}
if (NULL != status) { /* return status */
*status = recvreq->req_recv.req_base.req_ompi.req_status;
*status = recvreq->req_base.req_ompi.req_status;
}
ret = recvreq->req_recv.req_base.req_ompi.req_status.MPI_ERROR;
ret = recvreq->req_base.req_ompi.req_status.MPI_ERROR;
ompi_request_free( (ompi_request_t**)&recvreq );
return ret;

Просмотреть файл

@ -23,16 +23,35 @@
static int
mca_pml_cm_recv_request_free(struct ompi_request_t** request)
mca_pml_cm_thin_recv_request_free(struct ompi_request_t** request)
{
mca_pml_cm_recv_request_t* recvreq = *(mca_pml_cm_recv_request_t**)request;
mca_pml_cm_request_t* recvreq = *(mca_pml_cm_request_t**)request;
assert( false == recvreq->req_recv.req_base.req_free_called );
assert( false == recvreq->req_free_called );
OPAL_THREAD_LOCK(&ompi_request_lock);
recvreq->req_recv.req_base.req_free_called = true;
if( true == recvreq->req_recv.req_base.req_pml_complete ) {
MCA_PML_CM_RECV_REQUEST_RETURN( recvreq );
recvreq->req_free_called = true;
if( true == recvreq->req_pml_complete ) {
MCA_PML_CM_THIN_RECV_REQUEST_RETURN((mca_pml_cm_thin_recv_request_t*) recvreq );
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
*request = MPI_REQUEST_NULL;
return OMPI_SUCCESS;
}
static int
mca_pml_cm_hvy_recv_request_free(struct ompi_request_t** request)
{
mca_pml_cm_request_t* recvreq = *(mca_pml_cm_request_t**)request;
assert( false == recvreq->req_free_called );
OPAL_THREAD_LOCK(&ompi_request_lock);
recvreq->req_free_called = true;
if( true == recvreq->req_pml_complete ) {
MCA_PML_CM_HVY_RECV_REQUEST_RETURN((mca_pml_cm_hvy_recv_request_t*) recvreq );
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
@ -43,25 +62,35 @@ mca_pml_cm_recv_request_free(struct ompi_request_t** request)
static void
recvreq_construct(mca_pml_cm_recv_request_t* recvreq)
mca_pml_cm_thin_recv_request_construct(mca_pml_cm_thin_recv_request_t* recvreq)
{
recvreq->req_mtl.ompi_req = (ompi_request_t*) recvreq;
recvreq->req_mtl.completion_callback = mca_pml_cm_request_completion;
recvreq->req_mtl.completion_callback = mca_pml_cm_thin_recv_request_completion;
recvreq->req_recv.req_base.req_ompi.req_free = mca_pml_cm_recv_request_free;
recvreq->req_recv.req_base.req_ompi.req_cancel = mca_pml_cm_cancel;
recvreq->req_base.req_ompi.req_free = mca_pml_cm_thin_recv_request_free;
recvreq->req_base.req_ompi.req_cancel = mca_pml_cm_cancel;
}
static void
recvreq_destruct(mca_pml_cm_recv_request_t* recvreq)
mca_pml_cm_hvy_recv_request_construct(mca_pml_cm_hvy_recv_request_t* recvreq)
{
recvreq->req_mtl.ompi_req = NULL;
recvreq->req_mtl.completion_callback = NULL;
recvreq->req_mtl.ompi_req = (ompi_request_t*) recvreq;
recvreq->req_mtl.completion_callback = mca_pml_cm_hvy_recv_request_completion;
recvreq->req_base.req_ompi.req_free = mca_pml_cm_hvy_recv_request_free;
recvreq->req_base.req_ompi.req_cancel = mca_pml_cm_cancel;
}
OBJ_CLASS_INSTANCE(mca_pml_cm_recv_request_t,
mca_pml_base_recv_request_t,
recvreq_construct,
recvreq_destruct);
OBJ_CLASS_INSTANCE(mca_pml_cm_thin_recv_request_t,
mca_pml_cm_request_t,
mca_pml_cm_thin_recv_request_construct,
NULL);
OBJ_CLASS_INSTANCE(mca_pml_cm_hvy_recv_request_t,
mca_pml_cm_request_t,
mca_pml_cm_hvy_recv_request_construct,
NULL);

Просмотреть файл

@ -19,16 +19,33 @@
#ifndef PML_CM_RECVREQ_H
#define PML_CM_RECVREQ_H
#include "pml_cm_request.h"
#include "ompi/mca/pml/base/pml_base_recvreq.h"
#include "ompi/mca/mtl/mtl.h"
struct mca_pml_cm_recv_request_t {
mca_pml_base_recv_request_t req_recv;
mca_mtl_request_t req_mtl;
struct mca_pml_cm_thin_recv_request_t {
mca_pml_cm_request_t req_base;
mca_mtl_request_t req_mtl; /**< the mtl specific memory */
};
typedef struct mca_pml_cm_recv_request_t mca_pml_cm_recv_request_t;
OBJ_CLASS_DECLARATION(mca_pml_cm_recv_request_t);
typedef struct mca_pml_cm_thin_recv_request_t mca_pml_cm_thin_recv_request_t;
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_cm_thin_recv_request_t);
struct mca_pml_cm_hvy_recv_request_t {
mca_pml_cm_request_t req_base;
void *req_addr; /**< pointer to application buffer */
size_t req_count; /**< count of user datatype elements */
int32_t req_peer; /**< peer process - rank w/in this communicator */
int32_t req_tag; /**< user defined tag */
struct ompi_communicator_t *req_comm; /**< communicator pointer */
struct ompi_datatype_t *req_datatype; /**< pointer to data type */
void *req_buff; /**< pointer to send buffer - may not be application buffer */
size_t req_bytes_packed; /**< packed size of a message given the datatype and count */
bool req_blocking;
mca_mtl_request_t req_mtl; /**< the mtl specific memory */
};
typedef struct mca_pml_cm_hvy_recv_request_t mca_pml_cm_hvy_recv_request_t;
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_cm_hvy_recv_request_t);
/**
* Allocate a recv request from the modules free list.
@ -36,11 +53,18 @@ OBJ_CLASS_DECLARATION(mca_pml_cm_recv_request_t);
* @param rc (OUT) OMPI_SUCCESS or error status on failure.
* @return Receive request.
*/
#define MCA_PML_CM_RECV_REQUEST_ALLOC(recvreq, rc) \
#define MCA_PML_CM_THIN_RECV_REQUEST_ALLOC(recvreq, rc) \
do { \
ompi_free_list_item_t*item; \
OMPI_FREE_LIST_GET(&ompi_pml_cm.cm_recv_requests, item, rc); \
recvreq = (mca_pml_cm_recv_request_t*) item; \
OMPI_FREE_LIST_GET(&ompi_pml_cm.cm_thin_recv_requests, item, rc); \
recvreq = (mca_pml_cm_thin_recv_request_t*) item; \
} while (0)
#define MCA_PML_CM_HVY_RECV_REQUEST_ALLOC(recvreq, rc) \
do { \
ompi_free_list_item_t*item; \
OMPI_FREE_LIST_GET(&ompi_pml_cm.cm_hvy_recv_requests, item, rc); \
recvreq = (mca_pml_cm_hvy_recv_request_t*) item; \
} while (0)
@ -56,41 +80,71 @@ do { \
* @param comm (IN) Communicator.
* @param persistent (IN) Is this a ersistent request.
*/
#define MCA_PML_CM_RECV_REQUEST_INIT( request, \
addr, \
count, \
datatype, \
src, \
tag, \
comm, \
persistent) \
#define MCA_PML_CM_THIN_RECV_REQUEST_INIT( request, \
ompi_proc, \
comm, \
tag, \
src, \
datatype, \
addr, \
count ) \
do { \
MCA_PML_BASE_RECV_REQUEST_INIT( &(request)->req_recv, \
addr, \
count, \
datatype, \
src, \
tag, \
comm, \
persistent); \
/* BWB - fix me - need real remote proc */ \
if (MPI_ANY_SOURCE == src) { \
(request)->req_recv.req_base.req_proc = \
OMPI_REQUEST_INIT(&(request)->req_base.req_ompi, false); \
(request)->req_base.req_pml_complete = false; \
(request)->req_base.req_free_called = false; \
\
if( MPI_ANY_SOURCE == src ) { \
ompi_proc = \
comm->c_pml_procs[comm->c_my_rank]->proc_ompi; \
} else { \
(request)->req_recv.req_base.req_proc = \
ompi_proc = \
comm->c_pml_procs[src]->proc_ompi; \
} \
\
ompi_convertor_copy_and_prepare_for_recv( \
(request)->req_recv.req_base.req_proc->proc_convertor, \
(request)->req_recv.req_base.req_datatype, \
(request)->req_recv.req_base.req_count, \
(request)->req_recv.req_base.req_addr, \
0, \
&(request)->req_recv.req_convertor ); \
ompi_proc->proc_convertor, \
datatype, \
count, \
addr, \
0, \
&(request)->req_base.req_convertor ); \
} while(0)
#define MCA_PML_CM_HVY_RECV_REQUEST_INIT( request, \
ompi_proc, \
comm, \
tag, \
src, \
datatype, \
addr, \
count, \
persistent) \
do { \
OMPI_REQUEST_INIT(&(request)->req_base.req_ompi, persistent); \
(request)->req_base.req_pml_complete = (persistent ? true : false); \
(request)->req_base.req_free_called = false; \
request->req_comm = comm; \
request->req_tag = tag; \
request->req_peer = src; \
request->req_datatype = datatype; \
request->req_addr = addr; \
request->req_count = count; \
\
if( MPI_ANY_SOURCE == src ) { \
ompi_proc = \
comm->c_pml_procs[comm->c_my_rank]->proc_ompi; \
} else { \
ompi_proc = \
comm->c_pml_procs[src]->proc_ompi; \
} \
ompi_convertor_copy_and_prepare_for_recv( \
ompi_proc->proc_convertor, \
datatype, \
count, \
addr, \
0, \
&(request)->req_base.req_convertor ); \
} while(0)
/**
* Start an initialized request.
@ -98,16 +152,50 @@ do { \
* @param request Receive request.
* @return OMPI_SUCESS or error status on failure.
*/
#define MCA_PML_CM_RECV_REQUEST_START(request, ret) \
#define MCA_PML_CM_THIN_RECV_REQUEST_START(request, comm, tag, src, ret) \
do { \
/* init/re-init the request */ \
MCA_PML_BASE_RECV_START( &(request)->req_recv.req_base ); \
request->req_base.req_pml_complete = false; \
request->req_base.req_ompi.req_complete = false; \
request->req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
\
/* always set the req_status.MPI_TAG to ANY_TAG before starting the \
* request. This field is used if cancelled to find out if the request \
* has been matched or not. \
*/ \
request->req_base.req_ompi.req_status.MPI_TAG = OMPI_ANY_TAG; \
request->req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; \
request->req_base.req_ompi.req_status._cancelled = 0; \
ret = OMPI_MTL_CALL(irecv(ompi_mtl, \
recvreq->req_recv.req_base.req_comm, \
recvreq->req_recv.req_base.req_peer, \
recvreq->req_recv.req_base.req_tag, \
&recvreq->req_recv.req_convertor, \
&recvreq->req_mtl)); \
comm, \
src, \
tag, \
&recvreq->req_base.req_convertor, \
&recvreq->req_mtl)); \
} while (0)
#define MCA_PML_CM_HVY_RECV_REQUEST_START(request, ret) \
do { \
/* opal_output(0, "posting hvy request %d\n", request); */ \
/* init/re-init the request */ \
request->req_base.req_pml_complete = false; \
request->req_base.req_ompi.req_complete = false; \
request->req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
\
/* always set the req_status.MPI_TAG to ANY_TAG before starting the \
* request. This field is used if cancelled to find out if the request \
* has been matched or not. \
*/ \
request->req_base.req_ompi.req_status.MPI_TAG = OMPI_ANY_TAG; \
request->req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; \
request->req_base.req_ompi.req_status._cancelled = 0; \
ret = OMPI_MTL_CALL(irecv(ompi_mtl, \
request->req_comm, \
request->req_peer, \
request->req_tag, \
&recvreq->req_base.req_convertor, \
&recvreq->req_mtl)); \
} while (0)
@ -116,10 +204,33 @@ do { \
*
* @param recvreq (IN) Receive request.
*/
#define MCA_PML_CM_RECV_REQUEST_MPI_COMPLETE( recvreq ) \
#define MCA_PML_CM_THIN_RECV_REQUEST_MPI_COMPLETE( recvreq ) \
do { \
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_recv.req_base.req_ompi) ); \
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_base.req_ompi) ); \
} while (0)
/**
* Return a recv request to the modules free list.
*
* @param recvreq (IN) Receive request.
*/
#define MCA_PML_CM_THIN_RECV_REQUEST_PML_COMPLETE(recvreq) \
do { \
assert( false == recvreq->req_base.req_pml_complete ); \
\
OPAL_THREAD_LOCK(&ompi_request_lock); \
\
if( true == recvreq->req_base.req_free_called ) { \
MCA_PML_CM_THIN_RECV_REQUEST_RETURN( recvreq ); \
} else { \
recvreq->req_base.req_pml_complete = true; \
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_base.req_ompi) ); \
} \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
} while(0)
/**
@ -127,23 +238,23 @@ do { \
*
* @param recvreq (IN) Receive request.
*/
#define MCA_PML_CM_RECV_REQUEST_PML_COMPLETE(recvreq) \
#define MCA_PML_CM_HVY_RECV_REQUEST_PML_COMPLETE(recvreq) \
do { \
assert( false == recvreq->req_recv.req_base.req_pml_complete ); \
assert( false == recvreq->req_base.req_pml_complete ); \
\
OPAL_THREAD_LOCK(&ompi_request_lock); \
\
if( true == recvreq->req_recv.req_base.req_free_called ) { \
MCA_PML_CM_RECV_REQUEST_RETURN( recvreq ); \
if( true == recvreq->req_base.req_free_called ) { \
MCA_PML_CM_HVY_RECV_REQUEST_RETURN( recvreq ); \
} else { \
/* initialize request status */ \
if(recvreq->req_recv.req_base.req_ompi.req_persistent) { \
if(recvreq->req_base.req_ompi.req_persistent) { \
/* rewind convertor */ \
size_t offset = 0; \
ompi_convertor_set_position(&recvreq->req_recv.req_convertor, &offset); \
ompi_convertor_set_position(&recvreq->req_base.req_convertor, &offset); \
} \
recvreq->req_recv.req_base.req_pml_complete = true; \
MCA_PML_CM_RECV_REQUEST_MPI_COMPLETE( recvreq ); \
recvreq->req_base.req_pml_complete = true; \
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &(recvreq->req_base.req_ompi) ); \
} \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
} while(0)
@ -152,12 +263,24 @@ do { \
/**
* Free the PML receive request
*/
#define MCA_PML_CM_RECV_REQUEST_RETURN(recvreq) \
#define MCA_PML_CM_HVY_RECV_REQUEST_RETURN(recvreq) \
{ \
MCA_PML_BASE_RECV_REQUEST_FINI(&(recvreq)->req_recv); \
OMPI_FREE_LIST_RETURN( &ompi_pml_cm.cm_recv_requests, \
OMPI_REQUEST_FINI(&(recvreq)->req_base.req_ompi); \
ompi_convertor_cleanup( &((recvreq)->req_base.req_convertor) ); \
OMPI_FREE_LIST_RETURN( &ompi_pml_cm.cm_hvy_recv_requests, \
(ompi_free_list_item_t*)(recvreq)); \
}
/**
* Free the PML receive request
*/
#define MCA_PML_CM_THIN_RECV_REQUEST_RETURN(recvreq) \
{ \
OMPI_REQUEST_FINI(&(recvreq)->req_base.req_ompi); \
ompi_convertor_cleanup( &((recvreq)->req_base.req_convertor) ); \
OMPI_FREE_LIST_RETURN( &ompi_pml_cm.cm_thin_recv_requests, \
(ompi_free_list_item_t*)(recvreq)); \
}
#endif

Просмотреть файл

@ -28,14 +28,17 @@ mca_pml_cm_isend_init(void* buf,
ompi_request_t** request)
{
int ret;
mca_pml_cm_send_request_t *sendreq;
MCA_PML_CM_SEND_REQUEST_ALLOC(comm, dst, sendreq, ret);
mca_pml_cm_hvy_send_request_t *sendreq;
ompi_proc_t* ompi_proc;
MCA_PML_CM_HVY_SEND_REQUEST_ALLOC(sendreq, comm, dst, ompi_proc, ret);
if (NULL == sendreq || OMPI_SUCCESS != ret) return ret;
MCA_PML_CM_SEND_REQUEST_INIT(sendreq, buf, count,
datatype, dst, tag, comm,
sendmode, false, true);
MCA_PML_CM_HVY_SEND_REQUEST_INIT(sendreq, ompi_proc, comm, tag, dst,
datatype, sendmode, true, false, buf, count);
sendreq->req_send.req_base.req_pml_type = MCA_PML_CM_REQUEST_SEND;
*request = (ompi_request_t*) sendreq;
return OMPI_SUCCESS;
@ -53,19 +56,60 @@ mca_pml_cm_isend(void* buf,
ompi_request_t** request)
{
int ret;
mca_pml_cm_send_request_t *sendreq;
MCA_PML_CM_SEND_REQUEST_ALLOC(comm, dst, sendreq, ret);
if (NULL == sendreq || OMPI_SUCCESS != ret) return ret;
MCA_PML_CM_SEND_REQUEST_INIT(sendreq, buf, count,
datatype, dst, tag, comm,
sendmode, false, false);
MCA_PML_CM_SEND_REQUEST_START(sendreq, ret);
if (OMPI_SUCCESS == ret) *request = (ompi_request_t*) sendreq;
if(sendmode == MCA_PML_BASE_SEND_BUFFERED ) {
mca_pml_cm_hvy_send_request_t* sendreq;
ompi_proc_t* ompi_proc;
MCA_PML_CM_HVY_SEND_REQUEST_ALLOC(sendreq, comm, dst, ompi_proc, ret);
if (NULL == sendreq || OMPI_SUCCESS != ret) return ret;
MCA_PML_CM_HVY_SEND_REQUEST_INIT(sendreq,
ompi_proc,
comm,
tag,
dst,
datatype,
sendmode,
false,
false,
buf,
count);
MCA_PML_CM_HVY_SEND_REQUEST_START( sendreq, ret);
if (OMPI_SUCCESS == ret) *request = (ompi_request_t*) sendreq;
} else {
mca_pml_cm_thin_send_request_t* sendreq;
ompi_proc_t* ompi_proc;
MCA_PML_CM_THIN_SEND_REQUEST_ALLOC(sendreq, comm, dst, ompi_proc, ret);
if (NULL == sendreq || OMPI_SUCCESS != ret) return ret;
MCA_PML_CM_THIN_SEND_REQUEST_INIT(sendreq,
ompi_proc,
comm,
tag,
dst,
datatype,
sendmode,
buf,
count);
MCA_PML_CM_THIN_SEND_REQUEST_START(
sendreq,
comm,
tag,
dst,
sendmode,
false,
ret);
if (OMPI_SUCCESS == ret) *request = (ompi_request_t*) sendreq;
}
return ret;
}
@ -80,62 +124,95 @@ mca_pml_cm_send(void *buf,
ompi_communicator_t* comm)
{
int ret;
mca_pml_cm_send_request_t *sendreq;
MCA_PML_CM_SEND_REQUEST_ALLOC(comm, dst, sendreq, ret);
if (NULL == sendreq || OMPI_SUCCESS != ret) return ret;
MCA_PML_CM_SEND_REQUEST_INIT(sendreq, buf, count,
datatype, dst, tag, comm,
sendmode, true, false);
if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED ) {
MCA_PML_CM_SEND_REQUEST_START(sendreq, ret);
if(sendmode == MCA_PML_BASE_SEND_BUFFERED) {
mca_pml_cm_hvy_send_request_t *sendreq;
ompi_proc_t * ompi_proc;
MCA_PML_CM_HVY_SEND_REQUEST_ALLOC(sendreq, comm, dst, ompi_proc, ret);
if (NULL == sendreq || OMPI_SUCCESS != ret) return ret;
MCA_PML_CM_HVY_SEND_REQUEST_INIT(sendreq,
ompi_proc,
comm,
tag,
dst,
datatype,
sendmode,
false,
false,
buf,
count);
MCA_PML_CM_HVY_SEND_REQUEST_START(sendreq, ret);
if (OMPI_SUCCESS != ret) {
MCA_PML_CM_SEND_REQUEST_RETURN(sendreq);
MCA_PML_CM_HVY_SEND_REQUEST_RETURN(sendreq);
return ret;
}
} else if (NULL == ompi_mtl->mtl_send) {
MCA_PML_CM_SEND_REQUEST_START(sendreq, ret);
if (OMPI_SUCCESS != ret) {
MCA_PML_CM_SEND_REQUEST_RETURN(sendreq);
return ret;
}
if (sendreq->req_send.req_base.req_ompi.req_complete == false) {
/* give up and sleep until completion */
if (opal_using_threads()) {
opal_mutex_lock(&ompi_request_lock);
ompi_request_waiting++;
while (sendreq->req_send.req_base.req_ompi.req_complete == false)
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--;
opal_mutex_unlock(&ompi_request_lock);
} else {
ompi_request_waiting++;
while (sendreq->req_send.req_base.req_ompi.req_complete == false)
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--;
ompi_request_free( (ompi_request_t**)&sendreq );
return ret;
} else {
mca_pml_cm_thin_send_request_t *sendreq;
ompi_proc_t * ompi_proc;
MCA_PML_CM_THIN_SEND_REQUEST_ALLOC(sendreq, comm, dst, ompi_proc, ret);
if (NULL == sendreq || OMPI_SUCCESS != ret) return ret;
MCA_PML_CM_THIN_SEND_REQUEST_INIT(sendreq,
ompi_proc,
comm,
tag,
dst,
datatype,
sendmode,
buf,
count);
if (NULL == ompi_mtl->mtl_send) {
MCA_PML_CM_THIN_SEND_REQUEST_START(sendreq,
comm,
tag,
dst,
sendmode,
false,
ret);
if (OMPI_SUCCESS != ret) {
MCA_PML_CM_THIN_SEND_REQUEST_RETURN(sendreq);
return ret;
}
if (sendreq->req_send.req_base.req_ompi.req_complete == false) {
/* give up and sleep until completion */
if (opal_using_threads()) {
opal_mutex_lock(&ompi_request_lock);
ompi_request_waiting++;
while (sendreq->req_send.req_base.req_ompi.req_complete == false)
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--;
opal_mutex_unlock(&ompi_request_lock);
} else {
ompi_request_waiting++;
while (sendreq->req_send.req_base.req_ompi.req_complete == false)
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
ompi_request_waiting--;
}
}
} else {
MCA_PML_CM_SEND_REQUEST_START_SETUP((&sendreq->req_send));
if (OMPI_SUCCESS != ret) {
MCA_PML_CM_THIN_SEND_REQUEST_RETURN(sendreq);
return ret;
}
ret = OMPI_MTL_CALL(send(ompi_mtl,
comm,
dst,
tag,
&sendreq->req_send.req_base.req_convertor,
sendmode));
MCA_PML_CM_THIN_SEND_REQUEST_PML_COMPLETE(sendreq);
}
} else {
MCA_PML_CM_SEND_REQUEST_START_SETUP(sendreq, ret);
if (OMPI_SUCCESS != ret) {
MCA_PML_CM_SEND_REQUEST_RETURN(sendreq);
return ret;
}
ret = OMPI_MTL_CALL(send(ompi_mtl,
sendreq->req_send.req_base.req_comm,
sendreq->req_send.req_base.req_peer,
sendreq->req_send.req_base.req_tag,
&sendreq->req_send.req_convertor,
sendreq->req_send.req_send_mode));
MCA_PML_CM_SEND_REQUEST_PML_COMPLETE(sendreq);
ompi_request_free( (ompi_request_t**)&sendreq );
return ret;
}
ompi_request_free( (ompi_request_t**)&sendreq );
return ret;
return OMPI_ERROR;
}

Просмотреть файл

@ -22,6 +22,49 @@
#include "pml_cm_sendreq.h"
static int
mca_pml_cm_thin_send_request_free(struct ompi_request_t** request);
static int
mca_pml_cm_hvy_send_request_free(struct ompi_request_t** request);
static void mca_pml_cm_thin_send_request_construct(mca_pml_cm_thin_send_request_t* req);
static void mca_pml_cm_hvy_send_request_construct(mca_pml_cm_hvy_send_request_t* sendreq);
OBJ_CLASS_INSTANCE(mca_pml_cm_send_request_t,
mca_pml_cm_request_t,
NULL,
NULL);
OBJ_CLASS_INSTANCE(mca_pml_cm_thin_send_request_t,
mca_pml_cm_send_request_t,
mca_pml_cm_thin_send_request_construct,
NULL);
OBJ_CLASS_INSTANCE(mca_pml_cm_hvy_send_request_t,
mca_pml_cm_send_request_t,
mca_pml_cm_hvy_send_request_construct,
NULL);
static void mca_pml_cm_thin_send_request_construct(mca_pml_cm_thin_send_request_t* sendreq)
{
/* no need to reinit for every send -- never changes */
sendreq->req_mtl.ompi_req = (ompi_request_t*) sendreq;
sendreq->req_mtl.completion_callback = mca_pml_cm_thin_send_request_completion;
sendreq->req_send.req_base.req_ompi.req_free = mca_pml_cm_thin_send_request_free;
sendreq->req_send.req_base.req_ompi.req_cancel = mca_pml_cm_cancel;
}
static void mca_pml_cm_hvy_send_request_construct(mca_pml_cm_hvy_send_request_t* sendreq)
{
/* no need to reinit for every send -- never changes */
sendreq->req_mtl.ompi_req = (ompi_request_t*) sendreq;
sendreq->req_mtl.completion_callback = mca_pml_cm_hvy_send_request_completion;
sendreq->req_send.req_base.req_ompi.req_free = mca_pml_cm_hvy_send_request_free;
sendreq->req_send.req_base.req_ompi.req_cancel = mca_pml_cm_cancel;
}
/*
* The free call mark the final stage in a request
* life-cycle. Starting from this point the request is completed at
@ -30,46 +73,37 @@
* added to the free request list.
*/
static int
mca_pml_cm_send_request_free(struct ompi_request_t** request)
mca_pml_cm_thin_send_request_free(struct ompi_request_t** request)
{
mca_pml_cm_send_request_t* sendreq = *(mca_pml_cm_send_request_t**)request;
assert( false == sendreq->req_send.req_base.req_free_called );
assert( false == sendreq->req_base.req_free_called );
OPAL_THREAD_LOCK(&ompi_request_lock);
sendreq->req_send.req_base.req_free_called = true;
if( true == sendreq->req_send.req_base.req_pml_complete ) {
MCA_PML_CM_SEND_REQUEST_RETURN( sendreq );
sendreq->req_base.req_free_called = true;
if( true == sendreq->req_base.req_pml_complete ) {
MCA_PML_CM_THIN_SEND_REQUEST_RETURN( ((mca_pml_cm_thin_send_request_t*) sendreq) );
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
*request = MPI_REQUEST_NULL;
return OMPI_SUCCESS;
}
static void
sendreq_construct(mca_pml_cm_send_request_t* sendreq)
static int
mca_pml_cm_hvy_send_request_free(struct ompi_request_t** request)
{
sendreq->req_mtl.ompi_req = (ompi_request_t*) sendreq;
sendreq->req_mtl.completion_callback = mca_pml_cm_request_completion;
sendreq->req_send.req_base.req_ompi.req_free = mca_pml_cm_send_request_free;
sendreq->req_send.req_base.req_ompi.req_cancel = mca_pml_cm_cancel;
mca_pml_cm_send_request_t* sendreq = *(mca_pml_cm_send_request_t**)request;
assert( false == sendreq->req_base.req_free_called );
OPAL_THREAD_LOCK(&ompi_request_lock);
sendreq->req_base.req_free_called = true;
if( true == sendreq->req_base.req_pml_complete ) {
MCA_PML_CM_HVY_SEND_REQUEST_RETURN( ((mca_pml_cm_hvy_send_request_t*) sendreq) );
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
*request = MPI_REQUEST_NULL;
return OMPI_SUCCESS;
}
static void
sendreq_destruct(mca_pml_cm_send_request_t* sendreq)
{
sendreq->req_mtl.ompi_req = NULL;
sendreq->req_mtl.completion_callback = NULL;
}
OBJ_CLASS_INSTANCE(mca_pml_cm_send_request_t,
mca_pml_base_send_request_t,
sendreq_construct,
sendreq_destruct);

Просмотреть файл

@ -19,120 +19,251 @@
#ifndef PML_CM_SENDREQ_H
#define PML_CM_SENDREQ_H
#include "pml_cm_request.h"
#include "ompi/mca/pml/base/pml_base_sendreq.h"
#include "ompi/mca/pml/base/pml_base_bsend.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/mtl/mtl.h"
struct mca_pml_cm_send_request_t {
mca_pml_base_send_request_t req_send;
mca_mtl_request_t req_mtl;
bool req_blocking;
struct mca_pml_cm_send_request_t {
mca_pml_cm_request_t req_base;
mca_pml_base_send_mode_t req_send_mode;
};
typedef struct mca_pml_cm_send_request_t mca_pml_cm_send_request_t;
OBJ_CLASS_DECLARATION(mca_pml_cm_send_request_t);
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_cm_send_request_t);
#define MCA_PML_CM_SEND_REQUEST_ALLOC(comm, dst, sendreq, rc) \
struct mca_pml_cm_thin_send_request_t {
mca_pml_cm_send_request_t req_send;
mca_mtl_request_t req_mtl; /**< the mtl specific memory */
};
typedef struct mca_pml_cm_thin_send_request_t mca_pml_cm_thin_send_request_t;
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_cm_thin_send_request_t);
struct mca_pml_cm_hvy_send_request_t {
mca_pml_cm_send_request_t req_send;
void *req_addr; /**< pointer to application buffer */
size_t req_count; /**< count of user datatype elements */
int32_t req_peer; /**< peer process - rank w/in this communicator */
int32_t req_tag; /**< user defined tag */
struct ompi_communicator_t *req_comm; /**< communicator pointer */
struct ompi_datatype_t *req_datatype; /**< pointer to data type */
void *req_buff; /**< pointer to send buffer - may not be application buffer */
bool req_blocking;
mca_mtl_request_t req_mtl; /**< the mtl specific memory */
};
typedef struct mca_pml_cm_hvy_send_request_t mca_pml_cm_hvy_send_request_t;
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_cm_hvy_send_request_t);
#define MCA_PML_CM_THIN_SEND_REQUEST_ALLOC(sendreq, comm, dst, \
ompi_proc, rc) \
{ \
ompi_proc_t *proc = \
comm->c_pml_procs[dst]->proc_ompi; \
ompi_free_list_item_t* item; \
do{ \
ompi_free_list_item_t* item; \
ompi_proc = \
comm->c_pml_procs[dst]->proc_ompi; \
\
if(NULL == proc) { \
if(NULL == ompi_proc) { \
rc = OMPI_ERR_OUT_OF_RESOURCE; \
sendreq = NULL; \
} else { \
rc = OMPI_SUCCESS; \
OMPI_FREE_LIST_WAIT(&ompi_pml_cm.cm_thin_send_requests, \
item, rc); \
sendreq = (mca_pml_cm_thin_send_request_t*)item; \
} \
}while(0); \
}
#define MCA_PML_CM_HVY_SEND_REQUEST_ALLOC(sendreq, comm, dst, \
ompi_proc, rc) \
{ \
ompi_free_list_item_t* item; \
ompi_proc = \
comm->c_pml_procs[dst]->proc_ompi; \
if(NULL == ompi_proc) { \
rc = OMPI_ERR_OUT_OF_RESOURCE; \
sendreq = NULL; \
} else { \
rc = OMPI_SUCCESS; \
OMPI_FREE_LIST_WAIT(&ompi_pml_cm.cm_send_requests, item, rc); \
sendreq = (mca_pml_cm_send_request_t*)item; \
sendreq->req_send.req_base.req_proc = proc; \
OMPI_FREE_LIST_WAIT(&ompi_pml_cm.cm_hvy_send_requests, \
item, rc); \
sendreq = (mca_pml_cm_hvy_send_request_t*)item; \
} \
}
#define MCA_PML_CM_SEND_REQUEST_INIT( sendreq, \
buf, \
count, \
datatype, \
dst, \
tag, \
comm, \
sendmode, \
blocking, \
persistent) \
#define MCA_PML_CM_SEND_REQUEST_INIT_COMMON(req_send, \
ompi_proc, \
comm, \
tag, \
datatype, \
sendmode, \
buf, \
count) \
{ \
MCA_PML_BASE_SEND_REQUEST_INIT(&sendreq->req_send, \
buf, \
count, \
datatype, \
dst, \
tag, \
comm, \
sendmode, \
persistent); \
/* BWB - XXX - fix me later */ \
if (count == 0) { \
ompi_convertor_copy_and_prepare_for_send( \
(sendreq)->req_send.req_base.req_proc->proc_convertor, \
(sendreq)->req_send.req_base.req_datatype, \
(sendreq)->req_send.req_base.req_count, \
(sendreq)->req_send.req_base.req_addr, \
0, \
&(sendreq)->req_send.req_convertor ); \
ompi_convertor_get_packed_size( &(sendreq)->req_send.req_convertor, \
&((sendreq)->req_send.req_bytes_packed) ); \
} \
ompi_convertor_copy_and_prepare_for_send( \
ompi_proc->proc_convertor, \
datatype, \
count, \
buf, \
0, \
&req_send->req_base.req_convertor ); \
req_send->req_base.req_ompi.req_status.MPI_SOURCE = \
comm->c_my_rank; \
req_send->req_base.req_ompi.req_status.MPI_TAG = tag; \
req_send->req_base.req_ompi.req_status._count = count; \
req_send->req_send_mode = sendmode; \
req_send->req_base.req_free_called = false; \
}
#define MCA_PML_CM_HVY_SEND_REQUEST_INIT( sendreq, \
ompi_proc, \
comm, \
tag, \
dst, \
datatype, \
sendmode, \
persistent, \
blocking, \
buf, \
count) \
{ \
do { \
OMPI_REQUEST_INIT(&(sendreq->req_send.req_base.req_ompi), \
persistent); \
sendreq->req_comm = comm; \
sendreq->req_tag = tag; \
sendreq->req_peer = dst; \
sendreq->req_datatype = datatype; \
sendreq->req_addr = buf; \
sendreq->req_count = count; \
MCA_PML_CM_SEND_REQUEST_INIT_COMMON( \
(&sendreq->req_send), \
ompi_proc, \
comm, \
tag, \
datatype, \
sendmode, \
buf, \
count); \
ompi_convertor_get_packed_size( \
&sendreq->req_send.req_base.req_convertor, \
&sendreq->req_count ); \
\
sendreq->req_blocking = blocking; \
\
sendreq->req_blocking = blocking; \
sendreq->req_send.req_base.req_pml_complete = \
(persistent ? true:false); \
}while(0); \
}
#define MCA_PML_CM_SEND_REQUEST_START_SETUP(sendreq, ret) \
do { \
MCA_PML_BASE_SEND_START( &sendreq->req_send.req_base ); \
ret = OMPI_SUCCESS; \
if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \
ret =mca_pml_base_bsend_request_start(&sendreq->req_send.req_base.req_ompi); \
} \
#define MCA_PML_CM_THIN_SEND_REQUEST_INIT( sendreq, \
ompi_proc, \
comm, \
tag, \
dst, \
datatype, \
sendmode, \
buf, \
count) \
{ \
do { \
OMPI_REQUEST_INIT(&(sendreq->req_send.req_base.req_ompi), \
false); \
MCA_PML_CM_SEND_REQUEST_INIT_COMMON( \
(&sendreq->req_send), \
ompi_proc, \
comm, \
tag, \
datatype, \
sendmode, \
buf, \
count); \
sendreq->req_send.req_base.req_pml_complete = false; \
}while(0); \
}
#define MCA_PML_CM_SEND_REQUEST_START_SETUP(req_send) \
do { \
\
req_send->req_base.req_pml_complete = false; \
req_send->req_base.req_ompi.req_complete = false; \
req_send->req_base.req_ompi.req_state = \
OMPI_REQUEST_ACTIVE; \
req_send->req_base.req_ompi.req_status._cancelled = 0; \
\
} while (0)
#define MCA_PML_CM_SEND_REQUEST_START(sendreq, ret) \
#define MCA_PML_CM_THIN_SEND_REQUEST_START(sendreq, \
comm, \
tag, \
dst, \
sendmode, \
blocking, \
ret) \
do { \
MCA_PML_CM_SEND_REQUEST_START_SETUP(sendreq, ret); \
if (OMPI_SUCCESS == ret) { \
ret = OMPI_MTL_CALL(isend(ompi_mtl, \
sendreq->req_send.req_base.req_comm, \
sendreq->req_send.req_base.req_peer, \
sendreq->req_send.req_base.req_tag, \
&sendreq->req_send.req_convertor, \
sendreq->req_send.req_send_mode, \
sendreq->req_blocking, \
&sendreq->req_mtl)); \
if(OMPI_SUCCESS == ret && \
sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \
MCA_PML_CM_SEND_REQUEST_MPI_COMPLETE(sendreq); \
} \
MCA_PML_CM_SEND_REQUEST_START_SETUP((&sendreq->req_send)); \
ret = OMPI_MTL_CALL(isend(ompi_mtl, \
comm, \
dst, \
tag, \
&sendreq->req_send.req_base.req_convertor, \
sendmode, \
blocking, \
&sendreq->req_mtl)); \
} while (0)
#define MCA_PML_CM_HVY_SEND_REQUEST_BSEND_ALLOC(sendreq) \
do { \
struct iovec iov; \
unsigned int iov_count; \
size_t max_data; \
int freeAfter; \
\
if(sendreq->req_count > 0) { \
sendreq->req_addr = \
mca_pml_base_bsend_request_alloc_buf(sendreq->req_count); \
iov.iov_base = sendreq->req_addr; \
max_data = iov.iov_len = sendreq->req_count; \
iov_count = 1; \
ompi_convertor_pack( &sendreq->req_send.req_base.req_convertor, \
&iov, \
&iov_count, \
&max_data, &freeAfter); \
ompi_convertor_prepare_for_send( &sendreq->req_send.req_base.req_convertor, MPI_PACKED,\
max_data, sendreq->req_addr ); \
} \
} while (0)
} while(0);
/*
* Mark a send request as completed at the MPI level.
*/
#define MCA_PML_CM_SEND_REQUEST_MPI_COMPLETE(sendreq) \
#define MCA_PML_CM_HVY_SEND_REQUEST_START(sendreq, ret) \
do { \
(sendreq)->req_send.req_base.req_ompi.req_status.MPI_SOURCE = \
(sendreq)->req_send.req_base.req_comm->c_my_rank; \
(sendreq)->req_send.req_base.req_ompi.req_status.MPI_TAG = \
(sendreq)->req_send.req_base.req_tag; \
(sendreq)->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; \
(sendreq)->req_send.req_base.req_ompi.req_status._count = \
(sendreq)->req_send.req_bytes_packed; \
MCA_PML_BASE_REQUEST_MPI_COMPLETE( &((sendreq)->req_send.req_base.req_ompi) ); \
} while(0)
MCA_PML_CM_SEND_REQUEST_START_SETUP((&sendreq->req_send)); \
if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \
MCA_PML_CM_HVY_SEND_REQUEST_BSEND_ALLOC(sendreq); \
} \
ret = OMPI_MTL_CALL(isend(ompi_mtl, \
sendreq->req_comm, \
sendreq->req_peer, \
sendreq->req_tag, \
&sendreq->req_send.req_base.req_convertor, \
sendreq->req_send.req_send_mode, \
sendreq->req_blocking, \
&sendreq->req_mtl)); \
if(OMPI_SUCCESS == ret && \
sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \
MCA_PML_BASE_REQUEST_MPI_COMPLETE(&(sendreq->req_send.req_base.req_ompi)); \
} \
} while (0)
/*
* The PML has completed a send request. Note that this request
@ -141,43 +272,85 @@ do { \
* This macro will never be called directly from the upper level, as it should
* only be an internal call to the PML.
*/
#define MCA_PML_CM_SEND_REQUEST_PML_COMPLETE(sendreq) \
#define MCA_PML_CM_HVY_SEND_REQUEST_PML_COMPLETE(sendreq) \
do { \
assert( false == sendreq->req_send.req_base.req_pml_complete ); \
assert( false == sendreq->req_send.req_base.req_pml_complete ); \
\
if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \
} \
if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \
sendreq->req_count > 0 ) { \
mca_pml_base_bsend_request_free(sendreq->req_addr); \
} \
\
OPAL_THREAD_LOCK(&ompi_request_lock); \
if( false == sendreq->req_send.req_base.req_ompi.req_complete ) { \
/* Should only be called for long messages (maybe synchronous) */ \
MCA_PML_CM_SEND_REQUEST_MPI_COMPLETE(sendreq); \
} \
sendreq->req_send.req_base.req_pml_complete = true; \
OPAL_THREAD_LOCK(&ompi_request_lock); \
if( false == sendreq->req_send.req_base.req_ompi.req_complete ) { \
/* Should only be called for long messages (maybe synchronous) */ \
MCA_PML_BASE_REQUEST_MPI_COMPLETE(&(sendreq->req_send.req_base.req_ompi)); \
} \
sendreq->req_send.req_base.req_pml_complete = true; \
\
if( sendreq->req_send.req_base.req_free_called ) { \
MCA_PML_CM_SEND_REQUEST_RETURN( sendreq ); \
} else { \
if(sendreq->req_send.req_base.req_ompi.req_persistent) { \
/* rewind convertor */ \
size_t offset = 0; \
ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset); \
} \
if( sendreq->req_send.req_base.req_free_called ) { \
MCA_PML_CM_HVY_SEND_REQUEST_RETURN( sendreq ); \
} else { \
if(sendreq->req_send.req_base.req_ompi.req_persistent) { \
/* rewind convertor */ \
size_t offset = 0; \
ompi_convertor_set_position(&sendreq->req_send.req_base.req_convertor, \
&offset); \
} \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
} while (0)
} \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
} while (0)
/*
* Release resources associated with a request
*/
#define MCA_PML_CM_SEND_REQUEST_RETURN(sendreq) \
{ \
/* Let the base handle the reference counts */ \
MCA_PML_BASE_SEND_REQUEST_FINI((&(sendreq)->req_send)); \
OMPI_FREE_LIST_RETURN( \
&ompi_pml_cm.cm_send_requests, (ompi_free_list_item_t*)sendreq); \
}
#define MCA_PML_CM_HVY_SEND_REQUEST_RETURN(sendreq) \
{ \
/* Let the base handle the reference counts */ \
OMPI_REQUEST_FINI(&sendreq->req_send.req_base.req_ompi); \
ompi_convertor_cleanup( &(sendreq->req_send.req_base.req_convertor) ); \
OMPI_FREE_LIST_RETURN( \
&ompi_pml_cm.cm_hvy_send_requests, \
(ompi_free_list_item_t*)sendreq); \
}
/*
* The PML has completed a send request. Note that this request
* may have been orphaned by the user or have already completed
* at the MPI level.
* This macro will never be called directly from the upper level, as it should
* only be an internal call to the PML.
*/
#define MCA_PML_CM_THIN_SEND_REQUEST_PML_COMPLETE(sendreq) \
do { \
assert( false == sendreq->req_send.req_base.req_pml_complete ); \
\
OPAL_THREAD_LOCK(&ompi_request_lock); \
if( false == sendreq->req_send.req_base.req_ompi.req_complete ) { \
/* Should only be called for long messages (maybe synchronous) */ \
MCA_PML_BASE_REQUEST_MPI_COMPLETE(&(sendreq->req_send.req_base.req_ompi)); \
} \
sendreq->req_send.req_base.req_pml_complete = true; \
\
if( sendreq->req_send.req_base.req_free_called ) { \
MCA_PML_CM_THIN_SEND_REQUEST_RETURN( sendreq ); \
} \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \
} while (0)
/*
* Release resources associated with a request
*/
#define MCA_PML_CM_THIN_SEND_REQUEST_RETURN(sendreq) \
{ \
/* Let the base handle the reference counts */ \
OMPI_REQUEST_FINI(&sendreq->req_send.req_base.req_ompi); \
ompi_convertor_cleanup( &(sendreq->req_send.req_base.req_convertor) ); \
OMPI_FREE_LIST_RETURN( \
&ompi_pml_cm.cm_thin_send_requests, \
(ompi_free_list_item_t*)sendreq); \
}
#endif

Просмотреть файл

@ -32,12 +32,13 @@ mca_pml_cm_start(size_t count, ompi_request_t** requests)
{
int rc;
size_t i;
for (i = 0 ; i < count ; i++) {
mca_pml_base_request_t *pml_request =
(mca_pml_base_request_t*)requests[i];
if (NULL == pml_request) continue;
mca_pml_cm_request_t *pml_request =
(mca_pml_cm_request_t*)requests[i];
if (NULL == pml_request) {
/* opal_output(0, "hmm, null request!\n"); */
continue;
}
/* If the persistent request is currebtly active - obtain the
* request lock and verify the status is incomplete. if the
* pml layer has not completed the request - mark the request
@ -45,88 +46,97 @@ mca_pml_cm_start(size_t count, ompi_request_t** requests)
* completes - and create a new request.
*/
switch (pml_request->req_ompi.req_state) {
case OMPI_REQUEST_INACTIVE:
if (pml_request->req_pml_complete == true)
break;
case OMPI_REQUEST_ACTIVE: {
/* otherwise fall through */
ompi_request_t *request;
OPAL_THREAD_LOCK(&ompi_request_lock);
if (pml_request->req_pml_complete == false) {
/* free request after it completes */
pml_request->req_free_called = true;
} else {
/* can reuse the existing request */
OPAL_THREAD_UNLOCK(&ompi_request_lock);
break;
}
/* allocate a new request */
switch (pml_request->req_type) {
case MCA_PML_REQUEST_SEND: {
mca_pml_base_send_mode_t sendmode =
((mca_pml_base_send_request_t*)pml_request)->req_send_mode;
rc = mca_pml_cm_isend_init(
pml_request->req_addr,
pml_request->req_count,
pml_request->req_datatype,
pml_request->req_peer,
pml_request->req_tag,
sendmode,
pml_request->req_comm,
&request);
break;
}
case MCA_PML_REQUEST_RECV:
rc = mca_pml_cm_irecv_init(
pml_request->req_addr,
pml_request->req_count,
pml_request->req_datatype,
pml_request->req_peer,
pml_request->req_tag,
pml_request->req_comm,
&request);
break;
default:
rc = OMPI_ERR_REQUEST;
break;
}
case OMPI_REQUEST_INACTIVE:
/* opal_output(0, "ereIbe!\n"); */
if (pml_request->req_pml_complete == true)
break;
case OMPI_REQUEST_ACTIVE: {
/* otherwise fall through */
ompi_request_t *request;
/* opal_output(0, "ereIam!\n"); */
OPAL_THREAD_LOCK(&ompi_request_lock);
if (pml_request->req_pml_complete == false) {
/* free request after it completes */
pml_request->req_free_called = true;
} else {
/* can reuse the existing request */
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(OMPI_SUCCESS != rc)
return rc;
pml_request = (mca_pml_base_request_t*)request;
requests[i] = request;
break;
}
/* allocate a new request */
switch (pml_request->req_pml_type) {
case MCA_PML_CM_REQUEST_SEND: {
mca_pml_cm_hvy_send_request_t* sendreq = (mca_pml_cm_hvy_send_request_t*) pml_request;
rc = mca_pml_cm_isend_init(
sendreq->req_addr,
sendreq->req_count,
sendreq->req_datatype,
sendreq->req_peer,
sendreq->req_tag,
sendreq->req_send.req_send_mode,
sendreq->req_comm,
&request);
break;
}
case MCA_PML_CM_REQUEST_RECV:{
mca_pml_cm_hvy_recv_request_t* recvreq = (mca_pml_cm_hvy_recv_request_t*) pml_request;
rc = mca_pml_cm_irecv_init(
recvreq->req_addr,
recvreq->req_count,
recvreq->req_datatype,
recvreq->req_peer,
recvreq->req_tag,
recvreq->req_comm,
&request);
break;
}
default:
return OMPI_ERR_REQUEST;
rc = OMPI_ERR_REQUEST;
break;
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(OMPI_SUCCESS != rc)
return rc;
pml_request = (mca_pml_cm_request_t*)request;
requests[i] = request;
break;
}
default:
/* opal_output(0,"unknown request state!\n"); */
return OMPI_ERR_REQUEST;
}
/* start the request */
switch (pml_request->req_type) {
case MCA_PML_REQUEST_SEND:
switch (pml_request->req_pml_type) {
case MCA_PML_CM_REQUEST_SEND:
{
mca_pml_cm_send_request_t* sendreq =
(mca_pml_cm_send_request_t*)pml_request;
MCA_PML_CM_SEND_REQUEST_START(sendreq, rc);
mca_pml_cm_hvy_send_request_t* sendreq =
(mca_pml_cm_hvy_send_request_t*)pml_request;
MCA_PML_CM_HVY_SEND_REQUEST_START(sendreq, rc);
if(rc != OMPI_SUCCESS)
return rc;
break;
}
case MCA_PML_REQUEST_RECV:
case MCA_PML_CM_REQUEST_RECV:
{
mca_pml_cm_recv_request_t* recvreq =
(mca_pml_cm_recv_request_t*)pml_request;
MCA_PML_CM_RECV_REQUEST_START(recvreq, rc);
mca_pml_cm_hvy_recv_request_t* recvreq =
(mca_pml_cm_hvy_recv_request_t*)pml_request;
/* opal_output(0, "calling recv request start\n"); */
MCA_PML_CM_HVY_RECV_REQUEST_START(recvreq, rc);
if(rc != OMPI_SUCCESS)
return rc;
break;
}
default:
return OMPI_ERR_REQUEST;
default:
/* opal_output(0, "can't start an unknown request type!"); */
return OMPI_ERR_REQUEST;
}
}
return OMPI_SUCCESS;
}