diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index 1c088dc117..70b6fab7b8 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -59,7 +59,7 @@ struct ompi_osc_pt2pt_component_t { int module_count; /** free list of ompi_osc_pt2pt_frag_t structures */ - opal_free_list_t frags; + ompi_free_list_t frags; /** Free list of requests */ ompi_free_list_t requests; @@ -67,17 +67,14 @@ struct ompi_osc_pt2pt_component_t { /** PT2PT component buffer size */ unsigned int buffer_size; + /** Lock for pending_operations */ + opal_mutex_t pending_operations_lock; + /** List of operations that need to be processed */ opal_list_t pending_operations; /** Is the progress function enabled? */ bool progress_enable; - - /** List of requests that need to be freed */ - opal_list_t request_gc; - - /** List of buffers that need to be freed */ - opal_list_t buffer_gc; }; typedef struct ompi_osc_pt2pt_component_t ompi_osc_pt2pt_component_t; @@ -89,7 +86,9 @@ struct ompi_osc_pt2pt_peer_t { /** Number of acks pending. New requests can not be sent out if there are * acks pending (to fulfill the ordering constraints of accumulate) */ uint32_t num_acks_pending; + int32_t passive_incoming_frag_count; bool access_epoch; + bool eager_send_active; }; typedef struct ompi_osc_pt2pt_peer_t ompi_osc_pt2pt_peer_t; @@ -133,6 +132,9 @@ struct ompi_osc_pt2pt_module_t { peer. Not in peer data to make fence more manageable. */ uint32_t *epoch_outgoing_frag_count; + /** Lock for queued_frags */ + opal_mutex_t queued_frags_lock; + /** List of full communication buffers queued to be sent. Should be maintained in order (at least in per-target order). */ opal_list_t queued_frags; @@ -152,9 +154,6 @@ struct ompi_osc_pt2pt_module_t { /* Next incoming buffer count at which we want a signal on cond */ uint32_t active_incoming_frag_signal_count; - uint32_t *passive_incoming_frag_count; - uint32_t *passive_incoming_frag_signal_count; - /* Number of flush ack requests send since beginning of time */ uint64_t flush_ack_requested_count; /* Number of flush ack replies received since beginning of @@ -171,8 +170,6 @@ struct ompi_osc_pt2pt_module_t { /** Indicates the window is in an all access epoch (fence, lock_all) */ bool all_access_epoch; - bool *passive_eager_send_active; - /* ********************* PWSC data ************************ */ struct ompi_group_t *pw_group; struct ompi_group_t *sc_group; @@ -189,9 +186,11 @@ struct ompi_osc_pt2pt_module_t { /** Status of the local window lock. One of 0 (unlocked), MPI_LOCK_EXCLUSIVE, or MPI_LOCK_SHARED. */ - int lock_status; - /** number of peers who hold a shared lock on the local window */ - int32_t shared_count; + int32_t lock_status; + + /** lock for locks_pending list */ + opal_mutex_t locks_pending_lock; + /** target side list of lock requests we couldn't satisfy yet */ opal_list_t locks_pending; @@ -210,6 +209,15 @@ struct ompi_osc_pt2pt_module_t { /* enforce pscw matching */ /** list of unmatched post messages */ opal_list_t pending_posts; + + /** Lock for garbage collection lists */ + opal_mutex_t gc_lock; + + /** List of requests that need to be freed */ + opal_list_t request_gc; + + /** List of buffers that need to be freed */ + opal_list_t buffer_gc; }; typedef struct ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_t; OMPI_MODULE_DECLSPEC extern ompi_osc_pt2pt_component_t mca_osc_pt2pt_component; @@ -431,11 +439,12 @@ static inline void mark_incoming_completion (ompi_osc_pt2pt_module_t *module, in opal_condition_broadcast(&module->cond); } } else { + ompi_osc_pt2pt_peer_t *peer = module->peers + source; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "mark_incoming_completion marking passive incoming complete. source = %d, count = %d", - source, (int) module->passive_incoming_frag_count[source] + 1)); - OPAL_THREAD_ADD32((int32_t *) (module->passive_incoming_frag_count + source), 1); - if (module->passive_incoming_frag_count[source] >= module->passive_incoming_frag_signal_count[source]) { + source, (int) peer->passive_incoming_frag_count + 1)); + OPAL_THREAD_ADD32((int32_t *) &peer->passive_incoming_frag_count, 1); + if (0 == peer->passive_incoming_frag_count) { opal_condition_broadcast(&module->cond); } } @@ -576,36 +585,42 @@ static inline void osc_pt2pt_copy_for_send (void *target, size_t target_len, voi * and buffers on the module's garbage collection lists and release then * at a later time. */ -static inline void osc_pt2pt_gc_clean (void) +static inline void osc_pt2pt_gc_clean (ompi_osc_pt2pt_module_t *module) { ompi_request_t *request; opal_list_item_t *item; - OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock); + OPAL_THREAD_LOCK(&module->gc_lock); - while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&mca_osc_pt2pt_component.request_gc))) { + while (NULL != (request = (ompi_request_t *) opal_list_remove_first (&module->request_gc))) { + OPAL_THREAD_UNLOCK(&module->gc_lock); ompi_request_free (&request); + OPAL_THREAD_LOCK(&module->gc_lock); } - while (NULL != (item = opal_list_remove_first (&mca_osc_pt2pt_component.buffer_gc))) { + while (NULL != (item = opal_list_remove_first (&module->buffer_gc))) { OBJ_RELEASE(item); } - OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock); + OPAL_THREAD_UNLOCK(&module->gc_lock); } -static inline void osc_pt2pt_gc_add_request (ompi_request_t *request) +static inline void osc_pt2pt_gc_add_request (ompi_osc_pt2pt_module_t *module, ompi_request_t *request) { - OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock); - opal_list_append (&mca_osc_pt2pt_component.request_gc, (opal_list_item_t *) request); - OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock); + OPAL_THREAD_SCOPED_LOCK(&module->gc_lock, + opal_list_append (&module->request_gc, (opal_list_item_t *) request)); } -static inline void osc_pt2pt_gc_add_buffer (opal_list_item_t *buffer) +static inline void osc_pt2pt_gc_add_buffer (ompi_osc_pt2pt_module_t *module, opal_list_item_t *buffer) { - OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock); - opal_list_append (&mca_osc_pt2pt_component.buffer_gc, buffer); - OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock); + OPAL_THREAD_SCOPED_LOCK(&module->gc_lock, + opal_list_append (&module->buffer_gc, buffer)); +} + +static inline void osc_pt2pt_add_pending (ompi_osc_pt2pt_pending_t *pending) +{ + OPAL_THREAD_SCOPED_LOCK(&mca_osc_pt2pt_component.pending_operations_lock, + opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super)); } #define OSC_PT2PT_FRAG_TAG 0x10000 diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c index 1f0fcda4b7..38806ab8e5 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c @@ -122,16 +122,16 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) /* short-circuit the noprecede case */ if (0 != (assert & MPI_MODE_NOPRECEDE)) { - ret = module->comm->c_coll.coll_barrier(module->comm, - module->comm->c_coll.coll_barrier_module); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: fence end (short circuit)")); return ret; } - /* try to start all the requests. */ + /* try to start all requests. */ ret = ompi_osc_pt2pt_frag_flush_all(module); - if (OMPI_SUCCESS != ret) goto cleanup; + if (OMPI_SUCCESS != ret) { + return ret; + } OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: fence done sending")); @@ -141,10 +141,12 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) &incoming_reqs, 1, MPI_UINT32_T, MPI_SUM, module->comm, module->comm->c_coll.coll_reduce_scatter_block_module); - if (OMPI_SUCCESS != ret) goto cleanup; + if (OMPI_SUCCESS != ret) { + OPAL_THREAD_UNLOCK(&module->lock); + return ret; + } OPAL_THREAD_LOCK(&module->lock); - bzero(module->epoch_outgoing_frag_count, sizeof(uint32_t) * ompi_comm_size(module->comm)); @@ -161,22 +163,21 @@ ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) opal_condition_wait(&module->cond, &module->lock); } - ret = OMPI_SUCCESS; + module->active_incoming_frag_signal_count = 0; if (assert & MPI_MODE_NOSUCCEED) { /* as specified in MPI-3 p 438 3-5 the fence can end an epoch. it isn't explicitly * stated that MPI_MODE_NOSUCCEED ends the epoch but it is a safe assumption. */ module->active_eager_send_active = false; module->all_access_epoch = false; - } + } + opal_condition_broadcast (&module->cond); + OPAL_THREAD_UNLOCK(&module->lock); - cleanup: OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "osc pt2pt: fence end: %d", ret)); - OPAL_THREAD_UNLOCK(&module->lock); - - return ret; + return OMPI_SUCCESS; } @@ -226,9 +227,13 @@ ompi_osc_pt2pt_start(ompi_group_t *group, ompi_proc_t *pending_proc = ompi_comm_peer_lookup (module->comm, pending_post->rank); if (group_contains_proc (module->sc_group, pending_proc)) { + ompi_osc_pt2pt_peer_t *peer = module->peers + pending_post->rank; + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "Consumed unexpected post message from %d", pending_post->rank)); ++module->num_post_msgs; + peer->eager_send_active = true; + opal_list_remove_item (&module->pending_posts, &pending_post->super); OBJ_RELEASE(pending_post); } @@ -291,6 +296,7 @@ ompi_osc_pt2pt_complete(ompi_win_t *win) "waiting for post messages. num_post_msgs = %d", module->num_post_msgs)); opal_condition_wait(&module->cond, &module->lock); } + OPAL_THREAD_UNLOCK(&module->lock); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_complete sending complete message")); @@ -302,7 +308,6 @@ ompi_osc_pt2pt_complete(ompi_win_t *win) At the same time, clean out the outgoing count for the next round. */ - OPAL_THREAD_UNLOCK(&module->lock); for (i = 0 ; i < ompi_group_size(module->sc_group) ; ++i) { if (my_rank == ranks[i]) { /* shortcut for self */ @@ -325,15 +330,17 @@ ompi_osc_pt2pt_complete(ompi_win_t *win) sizeof(ompi_osc_pt2pt_header_complete_t)); if (OMPI_SUCCESS != ret) goto cleanup; } - OPAL_THREAD_LOCK(&module->lock); /* start all requests */ ret = ompi_osc_pt2pt_frag_flush_all(module); if (OMPI_SUCCESS != ret) goto cleanup; + OPAL_THREAD_LOCK(&module->lock); /* zero the fragment counts here to ensure they are zerod */ for (i = 0 ; i < ompi_group_size(module->sc_group) ; ++i) { + peer = module->peers + ranks[i]; module->epoch_outgoing_frag_count[ranks[i]] = 0; + peer->eager_send_active = false; } /* wait for outgoing requests to complete. Don't wait for incoming, as @@ -347,7 +354,7 @@ ompi_osc_pt2pt_complete(ompi_win_t *win) module->sc_group = NULL; /* unlock here, as group cleanup can take a while... */ - OPAL_THREAD_UNLOCK(&(module->lock)); + OPAL_THREAD_UNLOCK(&module->lock); /* phase 2 cleanup group */ ompi_group_decrement_proc_count(group); @@ -362,8 +369,6 @@ ompi_osc_pt2pt_complete(ompi_win_t *win) cleanup: if (NULL != ranks) free(ranks); - OPAL_THREAD_UNLOCK(&(module->lock)); - return ret; } @@ -504,7 +509,6 @@ ompi_osc_pt2pt_test(ompi_win_t *win, module->active_incoming_frag_count != module->active_incoming_frag_signal_count) { *flag = 0; ret = OMPI_SUCCESS; - goto cleanup; } else { *flag = 1; @@ -519,7 +523,6 @@ ompi_osc_pt2pt_test(ompi_win_t *win, return OMPI_SUCCESS; } - cleanup: OPAL_THREAD_UNLOCK(&(module->lock)); return ret; @@ -528,6 +531,7 @@ ompi_osc_pt2pt_test(ompi_win_t *win, int osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source) { ompi_proc_t *source_proc = ompi_comm_peer_lookup (module->comm, source); + ompi_osc_pt2pt_peer_t *peer = module->peers + source; OPAL_THREAD_LOCK(&module->lock); @@ -547,6 +551,9 @@ int osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source) return OMPI_SUCCESS; } + assert (!peer->eager_send_active); + peer->eager_send_active = true; + module->num_post_msgs++; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "received post message. num_post_msgs = %d", module->num_post_msgs)); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c index fa0ba22e9a..e915cace46 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c @@ -49,14 +49,12 @@ static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request) mark_outgoing_completion (module); - OPAL_THREAD_LOCK(&ompi_request_lock); - if (0 == --pt2pt_request->outstanding_requests) { + if (0 == OPAL_THREAD_ADD32(&pt2pt_request->outstanding_requests, -1)) { ompi_osc_pt2pt_request_complete (pt2pt_request, request->req_status.MPI_ERROR); } - OPAL_THREAD_UNLOCK(&ompi_request_lock); /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (request); + osc_pt2pt_gc_add_request (module, request); return OMPI_SUCCESS; } @@ -64,11 +62,19 @@ static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request) static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request) { ompi_datatype_t *datatype = (ompi_datatype_t *) request->req_complete_cb_data; + ompi_osc_pt2pt_module_t *module = NULL; OBJ_RELEASE(datatype); + OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock); + opal_hash_table_get_value_uint32(&mca_osc_pt2pt_component.modules, + ompi_comm_get_cid(request->req_mpi_object.comm), + (void **) &module); + OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock); + assert (NULL != module); + /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (request); + osc_pt2pt_gc_add_request (module, request); return OMPI_SUCCESS; } @@ -327,8 +333,6 @@ static inline int ompi_osc_pt2pt_put_w_req (void *origin_addr, int origin_count, payload_len = origin_dt->super.size * origin_count; frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len + payload_len; - OPAL_THREAD_LOCK(&module->lock); - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len; @@ -338,7 +342,6 @@ static inline int ompi_osc_pt2pt_put_w_req (void *origin_addr, int origin_count, frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8; ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - OPAL_THREAD_UNLOCK(&module->lock); return OMPI_ERR_OUT_OF_RESOURCE; } @@ -352,15 +355,15 @@ static inline int ompi_osc_pt2pt_put_w_req (void *origin_addr, int origin_count, /* flush will be called at the end of this function. make sure the post message has * arrived. */ if ((is_long_msg || request) && module->sc_group) { + OPAL_THREAD_LOCK(&module->lock); while (0 != module->num_post_msgs) { OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "waiting for post messages. num_post_msgs = %d", module->num_post_msgs)); opal_condition_wait(&module->cond, &module->lock); } + OPAL_THREAD_UNLOCK(&module->lock); } - OPAL_THREAD_UNLOCK(&module->lock); - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: put long protocol: %d, large datatype: %d", (int) is_long_msg, (int) is_long_datatype)); @@ -433,15 +436,12 @@ static inline int ompi_osc_pt2pt_put_w_req (void *origin_addr, int origin_count, header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID; } - OPAL_THREAD_LOCK(&module->lock); - ret = ompi_osc_pt2pt_frag_finish(module, frag); if (request || is_long_msg) { /* need to flush now in case the caller decides to wait on the request */ ompi_osc_pt2pt_frag_flush_target (module, target); } - OPAL_THREAD_UNLOCK(&module->lock); return ret; } @@ -512,8 +512,6 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count, ddt_len = ompi_datatype_pack_description_length(target_dt); payload_len = origin_dt->super.size * origin_count; - OPAL_THREAD_LOCK(&module->lock); - frag_len = sizeof(*header) + ddt_len + payload_len; ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); if (OMPI_SUCCESS != ret) { @@ -524,7 +522,6 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count, frag_len = sizeof(*header) + 8; ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - OPAL_THREAD_UNLOCK(&module->lock); return OMPI_ERR_OUT_OF_RESOURCE; } @@ -538,15 +535,15 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count, /* flush will be called at the end of this function. make sure the post message has * arrived. */ if ((is_long_msg || request) && module->sc_group) { + OPAL_THREAD_LOCK(&module->lock); while (0 != module->num_post_msgs) { OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "waiting for post messages. num_post_msgs = %d", module->num_post_msgs)); opal_condition_wait(&module->cond, &module->lock); } + OPAL_THREAD_UNLOCK(&module->lock); } - OPAL_THREAD_UNLOCK(&module->lock); - header = (ompi_osc_pt2pt_header_acc_t*) ptr; header->base.flags = 0; header->len = frag_len; @@ -623,8 +620,6 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count, header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID; } - OPAL_THREAD_LOCK(&module->lock); - ret = ompi_osc_pt2pt_frag_finish(module, frag); if (is_long_msg || request) { @@ -632,8 +627,6 @@ ompi_osc_pt2pt_accumulate_w_req (void *origin_addr, int origin_count, ompi_osc_pt2pt_frag_flush_target (module, target); } - OPAL_THREAD_UNLOCK(&module->lock); - return ret; } @@ -700,12 +693,9 @@ int ompi_osc_pt2pt_compare_and_swap (void *origin_addr, void *compare_addr, /* we need to send both the origin and compare buffers */ payload_len = dt->super.size * 2; - OPAL_THREAD_LOCK(&module->lock); - frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + payload_len; ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); if (OMPI_SUCCESS != ret) { - OPAL_THREAD_UNLOCK(&module->lock); return OMPI_ERR_OUT_OF_RESOURCE; } @@ -733,12 +723,10 @@ int ompi_osc_pt2pt_compare_and_swap (void *origin_addr, void *compare_addr, ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, 1, dt, target, tag, module->comm, NULL, ompi_osc_pt2pt_req_comm_complete, request); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - OPAL_THREAD_UNLOCK(&module->lock); return ret; } ret = ompi_osc_pt2pt_frag_finish(module, frag); - OPAL_THREAD_UNLOCK(&module->lock); return ret; } @@ -857,8 +845,6 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co * must fit in a single frag */ ddt_len = ompi_datatype_pack_description_length(target_dt); - OPAL_THREAD_LOCK(&module->lock); - frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + ddt_len; ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); if (OMPI_SUCCESS != ret) { @@ -866,7 +852,6 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8; ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - OPAL_THREAD_UNLOCK(&module->lock); return OMPI_ERR_OUT_OF_RESOURCE; } @@ -888,8 +873,6 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co } } - OPAL_THREAD_UNLOCK(&module->lock); - header = (ompi_osc_pt2pt_header_get_t*) ptr; header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_GET; header->base.flags = 0; @@ -936,14 +919,12 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co *request = &pt2pt_request->super; } - OPAL_THREAD_LOCK(&module->lock); ret = ompi_osc_pt2pt_frag_finish(module, frag); if (!release_req) { /* need to flush now in case the caller decides to wait on the request */ ompi_osc_pt2pt_frag_flush_target (module, target); } - OPAL_THREAD_UNLOCK(&module->lock); return ret; } @@ -1087,8 +1068,6 @@ int ompi_osc_pt2pt_rget_accumulate_internal (void *origin_addr, int origin_count payload_len = 0; } - OPAL_THREAD_LOCK(&module->lock); - frag_len = sizeof(*header) + ddt_len + payload_len; ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr); if (OMPI_SUCCESS != ret) { @@ -1099,7 +1078,6 @@ int ompi_osc_pt2pt_rget_accumulate_internal (void *origin_addr, int origin_count frag_len = sizeof(*header) + 8; ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - OPAL_THREAD_UNLOCK(&module->lock); return OMPI_ERR_OUT_OF_RESOURCE; } @@ -1121,15 +1099,15 @@ int ompi_osc_pt2pt_rget_accumulate_internal (void *origin_addr, int origin_count /* flush will be called at the end of this function. make sure the post message has * arrived. */ if (!release_req && module->sc_group) { + OPAL_THREAD_LOCK(&module->lock); while (0 != module->num_post_msgs) { OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "waiting for post messages. num_post_msgs = %d", module->num_post_msgs)); opal_condition_wait(&module->cond, &module->lock); } + OPAL_THREAD_UNLOCK(&module->lock); } - OPAL_THREAD_UNLOCK(&module->lock); - header = (ompi_osc_pt2pt_header_acc_t *) ptr; header->base.flags = 0; header->len = frag_len; @@ -1191,14 +1169,12 @@ int ompi_osc_pt2pt_rget_accumulate_internal (void *origin_addr, int origin_count *request = (ompi_request_t *) pt2pt_request; } - OPAL_THREAD_LOCK(&module->lock); ret = ompi_osc_pt2pt_frag_finish(module, frag); if (!release_req) { /* need to flush now in case the caller decides to wait on the request */ ompi_osc_pt2pt_frag_flush_target (module, target_rank); } - OPAL_THREAD_UNLOCK(&module->lock); return ret; } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c index 4a30290e98..351a5f2190 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c @@ -200,14 +200,15 @@ component_register(void) static int component_progress (void) { + int count = opal_list_get_size (&mca_osc_pt2pt_component.pending_operations); ompi_osc_pt2pt_pending_t *pending, *next; - if (0 == opal_list_get_size (&mca_osc_pt2pt_component.pending_operations)) { + if (0 == count) { return 0; } /* process one incoming request */ - OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock); + OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_operations_lock); OPAL_LIST_FOREACH_SAFE(pending, next, &mca_osc_pt2pt_component.pending_operations, ompi_osc_pt2pt_pending_t) { int ret; @@ -231,7 +232,7 @@ static int component_progress (void) OBJ_RELEASE(pending); } } - OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock); + OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_operations_lock); return 1; } @@ -244,8 +245,7 @@ component_init(bool enable_progress_threads, OBJ_CONSTRUCT(&mca_osc_pt2pt_component.lock, opal_mutex_t); OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations, opal_list_t); - OBJ_CONSTRUCT(&mca_osc_pt2pt_component.request_gc, opal_list_t); - OBJ_CONSTRUCT(&mca_osc_pt2pt_component.buffer_gc, opal_list_t); + OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations_lock, opal_mutex_t); OBJ_CONSTRUCT(&mca_osc_pt2pt_component.modules, opal_hash_table_t); @@ -254,11 +254,13 @@ component_init(bool enable_progress_threads, mca_osc_pt2pt_component.progress_enable = false; mca_osc_pt2pt_component.module_count = 0; - OBJ_CONSTRUCT(&mca_osc_pt2pt_component.frags, opal_free_list_t); - ret = opal_free_list_init(&mca_osc_pt2pt_component.frags, - sizeof(ompi_osc_pt2pt_frag_t), - OBJ_CLASS(ompi_osc_pt2pt_frag_t), - 1, -1, 1); + OBJ_CONSTRUCT(&mca_osc_pt2pt_component.frags, ompi_free_list_t); + ret = ompi_free_list_init_new (&mca_osc_pt2pt_component.frags, + sizeof(ompi_osc_pt2pt_frag_t), 8, + OBJ_CLASS(ompi_osc_pt2pt_frag_t), + mca_osc_pt2pt_component.buffer_size + + sizeof (ompi_osc_pt2pt_frag_header_t), + 8, 1, -1, 1, 0); if (OMPI_SUCCESS != ret) { opal_output_verbose(1, ompi_osc_base_framework.framework_output, "%s:%d: ompi_free_list_init failed: %d", @@ -303,8 +305,7 @@ component_finalize(void) OBJ_DESTRUCT(&mca_osc_pt2pt_component.lock); OBJ_DESTRUCT(&mca_osc_pt2pt_component.requests); OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations); - OBJ_DESTRUCT(&mca_osc_pt2pt_component.request_gc); - OBJ_DESTRUCT(&mca_osc_pt2pt_component.buffer_gc); + OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations_lock); return OMPI_SUCCESS; } @@ -329,17 +330,11 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit ompi_osc_pt2pt_module_t *module = NULL; int ret; char *name; - bool no_locks = false; /* We don't support shared windows; that's for the sm onesided component */ if (MPI_WIN_FLAVOR_SHARED == flavor) return OMPI_ERR_NOT_SUPPORTED; - if (check_config_value_bool("no_locks", info)) { - no_locks = true; - ompi_osc_pt2pt_no_locks = true; - } - /* create module structure with all fields initialized to zero */ module = (ompi_osc_pt2pt_module_t*) calloc(1, sizeof(ompi_osc_pt2pt_module_t)); @@ -354,10 +349,15 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit OBJ_CONSTRUCT(&module->cond, opal_condition_t); OBJ_CONSTRUCT(&module->acc_lock, opal_mutex_t); OBJ_CONSTRUCT(&module->queued_frags, opal_list_t); + OBJ_CONSTRUCT(&module->queued_frags_lock, opal_mutex_t); OBJ_CONSTRUCT(&module->locks_pending, opal_list_t); + OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t); OBJ_CONSTRUCT(&module->outstanding_locks, opal_list_t); OBJ_CONSTRUCT(&module->pending_acc, opal_list_t); OBJ_CONSTRUCT(&module->pending_posts, opal_list_t); + OBJ_CONSTRUCT(&module->request_gc, opal_list_t); + OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t); + OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t); /* options */ /* FIX ME: should actually check this value... */ @@ -405,20 +405,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit goto cleanup; } - if (!no_locks) { - module->passive_incoming_frag_count = calloc(ompi_comm_size(comm), sizeof(uint32_t)); - if (NULL == module->passive_incoming_frag_count) { - ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; - goto cleanup; - } - - module->passive_incoming_frag_signal_count = calloc(ompi_comm_size(comm), sizeof(uint32_t)); - if (NULL == module->passive_incoming_frag_signal_count) { - ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; - goto cleanup; - } - } - /* the statement below (from Brian) does not seem correct so disable active target on the * window. if this end up being incorrect please revert this one change */ module->active_eager_send_active = false; @@ -429,14 +415,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit module->active_eager_send_active = true; #endif - if (!no_locks) { - module->passive_eager_send_active = malloc(sizeof(bool) * ompi_comm_size(comm)); - if (NULL == module->passive_eager_send_active) { - ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; - goto cleanup; - } - } - /* lock data */ if (check_config_value_bool("no_locks", info)) { win->w_flags |= OMPI_WIN_NO_LOCKS; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index d717d76ce1..e682b854a5 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -26,7 +26,6 @@ #include "osc_pt2pt.h" #include "osc_pt2pt_header.h" #include "osc_pt2pt_data_move.h" -#include "osc_pt2pt_obj_convert.h" #include "osc_pt2pt_frag.h" #include "osc_pt2pt_request.h" @@ -225,8 +224,6 @@ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target, char *ptr; int ret; - OPAL_THREAD_LOCK(&module->lock); - ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr); if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { memcpy (ptr, data, len); @@ -234,8 +231,6 @@ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target, ret = ompi_osc_pt2pt_frag_finish(module, frag); } - OPAL_THREAD_UNLOCK(&module->lock); - return ret; } @@ -254,7 +249,7 @@ static int ompi_osc_pt2pt_control_send_unbuffered_cb (ompi_request_t *request) free (ctx); /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (request); + osc_pt2pt_gc_add_request (module, request); return OMPI_SUCCESS; } @@ -453,7 +448,7 @@ static int osc_pt2pt_incoming_req_complete (ompi_request_t *request) mark_incoming_completion (module, rank); /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (request); + osc_pt2pt_gc_add_request (module, request); return OMPI_SUCCESS; } @@ -476,7 +471,7 @@ static int osc_pt2pt_get_post_send_cb (ompi_request_t *request) mark_incoming_completion (module, rank); /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (request); + osc_pt2pt_gc_add_request (module, request); return OMPI_SUCCESS; } @@ -683,8 +678,7 @@ static int accumulate_cb (ompi_request_t *request) mark_incoming_completion (module, rank); - OPAL_THREAD_LOCK(&module->lock); - if (0 == --acc_data->request_count) { + if (0 == OPAL_THREAD_ADD32(&acc_data->request_count, -1)) { /* no more requests needed before the buffer can be accumulated */ if (acc_data->source) { @@ -695,13 +689,11 @@ static int accumulate_cb (ompi_request_t *request) /* drop the accumulate lock */ ompi_osc_pt2pt_accumulate_unlock (module); - osc_pt2pt_gc_add_buffer (&acc_data->super); + osc_pt2pt_gc_add_buffer (module, &acc_data->super); } /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (request); - - OPAL_THREAD_UNLOCK(&module->lock); + osc_pt2pt_gc_add_request (module, request); return ret; } @@ -710,6 +702,7 @@ static int accumulate_cb (ompi_request_t *request) static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_t *header, int source, char *data, size_t data_len, ompi_datatype_t *datatype) { + ompi_osc_pt2pt_peer_t *peer = module->peers + source; osc_pt2pt_pending_acc_t *pending_acc; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, @@ -720,11 +713,9 @@ static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_os return OMPI_ERR_OUT_OF_RESOURCE; } - if (!ompi_osc_pt2pt_no_locks) { - /* NTH: ensure we don't leave wait/process_flush/etc until this - * accumulate operation is complete. */ - module->passive_incoming_frag_signal_count[source]++; - } + /* NTH: ensure we don't leave wait/process_flush/etc until this + * accumulate operation is complete. */ + OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1); pending_acc->source = source; @@ -757,9 +748,7 @@ static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_os } /* add to the pending acc queue */ - OPAL_THREAD_LOCK(&module->lock); - opal_list_append (&module->pending_acc, &pending_acc->super); - OPAL_THREAD_UNLOCK(&module->lock); + OPAL_THREAD_SCOPED_LOCK(&module->lock, opal_list_append (&module->pending_acc, &pending_acc->super)); return OMPI_SUCCESS; } @@ -776,7 +765,7 @@ static int replace_cb (ompi_request_t *request) mark_incoming_completion (module, rank); /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (request); + osc_pt2pt_gc_add_request (module, request); /* unlock the accumulate lock */ ompi_osc_pt2pt_accumulate_unlock (module); @@ -1143,10 +1132,8 @@ int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module) assert (0); } - if (!ompi_osc_pt2pt_no_locks) { - /* signal that an operation is complete */ - mark_incoming_completion (module, pending_acc->source); - } + /* signal that an operation is complete */ + mark_incoming_completion (module, pending_acc->source); pending_acc->data = NULL; OBJ_RELEASE(pending_acc); @@ -1340,18 +1327,15 @@ static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source, source, complete_header->frag_count, module->active_incoming_frag_signal_count, module->active_incoming_frag_count)); - OPAL_THREAD_LOCK(&module->lock); - /* the current fragment is not part of the frag_count so we need to add it here */ - module->active_incoming_frag_signal_count += complete_header->frag_count + 1; - module->num_complete_msgs++; + OPAL_THREAD_ADD32((int32_t *) &module->active_incoming_frag_signal_count, + complete_header->frag_count + 1); - if (0 == module->num_complete_msgs) { + + if (0 == OPAL_THREAD_ADD32((int32_t *) &module->num_complete_msgs, 1)) { opal_condition_broadcast (&module->cond); } - OPAL_THREAD_UNLOCK(&module->lock); - return sizeof (*complete_header); } @@ -1361,17 +1345,18 @@ static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source, static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_flush_t *flush_header) { + ompi_osc_pt2pt_peer_t *peer = module->peers + source; int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "process_flush header = {.frag_count = %d}", flush_header->frag_count)); /* increase signal count by incoming frags */ - module->passive_incoming_frag_signal_count[source] += flush_header->frag_count; + OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -(int32_t) flush_header->frag_count); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "%d: process_flush: received message from %d. passive_incoming_frag_signal_count = %d, passive_incoming_frag_count = %d", - ompi_comm_rank(module->comm), source, module->passive_incoming_frag_signal_count[source], module->passive_incoming_frag_count[source])); + "%d: process_flush: received message from %d. passive_incoming_frag_count = %d", + ompi_comm_rank(module->comm), source, peer->passive_incoming_frag_count)); ret = ompi_osc_pt2pt_process_flush (module, source, flush_header); if (OMPI_SUCCESS != ret) { @@ -1382,35 +1367,30 @@ static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source, pending->source = source; pending->header.flush = *flush_header; - OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock); - opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super); - OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock); - - /* we now have to count the current fragment */ - module->passive_incoming_frag_signal_count[source]++; - } else { - /* need to account for the current fragment */ - module->passive_incoming_frag_count[source] = -1; + osc_pt2pt_add_pending (pending); } + /* signal incomming will increment this counter */ + OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1); + return sizeof (*flush_header); } static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_unlock_t *unlock_header) { + ompi_osc_pt2pt_peer_t *peer = module->peers + source; int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "process_unlock header = {.frag_count = %d}", unlock_header->frag_count)); /* increase signal count by incoming frags */ - module->passive_incoming_frag_signal_count[source] += unlock_header->frag_count; + OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -(int32_t) unlock_header->frag_count); OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, - "osc pt2pt: processing unlock request from %d. frag count = %d, signal_count = %d, processed_count = %d", - source, unlock_header->frag_count, (int) module->passive_incoming_frag_signal_count[source], - (int) module->passive_incoming_frag_count[source])); + "osc pt2pt: processing unlock request from %d. frag count = %d, processed_count = %d", + source, unlock_header->frag_count, (int) peer->passive_incoming_frag_count)); ret = ompi_osc_pt2pt_process_unlock (module, source, unlock_header); if (OMPI_SUCCESS != ret) { @@ -1421,17 +1401,12 @@ static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source, pending->source = source; pending->header.unlock = *unlock_header; - OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock); - opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super); - OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock); - - /* we now have to count the current fragment */ - module->passive_incoming_frag_signal_count[source]++; - } else { - /* need to account for the current fragment */ - module->passive_incoming_frag_count[source] = -1; + osc_pt2pt_add_pending (pending); } + /* signal incomming will increment this counter */ + OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1); + return sizeof (*unlock_header); } @@ -1463,10 +1438,10 @@ static int process_large_datatype_request_cb (ompi_request_t *request) } /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (request); + osc_pt2pt_gc_add_request (module, request); /* free the datatype buffer */ - osc_pt2pt_gc_add_buffer (&ddt_buffer->super); + osc_pt2pt_gc_add_buffer (module, &ddt_buffer->super); return OMPI_SUCCESS; } @@ -1679,10 +1654,6 @@ static int ompi_osc_pt2pt_callback (ompi_request_t *request) OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "finished processing incoming messages")); - - /* restart the receive request */ - OPAL_THREAD_LOCK(&module->lock); - /* post messages come unbuffered and should NOT increment the incoming completion * counters */ if (OMPI_OSC_PT2PT_HDR_TYPE_POST != base_header->type) { @@ -1690,14 +1661,12 @@ static int ompi_osc_pt2pt_callback (ompi_request_t *request) source : MPI_PROC_NULL); } - osc_pt2pt_gc_clean (); + osc_pt2pt_gc_clean (module); /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (request); + osc_pt2pt_gc_add_request (module, request); ompi_osc_pt2pt_frag_start_receive (module); - OPAL_THREAD_UNLOCK(&module->lock); - OPAL_THREAD_LOCK(&ompi_request_lock); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, @@ -1734,20 +1703,15 @@ isend_completion_cb(ompi_request_t *request) mark_outgoing_completion(module); /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (request); + osc_pt2pt_gc_add_request (module, request); return OMPI_SUCCESS; } -int -ompi_osc_pt2pt_component_isend(ompi_osc_pt2pt_module_t *module, - void *buf, - size_t count, - struct ompi_datatype_t *datatype, - int dest, - int tag, - struct ompi_communicator_t *comm) +int ompi_osc_pt2pt_component_isend (ompi_osc_pt2pt_module_t *module, void *buf, + size_t count, struct ompi_datatype_t *datatype, + int dest, int tag, struct ompi_communicator_t *comm) { return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, comm, isend_completion_cb, module); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c index 2f6787fc18..329d948bcc 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.c @@ -21,18 +21,11 @@ #include "osc_pt2pt_data_move.h" static void ompi_osc_pt2pt_frag_constructor (ompi_osc_pt2pt_frag_t *frag){ - frag->buffer = malloc (mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t)); - assert (frag->buffer); + frag->buffer = frag->super.ptr; } -static void ompi_osc_pt2pt_frag_destructor (ompi_osc_pt2pt_frag_t *frag) { - if (NULL != frag->buffer) { - free (frag->buffer); - } -} - -OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_frag_t, opal_list_item_t, - ompi_osc_pt2pt_frag_constructor, ompi_osc_pt2pt_frag_destructor); +OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_frag_t, ompi_free_list_item_t, + ompi_osc_pt2pt_frag_constructor, NULL); static int frag_send_cb (ompi_request_t *request) { @@ -45,11 +38,11 @@ static int frag_send_cb (ompi_request_t *request) frag->target, (void *) frag, (void *) request)); mark_outgoing_completion(module); - OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.frags, &frag->super); + OMPI_FREE_LIST_RETURN_MT(&mca_osc_pt2pt_component.frags, &frag->super); /* put this request on the garbage colletion list */ - osc_pt2pt_gc_add_request (request); + osc_pt2pt_gc_add_request (module, request); return OMPI_SUCCESS; } @@ -77,12 +70,12 @@ frag_send(ompi_osc_pt2pt_module_t *module, int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, - ompi_osc_pt2pt_frag_t *frag) + ompi_osc_pt2pt_frag_t *frag) { + ompi_osc_pt2pt_peer_t *peer = module->peers + frag->target; int ret; - assert(0 == frag->pending); - assert(module->peers[frag->target].active_frag != frag); + assert(0 == frag->pending && peer->active_frag != frag); /* we need to signal now that a frag is outgoing to ensure the count sent * with the unlock message is correct */ @@ -90,16 +83,10 @@ ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, /* if eager sends are not active, can't send yet, so buffer and get out... */ - if (module->passive_target_access_epoch) { - if (!module->passive_eager_send_active[frag->target]) { - opal_list_append(&module->queued_frags, &frag->super); - return OMPI_SUCCESS; - } - } else { - if (!module->active_eager_send_active) { - opal_list_append(&module->queued_frags, &frag->super); - return OMPI_SUCCESS; - } + if (!(peer->eager_send_active || module->all_access_epoch)) { + OPAL_THREAD_SCOPED_LOCK(&module->queued_frags_lock, + opal_list_append(&module->queued_frags, (opal_list_item_t *) frag)); + return OMPI_SUCCESS; } ret = frag_send(module, frag); @@ -113,43 +100,50 @@ ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target) { + ompi_osc_pt2pt_frag_t *next, *frag = module->peers[target].active_frag; int ret = OMPI_SUCCESS; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: frag flush target begin")); /* flush the active frag */ - if (NULL != module->peers[target].active_frag) { - ompi_osc_pt2pt_frag_t *frag = module->peers[target].active_frag; - - if (0 != frag->pending) { - /* communication going on while synchronizing; this is a bug */ + if (NULL != frag) { + if (1 != frag->pending) { + /* communication going on while synchronizing; this is an rma usage bug */ return OMPI_ERR_RMA_SYNC; } - module->peers[target].active_frag = NULL; - - ret = ompi_osc_pt2pt_frag_start(module, frag); - if (OMPI_SUCCESS != ret) return ret; + if (opal_atomic_cmpset (&module->peers[target].active_frag, frag, NULL)) { + OPAL_THREAD_ADD32(&frag->pending, -1); + ret = ompi_osc_pt2pt_frag_start(module, frag); + if (OMPI_SUCCESS != ret) { + return ret; + } + } } OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: frag flush target finished active frag")); /* walk through the pending list and send */ - ompi_osc_pt2pt_frag_t *frag, *next; - OPAL_LIST_FOREACH_SAFE(frag, next, &module->queued_frags, ompi_osc_pt2pt_frag_t) { - if (frag->target == target) { - opal_list_remove_item(&module->queued_frags, &frag->super); - ret = frag_send(module, frag); - if (OMPI_SUCCESS != ret) return ret; + OPAL_THREAD_LOCK(&module->queued_frags_lock); + if (opal_list_get_size (&module->queued_frags)) { + OPAL_LIST_FOREACH_SAFE(frag, next, &module->queued_frags, ompi_osc_pt2pt_frag_t) { + if (frag->target == target) { + opal_list_remove_item(&module->queued_frags, (opal_list_item_t *) frag); + ret = frag_send(module, frag); + if (OPAL_UNLIKELY(OMPI_SUCCESS != frag)) { + break; + } + } } } + OPAL_THREAD_UNLOCK(&module->queued_frags_lock); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: frag flush target finished")); - return OMPI_SUCCESS; + return ret; } @@ -165,18 +159,24 @@ ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module) /* flush the active frag */ for (i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { - if (NULL != module->peers[i].active_frag) { - ompi_osc_pt2pt_frag_t *frag = module->peers[i].active_frag; + ompi_osc_pt2pt_frag_t *frag = module->peers[i].active_frag; - if (0 != frag->pending) { + if (NULL != frag) { + if (1 != frag->pending) { + OPAL_THREAD_UNLOCK(&module->lock); /* communication going on while synchronizing; this is a bug */ return OMPI_ERR_RMA_SYNC; } - module->peers[i].active_frag = NULL; + if (!opal_atomic_cmpset_ptr (&module->peers[i].active_frag, frag, NULL)) { + continue; + } + OPAL_THREAD_ADD32(&frag->pending, -1); ret = ompi_osc_pt2pt_frag_start(module, frag); - if (OMPI_SUCCESS != ret) return ret; + if (OMPI_SUCCESS != ret) { + return ret; + } } } @@ -184,18 +184,20 @@ ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module) "osc pt2pt: frag flush all finished active frag")); /* try to start all the queued frags */ - OPAL_LIST_FOREACH_SAFE(frag, next, &module->queued_frags, ompi_osc_pt2pt_frag_t) { - opal_list_remove_item(&module->queued_frags, &frag->super); - ret = frag_send(module, frag); - if (OMPI_SUCCESS != ret) { - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "osc pt2pt: failure for frag send: %d", ret)); - return ret; + OPAL_THREAD_LOCK(&module->queued_frags_lock); + if (opal_list_get_size (&module->queued_frags)) { + OPAL_LIST_FOREACH_SAFE(frag, next, &module->queued_frags, ompi_osc_pt2pt_frag_t) { + opal_list_remove_item(&module->queued_frags, (opal_list_item_t *) frag); + ret = frag_send(module, frag); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + break; + } } } + OPAL_THREAD_UNLOCK(&module->queued_frags_lock); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: frag flush all done")); - return OMPI_SUCCESS; + return ret; } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h index a90ef42ac7..7ce966f215 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h @@ -21,7 +21,7 @@ /** Communication buffer for packing messages */ struct ompi_osc_pt2pt_frag_t { - opal_list_item_t super; + ompi_free_list_item_t super; /* target rank of buffer */ int target; unsigned char *buffer; @@ -33,7 +33,7 @@ struct ompi_osc_pt2pt_frag_t { char *top; /* Number of operations which have started writing into the frag, but not yet completed doing so */ - int pending; + int32_t pending; ompi_osc_pt2pt_frag_header_t *header; ompi_osc_pt2pt_module_t *module; }; @@ -64,22 +64,29 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int return OMPI_ERR_OUT_OF_RESOURCE; } + OPAL_THREAD_LOCK(&module->lock); if (NULL == curr || curr->remain_len < request_len) { - opal_free_list_item_t *item; + ompi_free_list_item_t *item = NULL; if (NULL != curr) { curr->remain_len = 0; + module->peers[target].active_frag = NULL; + opal_atomic_mb (); + /* If there's something pending, the pending finish will start the buffer. Otherwise, we need to start it now. */ - if (0 == curr->pending) { - module->peers[target].active_frag = NULL; + if (0 == OPAL_THREAD_ADD32(&curr->pending, -1)) { ret = ompi_osc_pt2pt_frag_start(module, curr); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + return ret; + } } } - OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.frags, - item, ret); - if (OMPI_SUCCESS != ret) return ret; + OMPI_FREE_LIST_GET_MT(&mca_osc_pt2pt_component.frags, item); + if (OPAL_UNLIKELY(NULL == item)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } curr = module->peers[target].active_frag = (ompi_osc_pt2pt_frag_t*) item; @@ -89,7 +96,7 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int curr->top = (char*) (curr->header + 1); curr->remain_len = mca_osc_pt2pt_component.buffer_size; curr->module = module; - curr->pending = 0; + curr->pending = 1; curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG; curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; @@ -101,6 +108,7 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int curr->header->windx = ompi_comm_get_cid(module->comm); if (curr->remain_len < request_len) { + OPAL_THREAD_UNLOCK(&module->lock); return OMPI_ERR_TEMP_OUT_OF_RESOURCE; } } @@ -110,8 +118,10 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int curr->top += request_len; curr->remain_len -= request_len; - curr->pending++; - curr->header->num_ops++; + OPAL_THREAD_UNLOCK(&module->lock); + + OPAL_THREAD_ADD32(&curr->pending, 1); + OPAL_THREAD_ADD32(&curr->header->num_ops, 1); return OMPI_SUCCESS; } @@ -123,12 +133,7 @@ static inline int ompi_osc_pt2pt_frag_alloc(ompi_osc_pt2pt_module_t *module, int static inline int ompi_osc_pt2pt_frag_finish(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t* buffer) { - if (0 == --buffer->pending && 0 == buffer->remain_len) { - if (OPAL_LIKELY(buffer == module->peers[buffer->target].active_frag)) { - /* this is the active fragment. need to set the current fragment to null - * or it will be started multiple times */ - module->peers[buffer->target].active_frag = NULL; - } + if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) { return ompi_osc_pt2pt_frag_start(module, buffer); } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_header.h b/ompi/mca/osc/pt2pt/osc_pt2pt_header.h index 4fc0a2f7bb..51d20ccd25 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_header.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_header.h @@ -120,7 +120,7 @@ typedef struct ompi_osc_pt2pt_header_post_t ompi_osc_pt2pt_header_post_t; struct ompi_osc_pt2pt_header_lock_t { ompi_osc_pt2pt_header_base_t base; int32_t lock_type; - uint64_t serial_number; + uint64_t lock_ptr; }; typedef struct ompi_osc_pt2pt_header_lock_t ompi_osc_pt2pt_header_lock_t; @@ -128,7 +128,7 @@ struct ompi_osc_pt2pt_header_lock_ack_t { ompi_osc_pt2pt_header_base_t base; uint16_t windx; uint32_t source; - uint64_t serial_number; + uint64_t lock_ptr; }; typedef struct ompi_osc_pt2pt_header_lock_ack_t ompi_osc_pt2pt_header_lock_ack_t; @@ -136,11 +136,13 @@ struct ompi_osc_pt2pt_header_unlock_t { ompi_osc_pt2pt_header_base_t base; int32_t lock_type; uint32_t frag_count; + uint64_t lock_ptr; }; typedef struct ompi_osc_pt2pt_header_unlock_t ompi_osc_pt2pt_header_unlock_t; struct ompi_osc_pt2pt_header_unlock_ack_t { ompi_osc_pt2pt_header_base_t base; + uint64_t lock_ptr; }; typedef struct ompi_osc_pt2pt_header_unlock_ack_t ompi_osc_pt2pt_header_unlock_ack_t; @@ -161,8 +163,8 @@ struct ompi_osc_pt2pt_frag_header_t { ompi_osc_pt2pt_header_base_t base; uint16_t windx; /* cid of communicator backing window (our window id) */ uint32_t source; /* rank in window of source process */ - uint16_t num_ops; /* number of operations in this buffer */ - uint16_t pad[3]; /* ensure the fragment header is a multiple of 8 bytes */ + int32_t num_ops; /* number of operations in this buffer */ + uint32_t pad; /* ensure the fragment header is a multiple of 8 bytes */ }; typedef struct ompi_osc_pt2pt_frag_header_t ompi_osc_pt2pt_frag_header_t; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_module.c b/ompi/mca/osc/pt2pt/osc_pt2pt_module.c index a8038304d7..03c6cfc277 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_module.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_module.c @@ -49,7 +49,6 @@ ompi_osc_pt2pt_free(ompi_win_t *win) { int ret = OMPI_SUCCESS; ompi_osc_pt2pt_module_t *module = GET_MODULE(win); - opal_list_item_t *item; if (NULL == module) { return OMPI_SUCCESS; @@ -67,43 +66,38 @@ ompi_osc_pt2pt_free(ompi_win_t *win) } /* remove from component information */ - OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock); - opal_hash_table_remove_value_uint32(&mca_osc_pt2pt_component.modules, - ompi_comm_get_cid(module->comm)); - OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock); + OPAL_THREAD_SCOPED_LOCK(&mca_osc_pt2pt_component.lock, + opal_hash_table_remove_value_uint32(&mca_osc_pt2pt_component.modules, + ompi_comm_get_cid(module->comm))); } win->w_osc_module = NULL; OBJ_DESTRUCT(&module->outstanding_locks); OBJ_DESTRUCT(&module->locks_pending); + OBJ_DESTRUCT(&module->locks_pending_lock); OBJ_DESTRUCT(&module->acc_lock); OBJ_DESTRUCT(&module->cond); OBJ_DESTRUCT(&module->lock); /* it is erroneous to close a window with active operations on it so we should * probably produce an error here instead of cleaning up */ - while (NULL != (item = opal_list_remove_first (&module->pending_acc))) { - OBJ_RELEASE(item); - } + OPAL_LIST_DESTRUCT(&module->pending_acc); + OPAL_LIST_DESTRUCT(&module->pending_posts); + OPAL_LIST_DESTRUCT(&module->queued_frags); + OBJ_DESTRUCT(&module->queued_frags_lock); - OBJ_DESTRUCT(&module->pending_acc); - - while (NULL != (item = opal_list_remove_first (&module->pending_posts))) { - OBJ_RELEASE(item); - } - - OBJ_DESTRUCT(&module->pending_posts); - - osc_pt2pt_gc_clean (); + osc_pt2pt_gc_clean (module); + OPAL_LIST_DESTRUCT(&module->request_gc); + OPAL_LIST_DESTRUCT(&module->buffer_gc); + OBJ_DESTRUCT(&module->gc_lock); if (NULL != module->peers) { free(module->peers); } - if (NULL != module->passive_eager_send_active) free(module->passive_eager_send_active); - if (NULL != module->passive_incoming_frag_count) free(module->passive_incoming_frag_count); - if (NULL != module->passive_incoming_frag_signal_count) free(module->passive_incoming_frag_signal_count); + if (NULL != module->epoch_outgoing_frag_count) free(module->epoch_outgoing_frag_count); + if (NULL != module->frag_request) { module->frag_request->req_complete_cb = NULL; ompi_request_cancel (module->frag_request); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c index a27553598f..fa96a908e3 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c @@ -33,12 +33,15 @@ #include "ompi/mca/osc/base/base.h" #include "opal/include/opal_stdint.h" +static bool ompi_osc_pt2pt_lock_try_acquire (ompi_osc_pt2pt_module_t* module, int source, int lock_type, + uint64_t serial_number); + /* target-side tracking of a lock request */ struct ompi_osc_pt2pt_pending_lock_t { opal_list_item_t super; int peer; int lock_type; - uint64_t serial_number; + uint64_t lock_ptr; }; typedef struct ompi_osc_pt2pt_pending_lock_t ompi_osc_pt2pt_pending_lock_t; OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_lock_t, opal_list_item_t, @@ -49,6 +52,8 @@ OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_lock_t, opal_list_item_t, struct ompi_osc_pt2pt_outstanding_lock_t { opal_list_item_t super; int target; + int assert; + bool flushing; int32_t lock_acks_received; int32_t unlock_acks_received; int32_t flush_acks_received; @@ -60,8 +65,10 @@ OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_outstanding_lock_t, opal_list_item_t, NULL, NULL); static int ompi_osc_activate_next_lock (ompi_osc_pt2pt_module_t *module); -static inline int queue_lock (ompi_osc_pt2pt_module_t *module, int requestor, - int lock_type, uint64_t serial_number); +static inline int queue_lock (ompi_osc_pt2pt_module_t *module, int requestor, int lock_type, uint64_t lock_ptr); +static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock, + int target); + /** * Find the first outstanding lock to a target. @@ -75,54 +82,64 @@ static inline int queue_lock (ompi_osc_pt2pt_module_t *module, int requestor, * looking for a lock that matches target. The caller must hold the * module lock. */ +static inline ompi_osc_pt2pt_outstanding_lock_t *find_outstanding_lock_st (ompi_osc_pt2pt_module_t *module, int target) +{ + ompi_osc_pt2pt_outstanding_lock_t *outstanding_lock, *lock = NULL; + + OPAL_LIST_FOREACH(outstanding_lock, &module->outstanding_locks, ompi_osc_pt2pt_outstanding_lock_t) { + if (outstanding_lock->target == target) { + lock = outstanding_lock; + break; + } + } + + return lock; +} + static inline ompi_osc_pt2pt_outstanding_lock_t *find_outstanding_lock (ompi_osc_pt2pt_module_t *module, int target) { ompi_osc_pt2pt_outstanding_lock_t *lock; - OPAL_LIST_FOREACH(lock, &module->outstanding_locks, ompi_osc_pt2pt_outstanding_lock_t) { - if (lock->target == target) { - return lock; - } - } + OPAL_THREAD_LOCK(&module->lock); + lock = find_outstanding_lock_st (module, target); + OPAL_THREAD_UNLOCK(&module->lock); - return NULL; + return lock; } static inline ompi_osc_pt2pt_outstanding_lock_t *find_outstanding_lock_by_serial (ompi_osc_pt2pt_module_t *module, uint64_t serial_number) { - ompi_osc_pt2pt_outstanding_lock_t *lock; + ompi_osc_pt2pt_outstanding_lock_t *outstanding_lock, *lock = NULL; - OPAL_LIST_FOREACH(lock, &module->outstanding_locks, ompi_osc_pt2pt_outstanding_lock_t) { - if (lock->serial_number == serial_number) { - return lock; + OPAL_THREAD_LOCK(&module->lock); + OPAL_LIST_FOREACH(outstanding_lock, &module->outstanding_locks, ompi_osc_pt2pt_outstanding_lock_t) { + if (outstanding_lock->serial_number == serial_number) { + lock = outstanding_lock; + break; } } + OPAL_THREAD_UNLOCK(&module->lock); - return NULL; + return lock; } static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock) { const int my_rank = ompi_comm_rank (module->comm); + bool acquired = false; - if ((MPI_LOCK_SHARED == lock->type && MPI_LOCK_EXCLUSIVE != module->lock_status) || - (MPI_LOCK_EXCLUSIVE == lock->type && 0 == module->lock_status)) { - /* we can aquire the lock immediately */ - module->lock_status = lock->type; - if (MPI_LOCK_SHARED == lock->type) { - module->shared_count++; - } - - lock->lock_acks_received++; - } else { + acquired = ompi_osc_pt2pt_lock_try_acquire (module, my_rank, lock->type, (uint64_t) (uintptr_t) lock); + if (!acquired) { /* queue the lock */ - queue_lock (module, my_rank, lock->type, lock->serial_number); - } + queue_lock (module, my_rank, lock->type, (uint64_t) (uintptr_t) lock); - /* If locking local, can't be non-blocking according to the - standard. We need to wait for the ack here. */ - while (0 == lock->lock_acks_received) { - opal_condition_wait(&module->cond, &module->lock); + /* If locking local, can't be non-blocking according to the + standard. We need to wait for the ack here. */ + OPAL_THREAD_LOCK(&module->lock); + while (0 == lock->lock_acks_received) { + opal_condition_wait(&module->cond, &module->lock); + } + OPAL_THREAD_UNLOCK(&module->lock); } OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, @@ -133,15 +150,20 @@ static inline int ompi_osc_pt2pt_lock_self (ompi_osc_pt2pt_module_t *module, omp static inline void ompi_osc_pt2pt_unlock_self (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock) { - if (!(MPI_LOCK_SHARED == lock->type && 0 == --module->shared_count)) { - module->lock_status = 0; + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, + "ompi_osc_pt2pt_unlock_self: unlocking myself. lock state = %d", module->lock_status)); + + if (MPI_LOCK_EXCLUSIVE == lock->type) { + OPAL_THREAD_ADD32(&module->lock_status, 1); + ompi_osc_activate_next_lock (module); + } else if (0 == OPAL_THREAD_ADD32(&module->lock_status, -1)) { ompi_osc_activate_next_lock (module); } /* need to ensure we make progress */ opal_progress(); - lock->unlock_acks_received++; + OPAL_THREAD_ADD32(&lock->unlock_acks_received, 1); } static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_outstanding_lock_t *lock) @@ -153,7 +175,7 @@ static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, i lock_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_LOCK_REQ; lock_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; lock_req.lock_type = lock->type; - lock_req.serial_number = lock->serial_number; + lock_req.lock_ptr = (uint64_t) (uintptr_t) lock; ret = ompi_osc_pt2pt_control_send (module, target, &lock_req, sizeof (lock_req)); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { @@ -169,44 +191,81 @@ static inline int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, i static inline int ompi_osc_pt2pt_unlock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_outstanding_lock_t *lock) { ompi_osc_pt2pt_header_unlock_t unlock_req; + int32_t frag_count = opal_atomic_swap_32 ((int32_t *) module->epoch_outgoing_frag_count + target, -1); unlock_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ; unlock_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; - unlock_req.frag_count = module->epoch_outgoing_frag_count[target]; + unlock_req.frag_count = frag_count; unlock_req.lock_type = lock->type; + unlock_req.lock_ptr = (uint64_t) (uintptr_t) lock; /* send control message with unlock request and count */ return ompi_osc_pt2pt_control_send (module, target, &unlock_req, sizeof (unlock_req)); } +static int ompi_osc_pt2pt_lock_internal_execute (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock) +{ + int my_rank = ompi_comm_rank (module->comm); + int target = lock->target; + int assert = lock->assert; + int ret; + if (0 == (assert & MPI_MODE_NOCHECK)) { + if (my_rank != target && target != -1) { + ret = ompi_osc_pt2pt_lock_remote (module, target, lock); + } else { + ret = ompi_osc_pt2pt_lock_self (module, lock); + } -int ompi_osc_pt2pt_lock(int lock_type, int target, int assert, ompi_win_t *win) + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + /* return */ + return ret; + } + + if (-1 == target) { + for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { + if (my_rank == i) { + continue; + } + + ret = ompi_osc_pt2pt_lock_remote (module, i, lock); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + return ret; + } + } + + } + } else { + if (-1 == target) { + lock->lock_acks_received = ompi_comm_size(module->comm); + } else { + lock->lock_acks_received = 1; + } + } + + return OMPI_SUCCESS; +} + +static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert, ompi_win_t *win) { ompi_osc_pt2pt_module_t *module = GET_MODULE(win); ompi_osc_pt2pt_outstanding_lock_t *lock; - ompi_osc_pt2pt_peer_t *peer = module->peers + target; + ompi_osc_pt2pt_peer_t *peer = NULL; int ret = OMPI_SUCCESS; + if (-1 != target) { + peer = module->peers + target; + } + /* Check if no_locks is set. TODO: we also need to track whether we are in an * active target epoch. Fence can make this tricky to track. */ - if (NULL == module->passive_eager_send_active || module->sc_group) { + if (module->sc_group) { return OMPI_ERR_RMA_SYNC; } - assert(module->epoch_outgoing_frag_count[target] == 0); - OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "osc pt2pt: lock %d %d", target, lock_type)); - /* delay all eager sends until we've heard back.. */ - OPAL_THREAD_LOCK(&module->lock); - module->passive_eager_send_active[target] = false; - module->passive_target_access_epoch = true; - - /* when the lock ack returns we will be in an access epoch with this peer */ - peer->access_epoch = true; - /* create lock item */ lock = OBJ_NEW(ompi_osc_pt2pt_outstanding_lock_t); if (OPAL_UNLIKELY(NULL == lock)) { @@ -216,241 +275,169 @@ int ompi_osc_pt2pt_lock(int lock_type, int target, int assert, ompi_win_t *win) lock->target = target; lock->lock_acks_received = 0; lock->unlock_acks_received = 0; - lock->serial_number = module->lock_serial_number++; + lock->serial_number = OPAL_THREAD_ADD64((int64_t *) &module->lock_serial_number, 1); lock->type = lock_type; - opal_list_append(&module->outstanding_locks, &lock->super); + lock->assert = assert; - if (0 == (assert & MPI_MODE_NOCHECK)) { - if (ompi_comm_rank (module->comm) != target) { - ret = ompi_osc_pt2pt_lock_remote (module, target, lock); - } else { - ret = ompi_osc_pt2pt_lock_self (module, lock); - } + /* delay all eager sends until we've heard back.. */ + OPAL_THREAD_LOCK(&module->lock); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - goto exit_error; - } - } else { - lock->lock_acks_received = 1; + /* check for conflicting lock */ + if (find_outstanding_lock_st (module, target)) { + OBJ_RELEASE(lock); + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_ERR_RMA_CONFLICT; } + /* when the lock ack returns we will be in an access epoch with this peer/all peers (target = -1) */ + if (-1 == target) { + module->all_access_epoch = true; + } else { + peer->access_epoch = true; + } + + module->passive_target_access_epoch = true; + + opal_list_append(&module->outstanding_locks, &lock->super); OPAL_THREAD_UNLOCK(&module->lock); - return OMPI_SUCCESS; + ret = ompi_osc_pt2pt_lock_internal_execute (module, lock); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + OPAL_THREAD_SCOPED_LOCK(&module->lock, + opal_list_remove_item(&module->outstanding_locks, &lock->super)); + OBJ_RELEASE(lock); + } -exit_error: - - OPAL_THREAD_UNLOCK(&module->lock); - opal_list_remove_item(&module->outstanding_locks, &lock->super); - OBJ_RELEASE(lock); - - /* return */ return ret; } - -int ompi_osc_pt2pt_unlock(int target, ompi_win_t *win) +static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win) { ompi_osc_pt2pt_module_t *module = GET_MODULE(win); ompi_osc_pt2pt_outstanding_lock_t *lock = NULL; - ompi_osc_pt2pt_peer_t *peer = module->peers + target; + int my_rank = ompi_comm_rank (module->comm); + ompi_osc_pt2pt_peer_t *peer = NULL; + int lock_acks_expected; int ret = OMPI_SUCCESS; - OPAL_THREAD_LOCK(&module->lock); + if (-1 != target) { + lock_acks_expected = 1; + peer = module->peers + target; + } else { + lock_acks_expected = ompi_comm_size (module->comm); + } - lock = find_outstanding_lock (module, target); + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, + "ompi_osc_pt2pt_unlock_internal: unlocking target %d", target)); + + OPAL_THREAD_LOCK(&module->lock); + lock = find_outstanding_lock_st (module, target); if (OPAL_UNLIKELY(NULL == lock)) { OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_unlock: target %d is not locked in window %s", target, win->w_name)); - OPAL_THREAD_LOCK(&module->lock); + OPAL_THREAD_UNLOCK(&module->lock); return OMPI_ERR_RMA_SYNC; } - if (ompi_comm_rank (module->comm) != target) { + opal_list_remove_item (&module->outstanding_locks, &lock->super); + + /* wait until ack has arrived from target */ + while (lock->lock_acks_received != lock_acks_expected) { + opal_condition_wait(&module->cond, &module->lock); + } + OPAL_THREAD_UNLOCK(&module->lock); + + if (lock->assert & MPI_MODE_NOCHECK) { + /* flush intstead */ + ompi_osc_pt2pt_flush_lock (module, lock, target); + } else if (my_rank != target) { OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "osc pt2pt: unlock %d, lock_acks_received = %d", target, lock->lock_acks_received)); - /* wait until ack has arrived from target */ - while (0 == lock->lock_acks_received) { - opal_condition_wait(&module->cond, &module->lock); - } + if (-1 == target) { + /* send unlock messages to all of my peers */ + for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { + if (my_rank == i) { + continue; + } - ret = ompi_osc_pt2pt_unlock_remote (module, target, lock); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - goto cleanup; + ret = ompi_osc_pt2pt_unlock_remote (module, i, lock); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + return ret; + } + } + + } else { + ret = ompi_osc_pt2pt_unlock_remote (module, target, lock); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + return ret; + } } /* start all sendreqs to target */ - ret = ompi_osc_pt2pt_frag_flush_target(module, target); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - goto cleanup; + if (-1 == target) { + ret = ompi_osc_pt2pt_frag_flush_all (module); + } else { + ret = ompi_osc_pt2pt_frag_flush_target(module, target); } - /* wait for all the requests and the unlock ack (meaning remote completion) */ - while (module->outgoing_frag_count != module->outgoing_frag_signal_count || - 0 == lock->unlock_acks_received) { + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { + return ret; + } + + /* wait for unlock acks. this signals remote completion of fragments */ + OPAL_THREAD_LOCK(&module->lock); + while (lock->unlock_acks_received != lock_acks_expected) { opal_condition_wait(&module->cond, &module->lock); } + OPAL_THREAD_UNLOCK(&module->lock); OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_unlock: unlock of %d complete", target)); - } else { + } + + if ((target == my_rank || target == -1) && !(lock->assert & MPI_MODE_NOCHECK)) { ompi_osc_pt2pt_unlock_self (module, lock); } - module->passive_eager_send_active[target] = false; - module->epoch_outgoing_frag_count[target] = 0; - module->passive_target_access_epoch = false; - - peer->access_epoch = false; - - /* delete the lock */ - opal_list_remove_item (&module->outstanding_locks, &lock->super); - OBJ_RELEASE(lock); - - cleanup: + OPAL_THREAD_LOCK(&module->lock); + if (-1 != target) { + peer->access_epoch = false; + module->passive_target_access_epoch = false; + } else { + module->passive_target_access_epoch = false; + module->all_access_epoch = false; + } OPAL_THREAD_UNLOCK(&module->lock); + OBJ_RELEASE(lock); + return ret; } +int ompi_osc_pt2pt_lock(int lock_type, int target, int assert, ompi_win_t *win) +{ + assert(target >= 0); + + return ompi_osc_pt2pt_lock_internal (lock_type, target, assert, win); +} + +int ompi_osc_pt2pt_unlock (int target, struct ompi_win_t *win) +{ + return ompi_osc_pt2pt_unlock_internal (target, win); +} int ompi_osc_pt2pt_lock_all(int assert, struct ompi_win_t *win) { - ompi_osc_pt2pt_module_t *module = GET_MODULE(win); - int ret, my_rank = ompi_comm_rank (module->comm); - ompi_osc_pt2pt_outstanding_lock_t *lock; - - /* Check if no_locks is set. TODO: we also need to track whether we are in an active - * target epoch. Fence can make this tricky to track. */ - if (NULL == module->passive_eager_send_active) { - return OMPI_ERR_RMA_SYNC; - } - - /* delay all eager sends until we've heard back.. */ - OPAL_THREAD_LOCK(&module->lock); - for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { - module->passive_eager_send_active[i] = false; - } - module->passive_target_access_epoch = true; - module->all_access_epoch = true; - - /* create lock item */ - lock = OBJ_NEW(ompi_osc_pt2pt_outstanding_lock_t); - lock->target = -1; - lock->lock_acks_received = 0; - lock->unlock_acks_received = 0; - lock->serial_number = module->lock_serial_number++; - lock->type = MPI_LOCK_SHARED; - opal_list_append(&module->outstanding_locks, &lock->super); - - /* if nocheck is not specified, send a lock request to everyone - and wait for the local response */ - if (0 != (assert & MPI_MODE_NOCHECK)) { - ret = ompi_osc_pt2pt_lock_self (module, lock); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - goto exit_error; - } - - for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { - if (my_rank == i) { - continue; - } - - ret = ompi_osc_pt2pt_lock_remote (module, i, lock); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - opal_list_remove_item(&module->outstanding_locks, &lock->super); - } - } - } else { - lock->lock_acks_received = ompi_comm_size(module->comm); - } - - OPAL_THREAD_UNLOCK(&module->lock); - - return OMPI_SUCCESS; - - exit_error: - - OPAL_THREAD_UNLOCK(&module->lock); - opal_list_remove_item(&module->outstanding_locks, &lock->super); - OBJ_RELEASE(lock); - - /* return */ - return ret; + return ompi_osc_pt2pt_lock_internal (MPI_LOCK_SHARED, -1, assert, win); } int ompi_osc_pt2pt_unlock_all (struct ompi_win_t *win) { - ompi_osc_pt2pt_module_t *module = GET_MODULE(win); - int my_rank = ompi_comm_rank (module->comm); - ompi_osc_pt2pt_outstanding_lock_t *lock; - int ret; - - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "ompi_osc_pt2pt_unlock_all entering...")); - - OPAL_THREAD_LOCK(&module->lock); - - lock = find_outstanding_lock (module, -1); - if (OPAL_UNLIKELY(NULL == lock)) { - OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, - "ompi_osc_pt2pt_unlock_all: not locked in window %s", - win->w_name)); - OPAL_THREAD_LOCK(&module->lock); - return OMPI_ERR_RMA_SYNC; - } - - /* wait for lock acks */ - while (ompi_comm_size(module->comm) != lock->lock_acks_received) { - opal_condition_wait(&module->cond, &module->lock); - } - - /* send unlock messages to all of my peers */ - for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { - if (my_rank == i) { - continue; - } - - ret = ompi_osc_pt2pt_unlock_remote (module, i, lock); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { - goto cleanup; - } - } - - /* unlock myself */ - ompi_osc_pt2pt_unlock_self (module, lock); - - /* start all sendreqs to target */ - ret = ompi_osc_pt2pt_frag_flush_all(module); - if (OMPI_SUCCESS != ret) goto cleanup; - - /* wait for all the requests and the unlock ack (meaning remote completion) */ - while (module->outgoing_frag_count != module->outgoing_frag_signal_count || - ompi_comm_size(module->comm) != lock->unlock_acks_received) { - opal_condition_wait(&module->cond, &module->lock); - } - - /* reset all fragment counters */ - memset (module->epoch_outgoing_frag_count, 0, ompi_comm_size(module->comm) * sizeof (module->epoch_outgoing_frag_count[0])); - memset (module->passive_eager_send_active, 0, ompi_comm_size(module->comm) * sizeof (module->passive_eager_send_active[0])); - - opal_list_remove_item (&module->outstanding_locks, &lock->super); - OBJ_RELEASE(lock); - - module->passive_target_access_epoch = false; - module->all_access_epoch = false; - - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "ompi_osc_pt2pt_unlock_all complete")); - - cleanup: - OPAL_THREAD_UNLOCK(&module->lock); - - return ret; + return ompi_osc_pt2pt_unlock_internal (-1, win); } @@ -461,7 +448,7 @@ int ompi_osc_pt2pt_sync (struct ompi_win_t *win) } static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_outstanding_lock_t *lock, - int target) + int target) { ompi_osc_pt2pt_header_flush_t flush_req; int peer_count, ret, flush_count; @@ -475,11 +462,14 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_ /* wait until ack has arrived from target, since we need to be able to eager send before we can transfer all the data... */ - while (peer_count > lock->lock_acks_received) { + OPAL_THREAD_LOCK(&module->lock); + while (peer_count > lock->lock_acks_received && lock->flushing) { opal_condition_wait(&module->cond, &module->lock); } lock->flush_acks_received = 0; + lock->flushing = true; + OPAL_THREAD_UNLOCK(&module->lock); flush_req.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ; flush_req.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID | OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; @@ -493,7 +483,7 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_ continue; } - flush_req.frag_count = module->epoch_outgoing_frag_count[i]; + flush_req.frag_count = opal_atomic_swap_32 ((int32_t *) module->epoch_outgoing_frag_count + i, -1); /* send control message with flush request and count */ ret = ompi_osc_pt2pt_control_send (module, i, &flush_req, sizeof (flush_req)); @@ -508,7 +498,7 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_ } } } else { - flush_req.frag_count = module->epoch_outgoing_frag_count[target]; + flush_req.frag_count = opal_atomic_swap_32 ((int32_t *) module->epoch_outgoing_frag_count + target, -1); flush_count = 1; /* send control message with flush request and count */ ret = ompi_osc_pt2pt_control_send (module, target, &flush_req, sizeof (flush_req)); @@ -524,16 +514,15 @@ static int ompi_osc_pt2pt_flush_lock (ompi_osc_pt2pt_module_t *module, ompi_osc_ } /* wait for all the requests and the flush ack (meaning remote completion) */ - while (module->outgoing_frag_count != module->outgoing_frag_signal_count || - flush_count != lock->flush_acks_received) { + OPAL_THREAD_LOCK(&module->lock); + while (flush_count != lock->flush_acks_received) { opal_condition_wait(&module->cond, &module->lock); } - if (-1 == target) { - memset (module->epoch_outgoing_frag_count, 0, peer_count * sizeof (module->epoch_outgoing_frag_count[0])); - } else { - module->epoch_outgoing_frag_count[target] = 0; - } + lock->flushing = false; + opal_condition_broadcast(&module->cond); + + OPAL_THREAD_UNLOCK(&module->lock); return OMPI_SUCCESS; } @@ -560,8 +549,6 @@ int ompi_osc_pt2pt_flush (int target, struct ompi_win_t *win) return OMPI_SUCCESS; } - OPAL_THREAD_LOCK(&module->lock); - lock = find_outstanding_lock (module, target); if (NULL == lock) { lock = find_outstanding_lock (module, -1); @@ -570,14 +557,11 @@ int ompi_osc_pt2pt_flush (int target, struct ompi_win_t *win) OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_flush: target %d is not locked in window %s", target, win->w_name)); - OPAL_THREAD_LOCK(&module->lock); - return OMPI_ERR_RMA_SYNC; + ret = OMPI_ERR_RMA_SYNC; + } else { + ret = ompi_osc_pt2pt_flush_lock (module, lock, target); } - ret = ompi_osc_pt2pt_flush_lock (module, lock, target); - - OPAL_THREAD_UNLOCK(&module->lock); - return ret; } @@ -589,19 +573,14 @@ int ompi_osc_pt2pt_flush_all (struct ompi_win_t *win) int ret = OMPI_SUCCESS; /* flush is only allowed from within a passive target epoch */ - if (!module->passive_target_access_epoch) { - return OMPI_ERR_RMA_SYNC; - } - - if (OPAL_UNLIKELY(0 == opal_list_get_size (&module->outstanding_locks))) { + if (OPAL_UNLIKELY(!module->passive_target_access_epoch || + 0 == opal_list_get_size (&module->outstanding_locks))) { OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_flush_all: no targets are locked in window %s", win->w_name)); return OMPI_ERR_RMA_SYNC; } - OPAL_THREAD_LOCK(&module->lock); - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_flush_all entering...")); @@ -616,8 +595,6 @@ int ompi_osc_pt2pt_flush_all (struct ompi_win_t *win) OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_flush_all complete")); - OPAL_THREAD_UNLOCK(&module->lock); - return ret; } @@ -632,20 +609,19 @@ int ompi_osc_pt2pt_flush_local (int target, struct ompi_win_t *win) return OMPI_ERR_RMA_SYNC; } - OPAL_THREAD_LOCK(&module->lock); - ret = ompi_osc_pt2pt_frag_flush_target(module, target); - if (OMPI_SUCCESS != ret) goto cleanup; + if (OMPI_SUCCESS != ret) { + return ret; + } /* wait for all the requests */ + OPAL_THREAD_LOCK(&module->lock); while (module->outgoing_frag_count != module->outgoing_frag_signal_count) { opal_condition_wait(&module->cond, &module->lock); } - - cleanup: OPAL_THREAD_UNLOCK(&module->lock); - return ret; + return OMPI_SUCCESS; } @@ -659,26 +635,25 @@ int ompi_osc_pt2pt_flush_local_all (struct ompi_win_t *win) return OMPI_ERR_RMA_SYNC; } - OPAL_THREAD_LOCK(&module->lock); - ret = ompi_osc_pt2pt_frag_flush_all(module); - if (OMPI_SUCCESS != ret) goto cleanup; + if (OMPI_SUCCESS != ret) { + return ret; + } /* wait for all the requests */ + OPAL_THREAD_LOCK(&module->lock); while (module->outgoing_frag_count != module->outgoing_frag_signal_count) { opal_condition_wait(&module->cond, &module->lock); } - - cleanup: OPAL_THREAD_UNLOCK(&module->lock); - return ret; + return OMPI_SUCCESS; } /* target side operation to acknowledge to initiator side that the lock is now held by the initiator */ static inline int activate_lock (ompi_osc_pt2pt_module_t *module, int requestor, - uint64_t serial_number) + uint64_t lock_ptr) { ompi_osc_pt2pt_outstanding_lock_t *lock; @@ -689,7 +664,7 @@ static inline int activate_lock (ompi_osc_pt2pt_module_t *module, int requestor, lock_ack.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; lock_ack.source = ompi_comm_rank(module->comm); lock_ack.windx = ompi_comm_get_cid(module->comm); - lock_ack.serial_number = serial_number; + lock_ack.lock_ptr = lock_ptr; OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "osc pt2pt: sending lock to %d", requestor)); @@ -703,16 +678,13 @@ static inline int activate_lock (ompi_osc_pt2pt_module_t *module, int requestor, OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "osc pt2pt: releasing local lock")); - lock = find_outstanding_lock (module, requestor); - if (NULL == lock) { - lock = find_outstanding_lock (module, -1); - if (OPAL_UNLIKELY(NULL == lock)) { - OPAL_OUTPUT_VERBOSE((5, ompi_osc_base_framework.framework_output, - "lock could not be located")); - } + lock = (ompi_osc_pt2pt_outstanding_lock_t *) (uintptr_t) lock_ptr; + if (OPAL_UNLIKELY(NULL == lock)) { + OPAL_OUTPUT_VERBOSE((5, ompi_osc_base_framework.framework_output, + "lock could not be located")); } - lock->lock_acks_received++; + OPAL_THREAD_ADD32(&lock->lock_acks_received, 1); opal_condition_broadcast (&module->cond); return OMPI_SUCCESS; @@ -722,7 +694,7 @@ static inline int activate_lock (ompi_osc_pt2pt_module_t *module, int requestor, /* target side operation to create a pending lock request for a lock request that could not be satisfied */ static inline int queue_lock (ompi_osc_pt2pt_module_t *module, int requestor, - int lock_type, uint64_t serial_number) + int lock_type, uint64_t lock_ptr) { ompi_osc_pt2pt_pending_lock_t *pending = OBJ_NEW(ompi_osc_pt2pt_pending_lock_t); @@ -732,56 +704,67 @@ static inline int queue_lock (ompi_osc_pt2pt_module_t *module, int requestor, pending->peer = requestor; pending->lock_type = lock_type; - pending->serial_number = serial_number; + pending->lock_ptr = lock_ptr; OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "osc pt2pt: queueing lock request from %d", requestor)); - opal_list_append(&module->locks_pending, &pending->super); + OPAL_THREAD_SCOPED_LOCK(&module->locks_pending_lock, opal_list_append(&module->locks_pending, &pending->super)); return OMPI_SUCCESS; } +static bool ompi_osc_pt2pt_lock_try_acquire (ompi_osc_pt2pt_module_t* module, int source, int lock_type, uint64_t lock_ptr) +{ + bool queue = false; + + if (MPI_LOCK_SHARED == lock_type) { + int32_t lock_status = module->lock_status; + + do { + if (lock_status < 0) { + queue = true; + break; + } + + if (opal_atomic_cmpset_32 (&module->lock_status, lock_status, lock_status + 1)) { + break; + } + + lock_status = module->lock_status; + } while (1); + } else { + queue = !opal_atomic_cmpset_32 (&module->lock_status, 0, -1); + } + + if (queue) { + return false; + } + + activate_lock(module, source, lock_ptr); + + /* activated the lock */ + return true; +} + static int ompi_osc_activate_next_lock (ompi_osc_pt2pt_module_t *module) { /* release any other pending locks we can */ ompi_osc_pt2pt_pending_lock_t *pending_lock, *next; int ret = OMPI_SUCCESS; + OPAL_THREAD_LOCK(&module->locks_pending_lock); OPAL_LIST_FOREACH_SAFE(pending_lock, next, &module->locks_pending, ompi_osc_pt2pt_pending_lock_t) { - if (MPI_LOCK_SHARED == pending_lock->lock_type) { - OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, - "ompi_osc_activate_next_lock: release pending lock of type MPI_LOCK_SHARED to peer %d\n", - pending_lock->peer)); - /* acquire shared lock */ - module->lock_status = MPI_LOCK_SHARED; - module->shared_count++; - ret = activate_lock(module, pending_lock->peer, pending_lock->serial_number); - - opal_list_remove_item (&module->locks_pending, &pending_lock->super); - OBJ_RELEASE(pending_lock); - } else { - if (0 == module->lock_status) { - OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, - "ompi_osc_activate_next_lock: release pending lock of type MPI_LOCK_EXCLUSIVE to peer %d\n", - pending_lock->peer)); - /* acquire exclusive lock */ - module->lock_status = MPI_LOCK_EXCLUSIVE; - ret = activate_lock(module, pending_lock->peer, pending_lock->serial_number); - opal_list_remove_item (&module->locks_pending, &pending_lock->super); - OBJ_RELEASE(pending_lock); - } - /* if the lock was acquired (ie, status was 0), then - we're done. If the lock was not acquired, we're - also done, because all the shared locks have to - finish first */ + bool acquired = ompi_osc_pt2pt_lock_try_acquire (module, pending_lock->peer, pending_lock->lock_type, + pending_lock->lock_ptr); + if (!acquired) { break; } - if (OMPI_SUCCESS != ret) { - break; - } + opal_list_remove_item (&module->locks_pending, &pending_lock->super); + OBJ_RELEASE(pending_lock); } + OPAL_THREAD_UNLOCK(&module->locks_pending_lock); return ret; } @@ -793,34 +776,19 @@ static int ompi_osc_activate_next_lock (ompi_osc_pt2pt_module_t *module) { int ompi_osc_pt2pt_process_lock (ompi_osc_pt2pt_module_t* module, int source, ompi_osc_pt2pt_header_lock_t* lock_header) { - int ret; + bool acquired; OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, - "ompi_osc_pt2pt_process_lock: processing lock request from %d. current lock state = %d, shared_count = %d", - source, module->lock_status, module->shared_count)); + "ompi_osc_pt2pt_process_lock: processing lock request from %d. current lock state = %d", + source, module->lock_status)); - if (MPI_LOCK_SHARED == lock_header->lock_type) { - if (module->lock_status != MPI_LOCK_EXCLUSIVE) { - /* acquire shared lock */ - module->lock_status = MPI_LOCK_SHARED; - module->shared_count++; - ret = activate_lock(module, source, lock_header->serial_number); - } else { - /* lock not available, queue */ - ret = queue_lock(module, source, lock_header->lock_type, lock_header->serial_number); - } - } else { - if (0 == module->lock_status) { - /* acquire exclusive lock */ - module->lock_status = MPI_LOCK_EXCLUSIVE; - ret = activate_lock(module, source, lock_header->serial_number); - } else { - /* lock not available, queue */ - ret = queue_lock(module, source, lock_header->lock_type, lock_header->serial_number); - } + acquired = ompi_osc_pt2pt_lock_try_acquire (module, source, lock_header->lock_type, lock_header->lock_ptr); + + if (!acquired) { + queue_lock(module, source, lock_header->lock_type, lock_header->lock_ptr); } - return ret; + return OMPI_SUCCESS; } @@ -829,23 +797,21 @@ int ompi_osc_pt2pt_process_lock (ompi_osc_pt2pt_module_t* module, int source, void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_lock_ack_t *lock_ack_header) { - ompi_osc_pt2pt_outstanding_lock_t *lock, *next; + ompi_osc_pt2pt_peer_t *peer = module->peers + lock_ack_header->source; + ompi_osc_pt2pt_outstanding_lock_t *lock; - OPAL_LIST_FOREACH_SAFE(lock, next, &module->outstanding_locks, ompi_osc_pt2pt_outstanding_lock_t) { - if (lock->serial_number == lock_ack_header->serial_number) { + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, + "ompi_osc_pt2pt_process_unlock_ack: processing lock ack from %d for lock %" PRIu64, + lock_ack_header->source, lock_ack_header->lock_ptr)); - OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, - "osc pt2pt: lock ack %d", lock_ack_header->source)); + lock = (ompi_osc_pt2pt_outstanding_lock_t *) (uintptr_t) lock_ack_header->lock_ptr; + assert (NULL != lock); - lock->lock_acks_received++; - module->passive_eager_send_active[lock_ack_header->source] = true; - return; - } - } + /* no need to hold the lock to set this */ + peer->eager_send_active = true; + OPAL_THREAD_ADD32(&lock->lock_acks_received, 1); - opal_output(ompi_osc_base_framework.framework_output, - "osc pt2pt: lock ack %d, %ld for unfindable lock request", - lock_ack_header->source, (unsigned long) lock_ack_header->serial_number); + opal_condition_broadcast(&module->cond); } void ompi_osc_pt2pt_process_flush_ack (ompi_osc_pt2pt_module_t *module, int source, @@ -860,13 +826,15 @@ void ompi_osc_pt2pt_process_flush_ack (ompi_osc_pt2pt_module_t *module, int sour lock = find_outstanding_lock_by_serial (module, flush_ack_header->serial_number); assert (NULL != lock); - lock->flush_acks_received++; + OPAL_THREAD_ADD32(&lock->flush_acks_received, 1); opal_condition_broadcast(&module->cond); } void ompi_osc_pt2pt_process_unlock_ack (ompi_osc_pt2pt_module_t *module, int source, - ompi_osc_pt2pt_header_unlock_ack_t *unlock_ack_header) { + ompi_osc_pt2pt_header_unlock_ack_t *unlock_ack_header) +{ + ompi_osc_pt2pt_peer_t *peer = module->peers + source; ompi_osc_pt2pt_outstanding_lock_t *lock; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, @@ -874,13 +842,14 @@ void ompi_osc_pt2pt_process_unlock_ack (ompi_osc_pt2pt_module_t *module, int sou source)); /* NTH: need to verify that this will work as expected */ - lock = find_outstanding_lock (module, source); - if (NULL == lock) { - lock = find_outstanding_lock(module, -1); - assert (NULL != lock); - } + lock = (ompi_osc_pt2pt_outstanding_lock_t *) (intptr_t) unlock_ack_header->lock_ptr; + assert (NULL != lock); - lock->unlock_acks_received++; + peer->eager_send_active = false; + + if (0 == OPAL_THREAD_ADD32(&lock->unlock_acks_received, 1)) { + opal_condition_broadcast(&module->cond); + } } /** @@ -890,7 +859,7 @@ void ompi_osc_pt2pt_process_unlock_ack (ompi_osc_pt2pt_module_t *module, int sou * @param[in] source - Source rank * @param[in] unlock_header - Incoming unlock header * - * This functions is the target-side functio for handling an unlock + * This functions is the target-side function for handling an unlock * request. Once all pending operations from the target are complete * this functions sends an unlock acknowledgement then attempts to * active a pending lock if the lock becomes free. @@ -899,40 +868,34 @@ int ompi_osc_pt2pt_process_unlock (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_unlock_t *unlock_header) { ompi_osc_pt2pt_header_unlock_ack_t unlock_ack; + ompi_osc_pt2pt_peer_t *peer = module->peers + source; int ret; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "ompi_osc_pt2pt_process_unlock entering (finished %d/%d)...", - module->passive_incoming_frag_count[source], - module->passive_incoming_frag_signal_count[source])); + "ompi_osc_pt2pt_process_unlock entering (passive_incoming_frag_count: %d)...", + peer->passive_incoming_frag_count)); /* we cannot block when processing an incoming request */ - if (module->passive_incoming_frag_signal_count[source] != - module->passive_incoming_frag_count[source]) { + if (0 != peer->passive_incoming_frag_count) { return OMPI_ERR_WOULD_BLOCK; } unlock_ack.base.type = OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_ACK; unlock_ack.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; + unlock_ack.lock_ptr = unlock_header->lock_ptr; ret = ompi_osc_pt2pt_control_send_unbuffered (module, source, &unlock_ack, sizeof (unlock_ack)); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return ret; } - module->passive_incoming_frag_signal_count[source] = 0; - module->passive_incoming_frag_count[source] = 0; - - OPAL_THREAD_LOCK(&module->lock); - - if (unlock_header->lock_type == MPI_LOCK_EXCLUSIVE || 0 == --module->shared_count) { - module->lock_status = 0; - + if (-1 == module->lock_status) { + OPAL_THREAD_ADD32(&module->lock_status, 1); + ompi_osc_activate_next_lock (module); + } else if (0 == OPAL_THREAD_ADD32(&module->lock_status, -1)) { ompi_osc_activate_next_lock (module); } - OPAL_THREAD_UNLOCK(&module->lock); - OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc pt2pt: finished processing unlock fragment")); @@ -942,22 +905,18 @@ int ompi_osc_pt2pt_process_unlock (ompi_osc_pt2pt_module_t *module, int source, int ompi_osc_pt2pt_process_flush (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_flush_t *flush_header) { + ompi_osc_pt2pt_peer_t *peer = module->peers + source; ompi_osc_pt2pt_header_flush_ack_t flush_ack; OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, - "ompi_osc_pt2pt_process_flush entering (finished %d/%d)...", - module->passive_incoming_frag_count[source], - module->passive_incoming_frag_signal_count[source])); + "ompi_osc_pt2pt_process_flush entering (passive_incoming_frag_count: %d)...", + peer->passive_incoming_frag_count)); /* we cannot block when processing an incoming request */ - if (module->passive_incoming_frag_signal_count[source] != - module->passive_incoming_frag_count[source]) { + if (0 != peer->passive_incoming_frag_count) { return OMPI_ERR_WOULD_BLOCK; } - module->passive_incoming_frag_signal_count[source] = 0; - module->passive_incoming_frag_count[source] = 0; - flush_ack.base.type = OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_ACK; flush_ack.base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; flush_ack.serial_number = flush_header->serial_number; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_request.h b/ompi/mca/osc/pt2pt/osc_pt2pt_request.h index 10cf043b9a..25b0cba6bf 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_request.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_request.h @@ -26,7 +26,7 @@ struct ompi_osc_pt2pt_request_t { int origin_count; struct ompi_datatype_t *origin_dt; ompi_osc_pt2pt_module_t* module; - int outstanding_requests; + int32_t outstanding_requests; bool internal; }; typedef struct ompi_osc_pt2pt_request_t ompi_osc_pt2pt_request_t;