1
1

Merge pull request #1177 from bosilca/topic/large_msg

Topic/large msg
Этот коммит содержится в:
bosilca 2017-09-05 13:30:19 -04:00 коммит произвёл GitHub
родитель 62739c6513 d10522a01c
Коммит dc538e9675
10 изменённых файлов: 50 добавлений и 46 удалений

Просмотреть файл

@ -55,8 +55,8 @@ struct mca_pml_ob1_t {
int free_list_num; /* initial size of free list */
int free_list_max; /* maximum size of free list */
int free_list_inc; /* number of elements to grow free list */
size_t send_pipeline_depth;
size_t recv_pipeline_depth;
int32_t send_pipeline_depth;
int32_t recv_pipeline_depth;
size_t rdma_retries_limit;
int max_rdma_per_request;
int max_send_per_range;

Просмотреть файл

@ -184,8 +184,8 @@ static int mca_pml_ob1_component_register(void)
mca_pml_ob1_param_register_int("free_list_max", -1, &mca_pml_ob1.free_list_max);
mca_pml_ob1_param_register_int("free_list_inc", 64, &mca_pml_ob1.free_list_inc);
mca_pml_ob1_param_register_int("priority", 20, &mca_pml_ob1.priority);
mca_pml_ob1_param_register_sizet("send_pipeline_depth", 3, &mca_pml_ob1.send_pipeline_depth);
mca_pml_ob1_param_register_sizet("recv_pipeline_depth", 4, &mca_pml_ob1.recv_pipeline_depth);
mca_pml_ob1_param_register_int("send_pipeline_depth", 3, &mca_pml_ob1.send_pipeline_depth);
mca_pml_ob1_param_register_int("recv_pipeline_depth", 4, &mca_pml_ob1.recv_pipeline_depth);
/* NTH: we can get into a live-lock situation in the RDMA failure path so disable
RDMA retries for now. Falling back to send may suck but it is better than

Просмотреть файл

@ -190,7 +190,7 @@ static void mca_pml_ob1_put_completion (mca_pml_ob1_rdma_frag_t *frag, int64_t r
mca_pml_ob1_recv_request_t* recvreq = (mca_pml_ob1_recv_request_t *) frag->rdma_req;
mca_bml_base_btl_t *bml_btl = frag->rdma_bml;
OPAL_THREAD_SUB_SIZE_T(&recvreq->req_pipeline_depth, 1);
OPAL_THREAD_ADD32(&recvreq->req_pipeline_depth, -1);
MCA_PML_OB1_RDMA_FRAG_RETURN(frag);
@ -198,7 +198,7 @@ static void mca_pml_ob1_put_completion (mca_pml_ob1_rdma_frag_t *frag, int64_t r
assert ((uint64_t) rdma_size == frag->rdma_length);
/* check completion status */
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, (size_t) rdma_size);
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, rdma_size);
if (recv_request_pml_complete_check(recvreq) == false &&
recvreq->req_rdma_offset < recvreq->req_send_offset) {
/* schedule additional rdma operations */
@ -951,7 +951,7 @@ int mca_pml_ob1_recv_request_schedule_once( mca_pml_ob1_recv_request_t* recvreq,
}
while(bytes_remaining > 0 &&
recvreq->req_pipeline_depth < mca_pml_ob1.recv_pipeline_depth) {
recvreq->req_pipeline_depth < mca_pml_ob1.recv_pipeline_depth) {
mca_pml_ob1_rdma_frag_t *frag = NULL;
mca_btl_base_module_t *btl;
int rc, rdma_idx;
@ -983,14 +983,10 @@ int mca_pml_ob1_recv_request_schedule_once( mca_pml_ob1_recv_request_t* recvreq,
} while(!size);
btl = bml_btl->btl;
/* NTH: This conditional used to check if there was a registration in
* recvreq->req_rdma[rdma_idx].btl_reg. If once existed it was due to
* the btl not needed registration (equivalent to btl->btl_register_mem
* != NULL. This new check is equivalent. Note: I feel this protocol
* needs work to better improve resource usage when running with a
* leave pinned protocol. */
if (btl->btl_register_mem && (btl->btl_rdma_pipeline_frag_size != 0) &&
(size > btl->btl_rdma_pipeline_frag_size)) {
/* NTH: Note: I feel this protocol needs work to better improve resource
* usage when running with a leave pinned protocol. */
/* GB: We should always abide by the BTL RDMA pipeline fragment limit (if one is set) */
if ((btl->btl_rdma_pipeline_frag_size != 0) && (size > btl->btl_rdma_pipeline_frag_size)) {
size = btl->btl_rdma_pipeline_frag_size;
}
@ -1028,7 +1024,7 @@ int mca_pml_ob1_recv_request_schedule_once( mca_pml_ob1_recv_request_t* recvreq,
if (OPAL_LIKELY(OMPI_SUCCESS == rc)) {
/* update request state */
recvreq->req_rdma_offset += size;
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_pipeline_depth, 1);
OPAL_THREAD_ADD32(&recvreq->req_pipeline_depth, 1);
recvreq->req_rdma[rdma_idx].length -= size;
bytes_remaining -= size;
} else {

Просмотреть файл

@ -41,12 +41,12 @@ BEGIN_C_DECLS
struct mca_pml_ob1_recv_request_t {
mca_pml_base_recv_request_t req_recv;
opal_ptr_t remote_req_send;
int32_t req_lock;
size_t req_pipeline_depth;
size_t req_bytes_received; /**< amount of data transferred into the user buffer */
size_t req_bytes_expected; /**< local size of the data as suggested by the user */
size_t req_rdma_offset;
size_t req_send_offset;
int32_t req_lock;
int32_t req_pipeline_depth;
size_t req_bytes_received; /**< amount of data transferred into the user buffer */
size_t req_bytes_expected; /**< local size of the data as suggested by the user */
size_t req_rdma_offset;
size_t req_send_offset;
uint32_t req_rdma_cnt;
uint32_t req_rdma_idx;
bool req_pending;

Просмотреть файл

@ -313,7 +313,7 @@ mca_pml_ob1_frag_completion( mca_btl_base_module_t* btl,
des->des_segment_count,
sizeof(mca_pml_ob1_frag_hdr_t));
OPAL_THREAD_SUB_SIZE_T(&sendreq->req_pipeline_depth, 1);
OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth, -1);
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_bytes_delivered, req_bytes_delivered);
if(send_request_pml_complete_check(sendreq) == false) {
@ -913,13 +913,13 @@ mca_pml_ob1_send_request_schedule_once(mca_pml_ob1_send_request_t* sendreq)
/* check pipeline_depth here before attempting to get any locks */
if(true == sendreq->req_throttle_sends &&
sendreq->req_pipeline_depth >= mca_pml_ob1.send_pipeline_depth)
sendreq->req_pipeline_depth >= mca_pml_ob1.send_pipeline_depth)
return OMPI_SUCCESS;
range = get_send_range(sendreq);
while(range && (false == sendreq->req_throttle_sends ||
sendreq->req_pipeline_depth < mca_pml_ob1.send_pipeline_depth)) {
sendreq->req_pipeline_depth < mca_pml_ob1.send_pipeline_depth)) {
mca_pml_ob1_frag_hdr_t* hdr;
mca_btl_base_descriptor_t* des;
int rc, btl_idx;
@ -1044,7 +1044,7 @@ cannot_pack:
range->range_btls[btl_idx].length -= size;
range->range_send_length -= size;
range->range_send_offset += size;
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth, 1);
OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth, 1);
if(range->range_send_length == 0) {
range = get_next_send_range(sendreq, range);
prev_bytes_remaining = 0;
@ -1060,7 +1060,7 @@ cannot_pack:
range->range_btls[btl_idx].length -= size;
range->range_send_length -= size;
range->range_send_offset += size;
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth, 1);
OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth, 1);
if(range->range_send_length == 0) {
range = get_next_send_range(sendreq, range);
prev_bytes_remaining = 0;

Просмотреть файл

@ -45,11 +45,11 @@ struct mca_pml_ob1_send_request_t {
mca_pml_base_send_request_t req_send;
mca_bml_base_endpoint_t* req_endpoint;
opal_ptr_t req_recv;
int32_t req_state;
int32_t req_lock;
bool req_throttle_sends;
size_t req_pipeline_depth;
size_t req_bytes_delivered;
int32_t req_state;
int32_t req_lock;
bool req_throttle_sends;
int32_t req_pipeline_depth;
size_t req_bytes_delivered;
uint32_t req_rdma_cnt;
mca_pml_ob1_send_pending_t req_pending;
opal_mutex_t req_send_range_lock;

Просмотреть файл

@ -447,7 +447,7 @@ int64_t opal_atomic_sub_64(volatile int64_t *addr, int64_t delta);
*/
#if defined(DOXYGEN) || OPAL_ENABLE_DEBUG
static inline size_t
opal_atomic_add_size_t(volatile size_t *addr, int delta)
opal_atomic_add_size_t(volatile size_t *addr, size_t delta)
{
#if SIZEOF_SIZE_T == 4
return (size_t) opal_atomic_add_32((int32_t*) addr, delta);
@ -458,7 +458,7 @@ opal_atomic_add_size_t(volatile size_t *addr, int delta)
#endif
}
static inline size_t
opal_atomic_sub_size_t(volatile size_t *addr, int delta)
opal_atomic_sub_size_t(volatile size_t *addr, size_t delta)
{
#if SIZEOF_SIZE_T == 4
return (size_t) opal_atomic_sub_32((int32_t*) addr, delta);

Просмотреть файл

@ -318,7 +318,12 @@ static int mca_btl_tcp_component_register(void)
mca_btl_tcp_module.super.btl_rndv_eager_limit = 64*1024;
mca_btl_tcp_module.super.btl_max_send_size = 128*1024;
mca_btl_tcp_module.super.btl_rdma_pipeline_send_length = 128*1024;
mca_btl_tcp_module.super.btl_rdma_pipeline_frag_size = INT_MAX;
/* Some OSes have hard coded limits on how many bytes can be manipulated
* by each writev operation. Force a reasonable limit, to prevent overflowing
* a signed 32-bit integer (limit comes from BSD and OS X). We remove 1k to
* make some room for our internal headers.
*/
mca_btl_tcp_module.super.btl_rdma_pipeline_frag_size = ((1UL<<31) - 1024);
mca_btl_tcp_module.super.btl_min_rdma_pipeline_size = 0;
mca_btl_tcp_module.super.btl_flags = MCA_BTL_FLAGS_PUT |
MCA_BTL_FLAGS_SEND_INPLACE |
@ -335,7 +340,11 @@ static int mca_btl_tcp_component_register(void)
mca_btl_base_param_register(&mca_btl_tcp_component.super.btl_version,
&mca_btl_tcp_module.super);
if (mca_btl_tcp_module.super.btl_rdma_pipeline_frag_size > ((1UL<<31) - 1024) ) {
/* Assume a hard limit. A test in configure would be a better solution, but until then
* kicking-in the pipeline RDMA for extremely large data is good enough. */
mca_btl_tcp_module.super.btl_rdma_pipeline_frag_size = ((1UL<<31) - 1024);
}
mca_btl_tcp_param_register_int ("disable_family", NULL, 0, OPAL_INFO_LVL_2, &mca_btl_tcp_component.tcp_disable_family);
return mca_btl_tcp_component_verify();

Просмотреть файл

@ -112,11 +112,11 @@ size_t mca_btl_tcp_frag_dump(mca_btl_tcp_frag_t* frag, char* msg, char* buf, siz
bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd)
{
ssize_t cnt = -1;
ssize_t cnt;
size_t i, num_vecs;
/* non-blocking write, but continue if interrupted */
while(cnt < 0) {
do {
cnt = writev(sd, frag->iov_ptr, frag->iov_cnt);
if(cnt < 0) {
switch(opal_socket_errno) {
@ -140,11 +140,11 @@ bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd)
return false;
}
}
}
} while(cnt < 0);
/* if the write didn't complete - update the iovec state */
num_vecs = frag->iov_cnt;
for(i=0; i<num_vecs; i++) {
for( i = 0; i < num_vecs; i++) {
if(cnt >= (ssize_t)frag->iov_ptr->iov_len) {
cnt -= frag->iov_ptr->iov_len;
frag->iov_ptr++;
@ -166,8 +166,8 @@ bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd)
bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
{
mca_btl_base_endpoint_t* btl_endpoint = frag->endpoint;
int i, num_vecs, dont_copy_data = 0;
ssize_t cnt;
int32_t i, num_vecs, dont_copy_data = 0;
repeat:
num_vecs = frag->iov_cnt;
@ -208,8 +208,7 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
/* non-blocking read, but continue if interrupted */
cnt = -1;
while( cnt < 0 ) {
do {
cnt = readv(sd, frag->iov_ptr, num_vecs);
if( 0 < cnt ) goto advance_iov_position;
if( cnt == 0 ) {
@ -247,7 +246,7 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
mca_btl_tcp_endpoint_close(btl_endpoint);
return false;
}
}
} while( cnt < 0 );
advance_iov_position:
/* if the read didn't complete - update the iovec state */

Просмотреть файл

@ -53,8 +53,8 @@ struct mca_btl_tcp_frag_t {
mca_btl_tcp_hdr_t hdr;
struct iovec iov[MCA_BTL_TCP_FRAG_IOVEC_NUMBER + 1];
struct iovec *iov_ptr;
size_t iov_cnt;
size_t iov_idx;
uint32_t iov_cnt;
uint32_t iov_idx;
size_t size;
uint16_t next_step;
int rc;