1
1

Merge pull request #3110 from hjelmn/osc_pt2pt

osc/pt2pt: flush pending fragments on lock ack
Этот коммит содержится в:
Nathan Hjelm 2017-03-07 14:44:09 -07:00 коммит произвёл GitHub
родитель e2ba60b778 0195d15401
Коммит 7240bee0e0
3 изменённых файлов: 54 добавлений и 7 удалений

Просмотреть файл

@ -117,16 +117,12 @@ static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, om
return ret; return ret;
} }
int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int target) int ompi_osc_pt2pt_frag_flush_pending (ompi_osc_pt2pt_module_t *module, int target)
{ {
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
ompi_osc_pt2pt_frag_t *frag; ompi_osc_pt2pt_frag_t *frag;
int ret = OMPI_SUCCESS; int ret = OMPI_SUCCESS;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: frag flush to target target %d. queue fragments: %lu",
target, (unsigned long) opal_list_get_size (&peer->queued_frags)));
/* walk through the pending list and send */ /* walk through the pending list and send */
OPAL_THREAD_LOCK(&peer->lock); OPAL_THREAD_LOCK(&peer->lock);
while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) { while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) {
@ -137,11 +133,40 @@ int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int targe
} }
OPAL_THREAD_UNLOCK(&peer->lock); OPAL_THREAD_UNLOCK(&peer->lock);
/* XXX -- TODO -- better error handling */ return ret;
}
int ompi_osc_pt2pt_frag_flush_pending_all (ompi_osc_pt2pt_module_t *module)
{
int ret;
for (int i = 0 ; i < ompi_comm_size (module->comm) ; ++i) {
ret = ompi_osc_pt2pt_frag_flush_pending (module, i);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
}
return ret;
}
int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int target)
{
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
ompi_osc_pt2pt_frag_t *frag;
int ret = OMPI_SUCCESS;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: frag flush to target target %d. queue fragments: %lu",
target, (unsigned long) opal_list_get_size (&peer->queued_frags)));
ret = ompi_osc_pt2pt_frag_flush_pending (module, target);
if (OMPI_SUCCESS != ret) { if (OMPI_SUCCESS != ret) {
/* XXX -- TODO -- better error handling */
return ret; return ret;
} }
/* flush the active frag */ /* flush the active frag */
ret = ompi_osc_pt2pt_flush_active_frag (module, peer); ret = ompi_osc_pt2pt_flush_active_frag (module, peer);

Просмотреть файл

@ -44,6 +44,8 @@ OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_frag_t);
int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer); int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer);
int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target); int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target);
int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module); int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module);
int ompi_osc_pt2pt_frag_flush_pending (ompi_osc_pt2pt_module_t *module, int target);
int ompi_osc_pt2pt_frag_flush_pending_all (ompi_osc_pt2pt_module_t *module);
static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module, static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_frag_t* buffer) ompi_osc_pt2pt_frag_t* buffer)
@ -107,7 +109,7 @@ static inline ompi_osc_pt2pt_frag_t *ompi_osc_pt2pt_frag_alloc_non_buffered (omp
* soon as it is sent. this allows request-based rma fragments to be completed * soon as it is sent. this allows request-based rma fragments to be completed
* so MPI_Test/MPI_Wait/etc will work as expected. * so MPI_Test/MPI_Wait/etc will work as expected.
*/ */
static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target, static inline int _ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target,
size_t request_len, ompi_osc_pt2pt_frag_t **buffer, size_t request_len, ompi_osc_pt2pt_frag_t **buffer,
char **ptr, bool long_send, bool buffered) char **ptr, bool long_send, bool buffered)
{ {
@ -164,4 +166,23 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target,
size_t request_len, ompi_osc_pt2pt_frag_t **buffer,
char **ptr, bool long_send, bool buffered)
{
int ret;
do {
ret = ompi_osc_pt2pt_frag_alloc (module, target, request_len , buffer, ptr, long_send, buffered);
if (OPAL_LIKELY(OMPI_SUCCESS == ret || OMPI_ERR_OUT_OF_RESOURCE != ret)) {
break;
}
ompi_osc_pt2pt_frag_flush_pending_all (module);
opal_progress ();
} while (1);
return ret;
}
#endif #endif

Просмотреть файл

@ -819,6 +819,7 @@ void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module,
assert (NULL != lock); assert (NULL != lock);
ompi_osc_pt2pt_peer_set_eager_active (peer, true); ompi_osc_pt2pt_peer_set_eager_active (peer, true);
ompi_osc_pt2pt_frag_flush_pending (module, peer->rank);
ompi_osc_pt2pt_sync_expected (lock); ompi_osc_pt2pt_sync_expected (lock);
} }