From 5ac95df9dcb3277a9581917ec6f1ce41aa2044e0 Mon Sep 17 00:00:00 2001 From: KAWASHIMA Takahiro Date: Mon, 11 Apr 2016 19:05:20 +0900 Subject: [PATCH] osc/pt2pt: use two distinct "namespaces" for tags - revised Before this commit, a same PML tag may be used for distinct communications for long messages. For example, consider a condition where rank A calls ```MPI_PUT``` targeting rank B and rank B calls ```MPI_GET``` targeting rank A simultaneously. A PML tag for the ```MPI_PUT``` is acquired on rank A and is used for the long-message communication from rank A to rank B. A PML tag for the ```MPI_GET``` is acquired on rank B and is used for the long-message communication from rank A to rank B. These two tags may become a same value because they are managed independently on each rank. This will cause a data corruption. This commit separates the tag used in a single RMA communication call, one for communication from an origin to a target, and one for communication from a target to an origin. A "base" tag is acquired using ```get_tag``` function and PML tag is caluculated from the base tag by ```tag_to_target``` and ```tag_to_origin``` function. --- ompi/mca/osc/pt2pt/osc_pt2pt.h | 36 +++++++++++++++++++++--- ompi/mca/osc/pt2pt/osc_pt2pt_comm.c | 20 ++++++------- ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c | 18 ++++++------ 3 files changed, 51 insertions(+), 23 deletions(-) diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index bb49cdbd28..ac100438e5 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -636,23 +636,51 @@ static inline void osc_pt2pt_add_pending (ompi_osc_pt2pt_pending_t *pending) /** * get_tag: * - * @short Get a send/recv tag for large memory operations. + * @short Get a send/recv base tag for large memory operations. * * @param[in] module - OSC PT2PT module * - * @long This function aquires a 16-bit tag for use with large memory operations. The + * @long This function acquires a 16-bit tag for use with large memory operations. The * tag will be odd or even depending on if this is in a passive target access - * or not. + * or not. An actual tag that will be passed to PML send/recv function is given + * by tag_to_target or tag_to_origin function depending on the communication + * direction. */ static inline int get_tag(ompi_osc_pt2pt_module_t *module) { /* the LSB of the tag is used be the receiver to determine if the message is a passive or active target (ie, where to mark completion). */ - int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) &module->tag_counter, 2); + int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) &module->tag_counter, 4); return (tmp & OSC_PT2PT_FRAG_MASK) | !!(module->passive_target_access_epoch); } +/** + * tag_to_target: + * + * @short Get a tag used for PML send/recv communication from an origin to a target. + * + * @param[in] tag - base tag given by get_tag function. + */ +static inline int tag_to_target(int tag) +{ + /* (returned_tag >> 1) & 0x1 == 0 */ + return tag + 0; +} + +/** + * tag_to_origin: + * + * @short Get a tag used for PML send/recv communication from a target to an origin. + * + * @param[in] tag - base tag given by get_tag function. + */ +static inline int tag_to_origin(int tag) +{ + /* (returned_tag >> 1) & 0x1 == 1 */ + return tag + 2; +} + /** * ompi_osc_pt2pt_accumulate_lock: * diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c index 335f9eeccc..6ff6a9a3e4 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c @@ -364,7 +364,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ OBJ_RETAIN(target_dt); ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target, - tag, module->comm, ompi_osc_pt2pt_dt_send_complete, + tag_to_target(tag), module->comm, ompi_osc_pt2pt_dt_send_complete, target_dt); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; @@ -394,7 +394,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ header->tag = tag; osc_pt2pt_hton(header, proc); - ret = ompi_osc_pt2pt_data_isend (module,origin_addr, origin_count, origin_dt, target, tag, + ret = ompi_osc_pt2pt_data_isend (module,origin_addr, origin_count, origin_dt, target, tag_to_target(tag), request); } } while (0); @@ -520,7 +520,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, OBJ_RETAIN(target_dt); ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target, - tag, module->comm, ompi_osc_pt2pt_dt_send_complete, + tag_to_target(tag), module->comm, ompi_osc_pt2pt_dt_send_complete, target_dt); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; @@ -553,7 +553,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "acc: starting long accumulate with tag %d", tag)); - ret = ompi_osc_pt2pt_data_isend (module, origin_addr, origin_count, origin_dt, target, tag, + ret = ompi_osc_pt2pt_data_isend (module, origin_addr, origin_count, origin_dt, target, tag_to_target(tag), request); } } while (0); @@ -663,7 +663,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar osc_pt2pt_copy_for_send (ptr, dt->super.size, compare_addr, proc, 1, dt); request->outstanding_requests = 1; - ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, 1, dt, target, tag, module->comm, + ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, 1, dt, target, tag_to_origin(tag), module->comm, NULL, ompi_osc_pt2pt_req_comm_complete, request); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { return ret; @@ -828,7 +828,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co OBJ_RETAIN(target_dt); ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target, - tag, module->comm, ompi_osc_pt2pt_dt_send_complete, + tag_to_target(tag), module->comm, ompi_osc_pt2pt_dt_send_complete, target_dt); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; @@ -843,7 +843,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co /* TODO -- store the request somewhere so we can cancel it on error */ pt2pt_request->outstanding_requests = 1; - ret = ompi_osc_pt2pt_irecv_w_cb (origin_addr, origin_count, origin_dt, target, tag, + ret = ompi_osc_pt2pt_irecv_w_cb (origin_addr, origin_count, origin_dt, target, tag_to_origin(tag), module->comm, NULL, ompi_osc_pt2pt_req_comm_complete, pt2pt_request); } while (0); @@ -1046,7 +1046,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin OBJ_RETAIN(target_datatype); ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target_rank, - tag, module->comm, ompi_osc_pt2pt_dt_send_complete, + tag_to_target(tag), module->comm, ompi_osc_pt2pt_dt_send_complete, target_datatype); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; @@ -1059,7 +1059,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin ptr += ddt_len; } - ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, result_count, result_datatype, target_rank, tag, + ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, result_count, result_datatype, target_rank, tag_to_origin(tag), module->comm, NULL, ompi_osc_pt2pt_req_comm_complete, pt2pt_request); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; @@ -1078,7 +1078,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin osc_pt2pt_hton(header, proc); ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, origin_count, origin_datatype, target_rank, - tag, module->comm, ompi_osc_pt2pt_req_comm_complete, pt2pt_request); + tag_to_target(tag), module->comm, ompi_osc_pt2pt_req_comm_complete, pt2pt_request); } } while (0); diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index dd8152e428..98149c5aef 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -398,7 +398,7 @@ static inline int process_put_long(ompi_osc_pt2pt_module_t* module, int source, ret = ompi_osc_pt2pt_component_irecv (module, target, put_header->count, datatype, source, - put_header->tag, + tag_to_target(put_header->tag), module->comm); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output, @@ -534,7 +534,7 @@ static inline int process_get (ompi_osc_pt2pt_module_t* module, int target, } /* send get data */ - ret = osc_pt2pt_get_post_send (module, source, get_header->count, datatype, target, get_header->tag); + ret = osc_pt2pt_get_post_send (module, source, get_header->count, datatype, target, tag_to_origin(get_header->tag)); OBJ_RELEASE(datatype); @@ -848,7 +848,7 @@ static int ompi_osc_pt2pt_acc_long_start (ompi_osc_pt2pt_module_t *module, int s do { if (op == &ompi_mpi_op_replace.op) { ret = ompi_osc_pt2pt_irecv_w_cb (target, acc_header->count, datatype, source, - acc_header->tag, module->comm, NULL, + tag_to_target(acc_header->tag), module->comm, NULL, replace_cb, module); break; } @@ -877,7 +877,7 @@ static int ompi_osc_pt2pt_acc_long_start (ompi_osc_pt2pt_module_t *module, int s } ret = ompi_osc_pt2pt_irecv_w_cb (buffer, primitive_count, primitive_datatype, source, - acc_header->tag, module->comm, NULL, accumulate_cb, acc_data); + tag_to_target(acc_header->tag), module->comm, NULL, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { OBJ_RELEASE(acc_data); } @@ -925,7 +925,7 @@ static int ompi_osc_pt2pt_gacc_start (ompi_osc_pt2pt_module_t *module, int sourc break; } - ret = ompi_osc_pt2pt_isend_w_cb (target, acc_header->count, datatype, source, acc_header->tag, + ret = ompi_osc_pt2pt_isend_w_cb (target, acc_header->count, datatype, source, tag_to_origin(acc_header->tag), module->comm, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { OBJ_RELEASE(acc_data); @@ -994,14 +994,14 @@ static int ompi_osc_gacc_long_start (ompi_osc_pt2pt_module_t *module, int source break; } - ret = ompi_osc_pt2pt_irecv_w_cb (buffer, acc_header->count, datatype, source, acc_header->tag, + ret = ompi_osc_pt2pt_irecv_w_cb (buffer, acc_header->count, datatype, source, tag_to_target(acc_header->tag), module->comm, &recv_request, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { OBJ_RELEASE(acc_data); break; } - ret = ompi_osc_pt2pt_isend_w_cb (target, primitive_count, primitive_datatype, source, acc_header->tag, + ret = ompi_osc_pt2pt_isend_w_cb (target, primitive_count, primitive_datatype, source, tag_to_origin(acc_header->tag), module->comm, accumulate_cb, acc_data); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { /* cancel the receive and free the accumulate data */ @@ -1054,7 +1054,7 @@ static int ompi_osc_pt2pt_cswap_start (ompi_osc_pt2pt_module_t *module, int sour do { /* no reason to do a non-blocking send here */ - ret = MCA_PML_CALL(send(target, 1, datatype, source, cswap_header->tag, MCA_PML_BASE_SEND_STANDARD, + ret = MCA_PML_CALL(send(target, 1, datatype, source, tag_to_origin(cswap_header->tag), MCA_PML_BASE_SEND_STANDARD, module->comm)); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { break; @@ -1503,7 +1503,7 @@ static int process_large_datatype_request (ompi_osc_pt2pt_module_t *module, int memcpy (ddt_buffer->header, header, header_len); ret = ompi_osc_pt2pt_irecv_w_cb ((void *)((uintptr_t) ddt_buffer->header + header_len), - ddt_len, MPI_BYTE, source, tag, module->comm, NULL, + ddt_len, MPI_BYTE, source, tag_to_target(tag), module->comm, NULL, process_large_datatype_request_cb, ddt_buffer); if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { OBJ_RELEASE(ddt_buffer);