1
1

osc/pt2pt: threading fixes and code cleanup

Этот коммит содержится в:
Nathan Hjelm 2014-11-13 15:13:10 -07:00 коммит произвёл Nathan Hjelm
родитель 60648e4231
Коммит 22625b005b
11 изменённых файлов: 585 добавлений и 683 удалений

Просмотреть файл

@ -59,7 +59,7 @@ struct ompi_osc_pt2pt_component_t {
int module_count;
/** free list of ompi_osc_pt2pt_frag_t structures */
opal_free_list_t frags;
ompi_free_list_t frags;
/** Free list of requests */
ompi_free_list_t requests;
@ -67,17 +67,14 @@ struct ompi_osc_pt2pt_component_t {
/** PT2PT component buffer size */
unsigned int buffer_size;
/** Lock for pending_operations */
opal_mutex_t pending_operations_lock;
/** List of operations that need to be processed */
opal_list_t pending_operations;
/** Is the progress function enabled? */
bool progress_enable;
/** List of requests that need to be freed */
opal_list_t request_gc;
/** List of buffers that need to be freed */
opal_list_t buffer_gc;
};
typedef struct ompi_osc_pt2pt_component_t ompi_osc_pt2pt_component_t;
@ -89,7 +86,9 @@ struct ompi_osc_pt2pt_peer_t {
/** Number of acks pending. New requests can not be sent out if there are
* acks pending (to fulfill the ordering constraints of accumulate) */
uint32_t num_acks_pending;
int32_t passive_incoming_frag_count;
bool access_epoch;
bool eager_send_active;
};
typedef struct ompi_osc_pt2pt_peer_t ompi_osc_pt2pt_peer_t;
@ -133,6 +132,9 @@ struct ompi_osc_pt2pt_module_t {
peer. Not in peer data to make fence more manageable. */
uint32_t *epoch_outgoing_frag_count;
/** Lock for queued_frags */
opal_mutex_t queued_frags_lock;
/** List of full communication buffers queued to be sent. Should
be maintained in order (at least in per-target order). */
opal_list_t queued_frags;
@ -152,9 +154,6 @@ struct ompi_osc_pt2pt_module_t {
/* Next incoming buffer count at which we want a signal on cond */
uint32_t active_incoming_frag_signal_count;
uint32_t *passive_incoming_frag_count;
uint32_t *passive_incoming_frag_signal_count;
/* Number of flush ack requests send since beginning of time */
uint64_t flush_ack_requested_count;
/* Number of flush ack replies received since beginning of
@ -171,8 +170,6 @@ struct ompi_osc_pt2pt_module_t {
/** Indicates the window is in an all access epoch (fence, lock_all) */
bool all_access_epoch;
bool *passive_eager_send_active;
/* ********************* PWSC data ************************ */
struct ompi_group_t *pw_group;
struct ompi_group_t *sc_group;
@ -189,9 +186,11 @@ struct ompi_osc_pt2pt_module_t {
/** Status of the local window lock. One of 0 (unlocked),
MPI_LOCK_EXCLUSIVE, or MPI_LOCK_SHARED. */
int lock_status;
/** number of peers who hold a shared lock on the local window */
int32_t shared_count;
int32_t lock_status;
/** lock for locks_pending list */
opal_mutex_t locks_pending_lock;
/** target side list of lock requests we couldn't satisfy yet */
opal_list_t locks_pending;
@ -210,6 +209,15 @@ struct ompi_osc_pt2pt_module_t {
/* enforce pscw matching */
/** list of unmatched post messages */
opal_list_t pending_posts;
/** Lock for garbage collection lists */
opal_mutex_t gc_lock;
/** List of requests that need to be freed */
opal_list_t request_gc;
/** List of buffers that need to be freed */
opal_list_t buffer_gc;
};
typedef struct ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_t;
OMPI_MODULE_DECLSPEC extern ompi_osc_pt2pt_component_t mca_osc_pt2pt_component;
@ -431,11 +439,12 @@ static inline void mark_incoming_completion (ompi_osc_pt2pt_module_t *module, in
opal_condition_broadcast(&module->cond);
}
} else {
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"mark_incoming_completion marking passive incoming complete. source = %d, count = %d",
source, (int) module->passive_incoming_frag_count[source] + 1));
OPAL_THREAD_ADD32((int32_t *) (module->passive_incoming_frag_count + source), 1);
if (module->passive_incoming_frag_count[source] >= module->passive_incoming_frag_signal_count[source]) {
source, (int) peer->passive_incoming_frag_count + 1));
OPAL_THREAD_ADD32((int32_t *) &peer->passive_incoming_frag_count, 1);
if (0 == peer->passive_incoming_frag_count) {
opal_condition_broadcast(&module->cond);
}
}
@ -576,36 +585,42 @@ static inline void osc_pt2pt_copy_for_send (void *target, size_t target_len, voi
* and buffers on the module's garbage collection lists and release then
* at a later time.
*/
static inline void osc_pt2pt_gc_clean (void)
static inline void osc_pt2pt_gc_clean (ompi_osc_pt2pt_module_t *module)
{
ompi_request_t *request;
opal_list_item_t *item;
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
OPAL_THREAD_LOCK(&module->gc_lock);
while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&mca_osc_pt2pt_component.request_gc))) {
while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&module->request_gc))) {
OPAL_THREAD_UNLOCK(&module->gc_lock);
ompi_request_free (&request);
OPAL_THREAD_LOCK(&module->gc_lock);
}
while (NULL != (item = opal_list_remove_first (&mca_osc_pt2pt_component.buffer_gc))) {
while (NULL != (item = opal_list_remove_first (&module->buffer_gc))) {
OBJ_RELEASE(item);
}
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
OPAL_THREAD_UNLOCK(&module->gc_lock);
}
static inline void osc_pt2pt_gc_add_request (ompi_request_t *request)
static inline void osc_pt2pt_gc_add_request (ompi_osc_pt2pt_module_t *module, ompi_request_t *request)
{
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
opal_list_append (&mca_osc_pt2pt_component.request_gc, (opal_list_item_t *) request);
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
OPAL_THREAD_SCOPED_LOCK(&module->gc_lock,
opal_list_append (&module->request_gc, (opal_list_item_t *) request));
}
static inline void osc_pt2pt_gc_add_buffer (opal_list_item_t *buffer)
static inline void osc_pt2pt_gc_add_buffer (ompi_osc_pt2pt_module_t *module, opal_list_item_t *buffer)
{
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
opal_list_append (&mca_osc_pt2pt_component.buffer_gc, buffer);
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
OPAL_THREAD_SCOPED_LOCK(&module->gc_lock,
opal_list_append (&module->buffer_gc, buffer));
}
static inline void osc_pt2pt_add_pending (ompi_osc_pt2pt_pending_t *pending)
{
OPAL_THREAD_SCOPED_LOCK(&mca_osc_pt2pt_component.pending_operations_lock,
opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super));
}
#define OSC_PT2PT_FRAG_TAG 0x10000

Просмотреть файл

@ -122,16 +122,16 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
/* short-circuit the noprecede case */
if (0 != (assert & MPI_MODE_NOPRECEDE)) {
ret = module->comm->c_coll.coll_barrier(module->comm,
module->comm->c_coll.coll_barrier_module);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: fence end (short circuit)"));
return ret;
}
/* try to start all the requests. */
/* try to start all requests. */
ret = ompi_osc_pt2pt_frag_flush_all(module);
if (OMPI_SUCCESS != ret) goto cleanup;
if (OMPI_SUCCESS != ret) {
return ret;
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: fence done sending"));
@ -141,10 +141,12 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
&incoming_reqs, 1, MPI_UINT32_T,
MPI_SUM, module->comm,
module->comm->c_coll.coll_reduce_scatter_block_module);
if (OMPI_SUCCESS != ret) goto cleanup;
if (OMPI_SUCCESS != ret) {
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}
OPAL_THREAD_LOCK(&module->lock);
bzero(module->epoch_outgoing_frag_count,
sizeof(uint32_t) * ompi_comm_size(module->comm));
@ -161,7 +163,7 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
opal_condition_wait(&module->cond, &module->lock);
}
ret = OMPI_SUCCESS;
module->active_incoming_frag_signal_count = 0;
if (assert & MPI_MODE_NOSUCCEED) {
/* as specified in MPI-3 p 438 3-5 the fence can end an epoch. it isn't explicitly
@ -169,14 +171,13 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
module->active_eager_send_active = false;
module->all_access_epoch = false;
}
opal_condition_broadcast (&module->cond);
OPAL_THREAD_UNLOCK(&module->lock);
cleanup:
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"osc pt2pt: fence end: %d", ret));
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
return OMPI_SUCCESS;
}
@ -226,9 +227,13 @@ ompi_osc_pt2pt_start(ompi_group_t *group,
ompi_proc_t *pending_proc = ompi_comm_peer_lookup (module->comm, pending_post->rank);
if (group_contains_proc (module->sc_group, pending_proc)) {
ompi_osc_pt2pt_peer_t *peer = module->peers + pending_post->rank;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "Consumed unexpected post message from %d",
pending_post->rank));
++module->num_post_msgs;
peer->eager_send_active = true;
opal_list_remove_item (&module->pending_posts, &pending_post->super);
OBJ_RELEASE(pending_post);
}
@ -291,6 +296,7 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
opal_condition_wait(&module->cond, &module->lock);
}
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_complete sending complete message"));
@ -302,7 +308,6 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
At the same time, clean out the outgoing count for the next
round. */
OPAL_THREAD_UNLOCK(&module->lock);
for (i = 0 ; i < ompi_group_size(module->sc_group) ; ++i) {
if (my_rank == ranks[i]) {
/* shortcut for self */
@ -325,15 +330,17 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
sizeof(ompi_osc_pt2pt_header_complete_t));
if (OMPI_SUCCESS != ret) goto cleanup;
}
OPAL_THREAD_LOCK(&module->lock);
/* start all requests */
ret = ompi_osc_pt2pt_frag_flush_all(module);
if (OMPI_SUCCESS != ret) goto cleanup;
OPAL_THREAD_LOCK(&module->lock);
/* zero the fragment counts here to ensure they are zerod */
for (i = 0 ; i < ompi_group_size(module->sc_group) ; ++i) {
peer = module->peers + ranks[i];
module->epoch_outgoing_frag_count[ranks[i]] = 0;
peer->eager_send_active = false;
}
/* wait for outgoing requests to complete. Don't wait for incoming, as
@ -347,7 +354,7 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
module->sc_group = NULL;
/* unlock here, as group cleanup can take a while... */
OPAL_THREAD_UNLOCK(&(module->lock));
OPAL_THREAD_UNLOCK(&module->lock);
/* phase 2 cleanup group */
ompi_group_decrement_proc_count(group);
@ -362,8 +369,6 @@ ompi_osc_pt2pt_complete(ompi_win_t *win)
cleanup:
if (NULL != ranks) free(ranks);
OPAL_THREAD_UNLOCK(&(module->lock));
return ret;
}
@ -504,7 +509,6 @@ ompi_osc_pt2pt_test(ompi_win_t *win,
module->active_incoming_frag_count != module->active_incoming_frag_signal_count) {
*flag = 0;
ret = OMPI_SUCCESS;
goto cleanup;
} else {
*flag = 1;
@ -519,7 +523,6 @@ ompi_osc_pt2pt_test(ompi_win_t *win,
return OMPI_SUCCESS;
}
cleanup:
OPAL_THREAD_UNLOCK(&(module->lock));
return ret;
@ -528,6 +531,7 @@ ompi_osc_pt2pt_test(ompi_win_t *win,
int osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source)
{
ompi_proc_t *source_proc = ompi_comm_peer_lookup (module->comm, source);
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
OPAL_THREAD_LOCK(&module->lock);
@ -547,6 +551,9 @@ int osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source)
return OMPI_SUCCESS;
}
assert (!peer->eager_send_active);
peer->eager_send_active = true;
module->num_post_msgs++;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"received post message. num_post_msgs = %d", module->num_post_msgs));

Просмотреть файл

@ -49,14 +49,12 @@ static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request)
mark_outgoing_completion (module);
OPAL_THREAD_LOCK(&ompi_request_lock);
if (0 == --pt2pt_request->outstanding_requests) {
if (0 == OPAL_THREAD_ADD32(&pt2pt_request->outstanding_requests, -1)) {
ompi_osc_pt2pt_request_complete (pt2pt_request, request->req_status.MPI_ERROR);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (request);
osc_pt2pt_gc_add_request (module, request);
return OMPI_SUCCESS;
}
@ -64,11 +62,19 @@ static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request)
static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request)
{
ompi_datatype_t *datatype = (ompi_datatype_t *) request->req_complete_cb_data;
ompi_osc_pt2pt_module_t *module = NULL;
OBJ_RELEASE(datatype);
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
opal_hash_table_get_value_uint32(&mca_osc_pt2pt_component.modules,
ompi_comm_get_cid(request->req_mpi_object.comm),
(void **) &module);
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
assert (NULL != module);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (request);
osc_pt2pt_gc_add_request (module, request);
return OMPI_SUCCESS;
}
@ -327,8 +333,6 @@ static inline int ompi_osc_pt2pt_put_w_req (void *origin_addr, int origin_count,
payload_len = origin_dt->super.size * origin_count;
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len + payload_len;
OPAL_THREAD_LOCK(&module->lock);
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len;
@ -338,7 +342,6 @@ static inline int ompi_osc_pt2pt_put_w_req (void *origin_addr, int origin_count,
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -352,14 +355,14 @@ static inline int ompi_osc_pt2pt_put_w_req (void *origin_addr, int origin_count,
/* flush will be called at the end of this function. make sure the post message has
* arrived. */
if ((is_long_msg || request) && module->sc_group) {
OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_post_msgs) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
opal_condition_wait(&module->cond, &module->lock);
}
}
OPAL_THREAD_UNLOCK(&module->lock);
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: put long protocol: %d, large datatype: %d",
@ -433,15 +436,12 @@ static inline int ompi_osc_pt2pt_put_w_req (void *origin_addr, int origin_count,
header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID;
}
OPAL_THREAD_LOCK(&module->lock);
ret = ompi_osc_pt2pt_frag_finish(module, frag);
if (request || is_long_msg) {
/* need to flush now in case the caller decides to wait on the request */
ompi_osc_pt2pt_frag_flush_target (module, target);
}
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}
@ -512,8 +512,6 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count,
ddt_len = ompi_datatype_pack_description_length(target_dt);
payload_len = origin_dt->super.size * origin_count;
OPAL_THREAD_LOCK(&module->lock);
frag_len = sizeof(*header) + ddt_len + payload_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
if (OMPI_SUCCESS != ret) {
@ -524,7 +522,6 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count,
frag_len = sizeof(*header) + 8;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -538,14 +535,14 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count,
/* flush will be called at the end of this function. make sure the post message has
* arrived. */
if ((is_long_msg || request) && module->sc_group) {
OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_post_msgs) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
opal_condition_wait(&module->cond, &module->lock);
}
}
OPAL_THREAD_UNLOCK(&module->lock);
}
header = (ompi_osc_pt2pt_header_acc_t*) ptr;
header->base.flags = 0;
@ -623,8 +620,6 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count,
header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID;
}
OPAL_THREAD_LOCK(&module->lock);
ret = ompi_osc_pt2pt_frag_finish(module, frag);
if (is_long_msg || request) {
@ -632,8 +627,6 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count,
ompi_osc_pt2pt_frag_flush_target (module, target);
}
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}
@ -700,12 +693,9 @@ int ompi_osc_pt2pt_compare_and_swap (void *origin_addr, void *compare_addr,
/* we need to send both the origin and compare buffers */
payload_len = dt->super.size * 2;
OPAL_THREAD_LOCK(&module->lock);
frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + payload_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
if (OMPI_SUCCESS != ret) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -733,12 +723,10 @@ int ompi_osc_pt2pt_compare_and_swap (void *origin_addr, void *compare_addr,
ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, 1, dt, target, tag, module->comm,
NULL, ompi_osc_pt2pt_req_comm_complete, request);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}
ret = ompi_osc_pt2pt_frag_finish(module, frag);
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}
@ -857,8 +845,6 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
* must fit in a single frag */
ddt_len = ompi_datatype_pack_description_length(target_dt);
OPAL_THREAD_LOCK(&module->lock);
frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + ddt_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
if (OMPI_SUCCESS != ret) {
@ -866,7 +852,6 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8;
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -888,8 +873,6 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
}
}
OPAL_THREAD_UNLOCK(&module->lock);
header = (ompi_osc_pt2pt_header_get_t*) ptr;
header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_GET;
header->base.flags = 0;
@ -936,14 +919,12 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
*request = &pt2pt_request->super;
}
OPAL_THREAD_LOCK(&module->lock);
ret = ompi_osc_pt2pt_frag_finish(module, frag);
if (!release_req) {
/* need to flush now in case the caller decides to wait on the request */
ompi_osc_pt2pt_frag_flush_target (module, target);
}
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}
@ -1087,8 +1068,6 @@ int ompi_osc_pt2pt_rget_accumulate_internal (void *origin_addr, int origin_count
payload_len = 0;
}
OPAL_THREAD_LOCK(&module->lock);
frag_len = sizeof(*header) + ddt_len + payload_len;
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr);
if (OMPI_SUCCESS != ret) {
@ -1099,7 +1078,6 @@ int ompi_osc_pt2pt_rget_accumulate_internal (void *origin_addr, int origin_count
frag_len = sizeof(*header) + 8;
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}
@ -1121,14 +1099,14 @@ int ompi_osc_pt2pt_rget_accumulate_internal (void *origin_addr, int origin_count
/* flush will be called at the end of this function. make sure the post message has
* arrived. */
if (!release_req && module->sc_group) {
OPAL_THREAD_LOCK(&module->lock);
while (0 != module->num_post_msgs) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"waiting for post messages. num_post_msgs = %d", module->num_post_msgs));
opal_condition_wait(&module->cond, &module->lock);
}
}
OPAL_THREAD_UNLOCK(&module->lock);
}
header = (ompi_osc_pt2pt_header_acc_t *) ptr;
header->base.flags = 0;
@ -1191,14 +1169,12 @@ int ompi_osc_pt2pt_rget_accumulate_internal (void *origin_addr, int origin_count
*request = (ompi_request_t *) pt2pt_request;
}
OPAL_THREAD_LOCK(&module->lock);
ret = ompi_osc_pt2pt_frag_finish(module, frag);
if (!release_req) {
/* need to flush now in case the caller decides to wait on the request */
ompi_osc_pt2pt_frag_flush_target (module, target_rank);
}
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}

Просмотреть файл

@ -200,14 +200,15 @@ component_register(void)
static int component_progress (void)
{
int count = opal_list_get_size (&mca_osc_pt2pt_component.pending_operations);
ompi_osc_pt2pt_pending_t *pending, *next;
if (0 == opal_list_get_size (&mca_osc_pt2pt_component.pending_operations)) {
if (0 == count) {
return 0;
}
/* process one incoming request */
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_operations_lock);
OPAL_LIST_FOREACH_SAFE(pending, next, &mca_osc_pt2pt_component.pending_operations, ompi_osc_pt2pt_pending_t) {
int ret;
@ -231,7 +232,7 @@ static int component_progress (void)
OBJ_RELEASE(pending);
}
}
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_operations_lock);
return 1;
}
@ -244,8 +245,7 @@ component_init(bool enable_progress_threads,
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations, opal_list_t);
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.request_gc, opal_list_t);
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.buffer_gc, opal_list_t);
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations_lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.modules,
opal_hash_table_t);
@ -254,11 +254,13 @@ component_init(bool enable_progress_threads,
mca_osc_pt2pt_component.progress_enable = false;
mca_osc_pt2pt_component.module_count = 0;
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.frags, opal_free_list_t);
ret = opal_free_list_init(&mca_osc_pt2pt_component.frags,
sizeof(ompi_osc_pt2pt_frag_t),
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.frags, ompi_free_list_t);
ret = ompi_free_list_init_new (&mca_osc_pt2pt_component.frags,
sizeof(ompi_osc_pt2pt_frag_t), 8,
OBJ_CLASS(ompi_osc_pt2pt_frag_t),
1, -1, 1);
mca_osc_pt2pt_component.buffer_size +
sizeof (ompi_osc_pt2pt_frag_header_t),
8, 1, -1, 1, 0);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_osc_base_framework.framework_output,
"%s:%d: ompi_free_list_init failed: %d",
@ -303,8 +305,7 @@ component_finalize(void)
OBJ_DESTRUCT(&mca_osc_pt2pt_component.lock);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.requests);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.request_gc);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.buffer_gc);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations_lock);
return OMPI_SUCCESS;
}
@ -329,17 +330,11 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
ompi_osc_pt2pt_module_t *module = NULL;
int ret;
char *name;
bool no_locks = false;
/* We don't support shared windows; that's for the sm onesided
component */
if (MPI_WIN_FLAVOR_SHARED == flavor) return OMPI_ERR_NOT_SUPPORTED;
if (check_config_value_bool("no_locks", info)) {
no_locks = true;
ompi_osc_pt2pt_no_locks = true;
}
/* create module structure with all fields initialized to zero */
module = (ompi_osc_pt2pt_module_t*)
calloc(1, sizeof(ompi_osc_pt2pt_module_t));
@ -354,10 +349,15 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
OBJ_CONSTRUCT(&module->cond, opal_condition_t);
OBJ_CONSTRUCT(&module->acc_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->queued_frags, opal_list_t);
OBJ_CONSTRUCT(&module->queued_frags_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->locks_pending, opal_list_t);
OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->outstanding_locks, opal_list_t);
OBJ_CONSTRUCT(&module->pending_acc, opal_list_t);
OBJ_CONSTRUCT(&module->pending_posts, opal_list_t);
OBJ_CONSTRUCT(&module->request_gc, opal_list_t);
OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t);
OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t);
/* options */
/* FIX ME: should actually check this value... */
@ -405,20 +405,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
goto cleanup;
}
if (!no_locks) {
module->passive_incoming_frag_count = calloc(ompi_comm_size(comm), sizeof(uint32_t));
if (NULL == module->passive_incoming_frag_count) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
module->passive_incoming_frag_signal_count = calloc(ompi_comm_size(comm), sizeof(uint32_t));
if (NULL == module->passive_incoming_frag_signal_count) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
}
/* the statement below (from Brian) does not seem correct so disable active target on the
* window. if this end up being incorrect please revert this one change */
module->active_eager_send_active = false;
@ -429,14 +415,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
module->active_eager_send_active = true;
#endif
if (!no_locks) {
module->passive_eager_send_active = malloc(sizeof(bool) * ompi_comm_size(comm));
if (NULL == module->passive_eager_send_active) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
}
/* lock data */
if (check_config_value_bool("no_locks", info)) {
win->w_flags |= OMPI_WIN_NO_LOCKS;

Просмотреть файл

@ -26,7 +26,6 @@
#include "osc_pt2pt.h"
#include "osc_pt2pt_header.h"
#include "osc_pt2pt_data_move.h"
#include "osc_pt2pt_obj_convert.h"
#include "osc_pt2pt_frag.h"
#include "osc_pt2pt_request.h"
@ -225,8 +224,6 @@ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target,
char *ptr;
int ret;
OPAL_THREAD_LOCK(&module->lock);
ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr);
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
memcpy (ptr, data, len);
@ -234,8 +231,6 @@ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target,
ret = ompi_osc_pt2pt_frag_finish(module, frag);
}
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}
@ -254,7 +249,7 @@ static int ompi_osc_pt2pt_control_send_unbuffered_cb (ompi_request_t *request)
free (ctx);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (request);
osc_pt2pt_gc_add_request (module, request);
return OMPI_SUCCESS;
}
@ -453,7 +448,7 @@ static int osc_pt2pt_incoming_req_complete (ompi_request_t *request)
mark_incoming_completion (module, rank);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (request);
osc_pt2pt_gc_add_request (module, request);
return OMPI_SUCCESS;
}
@ -476,7 +471,7 @@ static int osc_pt2pt_get_post_send_cb (ompi_request_t *request)
mark_incoming_completion (module, rank);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (request);
osc_pt2pt_gc_add_request (module, request);
return OMPI_SUCCESS;
}
@ -683,8 +678,7 @@ static int accumulate_cb (ompi_request_t *request)
mark_incoming_completion (module, rank);
OPAL_THREAD_LOCK(&module->lock);
if (0 == --acc_data->request_count) {
if (0 == OPAL_THREAD_ADD32(&acc_data->request_count, -1)) {
/* no more requests needed before the buffer can be accumulated */
if (acc_data->source) {
@ -695,13 +689,11 @@ static int accumulate_cb (ompi_request_t *request)
/* drop the accumulate lock */
ompi_osc_pt2pt_accumulate_unlock (module);
osc_pt2pt_gc_add_buffer (&acc_data->super);
osc_pt2pt_gc_add_buffer (module, &acc_data->super);
}
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (request);
OPAL_THREAD_UNLOCK(&module->lock);
osc_pt2pt_gc_add_request (module, request);
return ret;
}
@ -710,6 +702,7 @@ static int accumulate_cb (ompi_request_t *request)
static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_t *header, int source,
char *data, size_t data_len, ompi_datatype_t *datatype)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
osc_pt2pt_pending_acc_t *pending_acc;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
@ -720,11 +713,9 @@ static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_os
return OMPI_ERR_OUT_OF_RESOURCE;
}
if (!ompi_osc_pt2pt_no_locks) {
/* NTH: ensure we don't leave wait/process_flush/etc until this
* accumulate operation is complete. */
module->passive_incoming_frag_signal_count[source]++;
}
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1);
pending_acc->source = source;
@ -757,9 +748,7 @@ static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_os
}
/* add to the pending acc queue */
OPAL_THREAD_LOCK(&module->lock);
opal_list_append (&module->pending_acc, &pending_acc->super);
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_THREAD_SCOPED_LOCK(&module->lock, opal_list_append (&module->pending_acc, &pending_acc->super));
return OMPI_SUCCESS;
}
@ -776,7 +765,7 @@ static int replace_cb (ompi_request_t *request)
mark_incoming_completion (module, rank);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (request);
osc_pt2pt_gc_add_request (module, request);
/* unlock the accumulate lock */
ompi_osc_pt2pt_accumulate_unlock (module);
@ -1143,10 +1132,8 @@ int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module)
assert (0);
}
if (!ompi_osc_pt2pt_no_locks) {
/* signal that an operation is complete */
mark_incoming_completion (module, pending_acc->source);
}
pending_acc->data = NULL;
OBJ_RELEASE(pending_acc);
@ -1340,18 +1327,15 @@ static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source,
source, complete_header->frag_count, module->active_incoming_frag_signal_count,
module->active_incoming_frag_count));
OPAL_THREAD_LOCK(&module->lock);
/* the current fragment is not part of the frag_count so we need to add it here */
module->active_incoming_frag_signal_count += complete_header->frag_count + 1;
module->num_complete_msgs++;
OPAL_THREAD_ADD32((int32_t *) &module->active_incoming_frag_signal_count,
complete_header->frag_count + 1);
if (0 == module->num_complete_msgs) {
if (0 == OPAL_THREAD_ADD32((int32_t *) &module->num_complete_msgs, 1)) {
opal_condition_broadcast (&module->cond);
}
OPAL_THREAD_UNLOCK(&module->lock);
return sizeof (*complete_header);
}
@ -1361,17 +1345,18 @@ static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source,
static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_flush_t *flush_header)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"process_flush header = {.frag_count = %d}", flush_header->frag_count));
/* increase signal count by incoming frags */
module->passive_incoming_frag_signal_count[source] += flush_header->frag_count;
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -(int32_t) flush_header->frag_count);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_flush: received message from %d. passive_incoming_frag_signal_count = %d, passive_incoming_frag_count = %d",
ompi_comm_rank(module->comm), source, module->passive_incoming_frag_signal_count[source], module->passive_incoming_frag_count[source]));
"%d: process_flush: received message from %d. passive_incoming_frag_count = %d",
ompi_comm_rank(module->comm), source, peer->passive_incoming_frag_count));
ret = ompi_osc_pt2pt_process_flush (module, source, flush_header);
if (OMPI_SUCCESS != ret) {
@ -1382,35 +1367,30 @@ static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source,
pending->source = source;
pending->header.flush = *flush_header;
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super);
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
/* we now have to count the current fragment */
module->passive_incoming_frag_signal_count[source]++;
} else {
/* need to account for the current fragment */
module->passive_incoming_frag_count[source] = -1;
osc_pt2pt_add_pending (pending);
}
/* signal incomming will increment this counter */
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1);
return sizeof (*flush_header);
}
static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_unlock_t *unlock_header)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"process_unlock header = {.frag_count = %d}", unlock_header->frag_count));
/* increase signal count by incoming frags */
module->passive_incoming_frag_signal_count[source] += unlock_header->frag_count;
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -(int32_t) unlock_header->frag_count);
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"osc pt2pt: processing unlock request from %d. frag count = %d, signal_count = %d, processed_count = %d",
source, unlock_header->frag_count, (int) module->passive_incoming_frag_signal_count[source],
(int) module->passive_incoming_frag_count[source]));
"osc pt2pt: processing unlock request from %d. frag count = %d, processed_count = %d",
source, unlock_header->frag_count, (int) peer->passive_incoming_frag_count));
ret = ompi_osc_pt2pt_process_unlock (module, source, unlock_header);
if (OMPI_SUCCESS != ret) {
@ -1421,17 +1401,12 @@ static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source,
pending->source = source;
pending->header.unlock = *unlock_header;
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super);
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
/* we now have to count the current fragment */
module->passive_incoming_frag_signal_count[source]++;
} else {
/* need to account for the current fragment */
module->passive_incoming_frag_count[source] = -1;
osc_pt2pt_add_pending (pending);
}
/* signal incomming will increment this counter */
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1);
return sizeof (*unlock_header);
}
@ -1463,10 +1438,10 @@ static int process_large_datatype_request_cb (ompi_request_t *request)
}
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (request);
osc_pt2pt_gc_add_request (module, request);
/* free the datatype buffer */
osc_pt2pt_gc_add_buffer (&ddt_buffer->super);
osc_pt2pt_gc_add_buffer (module, &ddt_buffer->super);
return OMPI_SUCCESS;
}
@ -1679,10 +1654,6 @@ static int ompi_osc_pt2pt_callback (ompi_request_t *request)
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"finished processing incoming messages"));
/* restart the receive request */
OPAL_THREAD_LOCK(&module->lock);
/* post messages come unbuffered and should NOT increment the incoming completion
* counters */
if (OMPI_OSC_PT2PT_HDR_TYPE_POST != base_header->type) {
@ -1690,14 +1661,12 @@ static int ompi_osc_pt2pt_callback (ompi_request_t *request)
source : MPI_PROC_NULL);
}
osc_pt2pt_gc_clean ();
osc_pt2pt_gc_clean (module);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (request);
osc_pt2pt_gc_add_request (module, request);
ompi_osc_pt2pt_frag_start_receive (module);
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_THREAD_LOCK(&ompi_request_lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
@ -1734,20 +1703,15 @@ isend_completion_cb(ompi_request_t *request)
mark_outgoing_completion(module);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (request);
osc_pt2pt_gc_add_request (module, request);
return OMPI_SUCCESS;
}
int
ompi_osc_pt2pt_component_isend(ompi_osc_pt2pt_module_t *module,
void *buf,
size_t count,
struct ompi_datatype_t *datatype,
int dest,
int tag,
struct ompi_communicator_t *comm)
int ompi_osc_pt2pt_component_isend (ompi_osc_pt2pt_module_t *module, void *buf,
size_t count, struct ompi_datatype_t *datatype,
int dest, int tag, struct ompi_communicator_t *comm)
{
return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, comm,
isend_completion_cb, module);

Просмотреть файл

@ -21,18 +21,11 @@
#include "osc_pt2pt_data_move.h"
static void ompi_osc_pt2pt_frag_constructor (ompi_osc_pt2pt_frag_t *frag){
frag->buffer = malloc (mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t));
assert (frag->buffer);
frag->buffer = frag->super.ptr;
}
static void ompi_osc_pt2pt_frag_destructor (ompi_osc_pt2pt_frag_t *frag) {
if (NULL != frag->buffer) {
free (frag->buffer);
}
}
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_frag_t, opal_list_item_t,
ompi_osc_pt2pt_frag_constructor, ompi_osc_pt2pt_frag_destructor);
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_frag_t, ompi_free_list_item_t,
ompi_osc_pt2pt_frag_constructor, NULL);
static int frag_send_cb (ompi_request_t *request)
{
@ -45,11 +38,11 @@ static int frag_send_cb (ompi_request_t *request)
frag->target, (void *) frag, (void *) request));
mark_outgoing_completion(module);
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.frags, &frag->super);
OMPI_FREE_LIST_RETURN_MT(&mca_osc_pt2pt_component.frags, &frag->super);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (request);
osc_pt2pt_gc_add_request (module, request);
return OMPI_SUCCESS;
}
@ -79,10 +72,10 @@ int
ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_frag_t *frag)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + frag->target;
int ret;
assert(0 == frag->pending);
assert(module->peers[frag->target].active_frag != frag);
assert(0 == frag->pending && peer->active_frag != frag);
/* we need to signal now that a frag is outgoing to ensure the count sent
* with the unlock message is correct */
@ -90,17 +83,11 @@ ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module,
/* if eager sends are not active, can't send yet, so buffer and
get out... */
if (module->passive_target_access_epoch) {
if (!module->passive_eager_send_active[frag->target]) {
opal_list_append(&module->queued_frags, &frag->super);
if (!(peer->eager_send_active || module->all_access_epoch)) {
OPAL_THREAD_SCOPED_LOCK(&module->queued_frags_lock,
opal_list_append(&module->queued_frags, (opal_list_item_t *) frag));
return OMPI_SUCCESS;
}
} else {
if (!module->active_eager_send_active) {
opal_list_append(&module->queued_frags, &frag->super);
return OMPI_SUCCESS;
}
}
ret = frag_send(module, frag);
@ -113,43 +100,50 @@ ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module,
int
ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target)
{
ompi_osc_pt2pt_frag_t *next, *frag = module->peers[target].active_frag;
int ret = OMPI_SUCCESS;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: frag flush target begin"));
/* flush the active frag */
if (NULL != module->peers[target].active_frag) {
ompi_osc_pt2pt_frag_t *frag = module->peers[target].active_frag;
if (0 != frag->pending) {
/* communication going on while synchronizing; this is a bug */
if (NULL != frag) {
if (1 != frag->pending) {
/* communication going on while synchronizing; this is an rma usage bug */
return OMPI_ERR_RMA_SYNC;
}
module->peers[target].active_frag = NULL;
if (opal_atomic_cmpset (&module->peers[target].active_frag, frag, NULL)) {
OPAL_THREAD_ADD32(&frag->pending, -1);
ret = ompi_osc_pt2pt_frag_start(module, frag);
if (OMPI_SUCCESS != ret) return ret;
if (OMPI_SUCCESS != ret) {
return ret;
}
}
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: frag flush target finished active frag"));
/* walk through the pending list and send */
ompi_osc_pt2pt_frag_t *frag, *next;
OPAL_THREAD_LOCK(&module->queued_frags_lock);
if (opal_list_get_size (&module->queued_frags)) {
OPAL_LIST_FOREACH_SAFE(frag, next, &module->queued_frags, ompi_osc_pt2pt_frag_t) {
if (frag->target == target) {
opal_list_remove_item(&module->queued_frags, &frag->super);
opal_list_remove_item(&module->queued_frags, (opal_list_item_t *) frag);
ret = frag_send(module, frag);
if (OMPI_SUCCESS != ret) return ret;
if (OPAL_UNLIKELY(OMPI_SUCCESS != frag)) {
break;
}
}
}
}
OPAL_THREAD_UNLOCK(&module->queued_frags_lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: frag flush target finished"));
return OMPI_SUCCESS;
return ret;
}
@ -165,18 +159,24 @@ ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module)
/* flush the active frag */
for (i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
if (NULL != module->peers[i].active_frag) {
ompi_osc_pt2pt_frag_t *frag = module->peers[i].active_frag;
if (0 != frag->pending) {
if (NULL != frag) {
if (1 != frag->pending) {
OPAL_THREAD_UNLOCK(&module->lock);
/* communication going on while synchronizing; this is a bug */
return OMPI_ERR_RMA_SYNC;
}
module->peers[i].active_frag = NULL;
if (!opal_atomic_cmpset_ptr (&module->peers[i].active_frag, frag, NULL)) {
continue;
}
OPAL_THREAD_ADD32(&frag->pending, -1);
ret = ompi_osc_pt2pt_frag_start(module, frag);
if (OMPI_SUCCESS != ret) return ret;
if (OMPI_SUCCESS != ret) {
return ret;
}
}
}
@ -184,18 +184,20 @@ ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module)
"osc pt2pt: frag flush all finished active frag"));
/* try to start all the queued frags */
OPAL_THREAD_LOCK(&module->queued_frags_lock);
if (opal_list_get_size (&module->queued_frags)) {
OPAL_LIST_FOREACH_SAFE(frag, next, &module->queued_frags, ompi_osc_pt2pt_frag_t) {
opal_list_remove_item(&module->queued_frags, &frag->super);
opal_list_remove_item(&module->queued_frags, (opal_list_item_t *) frag);
ret = frag_send(module, frag);
if (OMPI_SUCCESS != ret) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: failure for frag send: %d", ret));
return ret;
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
}
}
OPAL_THREAD_UNLOCK(&module->queued_frags_lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: frag flush all done"));
return OMPI_SUCCESS;
return ret;
}

Просмотреть файл

@ -21,7 +21,7 @@
/** Communication buffer for packing messages */
struct ompi_osc_pt2pt_frag_t {
opal_list_item_t super;
ompi_free_list_item_t super;
/* target rank of buffer */
int target;
unsigned char *buffer;
@ -33,7 +33,7 @@ struct ompi_osc_pt2pt_frag_t {
char *top;
/* Number of operations which have started writing into the frag, but not yet completed doing so */
int pending;
int32_t pending;
ompi_osc_pt2pt_frag_header_t *header;
ompi_osc_pt2pt_module_t *module;
};
@ -64,22 +64,29 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int
return OMPI_ERR_OUT_OF_RESOURCE;
}
OPAL_THREAD_LOCK(&module->lock);
if (NULL == curr || curr->remain_len < request_len) {
opal_free_list_item_t *item;
ompi_free_list_item_t *item = NULL;
if (NULL != curr) {
curr->remain_len = 0;
module->peers[target].active_frag = NULL;
opal_atomic_mb ();
/* If there's something pending, the pending finish will
start the buffer. Otherwise, we need to start it now. */
if (0 == curr->pending) {
module->peers[target].active_frag = NULL;
if (0 == OPAL_THREAD_ADD32(&curr->pending, -1)) {
ret = ompi_osc_pt2pt_frag_start(module, curr);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
}
}
OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.frags,
item, ret);
if (OMPI_SUCCESS != ret) return ret;
OMPI_FREE_LIST_GET_MT(&mca_osc_pt2pt_component.frags, item);
if (OPAL_UNLIKELY(NULL == item)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
curr = module->peers[target].active_frag =
(ompi_osc_pt2pt_frag_t*) item;
@ -89,7 +96,7 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int
curr->top = (char*) (curr->header + 1);
curr->remain_len = mca_osc_pt2pt_component.buffer_size;
curr->module = module;
curr->pending = 0;
curr->pending = 1;
curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG;
curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
@ -101,6 +108,7 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int
curr->header->windx = ompi_comm_get_cid(module->comm);
if (curr->remain_len < request_len) {
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
}
}
@ -110,8 +118,10 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int
curr->top += request_len;
curr->remain_len -= request_len;
curr->pending++;
curr->header->num_ops++;
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_THREAD_ADD32(&curr->pending, 1);
OPAL_THREAD_ADD32(&curr->header->num_ops, 1);
return OMPI_SUCCESS;
}
@ -123,12 +133,7 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int
static inline int ompi_osc_pt2pt_frag_finish(ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_frag_t* buffer)
{
if (0 == --buffer->pending && 0 == buffer->remain_len) {
if (OPAL_LIKELY(buffer == module->peers[buffer->target].active_frag)) {
/* this is the active fragment. need to set the current fragment to null
* or it will be started multiple times */
module->peers[buffer->target].active_frag = NULL;
}
if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) {
return ompi_osc_pt2pt_frag_start(module, buffer);
}

Просмотреть файл

@ -120,7 +120,7 @@ typedef struct ompi_osc_pt2pt_header_post_t ompi_osc_pt2pt_header_post_t;
struct ompi_osc_pt2pt_header_lock_t {
ompi_osc_pt2pt_header_base_t base;
int32_t lock_type;
uint64_t serial_number;
uint64_t lock_ptr;
};
typedef struct ompi_osc_pt2pt_header_lock_t ompi_osc_pt2pt_header_lock_t;
@ -128,7 +128,7 @@ struct ompi_osc_pt2pt_header_lock_ack_t {
ompi_osc_pt2pt_header_base_t base;
uint16_t windx;
uint32_t source;
uint64_t serial_number;
uint64_t lock_ptr;
};
typedef struct ompi_osc_pt2pt_header_lock_ack_t ompi_osc_pt2pt_header_lock_ack_t;
@ -136,11 +136,13 @@ struct ompi_osc_pt2pt_header_unlock_t {
ompi_osc_pt2pt_header_base_t base;
int32_t lock_type;
uint32_t frag_count;
uint64_t lock_ptr;
};
typedef struct ompi_osc_pt2pt_header_unlock_t ompi_osc_pt2pt_header_unlock_t;
struct ompi_osc_pt2pt_header_unlock_ack_t {
ompi_osc_pt2pt_header_base_t base;
uint64_t lock_ptr;
};
typedef struct ompi_osc_pt2pt_header_unlock_ack_t ompi_osc_pt2pt_header_unlock_ack_t;
@ -161,8 +163,8 @@ struct ompi_osc_pt2pt_frag_header_t {
ompi_osc_pt2pt_header_base_t base;
uint16_t windx; /* cid of communicator backing window (our window id) */
uint32_t source; /* rank in window of source process */
uint16_t num_ops; /* number of operations in this buffer */
uint16_t pad[3]; /* ensure the fragment header is a multiple of 8 bytes */
int32_t num_ops; /* number of operations in this buffer */
uint32_t pad; /* ensure the fragment header is a multiple of 8 bytes */
};
typedef struct ompi_osc_pt2pt_frag_header_t ompi_osc_pt2pt_frag_header_t;

Просмотреть файл

@ -49,7 +49,6 @@ ompi_osc_pt2pt_free(ompi_win_t *win)
{
int ret = OMPI_SUCCESS;
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
opal_list_item_t *item;
if (NULL == module) {
return OMPI_SUCCESS;
@ -67,43 +66,38 @@ ompi_osc_pt2pt_free(ompi_win_t *win)
}
/* remove from component information */
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
OPAL_THREAD_SCOPED_LOCK(&mca_osc_pt2pt_component.lock,
opal_hash_table_remove_value_uint32(&mca_osc_pt2pt_component.modules,
ompi_comm_get_cid(module->comm));
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
ompi_comm_get_cid(module->comm)));
}
win->w_osc_module = NULL;
OBJ_DESTRUCT(&module->outstanding_locks);
OBJ_DESTRUCT(&module->locks_pending);
OBJ_DESTRUCT(&module->locks_pending_lock);
OBJ_DESTRUCT(&module->acc_lock);
OBJ_DESTRUCT(&module->cond);
OBJ_DESTRUCT(&module->lock);
/* it is erroneous to close a window with active operations on it so we should
* probably produce an error here instead of cleaning up */
while (NULL != (item = opal_list_remove_first (&module->pending_acc))) {
OBJ_RELEASE(item);
}
OPAL_LIST_DESTRUCT(&module->pending_acc);
OPAL_LIST_DESTRUCT(&module->pending_posts);
OPAL_LIST_DESTRUCT(&module->queued_frags);
OBJ_DESTRUCT(&module->queued_frags_lock);
OBJ_DESTRUCT(&module->pending_acc);
while (NULL != (item = opal_list_remove_first (&module->pending_posts))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&module->pending_posts);
osc_pt2pt_gc_clean ();
osc_pt2pt_gc_clean (module);
OPAL_LIST_DESTRUCT(&module->request_gc);
OPAL_LIST_DESTRUCT(&module->buffer_gc);
OBJ_DESTRUCT(&module->gc_lock);
if (NULL != module->peers) {
free(module->peers);
}
if (NULL != module->passive_eager_send_active) free(module->passive_eager_send_active);
if (NULL != module->passive_incoming_frag_count) free(module->passive_incoming_frag_count);
if (NULL != module->passive_incoming_frag_signal_count) free(module->passive_incoming_frag_signal_count);
if (NULL != module->epoch_outgoing_frag_count) free(module->epoch_outgoing_frag_count);
if (NULL != module->frag_request) {
module->frag_request->req_complete_cb = NULL;
ompi_request_cancel (module->frag_request);

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -26,7 +26,7 @@ struct ompi_osc_pt2pt_request_t {
int origin_count;
struct ompi_datatype_t *origin_dt;
ompi_osc_pt2pt_module_t* module;
int outstanding_requests;
int32_t outstanding_requests;
bool internal;
};
typedef struct ompi_osc_pt2pt_request_t ompi_osc_pt2pt_request_t;