Merge pull request #2006 from hjelmn/osc_pt2pt_fix
osc/pt2pt: fix several bugs
Этот коммит содержится в:
Коммит
c082068953
@ -936,13 +936,14 @@ static inline bool ompi_osc_pt2pt_access_epoch_active (ompi_osc_pt2pt_module_t *
|
||||
static inline bool ompi_osc_pt2pt_peer_sends_active (ompi_osc_pt2pt_module_t *module, int rank)
|
||||
{
|
||||
ompi_osc_pt2pt_sync_t *sync;
|
||||
ompi_osc_pt2pt_peer_t *peer;
|
||||
|
||||
sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, NULL);
|
||||
sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, &peer);
|
||||
if (!sync) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return sync->eager_send_active;
|
||||
return sync->eager_send_active || ompi_osc_pt2pt_peer_eager_active (peer);
|
||||
}
|
||||
|
||||
END_C_DECLS
|
||||
|
@ -116,7 +116,7 @@ static inline int ompi_osc_pt2pt_put_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co
|
||||
int ret;
|
||||
|
||||
/* if we are in active target mode wait until all post messages arrive */
|
||||
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
|
||||
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
|
||||
|
||||
ret = ompi_datatype_sndrcv ((void *)source, source_count, source_datatype,
|
||||
target, target_count, target_datatype);
|
||||
@ -140,7 +140,7 @@ static inline int ompi_osc_pt2pt_get_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, vo
|
||||
int ret;
|
||||
|
||||
/* if we are in active target mode wait until all post messages arrive */
|
||||
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
|
||||
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
|
||||
|
||||
ret = ompi_datatype_sndrcv (source, source_count, source_datatype,
|
||||
target, target_count, target_datatype);
|
||||
@ -162,7 +162,7 @@ static inline int ompi_osc_pt2pt_cas_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co
|
||||
((unsigned long) target_disp * module->disp_unit);
|
||||
|
||||
/* if we are in active target mode wait until all post messages arrive */
|
||||
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
|
||||
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
|
||||
|
||||
ompi_osc_pt2pt_accumulate_lock (module);
|
||||
|
||||
@ -186,7 +186,7 @@ static inline int ompi_osc_pt2pt_acc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co
|
||||
int ret;
|
||||
|
||||
/* if we are in active target mode wait until all post messages arrive */
|
||||
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
|
||||
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
|
||||
|
||||
ompi_osc_pt2pt_accumulate_lock (module);
|
||||
|
||||
@ -336,7 +336,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
|
||||
|
||||
if (is_long_msg) {
|
||||
/* wait for eager sends to be active before starting a long put */
|
||||
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
|
||||
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
@ -495,7 +495,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
|
||||
|
||||
if (is_long_msg) {
|
||||
/* wait for synchronization before posting a long message */
|
||||
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
|
||||
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
|
||||
}
|
||||
|
||||
header = (ompi_osc_pt2pt_header_acc_t*) ptr;
|
||||
@ -802,7 +802,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
|
||||
|
||||
if (!release_req) {
|
||||
/* wait for epoch to begin before starting rget operation */
|
||||
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
|
||||
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
|
||||
}
|
||||
|
||||
header = (ompi_osc_pt2pt_header_get_t*) ptr;
|
||||
@ -968,7 +968,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
|
||||
|
||||
if (!release_req) {
|
||||
/* wait for epoch to begin before starting operation */
|
||||
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
|
||||
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
|
||||
}
|
||||
|
||||
/* optimize the self case. TODO: optimize the local case */
|
||||
|
@ -151,6 +151,37 @@ int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int targe
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_pt2pt_frag_flush_target_locked (ompi_osc_pt2pt_module_t *module, int target)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
|
||||
ompi_osc_pt2pt_frag_t *frag;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: frag flush to target target %d. queue fragments: %lu",
|
||||
target, (unsigned long) opal_list_get_size (&peer->queued_frags)));
|
||||
|
||||
/* walk through the pending list and send */
|
||||
while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) {
|
||||
ret = frag_send(module, frag);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* XXX -- TODO -- better error handling */
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* flush the active frag */
|
||||
ret = ompi_osc_pt2pt_flush_active_frag (module, peer);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: frag flush target %d finished", target));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_pt2pt_frag_flush_all (ompi_osc_pt2pt_module_t *module)
|
||||
{
|
||||
|
@ -41,9 +41,10 @@ struct ompi_osc_pt2pt_frag_t {
|
||||
typedef struct ompi_osc_pt2pt_frag_t ompi_osc_pt2pt_frag_t;
|
||||
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_frag_t);
|
||||
|
||||
extern int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer);
|
||||
extern int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target);
|
||||
extern int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module);
|
||||
int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer);
|
||||
int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target);
|
||||
int ompi_osc_pt2pt_frag_flush_target_locked(ompi_osc_pt2pt_module_t *module, int target);
|
||||
int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module);
|
||||
|
||||
static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module,
|
||||
ompi_osc_pt2pt_frag_t* buffer)
|
||||
|
@ -122,6 +122,12 @@ int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, omp
|
||||
|
||||
int ret;
|
||||
|
||||
OPAL_THREAD_LOCK(&peer->lock);
|
||||
if (ompi_osc_pt2pt_peer_locked (peer)) {
|
||||
OPAL_THREAD_UNLOCK(&peer->lock);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
(void) OPAL_THREAD_ADD32(&lock->sync_expected, 1);
|
||||
|
||||
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
|
||||
@ -137,17 +143,24 @@ int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, omp
|
||||
lock_req.lock_ptr = (uint64_t) (uintptr_t) lock;
|
||||
OSC_PT2PT_HTON(&lock_req, module, target);
|
||||
|
||||
do {
|
||||
ret = ompi_osc_pt2pt_control_send (module, target, &lock_req, sizeof (lock_req));
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
return ret;
|
||||
break;
|
||||
}
|
||||
|
||||
/* make sure the request gets sent, so we can start eager sending... */
|
||||
ret = ompi_osc_pt2pt_frag_flush_target (module, target);
|
||||
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
|
||||
ompi_osc_pt2pt_peer_set_locked (peer, true);
|
||||
ret = ompi_osc_pt2pt_frag_flush_target_locked (module, target);
|
||||
} while (0);
|
||||
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
OPAL_THREAD_ADD32(&lock->sync_expected, -1);
|
||||
}
|
||||
|
||||
ompi_osc_pt2pt_peer_set_locked (peer, true);
|
||||
|
||||
OPAL_THREAD_UNLOCK(&peer->lock);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -316,6 +329,8 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert,
|
||||
if (OPAL_UNLIKELY(NULL == lock)) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
lock->peer_list.peer = ompi_osc_pt2pt_peer_lookup (module, target);
|
||||
} else {
|
||||
lock = &module->all_sync;
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user