1
1

osc/pt2pt: use two distinct "namespaces" for tags - revised

Before this commit, a same PML tag may be used for distinct
communications for long messages. For example, consider a condition
where rank A calls ```MPI_PUT``` targeting rank B and rank B calls
```MPI_GET``` targeting rank A simultaneously.
A PML tag for the ```MPI_PUT``` is acquired on rank A and is used
for the long-message communication from rank A to rank B.
A PML tag for the ```MPI_GET``` is acquired on rank B and is used
for the long-message communication from rank A to rank B.
These two tags may become a same value because they are managed
independently on each rank. This will cause a data corruption.

This commit separates the tag used in a single RMA communication
call, one for communication from an origin to a target, and one
for communication from a target to an origin. A "base" tag
is acquired using ```get_tag``` function and PML tag is caluculated
from the base tag by ```tag_to_target``` and ```tag_to_origin```
function.
Этот коммит содержится в:
KAWASHIMA Takahiro 2016-04-11 19:05:20 +09:00
родитель 3576ecafa7
Коммит 5ac95df9dc
3 изменённых файлов: 51 добавлений и 23 удалений

Просмотреть файл

@ -636,23 +636,51 @@ static inline void osc_pt2pt_add_pending (ompi_osc_pt2pt_pending_t *pending)
/**
* get_tag:
*
* @short Get a send/recv tag for large memory operations.
* @short Get a send/recv base tag for large memory operations.
*
* @param[in] module - OSC PT2PT module
*
* @long This function aquires a 16-bit tag for use with large memory operations. The
* @long This function acquires a 16-bit tag for use with large memory operations. The
* tag will be odd or even depending on if this is in a passive target access
* or not.
* or not. An actual tag that will be passed to PML send/recv function is given
* by tag_to_target or tag_to_origin function depending on the communication
* direction.
*/
static inline int get_tag(ompi_osc_pt2pt_module_t *module)
{
/* the LSB of the tag is used be the receiver to determine if the
message is a passive or active target (ie, where to mark
completion). */
int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) &module->tag_counter, 2);
int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) &module->tag_counter, 4);
return (tmp & OSC_PT2PT_FRAG_MASK) | !!(module->passive_target_access_epoch);
}
/**
* tag_to_target:
*
* @short Get a tag used for PML send/recv communication from an origin to a target.
*
* @param[in] tag - base tag given by get_tag function.
*/
static inline int tag_to_target(int tag)
{
/* (returned_tag >> 1) & 0x1 == 0 */
return tag + 0;
}
/**
* tag_to_origin:
*
* @short Get a tag used for PML send/recv communication from a target to an origin.
*
* @param[in] tag - base tag given by get_tag function.
*/
static inline int tag_to_origin(int tag)
{
/* (returned_tag >> 1) & 0x1 == 1 */
return tag + 2;
}
/**
* ompi_osc_pt2pt_accumulate_lock:
*

Просмотреть файл

@ -364,7 +364,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
OBJ_RETAIN(target_dt);
ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target,
tag, module->comm, ompi_osc_pt2pt_dt_send_complete,
tag_to_target(tag), module->comm, ompi_osc_pt2pt_dt_send_complete,
target_dt);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
@ -394,7 +394,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
header->tag = tag;
osc_pt2pt_hton(header, proc);
ret = ompi_osc_pt2pt_data_isend (module,origin_addr, origin_count, origin_dt, target, tag,
ret = ompi_osc_pt2pt_data_isend (module,origin_addr, origin_count, origin_dt, target, tag_to_target(tag),
request);
}
} while (0);
@ -520,7 +520,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
OBJ_RETAIN(target_dt);
ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target,
tag, module->comm, ompi_osc_pt2pt_dt_send_complete,
tag_to_target(tag), module->comm, ompi_osc_pt2pt_dt_send_complete,
target_dt);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
@ -553,7 +553,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"acc: starting long accumulate with tag %d", tag));
ret = ompi_osc_pt2pt_data_isend (module, origin_addr, origin_count, origin_dt, target, tag,
ret = ompi_osc_pt2pt_data_isend (module, origin_addr, origin_count, origin_dt, target, tag_to_target(tag),
request);
}
} while (0);
@ -663,7 +663,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar
osc_pt2pt_copy_for_send (ptr, dt->super.size, compare_addr, proc, 1, dt);
request->outstanding_requests = 1;
ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, 1, dt, target, tag, module->comm,
ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, 1, dt, target, tag_to_origin(tag), module->comm,
NULL, ompi_osc_pt2pt_req_comm_complete, request);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
@ -828,7 +828,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
OBJ_RETAIN(target_dt);
ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target,
tag, module->comm, ompi_osc_pt2pt_dt_send_complete,
tag_to_target(tag), module->comm, ompi_osc_pt2pt_dt_send_complete,
target_dt);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
@ -843,7 +843,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
/* TODO -- store the request somewhere so we can cancel it on error */
pt2pt_request->outstanding_requests = 1;
ret = ompi_osc_pt2pt_irecv_w_cb (origin_addr, origin_count, origin_dt, target, tag,
ret = ompi_osc_pt2pt_irecv_w_cb (origin_addr, origin_count, origin_dt, target, tag_to_origin(tag),
module->comm, NULL, ompi_osc_pt2pt_req_comm_complete, pt2pt_request);
} while (0);
@ -1046,7 +1046,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
OBJ_RETAIN(target_datatype);
ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target_rank,
tag, module->comm, ompi_osc_pt2pt_dt_send_complete,
tag_to_target(tag), module->comm, ompi_osc_pt2pt_dt_send_complete,
target_datatype);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
@ -1059,7 +1059,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
ptr += ddt_len;
}
ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, result_count, result_datatype, target_rank, tag,
ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, result_count, result_datatype, target_rank, tag_to_origin(tag),
module->comm, NULL, ompi_osc_pt2pt_req_comm_complete, pt2pt_request);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
@ -1078,7 +1078,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
osc_pt2pt_hton(header, proc);
ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, origin_count, origin_datatype, target_rank,
tag, module->comm, ompi_osc_pt2pt_req_comm_complete, pt2pt_request);
tag_to_target(tag), module->comm, ompi_osc_pt2pt_req_comm_complete, pt2pt_request);
}
} while (0);

Просмотреть файл

@ -398,7 +398,7 @@ static inline int process_put_long(ompi_osc_pt2pt_module_t* module, int source,
ret = ompi_osc_pt2pt_component_irecv (module, target,
put_header->count,
datatype, source,
put_header->tag,
tag_to_target(put_header->tag),
module->comm);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
@ -534,7 +534,7 @@ static inline int process_get (ompi_osc_pt2pt_module_t* module, int target,
}
/* send get data */
ret = osc_pt2pt_get_post_send (module, source, get_header->count, datatype, target, get_header->tag);
ret = osc_pt2pt_get_post_send (module, source, get_header->count, datatype, target, tag_to_origin(get_header->tag));
OBJ_RELEASE(datatype);
@ -848,7 +848,7 @@ static int ompi_osc_pt2pt_acc_long_start (ompi_osc_pt2pt_module_t *module, int s
do {
if (op == &ompi_mpi_op_replace.op) {
ret = ompi_osc_pt2pt_irecv_w_cb (target, acc_header->count, datatype, source,
acc_header->tag, module->comm, NULL,
tag_to_target(acc_header->tag), module->comm, NULL,
replace_cb, module);
break;
}
@ -877,7 +877,7 @@ static int ompi_osc_pt2pt_acc_long_start (ompi_osc_pt2pt_module_t *module, int s
}
ret = ompi_osc_pt2pt_irecv_w_cb (buffer, primitive_count, primitive_datatype, source,
acc_header->tag, module->comm, NULL, accumulate_cb, acc_data);
tag_to_target(acc_header->tag), module->comm, NULL, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OBJ_RELEASE(acc_data);
}
@ -925,7 +925,7 @@ static int ompi_osc_pt2pt_gacc_start (ompi_osc_pt2pt_module_t *module, int sourc
break;
}
ret = ompi_osc_pt2pt_isend_w_cb (target, acc_header->count, datatype, source, acc_header->tag,
ret = ompi_osc_pt2pt_isend_w_cb (target, acc_header->count, datatype, source, tag_to_origin(acc_header->tag),
module->comm, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OBJ_RELEASE(acc_data);
@ -994,14 +994,14 @@ static int ompi_osc_gacc_long_start (ompi_osc_pt2pt_module_t *module, int source
break;
}
ret = ompi_osc_pt2pt_irecv_w_cb (buffer, acc_header->count, datatype, source, acc_header->tag,
ret = ompi_osc_pt2pt_irecv_w_cb (buffer, acc_header->count, datatype, source, tag_to_target(acc_header->tag),
module->comm, &recv_request, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OBJ_RELEASE(acc_data);
break;
}
ret = ompi_osc_pt2pt_isend_w_cb (target, primitive_count, primitive_datatype, source, acc_header->tag,
ret = ompi_osc_pt2pt_isend_w_cb (target, primitive_count, primitive_datatype, source, tag_to_origin(acc_header->tag),
module->comm, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
/* cancel the receive and free the accumulate data */
@ -1054,7 +1054,7 @@ static int ompi_osc_pt2pt_cswap_start (ompi_osc_pt2pt_module_t *module, int sour
do {
/* no reason to do a non-blocking send here */
ret = MCA_PML_CALL(send(target, 1, datatype, source, cswap_header->tag, MCA_PML_BASE_SEND_STANDARD,
ret = MCA_PML_CALL(send(target, 1, datatype, source, tag_to_origin(cswap_header->tag), MCA_PML_BASE_SEND_STANDARD,
module->comm));
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
@ -1503,7 +1503,7 @@ static int process_large_datatype_request (ompi_osc_pt2pt_module_t *module, int
memcpy (ddt_buffer->header, header, header_len);
ret = ompi_osc_pt2pt_irecv_w_cb ((void *)((uintptr_t) ddt_buffer->header + header_len),
ddt_len, MPI_BYTE, source, tag, module->comm, NULL,
ddt_len, MPI_BYTE, source, tag_to_target(tag), module->comm, NULL,
process_large_datatype_request_cb, ddt_buffer);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OBJ_RELEASE(ddt_buffer);