cleanup to correct mpi semantics
This commit was SVN r3214.
Этот коммит содержится в:
родитель
d107efd77b
Коммит
7628c9f16a
@ -32,6 +32,8 @@ ompi_errcode_intern_t ompi_err_would_block;
|
||||
ompi_errcode_intern_t ompi_err_in_errno;
|
||||
ompi_errcode_intern_t ompi_err_unreach;
|
||||
ompi_errcode_intern_t ompi_err_not_found;
|
||||
ompi_errcode_intern_t ompi_err_request;
|
||||
ompi_errcode_intern_t ompi_err_buffer;
|
||||
|
||||
static void ompi_errcode_intern_construct(ompi_errcode_intern_t* errcode);
|
||||
static void ompi_errcode_intern_destruct(ompi_errcode_intern_t* errcode);
|
||||
@ -183,6 +185,22 @@ int ompi_errcode_intern_init (void)
|
||||
ompi_pointer_array_set_item(&ompi_errcodes_intern, ompi_err_not_found.index,
|
||||
&ompi_err_not_found);
|
||||
|
||||
OBJ_CONSTRUCT(&ompi_err_buffer, ompi_errcode_intern_t);
|
||||
ompi_err_buffer.code = OMPI_ERR_BUFFER;
|
||||
ompi_err_buffer.mpi_code = MPI_ERR_BUFFER;
|
||||
ompi_err_buffer.index = pos++;
|
||||
strcpy(ompi_err_buffer.errstring, "OMPI_ERR_BUFFER");
|
||||
ompi_pointer_array_set_item(&ompi_errcodes_intern, ompi_err_buffer.index,
|
||||
&ompi_err_buffer);
|
||||
|
||||
OBJ_CONSTRUCT(&ompi_err_request, ompi_errcode_intern_t);
|
||||
ompi_err_request.code = OMPI_ERR_REQUEST;
|
||||
ompi_err_request.mpi_code = MPI_ERR_REQUEST;
|
||||
ompi_err_request.index = pos++;
|
||||
strcpy(ompi_err_request.errstring, "OMPI_ERR_REQUEST");
|
||||
ompi_pointer_array_set_item(&ompi_errcodes_intern, ompi_err_request.index,
|
||||
&ompi_err_request);
|
||||
|
||||
ompi_errcode_intern_lastused=pos;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -207,6 +225,8 @@ int ompi_errcode_intern_finalize(void)
|
||||
OBJ_DESTRUCT(&ompi_err_in_errno);
|
||||
OBJ_DESTRUCT(&ompi_err_unreach);
|
||||
OBJ_DESTRUCT(&ompi_err_not_found);
|
||||
OBJ_DESTRUCT(&ompi_err_buffer);
|
||||
OBJ_DESTRUCT(&ompi_err_request);
|
||||
|
||||
OBJ_DESTRUCT(&ompi_errcodes_intern);
|
||||
return OMPI_SUCCESS;
|
||||
|
@ -32,7 +32,7 @@ typedef void(*mca_allocator_base_module_free_fn_t)(struct mca_allocator_base_mod
|
||||
* compact/return memory to higher level allocator
|
||||
*/
|
||||
|
||||
typedef int (*mca_allocator_base_module_return_fn_t)(
|
||||
typedef int (*mca_allocator_base_module_compact_fn_t)(
|
||||
struct mca_allocator_base_module_t* allocator
|
||||
);
|
||||
|
||||
@ -55,7 +55,7 @@ struct mca_allocator_base_module_t {
|
||||
/**< Reallocate memory */
|
||||
mca_allocator_base_module_free_fn_t alc_free;
|
||||
/**< Free memory */
|
||||
mca_allocator_base_module_return_fn_t alc_return;
|
||||
mca_allocator_base_module_compact_fn_t alc_compact;
|
||||
/**< Return memory */
|
||||
mca_allocator_base_module_finalize_fn_t alc_finalize;
|
||||
/**< Finalize and free everything */
|
||||
|
@ -45,7 +45,7 @@ struct mca_allocator_base_module_t* mca_allocator_bucket_module_init(
|
||||
allocator->super.alc_alloc = mca_allocator_bucket_alloc_wrapper;
|
||||
allocator->super.alc_realloc = mca_allocator_bucket_realloc;
|
||||
allocator->super.alc_free = mca_allocator_bucket_free;
|
||||
allocator->super.alc_return = mca_allocator_bucket_cleanup;
|
||||
allocator->super.alc_compact = mca_allocator_bucket_cleanup;
|
||||
allocator->super.alc_finalize = mca_allocator_bucket_finalize;
|
||||
|
||||
return((mca_allocator_base_module_t *) allocator);
|
||||
|
@ -58,20 +58,22 @@ void * mca_allocator_bucket_alloc(mca_allocator_base_module_t * mem,
|
||||
size_t size)
|
||||
{
|
||||
mca_allocator_bucket_t * mem_options = (mca_allocator_bucket_t *) mem;
|
||||
int bucket_num = 0;
|
||||
/* initialize for the later bit shifts */
|
||||
size_t bucket_size = 1;
|
||||
int bucket_num = 0;
|
||||
size_t bucket_size = MCA_ALLOCATOR_BUCKET_1_SIZE;
|
||||
size_t allocated_size;
|
||||
mca_allocator_bucket_chunk_header_t * chunk;
|
||||
mca_allocator_bucket_chunk_header_t * first_chunk;
|
||||
mca_allocator_bucket_segment_head_t * segment_header;
|
||||
/* add the size of the header into the amount we need to request */
|
||||
size += sizeof(mca_allocator_bucket_chunk_header_t);
|
||||
|
||||
/* figure out which bucket it will come from. */
|
||||
while(size > MCA_ALLOCATOR_BUCKET_1_SIZE) {
|
||||
size >>= 1;
|
||||
while(size > bucket_size) {
|
||||
bucket_num++;
|
||||
bucket_size <<= 1;
|
||||
}
|
||||
|
||||
/* now that we know what bucket it will come from, we must get the lock */
|
||||
OMPI_THREAD_LOCK(&(mem_options->buckets[bucket_num].lock));
|
||||
/* see if there is already a free chunk */
|
||||
@ -82,11 +84,10 @@ void * mca_allocator_bucket_alloc(mca_allocator_base_module_t * mem,
|
||||
/* go past the header */
|
||||
chunk += 1;
|
||||
/*release the lock */
|
||||
OMPI_THREAD_UNLOCK(&(mem_options->buckets[bucket_num].lock));
|
||||
OMPI_THREAD_UNLOCK(&(mem_options->buckets[bucket_num].lock));
|
||||
return((void *) chunk);
|
||||
}
|
||||
/* figure out the size of bucket we need */
|
||||
bucket_size <<= (bucket_num + MCA_ALLOCATOR_BUCKET_1_BITSHIFTS);
|
||||
allocated_size = bucket_size;
|
||||
/* we have to add in the size of the segment header into the
|
||||
* amount we need to request */
|
||||
|
@ -28,22 +28,11 @@ static void* mca_pml_bsend_alloc_segment(size_t* size_inout)
|
||||
{
|
||||
void *addr;
|
||||
size_t size = *size_inout;
|
||||
size_t pages = 1;
|
||||
|
||||
/* determine number of pages to allocate */
|
||||
while(size > mca_pml_bsend_pagesz) {
|
||||
size >>= mca_pml_bsend_pagebits;
|
||||
pages++;
|
||||
}
|
||||
|
||||
if(mca_pml_bsend_addr + size > mca_pml_bsend_base + mca_pml_bsend_size) {
|
||||
if( mca_pml_bsend_addr + *size_inout <= mca_pml_bsend_base + mca_pml_bsend_size ) {
|
||||
size = *size_inout;
|
||||
} else {
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_bsend_mutex);
|
||||
return NULL;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
/* allocate all that is left */
|
||||
size = mca_pml_bsend_size - (mca_pml_bsend_addr - mca_pml_bsend_base);
|
||||
addr = mca_pml_bsend_addr;
|
||||
mca_pml_bsend_addr += size;
|
||||
*size_inout = size;
|
||||
@ -56,7 +45,7 @@ static void* mca_pml_bsend_alloc_segment(size_t* size_inout)
|
||||
*/
|
||||
int mca_pml_base_bsend_init(bool* thread_safe)
|
||||
{
|
||||
int id = mca_base_param_register_string("pml", "base", "bsend_allocator", NULL, "bucket");
|
||||
int id = mca_base_param_register_string("pml", "base", "bsend_allocator", NULL, "basic");
|
||||
mca_allocator_base_module_t *allocator;
|
||||
char *name;
|
||||
size_t tmp;
|
||||
@ -183,55 +172,82 @@ int mca_pml_base_bsend_detach(void* addr, int* size)
|
||||
int mca_pml_base_bsend_request_init(ompi_request_t* request, bool persistent)
|
||||
{
|
||||
mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)request;
|
||||
struct iovec iov;
|
||||
void* buf;
|
||||
int rc, freeAfter;
|
||||
unsigned int max_data, iov_count;
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_pml_bsend_mutex);
|
||||
if(NULL == mca_pml_bsend_addr) {
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_bsend_mutex);
|
||||
return OMPI_ERR_BUFFER;
|
||||
}
|
||||
/* alloc buffer and pack user data */
|
||||
if(sendreq->req_count > 0) {
|
||||
OMPI_THREAD_LOCK(&mca_pml_bsend_mutex);
|
||||
if(NULL == mca_pml_bsend_addr) {
|
||||
sendreq->req_addr = NULL;
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_bsend_mutex);
|
||||
return OMPI_ERR_BUFFER;
|
||||
}
|
||||
|
||||
/* allocate a buffer to hold packed message */
|
||||
buf = mca_pml_bsend_allocator->alc_alloc(mca_pml_bsend_allocator, sendreq->req_bytes_packed, 0);
|
||||
if(NULL == buf) {
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_bsend_mutex);
|
||||
return OMPI_ERR_BUFFER;
|
||||
}
|
||||
/* allocate a buffer to hold packed message */
|
||||
sendreq->req_addr = mca_pml_bsend_allocator->alc_alloc(
|
||||
mca_pml_bsend_allocator, sendreq->req_bytes_packed, 0);
|
||||
if(NULL == sendreq->req_addr) {
|
||||
/* release resources when request is freed */
|
||||
sendreq->req_base.req_pml_complete = true;
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_bsend_mutex);
|
||||
return OMPI_ERR_BUFFER;
|
||||
}
|
||||
|
||||
/* pack users message into buffer */
|
||||
iov.iov_base = buf;
|
||||
iov.iov_len = sendreq->req_bytes_packed;
|
||||
iov_count = 1;
|
||||
max_data = iov.iov_len;
|
||||
if((rc = ompi_convertor_pack(&sendreq->req_convertor, &iov, &iov_count,
|
||||
&max_data, &freeAfter)) <= 0) {
|
||||
mca_pml_bsend_allocator->alc_free(mca_pml_bsend_allocator, buf);
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_bsend_mutex);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
/* setup request to reflect the contigous buffer */
|
||||
sendreq->req_count = sendreq->req_bytes_packed;
|
||||
sendreq->req_datatype = MPI_BYTE;
|
||||
|
||||
/* setup convertor to reflect contiguous buffer */
|
||||
if((rc = ompi_convertor_init_for_send(&sendreq->req_convertor, 0, MPI_BYTE,
|
||||
iov.iov_len, iov.iov_base,
|
||||
0, NULL /*never allocate*/)) != OMPI_SUCCESS) {
|
||||
mca_pml_bsend_allocator->alc_free(mca_pml_bsend_allocator, buf);
|
||||
/* increment count of pending requests */
|
||||
mca_pml_bsend_count++;
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_bsend_mutex);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* increment count of pending requests */
|
||||
mca_pml_bsend_count++;
|
||||
|
||||
/* set flag indicating mpi layer is done */
|
||||
sendreq->req_base.req_persistent = persistent;
|
||||
sendreq->req_base.req_ompi.req_complete = true;
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_bsend_mutex);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* pack send buffer into buffer
|
||||
*/
|
||||
|
||||
int mca_pml_base_bsend_request_start(ompi_request_t* request)
|
||||
{
|
||||
mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)request;
|
||||
struct iovec iov;
|
||||
unsigned int max_data, iov_count;
|
||||
int rc, freeAfter;
|
||||
|
||||
if(sendreq->req_count > 0) {
|
||||
/* setup convertor to point to app buffer */
|
||||
ompi_convertor_init_for_send( &sendreq->req_convertor,
|
||||
0,
|
||||
sendreq->req_base.req_datatype,
|
||||
sendreq->req_base.req_count,
|
||||
sendreq->req_base.req_addr,
|
||||
0, NULL );
|
||||
|
||||
/* pack */
|
||||
iov.iov_base = sendreq->req_addr;
|
||||
iov.iov_len = sendreq->req_count;
|
||||
iov_count = 1;
|
||||
max_data = iov.iov_len;
|
||||
if((rc = ompi_convertor_pack(&sendreq->req_convertor, &iov, &iov_count,
|
||||
&max_data, &freeAfter)) <= 0) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* setup convertor to point to packed buffer */
|
||||
ompi_convertor_init_for_send( &sendreq->req_convertor,
|
||||
0,
|
||||
sendreq->req_datatype,
|
||||
sendreq->req_count,
|
||||
sendreq->req_addr,
|
||||
0, NULL );
|
||||
}
|
||||
sendreq->req_base.req_ompi.req_complete = true;
|
||||
return OMPI_SUCCESS;;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Request completed - free buffer and decrement pending count
|
||||
@ -239,12 +255,15 @@ int mca_pml_base_bsend_request_init(ompi_request_t* request, bool persistent)
|
||||
int mca_pml_base_bsend_request_fini(ompi_request_t* request)
|
||||
{
|
||||
mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)request;
|
||||
if(sendreq->req_count == 0 || sendreq->req_addr == NULL)
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
/* remove from list of pending requests */
|
||||
OMPI_THREAD_LOCK(&mca_pml_bsend_mutex);
|
||||
|
||||
/* free buffer */
|
||||
mca_pml_bsend_allocator->alc_free(mca_pml_bsend_allocator, sendreq->req_convertor.pBaseBuf);
|
||||
mca_pml_bsend_allocator->alc_free(mca_pml_bsend_allocator, sendreq->req_addr);
|
||||
sendreq->req_addr = NULL;
|
||||
|
||||
/* decrement count of buffered requests */
|
||||
if(--mca_pml_bsend_count == 0)
|
||||
|
@ -13,6 +13,7 @@ int mca_pml_base_bsend_attach(void* addr, int size);
|
||||
int mca_pml_base_bsend_detach(void* addr, int* size);
|
||||
|
||||
int mca_pml_base_bsend_request_init(ompi_request_t*, bool persistent);
|
||||
int mca_pml_base_bsend_request_start(ompi_request_t*);
|
||||
int mca_pml_base_bsend_request_fini(ompi_request_t*);
|
||||
|
||||
|
||||
|
@ -45,10 +45,11 @@ typedef struct mca_pml_base_recv_request_t mca_pml_base_recv_request_t;
|
||||
comm, \
|
||||
persistent) \
|
||||
{ \
|
||||
/* increment reference count on communicator */ \
|
||||
OBJ_RETAIN(comm); \
|
||||
\
|
||||
OMPI_REQUEST_INIT(&(request)->req_base.req_ompi); \
|
||||
(request)->req_bytes_packed = 0; \
|
||||
(request)->req_bytes_received = 0; \
|
||||
(request)->req_bytes_delivered = 0; \
|
||||
(request)->req_base.req_sequence = 0; \
|
||||
(request)->req_base.req_addr = addr; \
|
||||
(request)->req_base.req_count = count; \
|
||||
@ -61,8 +62,6 @@ typedef struct mca_pml_base_recv_request_t mca_pml_base_recv_request_t;
|
||||
(request)->req_base.req_pml_complete = false; \
|
||||
(request)->req_base.req_free_called = false; \
|
||||
\
|
||||
/* increment reference count on communicator */ \
|
||||
OBJ_RETAIN(comm); \
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -19,6 +19,9 @@ extern ompi_class_t mca_pml_base_send_request_t_class;
|
||||
*/
|
||||
struct mca_pml_base_send_request_t {
|
||||
mca_pml_base_request_t req_base; /** base request type - common data structure for use by wait/test */
|
||||
void *req_addr; /**< pointer to send buffer - may not be application buffer */
|
||||
size_t req_count; /**< number of elements in send buffer */
|
||||
ompi_datatype_t* req_datatype; /**< pointer to datatype */
|
||||
size_t req_offset; /**< number of bytes that have already been assigned to a fragment */
|
||||
size_t req_bytes_packed; /**< packed size of a message given the datatype and count */
|
||||
size_t req_bytes_sent; /**< number of bytes that have been sent */
|
||||
@ -47,6 +50,9 @@ typedef struct mca_pml_base_send_request_t mca_pml_base_send_request_t;
|
||||
* @param comm (IN) Communicator
|
||||
* @param mode (IN) Send mode (STANDARD,BUFFERED,SYNCHRONOUS,READY)
|
||||
* @param persistent (IN) Is request persistent.
|
||||
*
|
||||
* Performa any one-time initialization. Note that per-use initialization
|
||||
* is done in the send request start routine.
|
||||
*/
|
||||
|
||||
#define MCA_PML_BASE_SEND_REQUEST_INIT( request, \
|
||||
@ -63,12 +69,10 @@ typedef struct mca_pml_base_send_request_t mca_pml_base_send_request_t;
|
||||
OBJ_RETAIN(comm); \
|
||||
\
|
||||
OMPI_REQUEST_INIT(&(request)->req_base.req_ompi); \
|
||||
request->req_offset = 0; \
|
||||
request->req_bytes_sent = 0; \
|
||||
request->req_addr = addr; \
|
||||
request->req_count = count; \
|
||||
request->req_datatype = datatype; \
|
||||
request->req_send_mode = mode; \
|
||||
request->req_peer_match.lval = 0; \
|
||||
request->req_peer_addr.lval = 0; \
|
||||
request->req_peer_size = 0; \
|
||||
request->req_base.req_addr = addr; \
|
||||
request->req_base.req_count = count; \
|
||||
request->req_base.req_datatype = datatype; \
|
||||
|
@ -228,7 +228,7 @@ extern int mca_pml_teg_start(
|
||||
{ \
|
||||
mca_pml_base_request_t* pml_request = *(mca_pml_base_request_t**)(request); \
|
||||
pml_request->req_free_called = true; \
|
||||
if(pml_request->req_pml_complete == true) \
|
||||
if( pml_request->req_pml_complete == true) \
|
||||
{ \
|
||||
OMPI_REQUEST_FINI(*(request)); \
|
||||
switch(pml_request->req_type) { \
|
||||
@ -251,7 +251,7 @@ extern int mca_pml_teg_start(
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
*(request) = NULL; \
|
||||
*(request) = MPI_REQUEST_NULL; \
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -14,12 +14,19 @@ static mca_ptl_base_recv_frag_t* mca_pml_teg_recv_request_match_specific_proc(
|
||||
mca_pml_base_recv_request_t* request, int proc);
|
||||
|
||||
|
||||
static int mca_pml_teg_recv_request_free(struct ompi_request_t* request)
|
||||
static int mca_pml_teg_recv_request_fini(struct ompi_request_t** request)
|
||||
{
|
||||
MCA_PML_TEG_FINI(&request);
|
||||
MCA_PML_TEG_FINI(request);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static int mca_pml_teg_recv_request_free(struct ompi_request_t** request)
|
||||
{
|
||||
MCA_PML_TEG_FREE(request);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int mca_pml_teg_recv_request_cancel(struct ompi_request_t* request, int complete)
|
||||
{
|
||||
return OMPI_SUCCESS;
|
||||
@ -29,6 +36,7 @@ static void mca_pml_teg_recv_request_construct(mca_pml_base_recv_request_t* requ
|
||||
{
|
||||
request->req_base.req_type = MCA_PML_REQUEST_RECV;
|
||||
request->req_base.req_ompi.req_query = NULL;
|
||||
request->req_base.req_ompi.req_fini = mca_pml_teg_recv_request_fini;
|
||||
request->req_base.req_ompi.req_free = mca_pml_teg_recv_request_free;
|
||||
request->req_base.req_ompi.req_cancel = mca_pml_teg_recv_request_cancel;
|
||||
}
|
||||
|
@ -65,7 +65,14 @@ void mca_pml_teg_recv_request_match_specific(mca_pml_base_recv_request_t* reques
|
||||
*/
|
||||
static inline int mca_pml_teg_recv_request_start(mca_pml_base_recv_request_t* request)
|
||||
{
|
||||
/* init/re-init the request */
|
||||
request->req_bytes_received = 0;
|
||||
request->req_bytes_delivered = 0;
|
||||
request->req_base.req_pml_complete = false;
|
||||
request->req_base.req_ompi.req_complete = false;
|
||||
request->req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE;
|
||||
|
||||
/* attempt to match posted recv */
|
||||
if(request->req_base.req_peer == OMPI_ANY_SOURCE) {
|
||||
mca_pml_teg_recv_request_match_wild(request);
|
||||
} else {
|
||||
|
@ -15,12 +15,17 @@
|
||||
|
||||
|
||||
|
||||
static int mca_pml_teg_send_request_free(struct ompi_request_t* request)
|
||||
static int mca_pml_teg_send_request_fini(struct ompi_request_t** request)
|
||||
{
|
||||
MCA_PML_TEG_FINI(&request);
|
||||
MCA_PML_TEG_FINI(request);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static int mca_pml_teg_send_request_free(struct ompi_request_t** request)
|
||||
{
|
||||
MCA_PML_TEG_FREE(request);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static int mca_pml_teg_send_request_cancel(struct ompi_request_t* request, int complete)
|
||||
{
|
||||
@ -32,6 +37,7 @@ static void mca_pml_teg_send_request_construct(mca_pml_base_send_request_t* req)
|
||||
{
|
||||
req->req_base.req_type = MCA_PML_REQUEST_SEND;
|
||||
req->req_base.req_ompi.req_query = NULL;
|
||||
req->req_base.req_ompi.req_fini = mca_pml_teg_send_request_fini;
|
||||
req->req_base.req_ompi.req_free = mca_pml_teg_send_request_free;
|
||||
req->req_base.req_ompi.req_cancel = mca_pml_teg_send_request_cancel;
|
||||
}
|
||||
@ -140,7 +146,7 @@ void mca_pml_teg_send_request_progress(
|
||||
}
|
||||
} else if (req->req_base.req_free_called) {
|
||||
MCA_PML_TEG_FREE((ompi_request_t**)&req);
|
||||
}
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
||||
/* check for pending requests that need to be progressed */
|
||||
|
@ -29,10 +29,8 @@ OBJ_CLASS_DECLARATION(mca_pml_teg_send_request_t);
|
||||
mca_ptl_proc_t* ptl_proc; \
|
||||
mca_pml_base_ptl_t* ptl_base; \
|
||||
\
|
||||
/*OMPI_THREAD_SCOPED_LOCK(&proc->proc_lock,*/ \
|
||||
/*(ptl_proc = mca_ptl_array_get_next(&proc->proc_ptl_first)));*/ \
|
||||
OMPI_THREAD_SCOPED_LOCK(&proc->proc_lock, \
|
||||
(ptl_proc = mca_ptl_array_get_index(&proc->proc_ptl_first, 0))); \
|
||||
(ptl_proc = mca_ptl_array_get_next(&proc->proc_ptl_first))); \
|
||||
ptl_base = ptl_proc->ptl_base; \
|
||||
/* \
|
||||
* check to see if there is a cache of send requests associated with \
|
||||
@ -114,14 +112,27 @@ static inline int mca_pml_teg_send_request_start(
|
||||
{
|
||||
mca_ptl_base_module_t* ptl = req->req_ptl;
|
||||
size_t first_fragment_size = ptl->ptl_first_frag_size;
|
||||
size_t offset = req->req_offset;
|
||||
int flags, rc;
|
||||
|
||||
/* initialize request state and message sequence number */
|
||||
/* init/reinit request - do this here instead of init
|
||||
* as a persistent request may be reused, and there is
|
||||
* no additional cost
|
||||
*/
|
||||
req->req_offset = 0;
|
||||
req->req_bytes_sent = 0;
|
||||
req->req_peer_match.lval = 0;
|
||||
req->req_peer_addr.lval = 0;
|
||||
req->req_peer_size = 0;
|
||||
req->req_base.req_pml_complete = false;
|
||||
req->req_base.req_ompi.req_complete = false;
|
||||
req->req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE;
|
||||
req->req_base.req_sequence = mca_pml_ptl_comm_send_sequence(
|
||||
req->req_base.req_comm->c_pml_comm,
|
||||
req->req_base.req_peer);
|
||||
req->req_base.req_comm->c_pml_comm, req->req_base.req_peer);
|
||||
|
||||
/* handle buffered send */
|
||||
if(req->req_send_mode == MCA_PML_BASE_SEND_BUFFERED) {
|
||||
mca_pml_base_bsend_request_start(&req->req_base.req_ompi);
|
||||
}
|
||||
|
||||
/* start the first fragment */
|
||||
if(first_fragment_size == 0 || req->req_bytes_packed <= first_fragment_size) {
|
||||
@ -132,7 +143,7 @@ static inline int mca_pml_teg_send_request_start(
|
||||
/* require match for first fragment of a multi-fragment */
|
||||
flags = MCA_PTL_FLAGS_ACK_MATCHED;
|
||||
}
|
||||
rc = ptl->ptl_send(ptl, req->req_peer, req, offset, first_fragment_size, flags);
|
||||
rc = ptl->ptl_send(ptl, req->req_peer, req, 0, first_fragment_size, flags);
|
||||
if(rc != OMPI_SUCCESS)
|
||||
return rc;
|
||||
return OMPI_SUCCESS;
|
||||
|
@ -40,7 +40,6 @@ int mca_pml_teg_start(size_t count, ompi_request_t** requests)
|
||||
OMPI_THREAD_UNLOCK(&ompi_request_lock);
|
||||
break;
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
||||
/* allocate a new request */
|
||||
switch(pml_request->req_type) {
|
||||
@ -75,6 +74,7 @@ int mca_pml_teg_start(size_t count, ompi_request_t** requests)
|
||||
rc = OMPI_ERR_REQUEST;
|
||||
break;
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&ompi_request_lock);
|
||||
if(OMPI_SUCCESS != rc)
|
||||
return rc;
|
||||
pml_request = (mca_pml_base_request_t*)request;
|
||||
@ -87,16 +87,20 @@ int mca_pml_teg_start(size_t count, ompi_request_t** requests)
|
||||
|
||||
/* start the request */
|
||||
switch(pml_request->req_type) {
|
||||
case MCA_PML_REQUEST_SEND:
|
||||
if((rc = mca_pml_teg_send_request_start((mca_pml_base_send_request_t*)pml_request))
|
||||
!= OMPI_SUCCESS)
|
||||
case MCA_PML_REQUEST_SEND:
|
||||
{
|
||||
mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)pml_request;
|
||||
if((rc = mca_pml_teg_send_request_start(sendreq)) != OMPI_SUCCESS)
|
||||
return rc;
|
||||
break;
|
||||
}
|
||||
case MCA_PML_REQUEST_RECV:
|
||||
if((rc = mca_pml_teg_recv_request_start((mca_pml_base_recv_request_t*)pml_request))
|
||||
!= OMPI_SUCCESS)
|
||||
{
|
||||
mca_pml_base_recv_request_t* recvreq = (mca_pml_base_recv_request_t*)pml_request;
|
||||
if((rc = mca_pml_teg_recv_request_start(recvreq)) != OMPI_SUCCESS)
|
||||
return rc;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
return OMPI_ERR_REQUEST;
|
||||
}
|
||||
|
@ -150,12 +150,12 @@ void mca_ptl_self_matched( mca_ptl_base_module_t* ptl,
|
||||
* a contigous buffer and the convertor on the send request initialized to point
|
||||
* into this buffer.
|
||||
*/
|
||||
if( sendreq->req_send.req_base.req_datatype == recvreq->req_base.req_datatype &&
|
||||
sendreq->req_send.req_send_mode != MCA_PML_BASE_SEND_BUFFERED) {
|
||||
if( sendreq->req_send.req_datatype == recvreq->req_base.req_datatype ) {
|
||||
ompi_ddt_copy_content_same_ddt( recvreq->req_base.req_datatype,
|
||||
recvreq->req_base.req_count,
|
||||
recvreq->req_base.req_count > sendreq->req_send.req_count ?
|
||||
sendreq->req_send.req_count : recvreq->req_base.req_count,
|
||||
recvreq->req_base.req_addr,
|
||||
sendreq->req_send.req_base.req_addr );
|
||||
sendreq->req_send.req_addr );
|
||||
} else {
|
||||
ompi_convertor_t *pSendConvertor, *pRecvConvertor;
|
||||
struct iovec iov[1];
|
||||
|
@ -104,9 +104,9 @@ int mca_ptl_tcp_send_frag_init(
|
||||
ompi_convertor_init_for_send(
|
||||
convertor,
|
||||
0,
|
||||
sendreq->req_base.req_datatype,
|
||||
sendreq->req_base.req_count,
|
||||
sendreq->req_base.req_addr,
|
||||
sendreq->req_datatype,
|
||||
sendreq->req_count,
|
||||
sendreq->req_addr,
|
||||
offset,
|
||||
ptl_tcp_memalloc );
|
||||
|
||||
|
@ -52,12 +52,16 @@ int MPI_Bsend(void *buf, int count, MPI_Datatype type, int dest, int tag, MPI_Co
|
||||
goto error_return;
|
||||
|
||||
rc = mca_pml_base_bsend_request_init(request, false);
|
||||
if(OMPI_SUCCESS != rc)
|
||||
if(OMPI_SUCCESS != rc) {
|
||||
ompi_request_free(&request);
|
||||
goto error_return;
|
||||
}
|
||||
|
||||
rc = mca_pml.pml_start(1, &request);
|
||||
if(OMPI_SUCCESS != rc)
|
||||
if(OMPI_SUCCESS != rc) {
|
||||
ompi_request_free(&request);
|
||||
goto error_return;
|
||||
}
|
||||
|
||||
rc = ompi_request_wait(&request, MPI_STATUS_IGNORE);
|
||||
if(OMPI_SUCCESS != rc) {
|
||||
|
@ -25,6 +25,7 @@ int MPI_Bsend_init(void *buf, int count, MPI_Datatype type,
|
||||
{
|
||||
int rc;
|
||||
if (dest == MPI_PROC_NULL) {
|
||||
*request = MPI_REQUEST_NULL;
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -52,8 +53,10 @@ int MPI_Bsend_init(void *buf, int count, MPI_Datatype type,
|
||||
goto error_return;
|
||||
|
||||
rc = mca_pml_base_bsend_request_init(*request, true);
|
||||
if(OMPI_SUCCESS != rc)
|
||||
if(OMPI_SUCCESS != rc) {
|
||||
ompi_request_free(request);
|
||||
goto error_return;
|
||||
}
|
||||
|
||||
error_return:
|
||||
OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME);
|
||||
|
@ -24,7 +24,7 @@ int MPI_Buffer_detach(void *buffer, int *size)
|
||||
{
|
||||
if (MPI_PARAM_CHECK) {
|
||||
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
|
||||
if (NULL == buffer || NULL == size || *size < 0) {
|
||||
if (NULL == buffer || NULL == size) {
|
||||
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_ARG, FUNC_NAME);
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ int MPI_Ibsend(void *buf, int count, MPI_Datatype type, int dest,
|
||||
{
|
||||
int rc;
|
||||
if (dest == MPI_PROC_NULL) {
|
||||
*request = MPI_REQUEST_NULL;
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,7 @@ int MPI_Irecv(void *buf, int count, MPI_Datatype type, int source,
|
||||
{
|
||||
int rc;
|
||||
if (source == MPI_PROC_NULL) {
|
||||
*request = &ompi_request_null;
|
||||
*request = MPI_REQUEST_NULL;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,7 @@ int MPI_Irsend(void *buf, int count, MPI_Datatype type, int dest,
|
||||
{
|
||||
int rc;
|
||||
if (dest == MPI_PROC_NULL) {
|
||||
*request = MPI_REQUEST_NULL;
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,7 @@ int MPI_Isend(void *buf, int count, MPI_Datatype type, int dest,
|
||||
{
|
||||
int rc;
|
||||
if (dest == MPI_PROC_NULL) {
|
||||
*request = MPI_REQUEST_NULL;
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,7 @@ int MPI_Issend(void *buf, int count, MPI_Datatype type, int dest,
|
||||
{
|
||||
int rc;
|
||||
if (dest == MPI_PROC_NULL) {
|
||||
*request = MPI_REQUEST_NULL;
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -30,9 +30,6 @@ int MPI_Request_free(MPI_Request *request)
|
||||
goto error_return;
|
||||
}
|
||||
}
|
||||
if( *request == NULL ) {
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
rc = ompi_request_free(request);
|
||||
|
||||
error_return:
|
||||
|
@ -26,6 +26,7 @@ int MPI_Send_init(void *buf, int count, MPI_Datatype type,
|
||||
{
|
||||
int rc;
|
||||
if (dest == MPI_PROC_NULL) {
|
||||
*request = MPI_REQUEST_NULL;
|
||||
return MPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,12 @@ int MPI_Start(MPI_Request *request)
|
||||
}
|
||||
OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
|
||||
}
|
||||
return mca_pml.pml_start(1, request);
|
||||
|
||||
switch((*request)->req_type) {
|
||||
case OMPI_REQUEST_PML:
|
||||
return mca_pml.pml_start(1, request);
|
||||
default:
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -10,13 +10,16 @@ static int ompi_grequest_query(ompi_request_t* req, ompi_status_public_t* status
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int ompi_grequest_free(ompi_request_t* req)
|
||||
static int ompi_grequest_free(ompi_request_t** req)
|
||||
{
|
||||
int rc = OMPI_SUCCESS;
|
||||
ompi_grequest_t* greq = (ompi_grequest_t*)req;
|
||||
ompi_grequest_t* greq = *(ompi_grequest_t**)req;
|
||||
if(greq->greq_free != NULL)
|
||||
rc = greq->greq_free(greq->greq_state);
|
||||
OBJ_RELEASE(req);
|
||||
if(rc == OMPI_SUCCESS) {
|
||||
OBJ_RELEASE(greq);
|
||||
*req = MPI_REQUEST_NULL;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -33,6 +36,7 @@ static void ompi_grequest_construct(ompi_grequest_t* greq)
|
||||
{
|
||||
OMPI_REQUEST_INIT(&greq->greq_base);
|
||||
greq->greq_base.req_query = ompi_grequest_query;
|
||||
greq->greq_base.req_fini = ompi_grequest_free;
|
||||
greq->greq_base.req_free = ompi_grequest_free;
|
||||
greq->greq_base.req_cancel = ompi_grequest_cancel;
|
||||
greq->greq_base.req_type = OMPI_REQUEST_GEN;
|
||||
|
@ -15,7 +15,6 @@ int ompi_request_test(
|
||||
int *completed,
|
||||
ompi_status_public_t * status)
|
||||
{
|
||||
int rc;
|
||||
size_t i;
|
||||
ompi_request_t **rptr;
|
||||
ompi_request_t *request;
|
||||
@ -32,11 +31,7 @@ int ompi_request_test(
|
||||
if (MPI_STATUS_IGNORE != status) {
|
||||
*status = request->req_status;
|
||||
}
|
||||
rc = request->req_free(request);
|
||||
if(rc != OMPI_SUCCESS)
|
||||
return rc;
|
||||
*rptr = MPI_REQUEST_NULL;
|
||||
return OMPI_SUCCESS;
|
||||
return request->req_fini(rptr);
|
||||
}
|
||||
rptr++;
|
||||
}
|
||||
@ -83,21 +78,20 @@ int ompi_request_test_all(
|
||||
int rc;
|
||||
request = *rptr;
|
||||
statuses[i] = request->req_status;
|
||||
rc = request->req_free(request);
|
||||
rc = request->req_fini(rptr);
|
||||
if(rc != OMPI_SUCCESS)
|
||||
return rc;
|
||||
*rptr = MPI_REQUEST_NULL;
|
||||
rptr++;
|
||||
}
|
||||
} else {
|
||||
/* free request if required */
|
||||
rptr = requests;
|
||||
for (i = 0; i < count; i++) {
|
||||
ompi_request_t *request = *rptr;
|
||||
int rc = request->req_free(request);
|
||||
int rc;
|
||||
request = *rptr;
|
||||
rc = request->req_fini(rptr);
|
||||
if(rc != OMPI_SUCCESS)
|
||||
return rc;
|
||||
*rptr = MPI_REQUEST_NULL;
|
||||
rptr++;
|
||||
}
|
||||
}
|
||||
|
@ -83,8 +83,7 @@ finished:
|
||||
}
|
||||
|
||||
/* return request to pool */
|
||||
rc = request->req_free(request);
|
||||
*rptr = MPI_REQUEST_NULL;
|
||||
rc = request->req_fini(rptr);
|
||||
*index = completed;
|
||||
}
|
||||
return rc;
|
||||
@ -143,10 +142,7 @@ int ompi_request_wait_all(
|
||||
int rc;
|
||||
request = *rptr;
|
||||
statuses[i] = request->req_status;
|
||||
rc = request->req_free(request);
|
||||
if(rc != OMPI_SUCCESS)
|
||||
return rc;
|
||||
*rptr = MPI_REQUEST_NULL;
|
||||
rc = request->req_fini(rptr);
|
||||
rptr++;
|
||||
}
|
||||
} else {
|
||||
@ -155,10 +151,7 @@ int ompi_request_wait_all(
|
||||
for (i = 0; i < count; i++) {
|
||||
int rc;
|
||||
request = *rptr;
|
||||
rc = request->req_free(request);
|
||||
if(rc != OMPI_SUCCESS)
|
||||
return rc;
|
||||
*rptr = MPI_REQUEST_NULL;
|
||||
rc = request->req_fini(rptr);
|
||||
rptr++;
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ static void ompi_request_construct(ompi_request_t* req)
|
||||
{
|
||||
OMPI_REQUEST_INIT(req);
|
||||
req->req_query = NULL;
|
||||
req->req_fini = NULL;
|
||||
req->req_free = NULL;
|
||||
req->req_cancel = NULL;
|
||||
}
|
||||
@ -29,7 +30,7 @@ static void ompi_request_destruct(ompi_request_t* req)
|
||||
OMPI_REQUEST_FINI(req);
|
||||
}
|
||||
|
||||
static int ompi_request_null_free(ompi_request_t* request)
|
||||
static int ompi_request_null_free(ompi_request_t** request)
|
||||
{
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -62,6 +63,7 @@ int ompi_request_init(void)
|
||||
ompi_request_null.req_complete = true;
|
||||
ompi_request_null.req_type = OMPI_REQUEST_NULL;
|
||||
ompi_request_null.req_query = NULL;
|
||||
ompi_request_null.req_fini = ompi_request_null_free;
|
||||
ompi_request_null.req_free = ompi_request_null_free;
|
||||
ompi_request_null.req_cancel = ompi_request_null_cancel;
|
||||
|
||||
|
@ -63,7 +63,7 @@ typedef int (*ompi_request_query_fn_t)(
|
||||
/*
|
||||
* Required function to free the request and any associated resources.
|
||||
*/
|
||||
typedef int (*ompi_request_free_fn_t)(struct ompi_request_t* request);
|
||||
typedef int (*ompi_request_free_fn_t)(struct ompi_request_t** rptr);
|
||||
|
||||
/*
|
||||
* Optional function to cancel a pending request.
|
||||
@ -82,7 +82,8 @@ struct ompi_request_t {
|
||||
volatile ompi_request_state_t req_state; /**< enum indicate state of the request */
|
||||
int req_f_to_c_index; /**< Index in Fortran <-> C translation array */
|
||||
ompi_request_query_fn_t req_query; /**< Optional query function to retrieve status */
|
||||
ompi_request_free_fn_t req_free; /**< Required function to free request */
|
||||
ompi_request_free_fn_t req_fini; /**< Called by test/wait */
|
||||
ompi_request_free_fn_t req_free; /**< Called by free */
|
||||
ompi_request_cancel_fn_t req_cancel; /**< Optional function to cancel the request */
|
||||
};
|
||||
|
||||
@ -180,10 +181,7 @@ int ompi_request_complete(ompi_request_t* request);
|
||||
|
||||
static inline int ompi_request_free(ompi_request_t** request)
|
||||
{
|
||||
int rc = (*request)->req_free(*request);
|
||||
if(rc == OMPI_SUCCESS)
|
||||
*request = MPI_REQUEST_NULL;
|
||||
return rc;
|
||||
return (*request)->req_free(request);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -243,7 +241,6 @@ static inline int ompi_request_wait(
|
||||
ompi_request_t ** req_ptr,
|
||||
ompi_status_public_t * status)
|
||||
{
|
||||
int rc;
|
||||
ompi_request_t *req = *req_ptr;
|
||||
|
||||
#if OMPI_HAVE_THREADS
|
||||
@ -291,9 +288,7 @@ static inline int ompi_request_wait(
|
||||
}
|
||||
|
||||
/* return request to pool */
|
||||
rc = req->req_free(req);
|
||||
*req_ptr = MPI_REQUEST_NULL;
|
||||
return rc;
|
||||
return req->req_fini(req_ptr);
|
||||
}
|
||||
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user