1
1

- fixed issue w/ btl send-in-place option that was affecting tcp

- reduced size of match header by an additional 4 bytes to 16 bytes
- corrections for buffered send (work in progress)

This commit was SVN r7371.
Этот коммит содержится в:
Tim Woodall 2005-09-14 17:08:08 +00:00
родитель e98415eb7b
Коммит c25fb5dab0
12 изменённых файлов: 331 добавлений и 33 удалений

Просмотреть файл

@ -892,14 +892,14 @@ int main( int argc, char* argv[] )
pdt = create_inversed_vector( &ompi_mpi_int, 10 ); pdt = create_inversed_vector( &ompi_mpi_int, 10 );
if( outputFlags & CHECK_PACK_UNPACK ) { if( outputFlags & CHECK_PACK_UNPACK ) {
local_copy_ddt_count(pdt, 100); local_copy_ddt_count(pdt, 100);
local_copy_with_convertor(pdt, 100, 4008); local_copy_with_convertor(pdt, 100, 956);
} }
OBJ_RELEASE( pdt ); assert( pdt == NULL ); OBJ_RELEASE( pdt ); assert( pdt == NULL );
printf( "\n\n/*\n * TEST STRANGE DATATYPE\n */\n\n" ); printf( "\n\n/*\n * TEST STRANGE DATATYPE\n */\n\n" );
pdt = create_strange_dt(); pdt = create_strange_dt();
if( outputFlags & CHECK_PACK_UNPACK ) { if( outputFlags & CHECK_PACK_UNPACK ) {
local_copy_ddt_count(pdt, 1); local_copy_ddt_count(pdt, 1);
local_copy_with_convertor(pdt, 1, 4008); local_copy_with_convertor(pdt, 1, 956);
} }
OBJ_RELEASE( pdt ); assert( pdt == NULL ); OBJ_RELEASE( pdt ); assert( pdt == NULL );
@ -961,9 +961,11 @@ int main( int argc, char* argv[] )
ompi_ddt_dump( pdt3 ); ompi_ddt_dump( pdt3 );
} }
#if 0
OBJ_RELEASE( pdt1 ); assert( pdt1 == NULL ); OBJ_RELEASE( pdt1 ); assert( pdt1 == NULL );
OBJ_RELEASE( pdt2 ); assert( pdt2 == NULL ); OBJ_RELEASE( pdt2 ); assert( pdt2 == NULL );
OBJ_RELEASE( pdt3 ); assert( pdt3 == NULL ); OBJ_RELEASE( pdt3 ); assert( pdt3 == NULL );
#endif
pdt = test_struct_char_double(); pdt = test_struct_char_double();
if( outputFlags & CHECK_PACK_UNPACK ) { if( outputFlags & CHECK_PACK_UNPACK ) {
@ -979,7 +981,7 @@ int main( int argc, char* argv[] )
pdt = test_create_blacs_type(); pdt = test_create_blacs_type();
if( outputFlags & CHECK_PACK_UNPACK ) { if( outputFlags & CHECK_PACK_UNPACK ) {
local_copy_with_convertor( pdt, 4500, 1023 ); local_copy_with_convertor( pdt, 4500, 956 );
} }
OBJ_RELEASE( pdt ); assert( pdt == NULL ); OBJ_RELEASE( pdt ); assert( pdt == NULL );

Просмотреть файл

@ -285,7 +285,8 @@ main(int argc, char *argv[])
struct iovec iov; struct iovec iov;
uint32_t iov_count; uint32_t iov_count;
int32_t free_after; int32_t free_after;
size_t max_data; size_t max_data_pack;
size_t max_data_unpack;
size_t bytes_remaining; size_t bytes_remaining;
loop_cnt++; /* increase the number of runned tests */ loop_cnt++; /* increase the number of runned tests */
@ -306,16 +307,24 @@ main(int argc, char *argv[])
length, recv_buffer); length, recv_buffer);
if(bytes_remaining > sizeof(eager)) if(bytes_remaining > sizeof(eager))
max_data = sizeof(eager); max_data_pack = sizeof(eager);
else else
max_data = bytes_remaining; max_data_pack = bytes_remaining;
iov.iov_base = eager; iov.iov_base = eager;
iov.iov_len = max_data; iov.iov_len = max_data_pack;
iov_count = 1; iov_count = 1;
ompi_convertor_pack(send_conv, &iov, &iov_count, &max_data, &free_after); ompi_convertor_pack(send_conv, &iov, &iov_count, &max_data, &free_after);
bytes_remaining -= max_data_pack; /* sender schedules data */
iov.iov_base = eager;
iov.iov_len = max_data_pack;
max_data_unpack = max_data_pack;
ompi_convertor_unpack(recv_conv, &iov, &iov_count, &max_data, &free_after); ompi_convertor_unpack(recv_conv, &iov, &iov_count, &max_data, &free_after);
bytes_remaining -= max_data;
if (max_data_pack != max_data_unpack) {
fprintf(stderr, "pack/unpack count mismatch: %lu != !lu\n", max_data_pack, max_data_unpack);
}
while(bytes_remaining != 0) { while(bytes_remaining != 0) {
if(bytes_remaining > sizeof(max_send)) { if(bytes_remaining > sizeof(max_send)) {
@ -329,8 +338,17 @@ main(int argc, char *argv[])
iov_count = 1; iov_count = 1;
ompi_convertor_pack(send_conv, &iov, &iov_count, &max_data, &free_after); ompi_convertor_pack(send_conv, &iov, &iov_count, &max_data, &free_after);
ompi_convertor_unpack(recv_conv, &iov, &iov_count, &max_data, &free_after);
bytes_remaining -= max_data; bytes_remaining -= max_data;
iov.iov_base = max_send;
iov.iov_len = max_data_pack;
iov_count = 1;
max_data_unpack = max_data_pack;
ompi_convertor_unpack(recv_conv, &iov, &iov_count, &max_data_unpack, &free_after);
if (max_data_pack != max_data_unpack) {
fprintf(stderr, "pack/unpack count mismatch: %lu != !lu\n", max_data_pack, max_data_unpack);
}
} }
/* Error Test */ /* Error Test */

Просмотреть файл

@ -657,6 +657,7 @@ static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void* user)
if(mca_btl_tcp_frag_send(frag, btl_endpoint->endpoint_sd) == false) { if(mca_btl_tcp_frag_send(frag, btl_endpoint->endpoint_sd) == false) {
break; break;
} }
btl_endpoint->endpoint_send_frag = NULL;
/* if required - update request status and release fragment */ /* if required - update request status and release fragment */
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);

Просмотреть файл

@ -255,6 +255,44 @@ int mca_pml_base_bsend_request_start(ompi_request_t* request)
} }
/*
* allocate buffer
*/
int mca_pml_base_bsend_request_alloc(ompi_request_t* request)
{
mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)request;
int rc;
/* has a buffer been provided */
OPAL_THREAD_LOCK(&mca_pml_bsend_mutex);
if(NULL == mca_pml_bsend_addr) {
sendreq->req_addr = NULL;
OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
return OMPI_ERR_BUFFER;
}
/* allocate a buffer to hold packed message */
sendreq->req_addr = mca_pml_bsend_allocator->alc_alloc(
mca_pml_bsend_allocator, sendreq->req_bytes_packed, 0, NULL);
if(NULL == sendreq->req_addr) {
/* release resources when request is freed */
sendreq->req_base.req_pml_complete = true;
OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
return OMPI_ERR_BUFFER;
}
/* increment count of pending requests */
mca_pml_bsend_count++;
OPAL_THREAD_UNLOCK(&mca_pml_bsend_mutex);
/* setup request to reflect the contigous buffer */
sendreq->req_count = sendreq->req_bytes_packed;
sendreq->req_datatype = MPI_PACKED;
return OMPI_SUCCESS;
}
/* /*
* Request completed - free buffer and decrement pending count * Request completed - free buffer and decrement pending count
*/ */

Просмотреть файл

@ -30,6 +30,7 @@ OMPI_DECLSPEC int mca_pml_base_bsend_fini(void);
OMPI_DECLSPEC int mca_pml_base_bsend_attach(void* addr, int size); OMPI_DECLSPEC int mca_pml_base_bsend_attach(void* addr, int size);
OMPI_DECLSPEC int mca_pml_base_bsend_detach(void* addr, int* size); OMPI_DECLSPEC int mca_pml_base_bsend_detach(void* addr, int* size);
OMPI_DECLSPEC int mca_pml_base_bsend_request_alloc(ompi_request_t*);
OMPI_DECLSPEC int mca_pml_base_bsend_request_start(ompi_request_t*); OMPI_DECLSPEC int mca_pml_base_bsend_request_start(ompi_request_t*);
OMPI_DECLSPEC int mca_pml_base_bsend_request_fini(ompi_request_t*); OMPI_DECLSPEC int mca_pml_base_bsend_request_fini(ompi_request_t*);
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)

Просмотреть файл

@ -233,7 +233,8 @@ extern int mca_pml_ob1_start(
case MCA_PML_REQUEST_SEND: \ case MCA_PML_REQUEST_SEND: \
{ \ { \
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)pml_request; \ mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)pml_request; \
if(sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \ if(sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \
sendreq->req_send.req_addr != sendreq->req_send.req_base.req_addr) { \
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \ mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \
} \ } \
MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq); \ MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq); \

Просмотреть файл

@ -31,7 +31,11 @@ extern "C" {
struct mca_pml_ob1_comm_proc_t { struct mca_pml_ob1_comm_proc_t {
opal_object_t super; opal_object_t super;
uint16_t expected_sequence; /**< send message sequence number - receiver side */ uint16_t expected_sequence; /**< send message sequence number - receiver side */
#if OMPI_HAVE_THREAD_SUPPORT
volatile int32_t send_sequence; /**< send side sequence number */ volatile int32_t send_sequence; /**< send side sequence number */
#else
int32_t send_sequence; /**< send side sequence number */
#endif
opal_list_t frags_cant_match; /**< out-of-order fragment queues */ opal_list_t frags_cant_match; /**< out-of-order fragment queues */
opal_list_t specific_receives; /**< queues of unmatched specific receives */ opal_list_t specific_receives; /**< queues of unmatched specific receives */
opal_list_t unexpected_frags; /**< unexpected fragment queues */ opal_list_t unexpected_frags; /**< unexpected fragment queues */
@ -45,7 +49,11 @@ typedef struct mca_pml_ob1_comm_proc_t mca_pml_ob1_comm_proc_t;
*/ */
struct mca_pml_comm_t { struct mca_pml_comm_t {
opal_object_t super; opal_object_t super;
#if OMPI_HAVE_THREAD_SUPPORT
volatile uint32_t recv_sequence; /**< recv request sequence number - receiver side */ volatile uint32_t recv_sequence; /**< recv request sequence number - receiver side */
#else
uint32_t recv_sequence; /**< recv request sequence number - receiver side */
#endif
opal_mutex_t matching_lock; /**< matching lock */ opal_mutex_t matching_lock; /**< matching lock */
opal_list_t wild_receives; /**< queue of unmatched wild (source process not specified) receives */ opal_list_t wild_receives; /**< queue of unmatched wild (source process not specified) receives */
mca_pml_ob1_comm_proc_t* procs; mca_pml_ob1_comm_proc_t* procs;

Просмотреть файл

@ -103,7 +103,6 @@ struct mca_pml_ob1_match_hdr_t {
mca_pml_ob1_common_hdr_t hdr_common; /**< common attributes */ mca_pml_ob1_common_hdr_t hdr_common; /**< common attributes */
uint16_t hdr_ctx; /**< communicator index */ uint16_t hdr_ctx; /**< communicator index */
int32_t hdr_src; /**< source rank */ int32_t hdr_src; /**< source rank */
int32_t hdr_dst; /**< destination rank */
int32_t hdr_tag; /**< user tag */ int32_t hdr_tag; /**< user tag */
uint16_t hdr_seq; /**< message sequence number */ uint16_t hdr_seq; /**< message sequence number */
}; };
@ -114,7 +113,6 @@ typedef struct mca_pml_ob1_match_hdr_t mca_pml_ob1_match_hdr_t;
MCA_PML_OB1_COMMON_HDR_NTOH((h).hdr_common); \ MCA_PML_OB1_COMMON_HDR_NTOH((h).hdr_common); \
(h).hdr_ctx = ntohs((h).hdr_ctx); \ (h).hdr_ctx = ntohs((h).hdr_ctx); \
(h).hdr_src = ntohl((h).hdr_src); \ (h).hdr_src = ntohl((h).hdr_src); \
(h).hdr_dst = ntohl((h).hdr_dst); \
(h).hdr_tag = ntohl((h).hdr_tag); \ (h).hdr_tag = ntohl((h).hdr_tag); \
(h).hdr_seq = ntohs((h).hdr_seq); \ (h).hdr_seq = ntohs((h).hdr_seq); \
} while (0) } while (0)
@ -124,7 +122,6 @@ typedef struct mca_pml_ob1_match_hdr_t mca_pml_ob1_match_hdr_t;
MCA_PML_OB1_COMMON_HDR_HTON((h).hdr_common); \ MCA_PML_OB1_COMMON_HDR_HTON((h).hdr_common); \
(h).hdr_ctx = htons((h).hdr_ctx); \ (h).hdr_ctx = htons((h).hdr_ctx); \
(h).hdr_src = htonl((h).hdr_src); \ (h).hdr_src = htonl((h).hdr_src); \
(h).hdr_dst = htonl((h).hdr_dst); \
(h).hdr_tag = htonl((h).hdr_tag); \ (h).hdr_tag = htonl((h).hdr_tag); \
(h).hdr_seq = htons((h).hdr_seq); \ (h).hdr_seq = htons((h).hdr_seq); \
} while (0) } while (0)

Просмотреть файл

@ -26,6 +26,7 @@
struct mca_pml_ob1_buffer_t { struct mca_pml_ob1_buffer_t {
opal_list_item_t super; opal_list_item_t super;
size_t len;
unsigned char addr[1]; unsigned char addr[1];
}; };
typedef struct mca_pml_ob1_buffer_t mca_pml_ob1_buffer_t; typedef struct mca_pml_ob1_buffer_t mca_pml_ob1_buffer_t;

Просмотреть файл

@ -34,7 +34,11 @@ struct mca_pml_ob1_recv_request_t {
mca_pml_base_recv_request_t req_recv; mca_pml_base_recv_request_t req_recv;
struct ompi_proc_t *req_proc; struct ompi_proc_t *req_proc;
ompi_ptr_t req_send; ompi_ptr_t req_send;
#if OMPI_HAVE_THREAD_SUPPORT
volatile int32_t req_lock; volatile int32_t req_lock;
#else
int32_t req_lock;
#endif
size_t req_pipeline_depth; size_t req_pipeline_depth;
size_t req_bytes_received; size_t req_bytes_received;
size_t req_bytes_delivered; size_t req_bytes_delivered;

Просмотреть файл

@ -75,7 +75,7 @@ OBJ_CLASS_INSTANCE(
* Completion of a short message - nothing left to schedule. * Completion of a short message - nothing left to schedule.
*/ */
void mca_pml_ob1_match_completion( void mca_pml_ob1_match_completion_cache(
struct mca_btl_base_module_t* btl, struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep, struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_descriptor_t* descriptor, struct mca_btl_base_descriptor_t* descriptor,
@ -101,6 +101,36 @@ void mca_pml_ob1_match_completion(
OPAL_THREAD_UNLOCK(&ompi_request_lock); OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
/**
* Completion of a short message - nothing left to schedule.
*/
void mca_pml_ob1_match_completion_free(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)descriptor->des_cbdata;
mca_bml_base_btl_t* bml_btl = (mca_bml_base_btl_t*) descriptor->des_context;
/* check completion status */
if(OMPI_SUCCESS != status) {
/* TSW - FIX */
opal_output(0, "%s:%d FATAL", __FILE__, __LINE__);
orte_errmgr.abort();
}
/* free the descriptor */
mca_bml_base_free( bml_btl, descriptor );
/* signal request completion */
OPAL_THREAD_LOCK(&ompi_request_lock);
sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed;
MCA_PML_OB1_SEND_REQUEST_COMPLETE(sendreq);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
/* /*
* Completion of the first fragment of a long message that * Completion of the first fragment of a long message that
* requires an acknowledgement * requires an acknowledgement
@ -277,7 +307,6 @@ static int mca_pml_ob1_send_request_start_rdma(
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RGET; hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RGET;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed; hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed;
@ -306,7 +335,6 @@ static int mca_pml_ob1_send_request_start_rdma(
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV; hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed; hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed;
@ -375,7 +403,6 @@ static int mca_pml_ob1_send_request_start_rndv(
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV; hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed; hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed;
@ -396,6 +423,178 @@ static int mca_pml_ob1_send_request_start_rndv(
} }
/**
* Buffer the entire message and mark as complete.
*/
int mca_pml_ob1_send_request_start_buffered(
mca_pml_ob1_send_request_t* sendreq,
mca_bml_base_btl_t* bml_btl)
{
size_t size = sendreq->req_send.req_bytes_packed;
bool ack = false;
int rc;
/* determine first fragment size */
if(size > bml_btl->btl_eager_limit - sizeof(mca_pml_ob1_rendezvous_hdr_t)) {
size = bml_btl->btl_eager_limit - sizeof(mca_pml_ob1_rendezvous_hdr_t);
ack = true;
} else if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_SYNCHRONOUS) {
ack = true;
}
/* for a short message there is no reason to do any intermediate buffering,
* simply copy into BTL's buffer
*/
if (ack == false) {
mca_btl_base_descriptor_t* descriptor;
mca_btl_base_segment_t* segment;
mca_pml_ob1_hdr_t* hdr;
struct iovec iov;
unsigned int iov_count;
size_t max_data;
int32_t free_after;
/* allocate descriptor */
mca_bml_base_alloc(bml_btl, &descriptor, sizeof(mca_pml_ob1_match_hdr_t) + size);
if(NULL == descriptor) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
segment = descriptor->des_src;
/* pack the data into the BTL supplied buffer */
iov.iov_base = (void*)((unsigned char*)segment->seg_addr.pval + sizeof(mca_pml_ob1_match_hdr_t));
iov.iov_len = size;
iov_count = 1;
max_data = size;
if((rc = ompi_convertor_pack(
&sendreq->req_send.req_convertor,
&iov,
&iov_count,
&max_data,
&free_after)) < 0) {
mca_bml_base_free(bml_btl, descriptor);
return rc;
}
/* build match header */
hdr = (mca_pml_ob1_hdr_t*)segment->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
/* update lengths */
segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t) + max_data;
sendreq->req_send_offset = max_data;
sendreq->req_rdma_offset = max_data;
/* short message */
descriptor->des_cbfunc = mca_pml_ob1_match_completion_cache;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
descriptor->des_cbdata = sendreq;
/* send */
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
if(OMPI_SUCCESS != rc) {
mca_bml_base_free(bml_btl, descriptor );
}
/* longer message - pack first fragment into BTL buffer */
} else {
mca_btl_base_descriptor_t* descriptor;
mca_btl_base_segment_t* segment;
mca_pml_ob1_hdr_t* hdr;
struct iovec iov;
unsigned int iov_count;
size_t max_data;
int32_t free_after;
/* allocate descriptor */
mca_bml_base_alloc(bml_btl, &descriptor, sizeof(mca_pml_ob1_rendezvous_hdr_t) + size);
if(NULL == descriptor) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
segment = descriptor->des_src;
/* pack the data into the BTL supplied buffer */
iov.iov_base = (void*)((unsigned char*)segment->seg_addr.pval +
sizeof(mca_pml_ob1_rendezvous_hdr_t));
iov.iov_len = size;
iov_count = 1;
max_data = size;
if((rc = ompi_convertor_pack(
&sendreq->req_send.req_convertor,
&iov,
&iov_count,
&max_data,
&free_after)) < 0) {
mca_bml_base_free(bml_btl, descriptor);
return rc;
}
/* build rendezvous header */
hdr = (mca_pml_ob1_hdr_t*)segment->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed;
hdr->hdr_rndv.hdr_src_req.pval = sendreq;
/* update lengths */
segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t) + max_data;
sendreq->req_send_offset = max_data;
descriptor->des_cbfunc = mca_pml_ob1_rndv_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
descriptor->des_cbdata = sendreq;
/* buffer the remainder of the message */
rc = mca_pml_base_bsend_request_alloc((ompi_request_t*)sendreq);
if(OMPI_SUCCESS != rc) {
mca_bml_base_free(bml_btl, descriptor);
return rc;
}
iov.iov_base = ((unsigned char*)sendreq->req_send.req_addr) + sendreq->req_send_offset;
iov.iov_len = max_data = sendreq->req_send.req_bytes_packed - sendreq->req_send_offset;
if((rc = ompi_convertor_pack(
&sendreq->req_send.req_convertor,
&iov,
&iov_count,
&max_data,
&free_after)) < 0) {
mca_bml_base_free(bml_btl, descriptor);
return rc;
}
/* re-init convertor for packed data */
ompi_convertor_prepare_for_send(
&sendreq->req_send.req_convertor,
sendreq->req_send.req_datatype,
sendreq->req_send.req_count,
sendreq->req_send.req_addr);
/* send */
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
if(OMPI_SUCCESS != rc) {
mca_bml_base_free(bml_btl, descriptor );
}
}
/* request is complete at mpi level */
ompi_request_complete((ompi_request_t*)sendreq);
return rc;
}
/** /**
* BTL requires "specially" allocated memory. Request a segment that * BTL requires "specially" allocated memory. Request a segment that
* is used for initial hdr and any eager data. * is used for initial hdr and any eager data.
@ -455,7 +654,6 @@ int mca_pml_ob1_send_request_start_copy(
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH; hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
@ -465,7 +663,7 @@ int mca_pml_ob1_send_request_start_copy(
sendreq->req_rdma_offset = max_data; sendreq->req_rdma_offset = max_data;
/* short message */ /* short message */
descriptor->des_cbfunc = mca_pml_ob1_match_completion; descriptor->des_cbfunc = mca_pml_ob1_match_completion_cache;
/* request is complete at mpi level */ /* request is complete at mpi level */
ompi_request_complete((ompi_request_t*)sendreq); ompi_request_complete((ompi_request_t*)sendreq);
@ -550,12 +748,11 @@ int mca_pml_ob1_send_request_start_prepare(
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH; hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
/* short message */ /* short message */
descriptor->des_cbfunc = mca_pml_ob1_match_completion; descriptor->des_cbfunc = mca_pml_ob1_match_completion_free;
/* update lengths */ /* update lengths */
sendreq->req_send_offset = size; sendreq->req_send_offset = size;
@ -623,7 +820,7 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
int rc; int rc;
size_t size; size_t size;
mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send); mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send);
size_t num_btl_avail = bml_endpoint->btl_rdma.arr_size; size_t num_btl_avail = bml_endpoint->btl_send.arr_size;
if(num_btl_avail == 1 || bytes_remaining < bml_btl->btl_min_send_size) { if(num_btl_avail == 1 || bytes_remaining < bml_btl->btl_min_send_size) {
size = bytes_remaining; size = bytes_remaining;
@ -641,6 +838,20 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
if (bml_btl->btl_max_send_size != 0 && if (bml_btl->btl_max_send_size != 0 &&
size > bml_btl->btl_max_send_size - sizeof(mca_pml_ob1_frag_hdr_t)) { size > bml_btl->btl_max_send_size - sizeof(mca_pml_ob1_frag_hdr_t)) {
size = bml_btl->btl_max_send_size - sizeof(mca_pml_ob1_frag_hdr_t); size = bml_btl->btl_max_send_size - sizeof(mca_pml_ob1_frag_hdr_t);
/* very expensive - need to send on a convertor boundary */
if(sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) {
ompi_convertor_t convertor;
size_t position = sendreq->req_send_offset + size;
ompi_convertor_copy_and_prepare_for_send(
&sendreq->req_send.req_convertor,
sendreq->req_send.req_base.req_datatype,
sendreq->req_send.req_base.req_count,
sendreq->req_send.req_base.req_addr,
&convertor);
ompi_convertor_set_position(&convertor, &position);
size = position - sendreq->req_send_offset;
}
} }

Просмотреть файл

@ -38,9 +38,14 @@ struct mca_pml_ob1_send_request_t {
mca_pml_base_send_request_t req_send; mca_pml_base_send_request_t req_send;
ompi_proc_t* req_proc; ompi_proc_t* req_proc;
mca_bml_base_endpoint_t* req_endpoint; mca_bml_base_endpoint_t* req_endpoint;
volatile int32_t req_state;
ompi_ptr_t req_recv; ompi_ptr_t req_recv;
#if OMPI_HAVE_THREAD_SUPPORT
volatile int32_t req_state;
volatile int32_t req_lock; volatile int32_t req_lock;
#else
volatile int32_t req_state;
volatile int32_t req_lock;
#endif
size_t req_pipeline_depth; size_t req_pipeline_depth;
size_t req_bytes_delivered; size_t req_bytes_delivered;
size_t req_send_offset; size_t req_send_offset;
@ -154,12 +159,11 @@ do {
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH; \ hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH; \
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; \ hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; \
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; \ hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; \
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer; \
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; \ hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; \
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; \ hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; \
\ \
/* short message */ \ /* short message */ \
descriptor->des_cbfunc = mca_pml_ob1_match_completion; \ descriptor->des_cbfunc = mca_pml_ob1_match_completion_cache; \
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; \ descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; \
descriptor->des_cbdata = sendreq; \ descriptor->des_cbdata = sendreq; \
\ \
@ -173,13 +177,9 @@ do {
} \ } \
\ \
} else { \ } else { \
/* handle buffered send */ \
if(sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \ if(sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \
mca_pml_base_bsend_request_start(&sendreq->req_send.req_base.req_ompi); \ rc = mca_pml_ob1_send_request_start_buffered( sendreq, bml_btl ); \
} \ } else if(bml_btl->btl_flags & MCA_BTL_FLAGS_SEND_INPLACE) { \
\
/* start request */ \
if(bml_btl->btl_flags & MCA_BTL_FLAGS_SEND_INPLACE) { \
rc = mca_pml_ob1_send_request_start_prepare( sendreq, bml_btl ); \ rc = mca_pml_ob1_send_request_start_prepare( sendreq, bml_btl ); \
} else { \ } else { \
rc = mca_pml_ob1_send_request_start_copy( sendreq, bml_btl ); \ rc = mca_pml_ob1_send_request_start_copy( sendreq, bml_btl ); \
@ -208,7 +208,8 @@ do {
} \ } \
} else if((sendreq)->req_send.req_base.req_free_called) { \ } else if((sendreq)->req_send.req_base.req_free_called) { \
MCA_PML_OB1_FREE((ompi_request_t**)&sendreq); \ MCA_PML_OB1_FREE((ompi_request_t**)&sendreq); \
} else if ((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \ } else if ((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \
(sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \ mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \
} \ } \
} }
@ -301,6 +302,10 @@ do { \
* Start the specified request * Start the specified request
*/ */
int mca_pml_ob1_send_request_start_buffered(
mca_pml_ob1_send_request_t* sendreq,
mca_bml_base_btl_t* bml_btl);
int mca_pml_ob1_send_request_start_copy( int mca_pml_ob1_send_request_start_copy(
mca_pml_ob1_send_request_t* sendreq, mca_pml_ob1_send_request_t* sendreq,
mca_bml_base_btl_t* bml_btl); mca_bml_base_btl_t* bml_btl);
@ -317,8 +322,19 @@ int mca_pml_ob1_send_request_schedule(
/** /**
* Completion callback on match header * Completion callback on match header
* Cache descriptor.
*/ */
void mca_pml_ob1_match_completion( void mca_pml_ob1_match_completion_cache(
struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_descriptor_t* descriptor,
int status);
/**
* Completion callback on match header
* Free descriptor.
*/
void mca_pml_ob1_match_completion_free(
struct mca_btl_base_module_t* btl, struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep, struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_descriptor_t* descriptor, struct mca_btl_base_descriptor_t* descriptor,