1
1

Merge pull request #1341 from hjelmn/osc_pt2pt_fixes

osc/pt2pt: bug fixes
Этот коммит содержится в:
Nathan Hjelm 2016-02-04 19:10:03 -07:00
родитель f38ad4adf3 5b9c82a964
Коммит a1e784d76f
5 изменённых файлов: 100 добавлений и 66 удалений

Просмотреть файл

@ -631,8 +631,8 @@ static inline void osc_pt2pt_add_pending (ompi_osc_pt2pt_pending_t *pending)
opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super));
}
#define OSC_PT2PT_FRAG_TAG 0x80000
#define OSC_PT2PT_FRAG_MASK 0x7ffff
#define OSC_PT2PT_FRAG_TAG 0x10000
#define OSC_PT2PT_FRAG_MASK 0x0ffff
/**
* get_tag:

Просмотреть файл

@ -147,6 +147,7 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
/* short-circuit the noprecede case */
if (0 != (assert & MPI_MODE_NOPRECEDE)) {
module->comm->c_coll.coll_barrier (module->comm, module->comm->c_coll.coll_barrier);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: fence end (short circuit)"));
return ret;

Просмотреть файл

@ -58,6 +58,9 @@ static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request)
"ompi_osc_pt2pt_req_comm_complete called tag = %d",
request->req_status.MPI_TAG));
/* update the cbdata for ompi_osc_pt2pt_comm_complete */
request->req_complete_cb_data = pt2pt_request->module;
if (0 == OPAL_THREAD_ADD32(&pt2pt_request->outstanding_requests, -1)) {
ompi_osc_pt2pt_request_complete (pt2pt_request, request->req_status.MPI_ERROR);
}
@ -218,8 +221,8 @@ static inline int ompi_osc_pt2pt_gacc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, c
((unsigned long) target_disp * module->disp_unit);
int ret;
/* if we are in active target mode wait until all post messages arrive */
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_gacc_self: starting local "
"get accumulate"));
ompi_osc_pt2pt_accumulate_lock (module);
@ -250,6 +253,9 @@ static inline int ompi_osc_pt2pt_gacc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, c
ompi_osc_pt2pt_accumulate_unlock (module);
OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_gacc_self: local get "
"accumulate complete"));
if (request) {
/* NTH: is it ok to use an ompi error code here? */
ompi_osc_pt2pt_request_complete (request, ret);
@ -310,14 +316,14 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
payload_len = origin_dt->super.size * origin_count;
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len + payload_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, true);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, false);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
/* allocate space for the header plus space to store ddt_len */
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, false);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -469,14 +475,14 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
payload_len = origin_dt->super.size * origin_count;
frag_len = sizeof(*header) + ddt_len + payload_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, true);
if (OMPI_SUCCESS != ret) {
frag_len = sizeof(*header) + ddt_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, !request);
if (OMPI_SUCCESS != ret) {
/* allocate space for the header plus space to store ddt_len */
frag_len = sizeof(*header) + 8;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, !request);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -488,7 +494,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
tag = get_rtag (module);
}
if (is_long_msg || is_long_datatype) {
if (is_long_msg) {
/* wait for synchronization before posting a long message */
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
}
@ -631,7 +637,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar
}
frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + payload_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, false);
if (OMPI_SUCCESS != ret) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -663,9 +669,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar
return ret;
}
ret = ompi_osc_pt2pt_frag_finish(module, frag);
return ret;
return ompi_osc_pt2pt_frag_finish (module, frag);
}
@ -779,11 +783,11 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
ddt_len = ompi_datatype_pack_description_length(target_dt);
frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + ddt_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, release_req);
if (OMPI_SUCCESS != ret) {
/* allocate space for the header plus space to store ddt_len */
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, release_req);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -961,6 +965,11 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
return OMPI_SUCCESS;
}
if (!release_req) {
/* wait for epoch to begin before starting operation */
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
}
/* optimize the self case. TODO: optimize the local case */
if (ompi_comm_rank (module->comm) == target_rank) {
*request = &pt2pt_request->super;
@ -987,14 +996,14 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
}
frag_len = sizeof(*header) + ddt_len + payload_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, false);
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, false, release_req);
if (OMPI_SUCCESS != ret) {
frag_len = sizeof(*header) + ddt_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true);
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true, release_req);
if (OMPI_SUCCESS != ret) {
/* allocate space for the header plus space to store ddt_len */
frag_len = sizeof(*header) + 8;
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true);
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true, release_req);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -1014,11 +1023,6 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
/* increment the number of outgoing fragments */
ompi_osc_signal_outgoing (module, target_rank, pt2pt_request->outstanding_requests);
if (!release_req) {
/* wait for epoch to begin before starting operation */
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
}
header = (ompi_osc_pt2pt_header_acc_t *) ptr;
header->base.flags = 0;
header->len = frag_len;

Просмотреть файл

@ -213,7 +213,7 @@ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target,
char *ptr;
int ret;
ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, false);
ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, false, true);
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
memcpy (ptr, data, len);

Просмотреть файл

@ -57,16 +57,62 @@ static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module,
return OMPI_SUCCESS;
}
static inline ompi_osc_pt2pt_frag_t *ompi_osc_pt2pt_frag_alloc_non_buffered (ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_peer_t *peer,
size_t request_len)
{
ompi_osc_pt2pt_frag_t *curr;
/* to ensure ordering flush the buffer on the peer */
curr = peer->active_frag;
if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, NULL)) {
/* If there's something pending, the pending finish will
start the buffer. Otherwise, we need to start it now. */
int ret = ompi_osc_pt2pt_frag_finish (module, curr);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return NULL;
}
}
curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get (&mca_osc_pt2pt_component.frags);
if (OPAL_UNLIKELY(NULL == curr)) {
return NULL;
}
curr->target = peer->rank;
curr->header = (ompi_osc_pt2pt_frag_header_t*) curr->buffer;
curr->top = (char*) (curr->header + 1);
curr->remain_len = mca_osc_pt2pt_component.buffer_size;
curr->module = module;
curr->pending = 1;
curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG;
curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
if (module->passive_target_access_epoch) {
curr->header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
}
curr->header->source = ompi_comm_rank(module->comm);
curr->header->num_ops = 1;
return curr;
}
/*
* Note: module lock must be held during this operation
* Note: this function takes the module lock
*
* buffered sends will cache the fragment on the peer object associated with the
* target. unbuffered-sends will cause the target fragment to be flushed and
* will not be cached on the peer. this causes the fragment to be flushed as
* soon as it is sent. this allows request-based rma fragments to be completed
* so MPI_Test/MPI_Wait/etc will work as expected.
*/
static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target,
size_t request_len, ompi_osc_pt2pt_frag_t **buffer,
char **ptr, bool long_send)
char **ptr, bool long_send, bool buffered)
{
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
ompi_osc_pt2pt_frag_t *curr;
int ret;
/* osc pt2pt headers can have 64-bit values. these will need to be aligned
* on an 8-byte boundary on some architectures so we up align the allocation
@ -77,51 +123,34 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in
return OMPI_ERR_OUT_OF_RESOURCE;
}
OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output,
"attempting to allocate buffer for %lu bytes to target %d. long send: %d, "
"buffered: %d", (unsigned long) request_len, target, long_send, buffered));
OPAL_THREAD_LOCK(&module->lock);
curr = peer->active_frag;
if (NULL == curr || curr->remain_len < request_len || (long_send && curr->pending_long_sends == 32)) {
if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, NULL)) {
/* If there's something pending, the pending finish will
start the buffer. Otherwise, we need to start it now. */
ret = ompi_osc_pt2pt_frag_finish (module, curr);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
if (buffered) {
curr = peer->active_frag;
if (NULL == curr || curr->remain_len < request_len || (long_send && curr->pending_long_sends == 32)) {
curr = ompi_osc_pt2pt_frag_alloc_non_buffered (module, peer, request_len);
if (OPAL_UNLIKELY(NULL == curr)) {
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
return OMPI_ERR_OUT_OF_RESOURCE;
}
curr->pending_long_sends = long_send;
peer->active_frag = curr;
} else {
OPAL_THREAD_ADD32(&curr->header->num_ops, 1);
curr->pending_long_sends += long_send;
}
curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get (&mca_osc_pt2pt_component.frags);
OPAL_THREAD_ADD32(&curr->pending, 1);
} else {
curr = ompi_osc_pt2pt_frag_alloc_non_buffered (module, peer, request_len);
if (OPAL_UNLIKELY(NULL == curr)) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
curr->target = target;
curr->header = (ompi_osc_pt2pt_frag_header_t*) curr->buffer;
curr->top = (char*) (curr->header + 1);
curr->remain_len = mca_osc_pt2pt_component.buffer_size;
curr->module = module;
curr->pending = 2;
curr->pending_long_sends = long_send;
curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG;
curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
if (module->passive_target_access_epoch) {
curr->header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
}
curr->header->source = ompi_comm_rank(module->comm);
curr->header->num_ops = 1;
if (curr->remain_len < request_len) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
}
peer->active_frag = curr;
} else {
OPAL_THREAD_ADD32(&curr->pending, 1);
OPAL_THREAD_ADD32(&curr->header->num_ops, 1);
curr->pending_long_sends += long_send;
}
*ptr = curr->top;