osc/pt2pt: threading fixes and code cleanup
Этот коммит содержится в:
родитель
3d79806805
Коммит
e68ed2876c
@ -59,7 +59,7 @@ struct ompi_osc_pt2pt_component_t {
|
||||
int module_count;
|
||||
|
||||
/** free list of ompi_osc_pt2pt_frag_t structures */
|
||||
opal_free_list_t frags;
|
||||
ompi_free_list_t frags;
|
||||
|
||||
/** Free list of requests */
|
||||
ompi_free_list_t requests;
|
||||
@ -67,17 +67,14 @@ struct ompi_osc_pt2pt_component_t {
|
||||
/** PT2PT component buffer size */
|
||||
unsigned int buffer_size;
|
||||
|
||||
/** Lock for pending_operations */
|
||||
opal_mutex_t pending_operations_lock;
|
||||
|
||||
/** List of operations that need to be processed */
|
||||
opal_list_t pending_operations;
|
||||
|
||||
/** Is the progress function enabled? */
|
||||
bool progress_enable;
|
||||
|
||||
/** List of requests that need to be freed */
|
||||
opal_list_t request_gc;
|
||||
|
||||
/** List of buffers that need to be freed */
|
||||
opal_list_t buffer_gc;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_component_t ompi_osc_pt2pt_component_t;
|
||||
|
||||
@ -89,7 +86,9 @@ struct ompi_osc_pt2pt_peer_t {
|
||||
/** Number of acks pending. New requests can not be sent out if there are
|
||||
* acks pending (to fulfill the ordering constraints of accumulate) */
|
||||
uint32_t num_acks_pending;
|
||||
int32_t passive_incoming_frag_count;
|
||||
bool access_epoch;
|
||||
bool eager_send_active;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_peer_t ompi_osc_pt2pt_peer_t;
|
||||
|
||||
@ -133,6 +132,9 @@ struct ompi_osc_pt2pt_module_t {
|
||||
peer. Not in peer data to make fence more manageable. */
|
||||
uint32_t *epoch_outgoing_frag_count;
|
||||
|
||||
/** Lock for queued_frags */
|
||||
opal_mutex_t queued_frags_lock;
|
||||
|
||||
/** List of full communication buffers queued to be sent. Should
|
||||
be maintained in order (at least in per-target order). */
|
||||
opal_list_t queued_frags;
|
||||
@ -152,9 +154,6 @@ struct ompi_osc_pt2pt_module_t {
|
||||
/* Next incoming buffer count at which we want a signal on cond */
|
||||
uint32_t active_incoming_frag_signal_count;
|
||||
|
||||
uint32_t *passive_incoming_frag_count;
|
||||
uint32_t *passive_incoming_frag_signal_count;
|
||||
|
||||
/* Number of flush ack requests send since beginning of time */
|
||||
uint64_t flush_ack_requested_count;
|
||||
/* Number of flush ack replies received since beginning of
|
||||
@ -171,8 +170,6 @@ struct ompi_osc_pt2pt_module_t {
|
||||
/** Indicates the window is in an all access epoch (fence, lock_all) */
|
||||
bool all_access_epoch;
|
||||
|
||||
bool *passive_eager_send_active;
|
||||
|
||||
/* ********************* PWSC data ************************ */
|
||||
struct ompi_group_t *pw_group;
|
||||
struct ompi_group_t *sc_group;
|
||||
@ -189,9 +186,11 @@ struct ompi_osc_pt2pt_module_t {
|
||||
|
||||
/** Status of the local window lock. One of 0 (unlocked),
|
||||
MPI_LOCK_EXCLUSIVE, or MPI_LOCK_SHARED. */
|
||||
int lock_status;
|
||||
/** number of peers who hold a shared lock on the local window */
|
||||
int32_t shared_count;
|
||||
int32_t lock_status;
|
||||
|
||||
/** lock for locks_pending list */
|
||||
opal_mutex_t locks_pending_lock;
|
||||
|
||||
/** target side list of lock requests we couldn't satisfy yet */
|
||||
opal_list_t locks_pending;
|
||||
|
||||
@ -210,6 +209,15 @@ struct ompi_osc_pt2pt_module_t {
|
||||
/* enforce pscw matching */
|
||||
/** list of unmatched post messages */
|
||||
opal_list_t pending_posts;
|
||||
|
||||
/** Lock for garbage collection lists */
|
||||
opal_mutex_t gc_lock;
|
||||
|
||||
/** List of requests that need to be freed */
|
||||
opal_list_t request_gc;
|
||||
|
||||
/** List of buffers that need to be freed */
|
||||
opal_list_t buffer_gc;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_t;
|
||||
OMPI_MODULE_DECLSPEC extern ompi_osc_pt2pt_component_t mca_osc_pt2pt_component;
|
||||
@ -431,11 +439,12 @@ static inline void mark_incoming_completion (ompi_osc_pt2pt_module_t *module, in
|
||||
opal_condition_broadcast(&module->cond);
|
||||
}
|
||||
} else {
|
||||
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"mark_incoming_completion marking passive incoming complete. source = %d, count = %d",
|
||||
source, (int) module->passive_incoming_frag_count[source] + 1));
|
||||
OPAL_THREAD_ADD32((int32_t *) (module->passive_incoming_frag_count + source), 1);
|
||||
if (module->passive_incoming_frag_count[source] >= module->passive_incoming_frag_signal_count[source]) {
|
||||
source, (int) peer->passive_incoming_frag_count + 1));
|
||||
OPAL_THREAD_ADD32((int32_t *) &peer->passive_incoming_frag_count, 1);
|
||||
if (0 == peer->passive_incoming_frag_count) {
|
||||
opal_condition_broadcast(&module->cond);
|
||||
}
|
||||
}
|
||||
@ -576,36 +585,42 @@ static inline void osc_pt2pt_copy_for_send (void *target, size_t target_len, voi
|
||||
* and buffers on the module's garbage collection lists and release then
|
||||
* at a later time.
|
||||
*/
|
||||
static inline void osc_pt2pt_gc_clean (void)
|
||||
static inline void osc_pt2pt_gc_clean (ompi_osc_pt2pt_module_t *module)
|
||||
{
|
||||
ompi_request_t *request;
|
||||
opal_list_item_t *item;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
|
||||
OPAL_THREAD_LOCK(&module->gc_lock);
|
||||
|
||||
while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&mca_osc_pt2pt_component.request_gc))) {
|
||||
while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&module->request_gc))) {
|
||||
OPAL_THREAD_UNLOCK(&module->gc_lock);
|
||||
ompi_request_free (&request);
|
||||
OPAL_THREAD_LOCK(&module->gc_lock);
|
||||
}
|
||||
|
||||
while (NULL != (item = opal_list_remove_first (&mca_osc_pt2pt_component.buffer_gc))) {
|
||||
while (NULL != (item = opal_list_remove_first (&module->buffer_gc))) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
|
||||
OPAL_THREAD_UNLOCK(&module->gc_lock);
|
||||
}
|
||||
|
||||
static inline void osc_pt2pt_gc_add_request (ompi_request_t *request)
|
||||
static inline void osc_pt2pt_gc_add_request (ompi_osc_pt2pt_module_t *module, ompi_request_t *request)
|
||||
{
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
|
||||
opal_list_append (&mca_osc_pt2pt_component.request_gc, (opal_list_item_t *) request);
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
|
||||
OPAL_THREAD_SCOPED_LOCK(&module->gc_lock,
|
||||
opal_list_append (&module->request_gc, (opal_list_item_t *) request));
|
||||
}
|
||||
|
||||
static inline void osc_pt2pt_gc_add_buffer (opal_list_item_t *buffer)
|
||||
static inline void osc_pt2pt_gc_add_buffer (ompi_osc_pt2pt_module_t *module, opal_list_item_t *buffer)
|
||||
{
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
|
||||
opal_list_append (&mca_osc_pt2pt_component.buffer_gc, buffer);
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
|
||||
OPAL_THREAD_SCOPED_LOCK(&module->gc_lock,
|
||||
opal_list_append (&module->buffer_gc, buffer));
|
||||
}
|
||||
|
||||
static inline void osc_pt2pt_add_pending (ompi_osc_pt2pt_pending_t *pending)
|
||||
{
|
||||
OPAL_THREAD_SCOPED_LOCK(&mca_osc_pt2pt_component.pending_operations_lock,
|
||||
opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super));
|
||||
}
|
||||
|
||||
#define OSC_PT2PT_FRAG_TAG 0x10000
|
||||
|
@ -122,16 +122,16 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
|
||||
|
||||
/* short-circuit the noprecede case */
|
||||
if (0 != (assert & MPI_MODE_NOPRECEDE)) {
|
||||
ret = module->comm->c_coll.coll_barrier(module->comm,
|
||||
module->comm->c_coll.coll_barrier_module);
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: fence end (short circuit)"));
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* try to start all the requests. */
|
||||
/* try to start all requests. */
|
||||
ret = ompi_osc_pt2pt_frag_flush_all(module);
|
||||
if (OMPI_SUCCESS != ret) goto cleanup;
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: fence done sending"));
|
||||
@ -141,10 +141,12 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
|
||||
&incoming_reqs, 1, MPI_UINT32_T,
|
||||
MPI_SUM, module->comm,
|
||||
module->comm->c_coll.coll_reduce_scatter_block_module);
|
||||
if (OMPI_SUCCESS != ret) goto cleanup;
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
return ret;
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
bzero(module->epoch_outgoing_frag_count,
|
||||
sizeof(uint32_t) * ompi_comm_size(module->comm));
|
||||
|
||||
@ -161,22 +163,21 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
|
||||
opal_condition_wait(&module->cond, &module->lock);
|
||||
}
|
||||
|
||||
ret = OMPI_SUCCESS;
|
||||
module->active_incoming_frag_signal_count = 0;
|
||||
|
||||
if (assert & MPI_MODE_NOSUCCEED) {
|
||||
/* as specified in MPI-3 p 438 3-5 the fence can end an epoch. it isn't explicitly
|
||||
* stated that MPI_MODE_NOSUCCEED ends the epoch but it is a safe assumption. */
|
||||
module->active_eager_send_active = false;
|
||||
module->all_access_epoch = false;
|
||||
}
|
||||
}
|
||||
opal_condition_broadcast (&module->cond);
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
cleanup:
|
||||
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: fence end: %d", ret));
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
return ret;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
@ -226,9 +227,13 @@ ompi_osc_pt2pt_start(ompi_group_t *group,
|
||||
ompi_proc_t *pending_proc = ompi_comm_peer_lookup (module->comm, pending_post->rank);
|
||||
|
||||
if (group_contains_proc (module->sc_group, pending_proc)) {
|
||||
ompi_osc_pt2pt_peer_t *peer = module->peers + pending_post->rank;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "Consumed unexpected post message from %d",
|
||||
pending_post->rank));
|
||||
++module->num_post_msgs;
|
||||
peer->eager_send_active = true;
|
||||
|
||||
opal_list_remove_item (&module->pending_posts, &pending_post->super);
|
||||
OBJ_RELEASE(pending_post);
|
||||
}
|
||||
@ -291,6 +296,7 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
|
||||
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
|
||||
opal_condition_wait(&module->cond, &module->lock);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"ompi_osc_pt2pt_complete sending complete message"));
|
||||
@ -302,7 +308,6 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
|
||||
|
||||
At the same time, clean out the outgoing count for the next
|
||||
round. */
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
for (i = 0 ; i < ompi_group_size(module->sc_group) ; ++i) {
|
||||
if (my_rank == ranks[i]) {
|
||||
/* shortcut for self */
|
||||
@ -325,15 +330,17 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
|
||||
sizeof(ompi_osc_pt2pt_header_complete_t));
|
||||
if (OMPI_SUCCESS != ret) goto cleanup;
|
||||
}
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
/* start all requests */
|
||||
ret = ompi_osc_pt2pt_frag_flush_all(module);
|
||||
if (OMPI_SUCCESS != ret) goto cleanup;
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
/* zero the fragment counts here to ensure they are zerod */
|
||||
for (i = 0 ; i < ompi_group_size(module->sc_group) ; ++i) {
|
||||
peer = module->peers + ranks[i];
|
||||
module->epoch_outgoing_frag_count[ranks[i]] = 0;
|
||||
peer->eager_send_active = false;
|
||||
}
|
||||
|
||||
/* wait for outgoing requests to complete. Don't wait for incoming, as
|
||||
@ -347,7 +354,7 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
|
||||
module->sc_group = NULL;
|
||||
|
||||
/* unlock here, as group cleanup can take a while... */
|
||||
OPAL_THREAD_UNLOCK(&(module->lock));
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
/* phase 2 cleanup group */
|
||||
ompi_group_decrement_proc_count(group);
|
||||
@ -362,8 +369,6 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
|
||||
cleanup:
|
||||
if (NULL != ranks) free(ranks);
|
||||
|
||||
OPAL_THREAD_UNLOCK(&(module->lock));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -504,7 +509,6 @@ ompi_osc_pt2pt_test(ompi_win_t *win,
|
||||
module->active_incoming_frag_count != module->active_incoming_frag_signal_count) {
|
||||
*flag = 0;
|
||||
ret = OMPI_SUCCESS;
|
||||
goto cleanup;
|
||||
} else {
|
||||
*flag = 1;
|
||||
|
||||
@ -519,7 +523,6 @@ ompi_osc_pt2pt_test(ompi_win_t *win,
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
OPAL_THREAD_UNLOCK(&(module->lock));
|
||||
|
||||
return ret;
|
||||
@ -528,6 +531,7 @@ ompi_osc_pt2pt_test(ompi_win_t *win,
|
||||
int osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source)
|
||||
{
|
||||
ompi_proc_t *source_proc = ompi_comm_peer_lookup (module->comm, source);
|
||||
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
@ -547,6 +551,9 @@ int osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source)
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
assert (!peer->eager_send_active);
|
||||
peer->eager_send_active = true;
|
||||
|
||||
module->num_post_msgs++;
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"received post message. num_post_msgs = %d", module->num_post_msgs));
|
||||
|
@ -49,14 +49,12 @@ static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request)
|
||||
|
||||
mark_outgoing_completion (module);
|
||||
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
if (0 == --pt2pt_request->outstanding_requests) {
|
||||
if (0 == OPAL_THREAD_ADD32(&pt2pt_request->outstanding_requests, -1)) {
|
||||
ompi_osc_pt2pt_request_complete (pt2pt_request, request->req_status.MPI_ERROR);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (request);
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -64,11 +62,19 @@ static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request)
|
||||
static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request)
|
||||
{
|
||||
ompi_datatype_t *datatype = (ompi_datatype_t *) request->req_complete_cb_data;
|
||||
ompi_osc_pt2pt_module_t *module = NULL;
|
||||
|
||||
OBJ_RELEASE(datatype);
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
|
||||
opal_hash_table_get_value_uint32(&mca_osc_pt2pt_component.modules,
|
||||
ompi_comm_get_cid(request->req_mpi_object.comm),
|
||||
(void **) &module);
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
|
||||
assert (NULL != module);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (request);
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -327,8 +333,6 @@ static inline int ompi_osc_pt2pt_put_w_req (void *origin_addr, int origin_count,
|
||||
payload_len = origin_dt->super.size * origin_count;
|
||||
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len + payload_len;
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len;
|
||||
@ -338,7 +342,6 @@ static inline int ompi_osc_pt2pt_put_w_req (void *origin_addr, int origin_count,
|
||||
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8;
|
||||
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
@ -352,15 +355,15 @@ static inline int ompi_osc_pt2pt_put_w_req (void *origin_addr, int origin_count,
|
||||
/* flush will be called at the end of this function. make sure the post message has
|
||||
* arrived. */
|
||||
if ((is_long_msg || request) && module->sc_group) {
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
while (0 != module->num_post_msgs) {
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
|
||||
opal_condition_wait(&module->cond, &module->lock);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: put long protocol: %d, large datatype: %d",
|
||||
(int) is_long_msg, (int) is_long_datatype));
|
||||
@ -433,15 +436,12 @@ static inline int ompi_osc_pt2pt_put_w_req (void *origin_addr, int origin_count,
|
||||
header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID;
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
ret = ompi_osc_pt2pt_frag_finish(module, frag);
|
||||
|
||||
if (request || is_long_msg) {
|
||||
/* need to flush now in case the caller decides to wait on the request */
|
||||
ompi_osc_pt2pt_frag_flush_target (module, target);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -512,8 +512,6 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count,
|
||||
ddt_len = ompi_datatype_pack_description_length(target_dt);
|
||||
payload_len = origin_dt->super.size * origin_count;
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
frag_len = sizeof(*header) + ddt_len + payload_len;
|
||||
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
@ -524,7 +522,6 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count,
|
||||
frag_len = sizeof(*header) + 8;
|
||||
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
@ -538,15 +535,15 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count,
|
||||
/* flush will be called at the end of this function. make sure the post message has
|
||||
* arrived. */
|
||||
if ((is_long_msg || request) && module->sc_group) {
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
while (0 != module->num_post_msgs) {
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
|
||||
opal_condition_wait(&module->cond, &module->lock);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
header = (ompi_osc_pt2pt_header_acc_t*) ptr;
|
||||
header->base.flags = 0;
|
||||
header->len = frag_len;
|
||||
@ -623,8 +620,6 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count,
|
||||
header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID;
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
ret = ompi_osc_pt2pt_frag_finish(module, frag);
|
||||
|
||||
if (is_long_msg || request) {
|
||||
@ -632,8 +627,6 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count,
|
||||
ompi_osc_pt2pt_frag_flush_target (module, target);
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -700,12 +693,9 @@ int ompi_osc_pt2pt_compare_and_swap (void *origin_addr, void *compare_addr,
|
||||
/* we need to send both the origin and compare buffers */
|
||||
payload_len = dt->super.size * 2;
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + payload_len;
|
||||
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
@ -733,12 +723,10 @@ int ompi_osc_pt2pt_compare_and_swap (void *origin_addr, void *compare_addr,
|
||||
ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, 1, dt, target, tag, module->comm,
|
||||
NULL, ompi_osc_pt2pt_req_comm_complete, request);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = ompi_osc_pt2pt_frag_finish(module, frag);
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -857,8 +845,6 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
|
||||
* must fit in a single frag */
|
||||
ddt_len = ompi_datatype_pack_description_length(target_dt);
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + ddt_len;
|
||||
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
@ -866,7 +852,6 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
|
||||
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8;
|
||||
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
@ -888,8 +873,6 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
header = (ompi_osc_pt2pt_header_get_t*) ptr;
|
||||
header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_GET;
|
||||
header->base.flags = 0;
|
||||
@ -936,14 +919,12 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
|
||||
*request = &pt2pt_request->super;
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
ret = ompi_osc_pt2pt_frag_finish(module, frag);
|
||||
|
||||
if (!release_req) {
|
||||
/* need to flush now in case the caller decides to wait on the request */
|
||||
ompi_osc_pt2pt_frag_flush_target (module, target);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -1087,8 +1068,6 @@ int ompi_osc_pt2pt_rget_accumulate_internal (void *origin_addr, int origin_count
|
||||
payload_len = 0;
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
frag_len = sizeof(*header) + ddt_len + payload_len;
|
||||
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
@ -1099,7 +1078,6 @@ int ompi_osc_pt2pt_rget_accumulate_internal (void *origin_addr, int origin_count
|
||||
frag_len = sizeof(*header) + 8;
|
||||
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
@ -1121,15 +1099,15 @@ int ompi_osc_pt2pt_rget_accumulate_internal (void *origin_addr, int origin_count
|
||||
/* flush will be called at the end of this function. make sure the post message has
|
||||
* arrived. */
|
||||
if (!release_req && module->sc_group) {
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
while (0 != module->num_post_msgs) {
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
|
||||
opal_condition_wait(&module->cond, &module->lock);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
header = (ompi_osc_pt2pt_header_acc_t *) ptr;
|
||||
header->base.flags = 0;
|
||||
header->len = frag_len;
|
||||
@ -1191,14 +1169,12 @@ int ompi_osc_pt2pt_rget_accumulate_internal (void *origin_addr, int origin_count
|
||||
*request = (ompi_request_t *) pt2pt_request;
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
ret = ompi_osc_pt2pt_frag_finish(module, frag);
|
||||
|
||||
if (!release_req) {
|
||||
/* need to flush now in case the caller decides to wait on the request */
|
||||
ompi_osc_pt2pt_frag_flush_target (module, target_rank);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -200,14 +200,15 @@ component_register(void)
|
||||
|
||||
static int component_progress (void)
|
||||
{
|
||||
int count = opal_list_get_size (&mca_osc_pt2pt_component.pending_operations);
|
||||
ompi_osc_pt2pt_pending_t *pending, *next;
|
||||
|
||||
if (0 == opal_list_get_size (&mca_osc_pt2pt_component.pending_operations)) {
|
||||
if (0 == count) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* process one incoming request */
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_operations_lock);
|
||||
OPAL_LIST_FOREACH_SAFE(pending, next, &mca_osc_pt2pt_component.pending_operations, ompi_osc_pt2pt_pending_t) {
|
||||
int ret;
|
||||
|
||||
@ -231,7 +232,7 @@ static int component_progress (void)
|
||||
OBJ_RELEASE(pending);
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_operations_lock);
|
||||
|
||||
return 1;
|
||||
}
|
||||
@ -244,8 +245,7 @@ component_init(bool enable_progress_threads,
|
||||
|
||||
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.request_gc, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.buffer_gc, opal_list_t);
|
||||
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations_lock, opal_mutex_t);
|
||||
|
||||
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.modules,
|
||||
opal_hash_table_t);
|
||||
@ -254,11 +254,13 @@ component_init(bool enable_progress_threads,
|
||||
mca_osc_pt2pt_component.progress_enable = false;
|
||||
mca_osc_pt2pt_component.module_count = 0;
|
||||
|
||||
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.frags, opal_free_list_t);
|
||||
ret = opal_free_list_init(&mca_osc_pt2pt_component.frags,
|
||||
sizeof(ompi_osc_pt2pt_frag_t),
|
||||
OBJ_CLASS(ompi_osc_pt2pt_frag_t),
|
||||
1, -1, 1);
|
||||
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.frags, ompi_free_list_t);
|
||||
ret = ompi_free_list_init_new (&mca_osc_pt2pt_component.frags,
|
||||
sizeof(ompi_osc_pt2pt_frag_t), 8,
|
||||
OBJ_CLASS(ompi_osc_pt2pt_frag_t),
|
||||
mca_osc_pt2pt_component.buffer_size +
|
||||
sizeof (ompi_osc_pt2pt_frag_header_t),
|
||||
8, 1, -1, 1, 0);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
|
||||
"%s:%d: ompi_free_list_init failed: %d",
|
||||
@ -303,8 +305,7 @@ component_finalize(void)
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.lock);
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.requests);
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations);
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.request_gc);
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.buffer_gc);
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations_lock);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -329,17 +330,11 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
|
||||
ompi_osc_pt2pt_module_t *module = NULL;
|
||||
int ret;
|
||||
char *name;
|
||||
bool no_locks = false;
|
||||
|
||||
/* We don't support shared windows; that's for the sm onesided
|
||||
component */
|
||||
if (MPI_WIN_FLAVOR_SHARED == flavor) return OMPI_ERR_NOT_SUPPORTED;
|
||||
|
||||
if (check_config_value_bool("no_locks", info)) {
|
||||
no_locks = true;
|
||||
ompi_osc_pt2pt_no_locks = true;
|
||||
}
|
||||
|
||||
/* create module structure with all fields initialized to zero */
|
||||
module = (ompi_osc_pt2pt_module_t*)
|
||||
calloc(1, sizeof(ompi_osc_pt2pt_module_t));
|
||||
@ -354,10 +349,15 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
|
||||
OBJ_CONSTRUCT(&module->cond, opal_condition_t);
|
||||
OBJ_CONSTRUCT(&module->acc_lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&module->queued_frags, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->queued_frags_lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&module->locks_pending, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&module->outstanding_locks, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->pending_acc, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->pending_posts, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->request_gc, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t);
|
||||
OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t);
|
||||
|
||||
/* options */
|
||||
/* FIX ME: should actually check this value... */
|
||||
@ -405,20 +405,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (!no_locks) {
|
||||
module->passive_incoming_frag_count = calloc(ompi_comm_size(comm), sizeof(uint32_t));
|
||||
if (NULL == module->passive_incoming_frag_count) {
|
||||
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
module->passive_incoming_frag_signal_count = calloc(ompi_comm_size(comm), sizeof(uint32_t));
|
||||
if (NULL == module->passive_incoming_frag_signal_count) {
|
||||
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
/* the statement below (from Brian) does not seem correct so disable active target on the
|
||||
* window. if this end up being incorrect please revert this one change */
|
||||
module->active_eager_send_active = false;
|
||||
@ -429,14 +415,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
|
||||
module->active_eager_send_active = true;
|
||||
#endif
|
||||
|
||||
if (!no_locks) {
|
||||
module->passive_eager_send_active = malloc(sizeof(bool) * ompi_comm_size(comm));
|
||||
if (NULL == module->passive_eager_send_active) {
|
||||
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
/* lock data */
|
||||
if (check_config_value_bool("no_locks", info)) {
|
||||
win->w_flags |= OMPI_WIN_NO_LOCKS;
|
||||
|
@ -26,7 +26,6 @@
|
||||
#include "osc_pt2pt.h"
|
||||
#include "osc_pt2pt_header.h"
|
||||
#include "osc_pt2pt_data_move.h"
|
||||
#include "osc_pt2pt_obj_convert.h"
|
||||
#include "osc_pt2pt_frag.h"
|
||||
#include "osc_pt2pt_request.h"
|
||||
|
||||
@ -225,8 +224,6 @@ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target,
|
||||
char *ptr;
|
||||
int ret;
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr);
|
||||
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
|
||||
memcpy (ptr, data, len);
|
||||
@ -234,8 +231,6 @@ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target,
|
||||
ret = ompi_osc_pt2pt_frag_finish(module, frag);
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -254,7 +249,7 @@ static int ompi_osc_pt2pt_control_send_unbuffered_cb (ompi_request_t *request)
|
||||
free (ctx);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (request);
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -453,7 +448,7 @@ static int osc_pt2pt_incoming_req_complete (ompi_request_t *request)
|
||||
mark_incoming_completion (module, rank);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (request);
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -476,7 +471,7 @@ static int osc_pt2pt_get_post_send_cb (ompi_request_t *request)
|
||||
mark_incoming_completion (module, rank);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (request);
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -683,8 +678,7 @@ static int accumulate_cb (ompi_request_t *request)
|
||||
|
||||
mark_incoming_completion (module, rank);
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
if (0 == --acc_data->request_count) {
|
||||
if (0 == OPAL_THREAD_ADD32(&acc_data->request_count, -1)) {
|
||||
/* no more requests needed before the buffer can be accumulated */
|
||||
|
||||
if (acc_data->source) {
|
||||
@ -695,13 +689,11 @@ static int accumulate_cb (ompi_request_t *request)
|
||||
/* drop the accumulate lock */
|
||||
ompi_osc_pt2pt_accumulate_unlock (module);
|
||||
|
||||
osc_pt2pt_gc_add_buffer (&acc_data->super);
|
||||
osc_pt2pt_gc_add_buffer (module, &acc_data->super);
|
||||
}
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (request);
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -710,6 +702,7 @@ static int accumulate_cb (ompi_request_t *request)
|
||||
static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_t *header, int source,
|
||||
char *data, size_t data_len, ompi_datatype_t *datatype)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
|
||||
osc_pt2pt_pending_acc_t *pending_acc;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
@ -720,11 +713,9 @@ static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_os
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
if (!ompi_osc_pt2pt_no_locks) {
|
||||
/* NTH: ensure we don't leave wait/process_flush/etc until this
|
||||
* accumulate operation is complete. */
|
||||
module->passive_incoming_frag_signal_count[source]++;
|
||||
}
|
||||
/* NTH: ensure we don't leave wait/process_flush/etc until this
|
||||
* accumulate operation is complete. */
|
||||
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1);
|
||||
|
||||
pending_acc->source = source;
|
||||
|
||||
@ -757,9 +748,7 @@ static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_os
|
||||
}
|
||||
|
||||
/* add to the pending acc queue */
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
opal_list_append (&module->pending_acc, &pending_acc->super);
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
OPAL_THREAD_SCOPED_LOCK(&module->lock, opal_list_append (&module->pending_acc, &pending_acc->super));
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -776,7 +765,7 @@ static int replace_cb (ompi_request_t *request)
|
||||
mark_incoming_completion (module, rank);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (request);
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
/* unlock the accumulate lock */
|
||||
ompi_osc_pt2pt_accumulate_unlock (module);
|
||||
@ -1143,10 +1132,8 @@ int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module)
|
||||
assert (0);
|
||||
}
|
||||
|
||||
if (!ompi_osc_pt2pt_no_locks) {
|
||||
/* signal that an operation is complete */
|
||||
mark_incoming_completion (module, pending_acc->source);
|
||||
}
|
||||
/* signal that an operation is complete */
|
||||
mark_incoming_completion (module, pending_acc->source);
|
||||
|
||||
pending_acc->data = NULL;
|
||||
OBJ_RELEASE(pending_acc);
|
||||
@ -1340,18 +1327,15 @@ static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source,
|
||||
source, complete_header->frag_count, module->active_incoming_frag_signal_count,
|
||||
module->active_incoming_frag_count));
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
/* the current fragment is not part of the frag_count so we need to add it here */
|
||||
module->active_incoming_frag_signal_count += complete_header->frag_count + 1;
|
||||
module->num_complete_msgs++;
|
||||
OPAL_THREAD_ADD32((int32_t *) &module->active_incoming_frag_signal_count,
|
||||
complete_header->frag_count + 1);
|
||||
|
||||
if (0 == module->num_complete_msgs) {
|
||||
|
||||
if (0 == OPAL_THREAD_ADD32((int32_t *) &module->num_complete_msgs, 1)) {
|
||||
opal_condition_broadcast (&module->cond);
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
return sizeof (*complete_header);
|
||||
}
|
||||
|
||||
@ -1361,17 +1345,18 @@ static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source,
|
||||
static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source,
|
||||
ompi_osc_pt2pt_header_flush_t *flush_header)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
|
||||
int ret;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"process_flush header = {.frag_count = %d}", flush_header->frag_count));
|
||||
|
||||
/* increase signal count by incoming frags */
|
||||
module->passive_incoming_frag_signal_count[source] += flush_header->frag_count;
|
||||
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -(int32_t) flush_header->frag_count);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"%d: process_flush: received message from %d. passive_incoming_frag_signal_count = %d, passive_incoming_frag_count = %d",
|
||||
ompi_comm_rank(module->comm), source, module->passive_incoming_frag_signal_count[source], module->passive_incoming_frag_count[source]));
|
||||
"%d: process_flush: received message from %d. passive_incoming_frag_count = %d",
|
||||
ompi_comm_rank(module->comm), source, peer->passive_incoming_frag_count));
|
||||
|
||||
ret = ompi_osc_pt2pt_process_flush (module, source, flush_header);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
@ -1382,35 +1367,30 @@ static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source,
|
||||
pending->source = source;
|
||||
pending->header.flush = *flush_header;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
|
||||
opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super);
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
|
||||
|
||||
/* we now have to count the current fragment */
|
||||
module->passive_incoming_frag_signal_count[source]++;
|
||||
} else {
|
||||
/* need to account for the current fragment */
|
||||
module->passive_incoming_frag_count[source] = -1;
|
||||
osc_pt2pt_add_pending (pending);
|
||||
}
|
||||
|
||||
/* signal incomming will increment this counter */
|
||||
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1);
|
||||
|
||||
return sizeof (*flush_header);
|
||||
}
|
||||
|
||||
static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source,
|
||||
ompi_osc_pt2pt_header_unlock_t *unlock_header)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
|
||||
int ret;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"process_unlock header = {.frag_count = %d}", unlock_header->frag_count));
|
||||
|
||||
/* increase signal count by incoming frags */
|
||||
module->passive_incoming_frag_signal_count[source] += unlock_header->frag_count;
|
||||
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -(int32_t) unlock_header->frag_count);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: processing unlock request from %d. frag count = %d, signal_count = %d, processed_count = %d",
|
||||
source, unlock_header->frag_count, (int) module->passive_incoming_frag_signal_count[source],
|
||||
(int) module->passive_incoming_frag_count[source]));
|
||||
"osc pt2pt: processing unlock request from %d. frag count = %d, processed_count = %d",
|
||||
source, unlock_header->frag_count, (int) peer->passive_incoming_frag_count));
|
||||
|
||||
ret = ompi_osc_pt2pt_process_unlock (module, source, unlock_header);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
@ -1421,17 +1401,12 @@ static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source,
|
||||
pending->source = source;
|
||||
pending->header.unlock = *unlock_header;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
|
||||
opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super);
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
|
||||
|
||||
/* we now have to count the current fragment */
|
||||
module->passive_incoming_frag_signal_count[source]++;
|
||||
} else {
|
||||
/* need to account for the current fragment */
|
||||
module->passive_incoming_frag_count[source] = -1;
|
||||
osc_pt2pt_add_pending (pending);
|
||||
}
|
||||
|
||||
/* signal incomming will increment this counter */
|
||||
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1);
|
||||
|
||||
return sizeof (*unlock_header);
|
||||
}
|
||||
|
||||
@ -1463,10 +1438,10 @@ static int process_large_datatype_request_cb (ompi_request_t *request)
|
||||
}
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (request);
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
/* free the datatype buffer */
|
||||
osc_pt2pt_gc_add_buffer (&ddt_buffer->super);
|
||||
osc_pt2pt_gc_add_buffer (module, &ddt_buffer->super);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -1679,10 +1654,6 @@ static int ompi_osc_pt2pt_callback (ompi_request_t *request)
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"finished processing incoming messages"));
|
||||
|
||||
/* restart the receive request */
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
|
||||
/* post messages come unbuffered and should NOT increment the incoming completion
|
||||
* counters */
|
||||
if (OMPI_OSC_PT2PT_HDR_TYPE_POST != base_header->type) {
|
||||
@ -1690,14 +1661,12 @@ static int ompi_osc_pt2pt_callback (ompi_request_t *request)
|
||||
source : MPI_PROC_NULL);
|
||||
}
|
||||
|
||||
osc_pt2pt_gc_clean ();
|
||||
osc_pt2pt_gc_clean (module);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (request);
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
ompi_osc_pt2pt_frag_start_receive (module);
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
@ -1734,20 +1703,15 @@ isend_completion_cb(ompi_request_t *request)
|
||||
mark_outgoing_completion(module);
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (request);
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
ompi_osc_pt2pt_component_isend(ompi_osc_pt2pt_module_t *module,
|
||||
void *buf,
|
||||
size_t count,
|
||||
struct ompi_datatype_t *datatype,
|
||||
int dest,
|
||||
int tag,
|
||||
struct ompi_communicator_t *comm)
|
||||
int ompi_osc_pt2pt_component_isend (ompi_osc_pt2pt_module_t *module, void *buf,
|
||||
size_t count, struct ompi_datatype_t *datatype,
|
||||
int dest, int tag, struct ompi_communicator_t *comm)
|
||||
{
|
||||
return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, comm,
|
||||
isend_completion_cb, module);
|
||||
|
@ -21,18 +21,11 @@
|
||||
#include "osc_pt2pt_data_move.h"
|
||||
|
||||
static void ompi_osc_pt2pt_frag_constructor (ompi_osc_pt2pt_frag_t *frag){
|
||||
frag->buffer = malloc (mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t));
|
||||
assert (frag->buffer);
|
||||
frag->buffer = frag->super.ptr;
|
||||
}
|
||||
|
||||
static void ompi_osc_pt2pt_frag_destructor (ompi_osc_pt2pt_frag_t *frag) {
|
||||
if (NULL != frag->buffer) {
|
||||
free (frag->buffer);
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_frag_t, opal_list_item_t,
|
||||
ompi_osc_pt2pt_frag_constructor, ompi_osc_pt2pt_frag_destructor);
|
||||
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_frag_t, ompi_free_list_item_t,
|
||||
ompi_osc_pt2pt_frag_constructor, NULL);
|
||||
|
||||
static int frag_send_cb (ompi_request_t *request)
|
||||
{
|
||||
@ -45,11 +38,11 @@ static int frag_send_cb (ompi_request_t *request)
|
||||
frag->target, (void *) frag, (void *) request));
|
||||
|
||||
mark_outgoing_completion(module);
|
||||
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.frags, &frag->super);
|
||||
OMPI_FREE_LIST_RETURN_MT(&mca_osc_pt2pt_component.frags, &frag->super);
|
||||
|
||||
|
||||
/* put this request on the garbage colletion list */
|
||||
osc_pt2pt_gc_add_request (request);
|
||||
osc_pt2pt_gc_add_request (module, request);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -77,12 +70,12 @@ frag_send(ompi_osc_pt2pt_module_t *module,
|
||||
|
||||
int
|
||||
ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module,
|
||||
ompi_osc_pt2pt_frag_t *frag)
|
||||
ompi_osc_pt2pt_frag_t *frag)
|
||||
{
|
||||
ompi_osc_pt2pt_peer_t *peer = module->peers + frag->target;
|
||||
int ret;
|
||||
|
||||
assert(0 == frag->pending);
|
||||
assert(module->peers[frag->target].active_frag != frag);
|
||||
assert(0 == frag->pending && peer->active_frag != frag);
|
||||
|
||||
/* we need to signal now that a frag is outgoing to ensure the count sent
|
||||
* with the unlock message is correct */
|
||||
@ -90,16 +83,10 @@ ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module,
|
||||
|
||||
/* if eager sends are not active, can't send yet, so buffer and
|
||||
get out... */
|
||||
if (module->passive_target_access_epoch) {
|
||||
if (!module->passive_eager_send_active[frag->target]) {
|
||||
opal_list_append(&module->queued_frags, &frag->super);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
if (!module->active_eager_send_active) {
|
||||
opal_list_append(&module->queued_frags, &frag->super);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
if (!(peer->eager_send_active || module->all_access_epoch)) {
|
||||
OPAL_THREAD_SCOPED_LOCK(&module->queued_frags_lock,
|
||||
opal_list_append(&module->queued_frags, (opal_list_item_t *) frag));
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
ret = frag_send(module, frag);
|
||||
@ -113,43 +100,50 @@ ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module,
|
||||
int
|
||||
ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target)
|
||||
{
|
||||
ompi_osc_pt2pt_frag_t *next, *frag = module->peers[target].active_frag;
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: frag flush target begin"));
|
||||
|
||||
/* flush the active frag */
|
||||
if (NULL != module->peers[target].active_frag) {
|
||||
ompi_osc_pt2pt_frag_t *frag = module->peers[target].active_frag;
|
||||
|
||||
if (0 != frag->pending) {
|
||||
/* communication going on while synchronizing; this is a bug */
|
||||
if (NULL != frag) {
|
||||
if (1 != frag->pending) {
|
||||
/* communication going on while synchronizing; this is an rma usage bug */
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
module->peers[target].active_frag = NULL;
|
||||
|
||||
ret = ompi_osc_pt2pt_frag_start(module, frag);
|
||||
if (OMPI_SUCCESS != ret) return ret;
|
||||
if (opal_atomic_cmpset (&module->peers[target].active_frag, frag, NULL)) {
|
||||
OPAL_THREAD_ADD32(&frag->pending, -1);
|
||||
ret = ompi_osc_pt2pt_frag_start(module, frag);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: frag flush target finished active frag"));
|
||||
|
||||
/* walk through the pending list and send */
|
||||
ompi_osc_pt2pt_frag_t *frag, *next;
|
||||
OPAL_LIST_FOREACH_SAFE(frag, next, &module->queued_frags, ompi_osc_pt2pt_frag_t) {
|
||||
if (frag->target == target) {
|
||||
opal_list_remove_item(&module->queued_frags, &frag->super);
|
||||
ret = frag_send(module, frag);
|
||||
if (OMPI_SUCCESS != ret) return ret;
|
||||
OPAL_THREAD_LOCK(&module->queued_frags_lock);
|
||||
if (opal_list_get_size (&module->queued_frags)) {
|
||||
OPAL_LIST_FOREACH_SAFE(frag, next, &module->queued_frags, ompi_osc_pt2pt_frag_t) {
|
||||
if (frag->target == target) {
|
||||
opal_list_remove_item(&module->queued_frags, (opal_list_item_t *) frag);
|
||||
ret = frag_send(module, frag);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != frag)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->queued_frags_lock);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: frag flush target finished"));
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -165,18 +159,24 @@ ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module)
|
||||
|
||||
/* flush the active frag */
|
||||
for (i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
|
||||
if (NULL != module->peers[i].active_frag) {
|
||||
ompi_osc_pt2pt_frag_t *frag = module->peers[i].active_frag;
|
||||
ompi_osc_pt2pt_frag_t *frag = module->peers[i].active_frag;
|
||||
|
||||
if (0 != frag->pending) {
|
||||
if (NULL != frag) {
|
||||
if (1 != frag->pending) {
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
/* communication going on while synchronizing; this is a bug */
|
||||
return OMPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
module->peers[i].active_frag = NULL;
|
||||
if (!opal_atomic_cmpset_ptr (&module->peers[i].active_frag, frag, NULL)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
OPAL_THREAD_ADD32(&frag->pending, -1);
|
||||
ret = ompi_osc_pt2pt_frag_start(module, frag);
|
||||
if (OMPI_SUCCESS != ret) return ret;
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -184,18 +184,20 @@ ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module)
|
||||
"osc pt2pt: frag flush all finished active frag"));
|
||||
|
||||
/* try to start all the queued frags */
|
||||
OPAL_LIST_FOREACH_SAFE(frag, next, &module->queued_frags, ompi_osc_pt2pt_frag_t) {
|
||||
opal_list_remove_item(&module->queued_frags, &frag->super);
|
||||
ret = frag_send(module, frag);
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: failure for frag send: %d", ret));
|
||||
return ret;
|
||||
OPAL_THREAD_LOCK(&module->queued_frags_lock);
|
||||
if (opal_list_get_size (&module->queued_frags)) {
|
||||
OPAL_LIST_FOREACH_SAFE(frag, next, &module->queued_frags, ompi_osc_pt2pt_frag_t) {
|
||||
opal_list_remove_item(&module->queued_frags, (opal_list_item_t *) frag);
|
||||
ret = frag_send(module, frag);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->queued_frags_lock);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
||||
"osc pt2pt: frag flush all done"));
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
return ret;
|
||||
}
|
||||
|
@ -21,7 +21,7 @@
|
||||
|
||||
/** Communication buffer for packing messages */
|
||||
struct ompi_osc_pt2pt_frag_t {
|
||||
opal_list_item_t super;
|
||||
ompi_free_list_item_t super;
|
||||
/* target rank of buffer */
|
||||
int target;
|
||||
unsigned char *buffer;
|
||||
@ -33,7 +33,7 @@ struct ompi_osc_pt2pt_frag_t {
|
||||
char *top;
|
||||
|
||||
/* Number of operations which have started writing into the frag, but not yet completed doing so */
|
||||
int pending;
|
||||
int32_t pending;
|
||||
ompi_osc_pt2pt_frag_header_t *header;
|
||||
ompi_osc_pt2pt_module_t *module;
|
||||
};
|
||||
@ -64,22 +64,29 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(&module->lock);
|
||||
if (NULL == curr || curr->remain_len < request_len) {
|
||||
opal_free_list_item_t *item;
|
||||
ompi_free_list_item_t *item = NULL;
|
||||
|
||||
if (NULL != curr) {
|
||||
curr->remain_len = 0;
|
||||
module->peers[target].active_frag = NULL;
|
||||
opal_atomic_mb ();
|
||||
|
||||
/* If there's something pending, the pending finish will
|
||||
start the buffer. Otherwise, we need to start it now. */
|
||||
if (0 == curr->pending) {
|
||||
module->peers[target].active_frag = NULL;
|
||||
if (0 == OPAL_THREAD_ADD32(&curr->pending, -1)) {
|
||||
ret = ompi_osc_pt2pt_frag_start(module, curr);
|
||||
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.frags,
|
||||
item, ret);
|
||||
if (OMPI_SUCCESS != ret) return ret;
|
||||
OMPI_FREE_LIST_GET_MT(&mca_osc_pt2pt_component.frags, item);
|
||||
if (OPAL_UNLIKELY(NULL == item)) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
curr = module->peers[target].active_frag =
|
||||
(ompi_osc_pt2pt_frag_t*) item;
|
||||
|
||||
@ -89,7 +96,7 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int
|
||||
curr->top = (char*) (curr->header + 1);
|
||||
curr->remain_len = mca_osc_pt2pt_component.buffer_size;
|
||||
curr->module = module;
|
||||
curr->pending = 0;
|
||||
curr->pending = 1;
|
||||
|
||||
curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG;
|
||||
curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
|
||||
@ -101,6 +108,7 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int
|
||||
curr->header->windx = ompi_comm_get_cid(module->comm);
|
||||
|
||||
if (curr->remain_len < request_len) {
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
}
|
||||
}
|
||||
@ -110,8 +118,10 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int
|
||||
|
||||
curr->top += request_len;
|
||||
curr->remain_len -= request_len;
|
||||
curr->pending++;
|
||||
curr->header->num_ops++;
|
||||
OPAL_THREAD_UNLOCK(&module->lock);
|
||||
|
||||
OPAL_THREAD_ADD32(&curr->pending, 1);
|
||||
OPAL_THREAD_ADD32(&curr->header->num_ops, 1);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -123,12 +133,7 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int
|
||||
static inline int ompi_osc_pt2pt_frag_finish(ompi_osc_pt2pt_module_t *module,
|
||||
ompi_osc_pt2pt_frag_t* buffer)
|
||||
{
|
||||
if (0 == --buffer->pending && 0 == buffer->remain_len) {
|
||||
if (OPAL_LIKELY(buffer == module->peers[buffer->target].active_frag)) {
|
||||
/* this is the active fragment. need to set the current fragment to null
|
||||
* or it will be started multiple times */
|
||||
module->peers[buffer->target].active_frag = NULL;
|
||||
}
|
||||
if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) {
|
||||
return ompi_osc_pt2pt_frag_start(module, buffer);
|
||||
}
|
||||
|
||||
|
@ -120,7 +120,7 @@ typedef struct ompi_osc_pt2pt_header_post_t ompi_osc_pt2pt_header_post_t;
|
||||
struct ompi_osc_pt2pt_header_lock_t {
|
||||
ompi_osc_pt2pt_header_base_t base;
|
||||
int32_t lock_type;
|
||||
uint64_t serial_number;
|
||||
uint64_t lock_ptr;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_header_lock_t ompi_osc_pt2pt_header_lock_t;
|
||||
|
||||
@ -128,7 +128,7 @@ struct ompi_osc_pt2pt_header_lock_ack_t {
|
||||
ompi_osc_pt2pt_header_base_t base;
|
||||
uint16_t windx;
|
||||
uint32_t source;
|
||||
uint64_t serial_number;
|
||||
uint64_t lock_ptr;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_header_lock_ack_t ompi_osc_pt2pt_header_lock_ack_t;
|
||||
|
||||
@ -136,11 +136,13 @@ struct ompi_osc_pt2pt_header_unlock_t {
|
||||
ompi_osc_pt2pt_header_base_t base;
|
||||
int32_t lock_type;
|
||||
uint32_t frag_count;
|
||||
uint64_t lock_ptr;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_header_unlock_t ompi_osc_pt2pt_header_unlock_t;
|
||||
|
||||
struct ompi_osc_pt2pt_header_unlock_ack_t {
|
||||
ompi_osc_pt2pt_header_base_t base;
|
||||
uint64_t lock_ptr;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_header_unlock_ack_t ompi_osc_pt2pt_header_unlock_ack_t;
|
||||
|
||||
@ -161,8 +163,8 @@ struct ompi_osc_pt2pt_frag_header_t {
|
||||
ompi_osc_pt2pt_header_base_t base;
|
||||
uint16_t windx; /* cid of communicator backing window (our window id) */
|
||||
uint32_t source; /* rank in window of source process */
|
||||
uint16_t num_ops; /* number of operations in this buffer */
|
||||
uint16_t pad[3]; /* ensure the fragment header is a multiple of 8 bytes */
|
||||
int32_t num_ops; /* number of operations in this buffer */
|
||||
uint32_t pad; /* ensure the fragment header is a multiple of 8 bytes */
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_frag_header_t ompi_osc_pt2pt_frag_header_t;
|
||||
|
||||
|
@ -49,7 +49,6 @@ ompi_osc_pt2pt_free(ompi_win_t *win)
|
||||
{
|
||||
int ret = OMPI_SUCCESS;
|
||||
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
|
||||
opal_list_item_t *item;
|
||||
|
||||
if (NULL == module) {
|
||||
return OMPI_SUCCESS;
|
||||
@ -67,43 +66,38 @@ ompi_osc_pt2pt_free(ompi_win_t *win)
|
||||
}
|
||||
|
||||
/* remove from component information */
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
|
||||
opal_hash_table_remove_value_uint32(&mca_osc_pt2pt_component.modules,
|
||||
ompi_comm_get_cid(module->comm));
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
|
||||
OPAL_THREAD_SCOPED_LOCK(&mca_osc_pt2pt_component.lock,
|
||||
opal_hash_table_remove_value_uint32(&mca_osc_pt2pt_component.modules,
|
||||
ompi_comm_get_cid(module->comm)));
|
||||
}
|
||||
|
||||
win->w_osc_module = NULL;
|
||||
|
||||
OBJ_DESTRUCT(&module->outstanding_locks);
|
||||
OBJ_DESTRUCT(&module->locks_pending);
|
||||
OBJ_DESTRUCT(&module->locks_pending_lock);
|
||||
OBJ_DESTRUCT(&module->acc_lock);
|
||||
OBJ_DESTRUCT(&module->cond);
|
||||
OBJ_DESTRUCT(&module->lock);
|
||||
|
||||
/* it is erroneous to close a window with active operations on it so we should
|
||||
* probably produce an error here instead of cleaning up */
|
||||
while (NULL != (item = opal_list_remove_first (&module->pending_acc))) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
OPAL_LIST_DESTRUCT(&module->pending_acc);
|
||||
OPAL_LIST_DESTRUCT(&module->pending_posts);
|
||||
OPAL_LIST_DESTRUCT(&module->queued_frags);
|
||||
OBJ_DESTRUCT(&module->queued_frags_lock);
|
||||
|
||||
OBJ_DESTRUCT(&module->pending_acc);
|
||||
|
||||
while (NULL != (item = opal_list_remove_first (&module->pending_posts))) {
|
||||
OBJ_RELEASE(item);
|
||||
}
|
||||
|
||||
OBJ_DESTRUCT(&module->pending_posts);
|
||||
|
||||
osc_pt2pt_gc_clean ();
|
||||
osc_pt2pt_gc_clean (module);
|
||||
OPAL_LIST_DESTRUCT(&module->request_gc);
|
||||
OPAL_LIST_DESTRUCT(&module->buffer_gc);
|
||||
OBJ_DESTRUCT(&module->gc_lock);
|
||||
|
||||
if (NULL != module->peers) {
|
||||
free(module->peers);
|
||||
}
|
||||
if (NULL != module->passive_eager_send_active) free(module->passive_eager_send_active);
|
||||
if (NULL != module->passive_incoming_frag_count) free(module->passive_incoming_frag_count);
|
||||
if (NULL != module->passive_incoming_frag_signal_count) free(module->passive_incoming_frag_signal_count);
|
||||
|
||||
if (NULL != module->epoch_outgoing_frag_count) free(module->epoch_outgoing_frag_count);
|
||||
|
||||
if (NULL != module->frag_request) {
|
||||
module->frag_request->req_complete_cb = NULL;
|
||||
ompi_request_cancel (module->frag_request);
|
||||
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
@ -26,7 +26,7 @@ struct ompi_osc_pt2pt_request_t {
|
||||
int origin_count;
|
||||
struct ompi_datatype_t *origin_dt;
|
||||
ompi_osc_pt2pt_module_t* module;
|
||||
int outstanding_requests;
|
||||
int32_t outstanding_requests;
|
||||
bool internal;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_request_t ompi_osc_pt2pt_request_t;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user