osc/pt2pt: fix synchronization bugs
The fragment flush code tries to send the active fragment before sending any queued fragments. This could cause osc messages to arrive out-of-order at the target (bad). Ensure ordering by alway sending the active fragment after sending queued fragments. This commit also fixes a bug when a synchronization message (unlock, flush, complete) can not be packed at the end of an existing active fragment. In this case the source process will end up sending 1 more fragment than claimed in the synchronization message. To fix the issue a check has been added that fixes the fragment count if this situation is detected. Signed-off-by: Nathan Hjelm <hjelmn@lanl.gov>
Этот коммит содержится в:
родитель
108bcb70b0
Коммит
80ed805a16
@ -80,18 +80,35 @@ typedef struct ompi_osc_pt2pt_component_t ompi_osc_pt2pt_component_t;
|
||||
|
||||
|
||||
struct ompi_osc_pt2pt_peer_t {
|
||||
/** Pointer to the current send fragment for each outgoing target */
|
||||
/** make this an opal object */
|
||||
opal_object_t super;
|
||||
|
||||
/** pointer to the current send fragment for each outgoing target */
|
||||
struct ompi_osc_pt2pt_frag_t *active_frag;
|
||||
|
||||
/** Number of acks pending. New requests can not be sent out if there are
|
||||
/** lock for this peer */
|
||||
opal_mutex_t lock;
|
||||
|
||||
/** fragments queued to this target */
|
||||
opal_list_t queued_frags;
|
||||
|
||||
/** number of acks pending. New requests can not be sent out if there are
|
||||
* acks pending (to fulfill the ordering constraints of accumulate) */
|
||||
uint32_t num_acks_pending;
|
||||
|
||||
/** number of fragments incomming (negative - expected, positive - unsynchronized) */
|
||||
int32_t passive_incoming_frag_count;
|
||||
|
||||
/** peer is in an access epoch */
|
||||
bool access_epoch;
|
||||
|
||||
/** eager sends are active to this peer */
|
||||
bool eager_send_active;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_peer_t ompi_osc_pt2pt_peer_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_peer_t);
|
||||
|
||||
#define SEQ_INVALID 0xFFFFFFFFFFFFFFFFULL
|
||||
|
||||
/** Module structure. Exactly one of these is associated with each
|
||||
@ -132,13 +149,6 @@ struct ompi_osc_pt2pt_module_t {
|
||||
peer. Not in peer data to make fence more manageable. */
|
||||
uint32_t *epoch_outgoing_frag_count;
|
||||
|
||||
/** Lock for queued_frags */
|
||||
opal_mutex_t queued_frags_lock;
|
||||
|
||||
/** List of full communication buffers queued to be sent. Should
|
||||
be maintained in order (at least in per-target order). */
|
||||
opal_list_t queued_frags;
|
||||
|
||||
/** cyclic counter for a unique tage for long messages. */
|
||||
unsigned int tag_counter;
|
||||
|
||||
|
@ -297,7 +297,7 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"ompi_osc_pt2pt_complete sending complete message"));
|
||||
"ompi_osc_pt2pt_complete sending complete messages"));
|
||||
|
||||
/* for each process in group, send a control message with number
|
||||
of updates coming, then start all the requests. Note that the
|
||||
@ -320,19 +320,27 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
|
||||
|
||||
peer = module->peers + ranks[i];
|
||||
|
||||
/* XXX -- TODO -- since fragment are always delivered in order we do not need to count anything but long
|
||||
* requests. once that is done this can be removed. */
|
||||
if (peer->active_frag && (peer->active_frag->remain_len < sizeof (complete_req))) {
|
||||
++complete_req.frag_count;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"ompi_osc_pt2pt_complete sending complete message to %d. frag_count: %u",
|
||||
ranks[i], complete_req.frag_count));
|
||||
|
||||
|
||||
peer->access_epoch = false;
|
||||
|
||||
ret = ompi_osc_pt2pt_control_send(module,
|
||||
ranks[i],
|
||||
&complete_req,
|
||||
sizeof(ompi_osc_pt2pt_header_complete_t));
|
||||
ret = ompi_osc_pt2pt_control_send (module, ranks[i], &complete_req,
|
||||
sizeof(ompi_osc_pt2pt_header_complete_t));
|
||||
if (OMPI_SUCCESS != ret) goto cleanup;
|
||||
|
||||
ret = ompi_osc_pt2pt_frag_flush_target (module, ranks[i]);
|
||||
if (OMPI_SUCCESS != ret) goto cleanup;
|
||||
}
|
||||
|
||||
/* start all requests */
|
||||
ret = ompi_osc_pt2pt_frag_flush_all(module);
|
||||
if (OMPI_SUCCESS != ret) goto cleanup;
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
/* zero the fragment counts here to ensure they are zerod */
|
||||
for (i = 0 ; i < ompi_group_size(module->sc_group) ; ++i) {
|
||||
|
@ -348,8 +348,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
|
||||
OBJ_CONSTRUCT(&module->lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&module->cond, opal_condition_t);
|
||||
OBJ_CONSTRUCT(&module->acc_lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&module->queued_frags, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->queued_frags_lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&module->locks_pending, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&module->outstanding_locks, opal_list_t);
|
||||
@ -398,6 +396,10 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
for (int i = 0 ; i < ompi_comm_size (comm) ; ++i) {
|
||||
OBJ_CONSTRUCT(module->peers + i, ompi_osc_pt2pt_peer_t);
|
||||
}
|
||||
|
||||
/* peer op count data */
|
||||
module->epoch_outgoing_frag_count = calloc (ompi_comm_size(comm), sizeof(uint32_t));
|
||||
if (NULL == module->epoch_outgoing_frag_count) {
|
||||
@ -497,3 +499,19 @@ ompi_osc_pt2pt_get_info(struct ompi_win_t *win, struct ompi_info_t **info_used)
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_t, opal_list_item_t, NULL, NULL);
|
||||
|
||||
void ompi_osc_pt2pt_peer_construct (ompi_osc_pt2pt_peer_t *peer)
|
||||
{
|
||||
OBJ_CONSTRUCT(&peer->queued_frags, opal_list_t);
|
||||
OBJ_CONSTRUCT(&peer->lock, opal_mutex_t);
|
||||
}
|
||||
|
||||
void ompi_osc_pt2pt_peer_destruct (ompi_osc_pt2pt_peer_t *peer)
|
||||
{
|
||||
OBJ_DESTRUCT(&peer->queued_frags);
|
||||
OBJ_DESTRUCT(&peer->lock);
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_peer_t, opal_object_t,
|
||||
ompi_osc_pt2pt_peer_construct,
|
||||
ompi_osc_pt2pt_peer_destruct);
|
||||
|
@ -49,9 +49,7 @@ static int frag_send_cb (ompi_request_t *request)
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
static int
|
||||
frag_send(ompi_osc_pt2pt_module_t *module,
|
||||
ompi_osc_pt2pt_frag_t *frag)
|
||||
static int frag_send (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *frag)
|
||||
{
|
||||
int count;
|
||||
|
||||
@ -61,32 +59,36 @@ frag_send(ompi_osc_pt2pt_module_t *module,
|
||||
"osc pt2pt: frag_send called to %d, frag = %p, count = %d",
|
||||
frag->target, (void *) frag, count));
|
||||
|
||||
/* we need to signal now that a frag is outgoing to ensure the count sent
|
||||
* with the unlock message is correct */
|
||||
ompi_osc_signal_outgoing (module, frag->target, 1);
|
||||
|
||||
return ompi_osc_pt2pt_isend_w_cb (frag->buffer, count, MPI_BYTE, frag->target, OSC_PT2PT_FRAG_TAG,
|
||||
module->comm, frag_send_cb, frag);
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module,
|
||||
ompi_osc_pt2pt_frag_t *frag)
|
||||
int ompi_osc_pt2pt_frag_start (ompi_osc_pt2pt_module_t *module,
|
||||
ompi_osc_pt2pt_frag_t *frag)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_t *peer = module->peers + frag->target;
|
||||
int ret;
|
||||
|
||||
assert(0 == frag->pending && peer->active_frag != frag);
|
||||
|
||||
/* we need to signal now that a frag is outgoing to ensure the count sent
|
||||
* with the unlock message is correct */
|
||||
ompi_osc_signal_outgoing (module, frag->target, 1);
|
||||
|
||||
/* if eager sends are not active, can't send yet, so buffer and
|
||||
get out... */
|
||||
if (!(peer->eager_send_active || module->all_access_epoch)) {
|
||||
OPAL_THREAD_SCOPED_LOCK(&module->queued_frags_lock,
|
||||
opal_list_append(&module->queued_frags, (opal_list_item_t *) frag));
|
||||
if (!(peer->eager_send_active || module->all_access_epoch) || opal_list_get_size (&peer->queued_frags)) {
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "queuing fragment to peer %d",
|
||||
frag->target));
|
||||
OPAL_THREAD_SCOPED_LOCK(&peer->lock,
|
||||
opal_list_append(&peer->queued_frags, (opal_list_item_t *) frag));
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "sending fragment to peer %d",
|
||||
frag->target));
|
||||
|
||||
ret = frag_send(module, frag);
|
||||
|
||||
opal_condition_broadcast(&module->cond);
|
||||
@ -94,105 +96,103 @@ ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module,
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target)
|
||||
static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, int target)
|
||||
{
|
||||
ompi_osc_pt2pt_frag_t *next, *frag = module->peers[target].active_frag;
|
||||
ompi_osc_pt2pt_frag_t *active_frag = module->peers[target].active_frag;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: frag flush target begin"));
|
||||
if (NULL == active_frag) {
|
||||
/* nothing to do */
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* flush the active frag */
|
||||
if (NULL != frag) {
|
||||
if (1 != frag->pending) {
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: flushing active fragment to target. pending: %d", target,
|
||||
active_frag->pending));
|
||||
|
||||
if (opal_atomic_cmpset (&module->peers[target].active_frag, active_frag, NULL)) {
|
||||
if (0 != OPAL_THREAD_ADD32(&active_frag->pending, -1)) {
|
||||
/* communication going on while synchronizing; this is an rma usage bug */
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
if (opal_atomic_cmpset (&module->peers[target].active_frag, frag, NULL)) {
|
||||
OPAL_THREAD_ADD32(&frag->pending, -1);
|
||||
ret = ompi_osc_pt2pt_frag_start(module, frag);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
ompi_osc_signal_outgoing (module, target, 1);
|
||||
ret = frag_send (module, active_frag);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int target)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_t *peer = module->peers + target;
|
||||
ompi_osc_pt2pt_frag_t *next, *frag;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: frag flush target finished active frag"));
|
||||
"osc pt2pt: frag flush to target target %d. queue fragments: %u",
|
||||
target, opal_list_get_size (&peer->queued_frags)));
|
||||
|
||||
/* walk through the pending list and send */
|
||||
OPAL_THREAD_LOCK(&module->queued_frags_lock);
|
||||
if (opal_list_get_size (&module->queued_frags)) {
|
||||
OPAL_LIST_FOREACH_SAFE(frag, next, &module->queued_frags, ompi_osc_pt2pt_frag_t) {
|
||||
if (frag->target == target) {
|
||||
opal_list_remove_item(&module->queued_frags, (opal_list_item_t *) frag);
|
||||
ret = frag_send(module, frag);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != frag)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_LOCK(&peer->lock);
|
||||
while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) {
|
||||
ret = frag_send(module, frag);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->queued_frags_lock);
|
||||
OPAL_THREAD_UNLOCK(&peer->lock);
|
||||
|
||||
/* XXX -- TODO -- better error handling */
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* flush the active frag */
|
||||
ret = ompi_osc_pt2pt_flush_active_frag (module, target);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: frag flush target finished"));
|
||||
"osc pt2pt: frag flush target %d finished", target));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module)
|
||||
int ompi_osc_pt2pt_frag_flush_all (ompi_osc_pt2pt_module_t *module)
|
||||
{
|
||||
int ret = OMPI_SUCCESS;
|
||||
int i;
|
||||
ompi_osc_pt2pt_frag_t *frag, *next;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: frag flush all begin"));
|
||||
|
||||
/* flush the active frag */
|
||||
for (i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
|
||||
ompi_osc_pt2pt_frag_t *frag = module->peers[i].active_frag;
|
||||
|
||||
if (NULL != frag) {
|
||||
if (1 != frag->pending) {
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
/* communication going on while synchronizing; this is a bug */
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
if (!opal_atomic_cmpset_ptr (&module->peers[i].active_frag, frag, NULL)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
OPAL_THREAD_ADD32(&frag->pending, -1);
|
||||
ret = ompi_osc_pt2pt_frag_start(module, frag);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: frag flush all finished active frag"));
|
||||
|
||||
/* try to start all the queued frags */
|
||||
OPAL_THREAD_LOCK(&module->queued_frags_lock);
|
||||
if (opal_list_get_size (&module->queued_frags)) {
|
||||
OPAL_LIST_FOREACH_SAFE(frag, next, &module->queued_frags, ompi_osc_pt2pt_frag_t) {
|
||||
opal_list_remove_item(&module->queued_frags, (opal_list_item_t *) frag);
|
||||
for (int i = 0 ; i < ompi_comm_size (module->comm) ; ++i) {
|
||||
ompi_osc_pt2pt_peer_t *peer = module->peers + i;
|
||||
|
||||
while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) {
|
||||
ret = frag_send(module, frag);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* XXX -- TODO -- better error handling */
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: flushing all active fragments"));
|
||||
|
||||
/* flush the active frag */
|
||||
for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
|
||||
ret = ompi_osc_pt2pt_flush_active_frag (module, i);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->queued_frags_lock);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: frag flush all done"));
|
||||
|
@ -44,15 +44,14 @@ extern int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_p
|
||||
extern int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target);
|
||||
extern int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module);
|
||||
|
||||
|
||||
/*
|
||||
* Note: module lock must be held during this operation
|
||||
*/
|
||||
static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int target,
|
||||
size_t request_len, ompi_osc_pt2pt_frag_t **buffer,
|
||||
char **ptr)
|
||||
static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target,
|
||||
size_t request_len, ompi_osc_pt2pt_frag_t **buffer,
|
||||
char **ptr)
|
||||
{
|
||||
ompi_osc_pt2pt_frag_t *curr = module->peers[target].active_frag;
|
||||
ompi_osc_pt2pt_frag_t *curr;
|
||||
int ret;
|
||||
|
||||
/* osc pt2pt headers can have 64-bit values. these will need to be aligned
|
||||
@ -65,6 +64,7 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
curr = module->peers[target].active_frag;
|
||||
if (NULL == curr || curr->remain_len < request_len) {
|
||||
opal_free_list_item_t *item = NULL;
|
||||
|
||||
|
@ -84,8 +84,6 @@ ompi_osc_pt2pt_free(ompi_win_t *win)
|
||||
* probably produce an error here instead of cleaning up */
|
||||
OPAL_LIST_DESTRUCT(&module->pending_acc);
|
||||
OPAL_LIST_DESTRUCT(&module->pending_posts);
|
||||
OPAL_LIST_DESTRUCT(&module->queued_frags);
|
||||
OBJ_DESTRUCT(&module->queued_frags_lock);
|
||||
|
||||
osc_pt2pt_gc_clean (module);
|
||||
OPAL_LIST_DESTRUCT(&module->request_gc);
|
||||
@ -93,6 +91,10 @@ ompi_osc_pt2pt_free(ompi_win_t *win)
|
||||
OBJ_DESTRUCT(&module->gc_lock);
|
||||
|
||||
if (NULL != module->peers) {
|
||||
for (int i = 0 ; i < ompi_comm_size (module->comm) ; ++i) {
|
||||
OBJ_DESTRUCT(module->peers + i);
|
||||
}
|
||||
|
||||
free(module->peers);
|
||||
}
|
||||
|
||||
|
@ -190,6 +190,7 @@ static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, i
|
||||
|
||||
static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_outstanding_lock_t *lock)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_t *peer = module->peers + target;
|
||||
ompi_osc_pt2pt_header_unlock_t unlock_req;
|
||||
int32_t frag_count = opal_atomic_swap_32 ((int32_t *) module->epoch_outgoing_frag_count + target, -1);
|
||||
|
||||
@ -199,10 +200,53 @@ static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module,
|
||||
unlock_req.lock_type = lock->type;
|
||||
unlock_req.lock_ptr = (uint64_t) (uintptr_t) lock;
|
||||
|
||||
if (peer->active_frag && peer->active_frag->remain_len < sizeof (unlock_req)) {
|
||||
/* the peer should expect one more packet */
|
||||
++unlock_req.frag_count;
|
||||
--module->epoch_outgoing_frag_count[target];
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: unlocking target %d, frag count: %d", target,
|
||||
unlock_req.frag_count));
|
||||
|
||||
/* send control message with unlock request and count */
|
||||
return ompi_osc_pt2pt_control_send (module, target, &unlock_req, sizeof (unlock_req));
|
||||
}
|
||||
|
||||
static inline int ompi_osc_pt2pt_flush_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_outstanding_lock_t *lock)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_t *peer = module->peers + target;
|
||||
ompi_osc_pt2pt_header_flush_t flush_req;
|
||||
int32_t frag_count = opal_atomic_swap_32 ((int32_t *) module->epoch_outgoing_frag_count + target, -1);
|
||||
int ret;
|
||||
|
||||
flush_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ;
|
||||
flush_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
|
||||
flush_req.frag_count = frag_count;
|
||||
flush_req.serial_number = lock->serial_number;
|
||||
|
||||
/* XXX -- TODO -- since fragment are always delivered in order we do not need to count anything but long
|
||||
* requests. once that is done this can be removed. */
|
||||
if (peer->active_frag && (peer->active_frag->remain_len < sizeof (flush_req))) {
|
||||
/* the peer should expect one more packet */
|
||||
++flush_req.frag_count;
|
||||
--module->epoch_outgoing_frag_count[target];
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "flushing to target %d, frag_count: %d",
|
||||
target, flush_req.frag_count));
|
||||
|
||||
/* send control message with unlock request and count */
|
||||
ret = ompi_osc_pt2pt_control_send (module, target, &flush_req, sizeof (flush_req));
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* start all sendreqs to target */
|
||||
return ompi_osc_pt2pt_frag_flush_target (module, target);
|
||||
}
|
||||
|
||||
static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock)
|
||||
{
|
||||
int my_rank = ompi_comm_rank (module->comm);
|
||||
@ -342,20 +386,23 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win)
|
||||
|
||||
opal_list_remove_item (&module->outstanding_locks, &lock->super);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
|
||||
"ompi_osc_pt2pt_unlock_internal: lock acks received: %d, expected: %d",
|
||||
lock->lock_acks_received, lock_acks_expected));
|
||||
|
||||
/* wait until ack has arrived from target */
|
||||
while (lock->lock_acks_received != lock_acks_expected) {
|
||||
opal_condition_wait(&module->cond, &module->lock);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
|
||||
"ompi_osc_pt2pt_unlock_internal: all lock acks received"));
|
||||
|
||||
if (lock->assert & MPI_MODE_NOCHECK) {
|
||||
/* flush instead */
|
||||
ompi_osc_pt2pt_flush_lock (module, lock, target);
|
||||
} else if (my_rank != target) {
|
||||
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: unlock %d, lock_acks_received = %d", target,
|
||||
lock->lock_acks_received));
|
||||
|
||||
if (-1 == target) {
|
||||
/* send unlock messages to all of my peers */
|
||||
for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
|
||||
@ -449,7 +496,6 @@ int ompi_osc_pt2pt_sync (struct ompi_win_t *win)
|
||||
static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock,
|
||||
int target)
|
||||
{
|
||||
ompi_osc_pt2pt_header_flush_t flush_req;
|
||||
int peer_count, ret, flush_count;
|
||||
int my_rank = ompi_comm_rank (module->comm);
|
||||
|
||||
@ -470,10 +516,6 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_
|
||||
lock->flushing = true;
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
flush_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ;
|
||||
flush_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
|
||||
flush_req.serial_number = lock->serial_number;
|
||||
|
||||
if (-1 == target) {
|
||||
/* NTH: no local flush */
|
||||
flush_count = ompi_comm_size(module->comm) - 1;
|
||||
@ -482,31 +524,16 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_
|
||||
continue;
|
||||
}
|
||||
|
||||
flush_req.frag_count = opal_atomic_swap_32 ((int32_t *) module->epoch_outgoing_frag_count + i, -1);
|
||||
|
||||
/* send control message with flush request and count */
|
||||
ret = ompi_osc_pt2pt_control_send (module, i, &flush_req, sizeof (flush_req));
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* start all sendreqs to target */
|
||||
ret = ompi_osc_pt2pt_frag_flush_target (module, i);
|
||||
ret = ompi_osc_pt2pt_flush_remote (module, i, lock);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
flush_req.frag_count = opal_atomic_swap_32 ((int32_t *) module->epoch_outgoing_frag_count + target, -1);
|
||||
flush_count = 1;
|
||||
/* send control message with flush request and count */
|
||||
ret = ompi_osc_pt2pt_control_send (module, target, &flush_req, sizeof (flush_req));
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* start all sendreqs to target */
|
||||
ret = ompi_osc_pt2pt_frag_flush_target (module, target);
|
||||
/* send control message with flush request and count */
|
||||
ret = ompi_osc_pt2pt_flush_remote (module, target, lock);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
return ret;
|
||||
}
|
||||
@ -800,7 +827,7 @@ void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module,
|
||||
ompi_osc_pt2pt_outstanding_lock_t *lock;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"ompi_osc_pt2pt_process_unlock_ack: processing lock ack from %d for lock %" PRIu64,
|
||||
"ompi_osc_pt2pt_process_lock_ack: processing lock ack from %d for lock %" PRIu64,
|
||||
lock_ack_header->source, lock_ack_header->lock_ptr));
|
||||
|
||||
lock = (ompi_osc_pt2pt_outstanding_lock_t *) (uintptr_t) lock_ack_header->lock_ptr;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user