
This commit renames the arithmetic atomic operations in opal to indicate that they return the new value not the old value. This naming differentiates these routines from new functions that return the old value. Signed-off-by: Nathan Hjelm <hjelmn@lanl.gov>
1308 строки
57 KiB
C
1308 строки
57 KiB
C
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
|
|
/*
|
|
* Copyright (c) 2014-2017 Los Alamos National Security, LLC. All rights
|
|
* reserved.
|
|
* Copyright (c) 2016-2017 Research Organization for Information Science
|
|
* and Technology (RIST). All rights reserved.
|
|
* Copyright (c) 2016 Intel, Inc. All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "osc_rdma_accumulate.h"
|
|
#include "osc_rdma_request.h"
|
|
#include "osc_rdma_comm.h"
|
|
|
|
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
|
|
|
|
enum ompi_osc_rdma_event_type_t {
|
|
OMPI_OSC_RDMA_EVENT_TYPE_PUT,
|
|
};
|
|
|
|
typedef enum ompi_osc_rdma_event_type_t ompi_osc_rdma_event_type_t;
|
|
|
|
struct ompi_osc_rdma_event_t {
|
|
opal_event_t super;
|
|
ompi_osc_rdma_module_t *module;
|
|
struct mca_btl_base_endpoint_t *endpoint;
|
|
void *local_address;
|
|
mca_btl_base_registration_handle_t *local_handle;
|
|
uint64_t remote_address;
|
|
mca_btl_base_registration_handle_t *remote_handle;
|
|
uint64_t length;
|
|
mca_btl_base_rdma_completion_fn_t cbfunc;
|
|
void *cbcontext;
|
|
void *cbdata;
|
|
};
|
|
|
|
typedef struct ompi_osc_rdma_event_t ompi_osc_rdma_event_t;
|
|
|
|
static void *ompi_osc_rdma_event_put (int fd, int flags, void *context)
|
|
{
|
|
ompi_osc_rdma_event_t *event = (ompi_osc_rdma_event_t *) context;
|
|
int ret;
|
|
|
|
ret = event->module->selected_btl->btl_put (event->module->selected_btl, event->endpoint, event->local_address,
|
|
event->remote_address, event->local_handle, event->remote_handle,
|
|
event->length, 0, MCA_BTL_NO_ORDER, event->cbfunc, event->cbcontext,
|
|
event->cbdata);
|
|
if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
|
|
/* done with this event */
|
|
opal_event_del (&event->super);
|
|
free (event);
|
|
} else {
|
|
/* re-activate the event */
|
|
opal_event_active (&event->super, OPAL_EV_READ, 1);
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static int ompi_osc_rdma_event_queue (ompi_osc_rdma_module_t *module, struct mca_btl_base_endpoint_t *endpoint,
|
|
ompi_osc_rdma_event_type_t event_type, void *local_address, mca_btl_base_registration_handle_t *local_handle,
|
|
uint64_t remote_address, mca_btl_base_registration_handle_t *remote_handle,
|
|
uint64_t length, mca_btl_base_rdma_completion_fn_t cbfunc, void *cbcontext,
|
|
void *cbdata)
|
|
{
|
|
ompi_osc_rdma_event_t *event = malloc (sizeof (*event));
|
|
void *(*event_func) (int, int, void *);
|
|
|
|
if (OPAL_UNLIKELY(NULL == event)) {
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
event->module = module;
|
|
event->endpoint = endpoint;
|
|
event->local_address = local_address;
|
|
event->local_handle = local_handle;
|
|
event->remote_address = remote_address;
|
|
event->remote_handle = remote_handle;
|
|
event->length = length;
|
|
event->cbfunc = cbfunc;
|
|
event->cbcontext = cbcontext;
|
|
event->cbdata = cbdata;
|
|
|
|
switch (event_type) {
|
|
case OMPI_OSC_RDMA_EVENT_TYPE_PUT:
|
|
event_func = ompi_osc_rdma_event_put;
|
|
break;
|
|
default:
|
|
opal_output(0, "osc/rdma: cannot queue unknown event type %d", event_type);
|
|
abort ();
|
|
}
|
|
|
|
opal_event_set (opal_sync_event_base, &event->super, -1, OPAL_EV_READ,
|
|
event_func, event);
|
|
opal_event_active (&event->super, OPAL_EV_READ, 1);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
|
|
static int ompi_osc_rdma_gacc_local (const void *source_buffer, int source_count, ompi_datatype_t *source_datatype,
|
|
void *result_buffer, int result_count, ompi_datatype_t *result_datatype,
|
|
ompi_osc_rdma_peer_t *peer, uint64_t target_address,
|
|
mca_btl_base_registration_handle_t *target_handle, int target_count,
|
|
ompi_datatype_t *target_datatype, ompi_op_t *op, ompi_osc_rdma_module_t *module,
|
|
ompi_osc_rdma_request_t *request)
|
|
{
|
|
int ret = OMPI_SUCCESS;
|
|
|
|
do {
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "performing accumulate with local region(s)");
|
|
|
|
if (!ompi_osc_rdma_peer_is_exclusive (peer)) {
|
|
(void) ompi_osc_rdma_lock_acquire_exclusive (module, peer, offsetof (ompi_osc_rdma_state_t, accumulate_lock));
|
|
}
|
|
|
|
if (NULL != result_buffer) {
|
|
/* get accumulate */
|
|
|
|
ret = ompi_datatype_sndrcv ((void *) (intptr_t) target_address, target_count, target_datatype,
|
|
result_buffer, result_count, result_datatype);
|
|
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (&ompi_mpi_op_no_op.op != op) {
|
|
if (&ompi_mpi_op_replace.op != op) {
|
|
ret = ompi_osc_base_sndrcv_op (source_buffer, source_count, source_datatype, (void *) (intptr_t) target_address,
|
|
target_count, target_datatype, op);
|
|
} else {
|
|
ret = ompi_datatype_sndrcv (source_buffer, source_count, source_datatype, (void *) (intptr_t) target_address,
|
|
target_count, target_datatype);
|
|
}
|
|
}
|
|
|
|
if (!ompi_osc_rdma_peer_is_exclusive (peer)) {
|
|
(void) ompi_osc_rdma_lock_release_exclusive (module, peer, offsetof (ompi_osc_rdma_state_t, accumulate_lock));
|
|
}
|
|
} while (0);
|
|
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_ERROR, "local accumulate failed with ompi error code %d", ret);
|
|
return ret;
|
|
}
|
|
|
|
if (request) {
|
|
/* NTH: is it ok to use an ompi error code here? */
|
|
ompi_osc_rdma_request_complete (request, ret);
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static inline int ompi_osc_rdma_cas_local (const void *source_addr, const void *compare_addr, void *result_addr,
|
|
ompi_datatype_t *datatype, ompi_osc_rdma_peer_t *peer,
|
|
uint64_t target_address, mca_btl_base_registration_handle_t *target_handle,
|
|
ompi_osc_rdma_module_t *module)
|
|
{
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "performing compare-and-swap with local regions");
|
|
|
|
ompi_osc_rdma_lock_acquire_exclusive (module, peer, offsetof (ompi_osc_rdma_state_t, accumulate_lock));
|
|
|
|
memcpy (result_addr, (void *) (uintptr_t) target_address, datatype->super.size);
|
|
|
|
if (0 == memcmp (compare_addr, result_addr, datatype->super.size)) {
|
|
memcpy ((void *) (uintptr_t) target_address, source_addr, datatype->super.size);
|
|
}
|
|
|
|
ompi_osc_rdma_lock_release_exclusive (module, peer, offsetof (ompi_osc_rdma_state_t, accumulate_lock));
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
/* completion of an accumulate put */
|
|
static void ompi_osc_rdma_acc_put_complete (struct mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t *endpoint,
|
|
void *local_address, mca_btl_base_registration_handle_t *local_handle,
|
|
void *context, void *data, int status)
|
|
{
|
|
ompi_osc_rdma_request_t *request = (ompi_osc_rdma_request_t *) context;
|
|
ompi_osc_rdma_sync_t *sync = request->sync;
|
|
ompi_osc_rdma_peer_t *peer = request->peer;
|
|
|
|
OSC_RDMA_VERBOSE(status ? MCA_BASE_VERBOSE_ERROR : MCA_BASE_VERBOSE_TRACE, "remote accumulate (put/get) complete on "
|
|
"sync %p. local address %p. opal status %d", (void *) sync, local_address, status);
|
|
|
|
ompi_osc_rdma_frag_complete (request->frag);
|
|
ompi_osc_rdma_request_complete (request, status);
|
|
|
|
if (!ompi_osc_rdma_peer_is_exclusive (peer)) {
|
|
(void) ompi_osc_rdma_lock_release_exclusive (sync->module, peer, offsetof (ompi_osc_rdma_state_t, accumulate_lock));
|
|
}
|
|
|
|
ompi_osc_rdma_sync_rdma_dec (sync);
|
|
ompi_osc_rdma_peer_clear_flag (peer, OMPI_OSC_RDMA_PEER_ACCUMULATING);
|
|
}
|
|
|
|
/* completion of an accumulate get operation */
|
|
static void ompi_osc_rdma_acc_get_complete (struct mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t *endpoint,
|
|
void *local_address, mca_btl_base_registration_handle_t *local_handle,
|
|
void *context, void *data, int status)
|
|
{
|
|
ompi_osc_rdma_request_t *request = (ompi_osc_rdma_request_t *) context;
|
|
intptr_t source = (intptr_t) local_address + request->offset;
|
|
ompi_osc_rdma_sync_t *sync = request->sync;
|
|
ompi_osc_rdma_module_t *module = sync->module;
|
|
|
|
assert (OMPI_SUCCESS == status);
|
|
|
|
OSC_RDMA_VERBOSE(status ? MCA_BASE_VERBOSE_ERROR : MCA_BASE_VERBOSE_TRACE, "remote accumulate get complete on sync %p. "
|
|
"status %d. request type %d", (void *) sync, status, request->type);
|
|
|
|
if (OMPI_SUCCESS == status && OMPI_OSC_RDMA_TYPE_GET_ACC == request->type) {
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "unpacking get accumulate result into user buffer");
|
|
if (NULL == request->result_addr) {
|
|
/* result buffer is not necessarily contiguous. use the opal datatype engine to
|
|
* copy the data over in this case */
|
|
struct iovec iov = {.iov_base = (void *) source, request->len};
|
|
uint32_t iov_count = 1;
|
|
size_t size = request->len;
|
|
|
|
opal_convertor_unpack (&request->convertor, &iov, &iov_count, &size);
|
|
opal_convertor_cleanup (&request->convertor);
|
|
} else {
|
|
/* copy contiguous data to the result buffer */
|
|
ompi_datatype_sndrcv ((void *) source, request->len, MPI_BYTE, request->result_addr,
|
|
request->result_count, request->result_dt);
|
|
}
|
|
|
|
if (&ompi_mpi_op_no_op.op == request->op) {
|
|
/* this is a no-op. nothing more to do except release resources and the accumulate lock */
|
|
ompi_osc_rdma_acc_put_complete (btl, endpoint, local_address, local_handle, context, data, status);
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
/* accumulate the data */
|
|
if (&ompi_mpi_op_replace.op != request->op) {
|
|
ompi_op_reduce (request->op, request->origin_addr, (void *) source, request->origin_count, request->origin_dt);
|
|
} else {
|
|
memcpy ((void *) source, request->origin_addr, request->len);
|
|
}
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "putting locally accumulated result into target window");
|
|
|
|
/* initiate the put of the accumulated data */
|
|
status = module->selected_btl->btl_put (module->selected_btl, endpoint, (void *) source,
|
|
request->target_address, local_handle,
|
|
(mca_btl_base_registration_handle_t *) request->ctx,
|
|
request->len, 0, MCA_BTL_NO_ORDER, ompi_osc_rdma_acc_put_complete,
|
|
request, NULL);
|
|
if (OPAL_SUCCESS != status) {
|
|
status = ompi_osc_rdma_event_queue (module, endpoint, OMPI_OSC_RDMA_EVENT_TYPE_PUT, (void *) source, local_handle,
|
|
request->target_address, (mca_btl_base_registration_handle_t *) request->ctx,
|
|
request->len, ompi_osc_rdma_acc_put_complete, request, NULL);
|
|
}
|
|
|
|
assert (OPAL_SUCCESS == status);
|
|
}
|
|
|
|
static inline int ompi_osc_rdma_gacc_contig (ompi_osc_rdma_sync_t *sync, const void *source, int source_count, ompi_datatype_t *source_datatype,
|
|
void *result, int result_count, ompi_datatype_t *result_datatype,
|
|
ompi_osc_rdma_peer_t *peer, uint64_t target_address,
|
|
mca_btl_base_registration_handle_t *target_handle, int target_count,
|
|
ompi_datatype_t *target_datatype, ompi_op_t *op, ompi_osc_rdma_request_t *request)
|
|
{
|
|
ompi_osc_rdma_module_t *module = sync->module;
|
|
const size_t btl_alignment_mask = ALIGNMENT_MASK(module->selected_btl->btl_get_alignment);
|
|
unsigned long len = target_count * target_datatype->super.size;
|
|
ompi_osc_rdma_frag_t *frag = NULL;
|
|
unsigned long aligned_len, offset;
|
|
char *ptr = NULL;
|
|
int ret;
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "initiating accumulate on contiguous region of %lu bytes to remote address %" PRIx64
|
|
", sync %p", len, target_address, (void *) sync);
|
|
|
|
offset = target_address & btl_alignment_mask;;
|
|
aligned_len = (len + offset + btl_alignment_mask) & ~btl_alignment_mask;
|
|
|
|
ret = ompi_osc_rdma_frag_alloc (module, aligned_len, &frag, &ptr);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_WARN, "could not allocate a temporary buffer for accumulate");
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
OPAL_THREAD_LOCK(&module->lock);
|
|
/* to ensure order wait until the previous accumulate completes */
|
|
while (!ompi_osc_rdma_peer_test_set_flag (peer, OMPI_OSC_RDMA_PEER_ACCUMULATING)) {
|
|
OPAL_THREAD_UNLOCK(&module->lock);
|
|
ompi_osc_rdma_progress (module);
|
|
OPAL_THREAD_LOCK(&module->lock);
|
|
}
|
|
|
|
OPAL_THREAD_UNLOCK(&module->lock);
|
|
|
|
if (!ompi_osc_rdma_peer_is_exclusive (peer)) {
|
|
(void) ompi_osc_rdma_lock_acquire_exclusive (module, peer, offsetof (ompi_osc_rdma_state_t, accumulate_lock));
|
|
}
|
|
|
|
/* set up the request */
|
|
request->frag = frag;
|
|
request->origin_addr = (void *) source;
|
|
request->origin_dt = source_datatype;
|
|
request->origin_count = source_count;
|
|
request->ctx = (void *) target_handle;
|
|
request->result_addr = result;
|
|
request->result_count = result_count;
|
|
request->result_dt = result_datatype;
|
|
request->offset = (ptrdiff_t) target_address & btl_alignment_mask;
|
|
request->target_address = target_address;
|
|
request->len = len;
|
|
request->op = op;
|
|
request->sync = sync;
|
|
|
|
ompi_osc_rdma_sync_rdma_inc (sync);
|
|
|
|
if (&ompi_mpi_op_replace.op != op || OMPI_OSC_RDMA_TYPE_GET_ACC == request->type) {
|
|
/* align the target address */
|
|
target_address = target_address & ~btl_alignment_mask;
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "initiating btl get. local: %p (handle %p), remote: 0x%" PRIx64
|
|
" (handle %p)", (void*)ptr, (void *) frag->handle, target_address, (void *) target_handle);
|
|
|
|
ret = module->selected_btl->btl_get (module->selected_btl, peer->data_endpoint, ptr,
|
|
target_address, frag->handle, target_handle, aligned_len,
|
|
0, MCA_BTL_NO_ORDER, ompi_osc_rdma_acc_get_complete,
|
|
request, NULL);
|
|
} else {
|
|
/* copy the put accumulate data */
|
|
memcpy (ptr, source, len);
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "initiating btl put. local: %p (handle %p), remote: 0x%" PRIx64
|
|
" (handle %p)", (void*)ptr, (void *) frag->handle, target_address, (void *) target_handle);
|
|
|
|
ret = module->selected_btl->btl_put (module->selected_btl, peer->data_endpoint, ptr,
|
|
target_address, frag->handle, target_handle, len, 0,
|
|
MCA_BTL_NO_ORDER, ompi_osc_rdma_acc_put_complete,
|
|
request, NULL);
|
|
}
|
|
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS == ret)) {
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_INFO, "accumulate btl operation failed with opal error code %d", ret);
|
|
|
|
if (!ompi_osc_rdma_peer_is_exclusive (peer)) {
|
|
(void) ompi_osc_rdma_lock_release_exclusive (module, peer, offsetof (ompi_osc_rdma_state_t, accumulate_lock));
|
|
}
|
|
|
|
ompi_osc_rdma_cleanup_rdma (sync, frag, NULL, NULL);
|
|
|
|
return ret;
|
|
}
|
|
|
|
static inline int ompi_osc_rdma_gacc_master (ompi_osc_rdma_sync_t *sync, const void *source_addr, int source_count,
|
|
ompi_datatype_t *source_datatype, void *result_addr, int result_count,
|
|
ompi_datatype_t *result_datatype, ompi_osc_rdma_peer_t *peer, uint64_t target_address,
|
|
mca_btl_base_registration_handle_t *target_handle, int target_count,
|
|
ompi_datatype_t *target_datatype, ompi_op_t *op, ompi_osc_rdma_request_t *request)
|
|
{
|
|
ompi_osc_rdma_module_t *module = sync->module;
|
|
struct iovec source_iovec[OMPI_OSC_RDMA_DECODE_MAX], target_iovec[OMPI_OSC_RDMA_DECODE_MAX];
|
|
const size_t acc_limit = (mca_osc_rdma_component.buffer_size >> 3);
|
|
uint32_t source_primitive_count, target_primitive_count;
|
|
opal_convertor_t source_convertor, target_convertor;
|
|
uint32_t source_iov_count, target_iov_count;
|
|
uint32_t source_iov_index, target_iov_index;
|
|
ompi_datatype_t *source_primitive, *target_primitive;
|
|
/* needed for opal_convertor_raw but not used */
|
|
size_t source_size, target_size;
|
|
ompi_osc_rdma_request_t *subreq;
|
|
size_t result_position;
|
|
ptrdiff_t lb, extent;
|
|
int ret, acc_len;
|
|
bool done;
|
|
|
|
(void) ompi_datatype_get_extent (target_datatype, &lb, &extent);
|
|
target_address += lb;
|
|
|
|
/* fast path for accumulate on built-in types */
|
|
if (OPAL_LIKELY((!source_count || ompi_datatype_is_predefined (source_datatype)) &&
|
|
ompi_datatype_is_predefined (target_datatype) &&
|
|
(!result_count || ompi_datatype_is_predefined (result_datatype)) &&
|
|
(target_datatype->super.size * target_count <= acc_limit))) {
|
|
if (NULL == request) {
|
|
OMPI_OSC_RDMA_REQUEST_ALLOC(module, peer, request);
|
|
request->internal = true;
|
|
}
|
|
|
|
request->type = result_datatype ? OMPI_OSC_RDMA_TYPE_GET_ACC : OMPI_OSC_RDMA_TYPE_ACC;
|
|
|
|
if (source_datatype) {
|
|
(void) ompi_datatype_get_extent (source_datatype, &lb, &extent);
|
|
source_addr = (void *)((intptr_t) source_addr + lb);
|
|
}
|
|
|
|
if (result_datatype) {
|
|
(void) ompi_datatype_get_extent (result_datatype, &lb, &extent);
|
|
result_addr = (void *)((intptr_t) result_addr + lb);
|
|
}
|
|
|
|
ret = ompi_osc_rdma_gacc_contig (sync, source_addr, source_count, source_datatype, result_addr,
|
|
result_count, result_datatype, peer, target_address,
|
|
target_handle, target_count, target_datatype, op,
|
|
request);
|
|
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
if (source_datatype) {
|
|
/* the convertors will handle the lb */
|
|
(void) ompi_datatype_get_extent (source_datatype, &lb, &extent);
|
|
source_addr = (void *)((intptr_t) source_addr - lb);
|
|
}
|
|
|
|
if (result_datatype) {
|
|
(void) ompi_datatype_get_extent (result_datatype, &lb, &extent);
|
|
result_addr = (void *)((intptr_t) result_addr - lb);
|
|
}
|
|
}
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "scheduling accumulate on non-contiguous datatype(s)");
|
|
|
|
/* the convertor will handle lb from here */
|
|
(void) ompi_datatype_get_extent (target_datatype, &lb, &extent);
|
|
target_address -= lb;
|
|
|
|
/* get the primitive datatype info */
|
|
ret = ompi_osc_base_get_primitive_type_info (target_datatype, &target_primitive, &target_primitive_count);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
/* target datatype is not made up of a single basic datatype */
|
|
return ret;
|
|
}
|
|
|
|
if (source_datatype) {
|
|
ret = ompi_osc_base_get_primitive_type_info (source_datatype, &source_primitive, &source_primitive_count);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
/* target datatype is not made up of a single basic datatype */
|
|
return ret;
|
|
}
|
|
|
|
if (OPAL_UNLIKELY(source_primitive != target_primitive)) {
|
|
return MPI_ERR_TYPE;
|
|
}
|
|
}
|
|
|
|
/* prepare convertors for the source and target. these convertors will be used to determine the
|
|
* contiguous segments within the source and target. */
|
|
/* the source may be NULL if using MPI_OP_NO_OP with MPI_Get_accumulate */
|
|
if (source_datatype) {
|
|
OBJ_CONSTRUCT(&source_convertor, opal_convertor_t);
|
|
ret = opal_convertor_copy_and_prepare_for_send (ompi_mpi_local_convertor, &source_datatype->super, source_count, source_addr,
|
|
0, &source_convertor);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
/* target_datatype can never be NULL */
|
|
OBJ_CONSTRUCT(&target_convertor, opal_convertor_t);
|
|
ret = opal_convertor_copy_and_prepare_for_send (ompi_mpi_local_convertor, &target_datatype->super, target_count,
|
|
(void *) (intptr_t) target_address, 0, &target_convertor);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
if (request) {
|
|
/* keep the request from completing until all the transfers have started */
|
|
request->outstanding_requests = 1;
|
|
}
|
|
|
|
target_iov_index = 0;
|
|
target_iov_count = 0;
|
|
result_position = 0;
|
|
|
|
do {
|
|
/* decode segments of the source data */
|
|
source_iov_count = OMPI_OSC_RDMA_DECODE_MAX;
|
|
source_iov_index = 0;
|
|
/* opal_convertor_raw returns done when it has reached the end of the data */
|
|
if (!source_datatype) {
|
|
done = true;
|
|
source_iovec[0].iov_len = (size_t) -1;
|
|
source_iovec[0].iov_base = NULL;
|
|
source_iov_count = 1;
|
|
} else {
|
|
done = opal_convertor_raw (&source_convertor, source_iovec, &source_iov_count, &source_size);
|
|
}
|
|
|
|
/* loop on the target segments until we have exhaused the decoded source data */
|
|
while (source_iov_index != source_iov_count) {
|
|
if (target_iov_index == target_iov_count) {
|
|
/* decode segments of the target buffer */
|
|
target_iov_count = OMPI_OSC_RDMA_DECODE_MAX;
|
|
target_iov_index = 0;
|
|
(void) opal_convertor_raw (&target_convertor, target_iovec, &target_iov_count, &target_size);
|
|
}
|
|
|
|
/* we already checked that the target was large enough. this should be impossible */
|
|
assert (0 != target_iov_count);
|
|
|
|
/* determine how much to put in this operation */
|
|
acc_len = min(target_iovec[target_iov_index].iov_len, source_iovec[source_iov_index].iov_len);
|
|
acc_len = min((size_t) acc_len, acc_limit);
|
|
|
|
/* execute the get */
|
|
OMPI_OSC_RDMA_REQUEST_ALLOC(module, peer, subreq);
|
|
subreq->internal = true;
|
|
subreq->parent_request = request;
|
|
if (request) {
|
|
(void) OPAL_THREAD_ADD_FETCH32 (&request->outstanding_requests, 1);
|
|
}
|
|
|
|
if (result_datatype) {
|
|
/* prepare a convertor for this part of the result */
|
|
opal_convertor_copy_and_prepare_for_recv (ompi_mpi_local_convertor, &result_datatype->super, result_count,
|
|
result_addr, 0, &subreq->convertor);
|
|
opal_convertor_set_position (&subreq->convertor, &result_position);
|
|
subreq->type = OMPI_OSC_RDMA_TYPE_GET_ACC;
|
|
} else {
|
|
subreq->type = OMPI_OSC_RDMA_TYPE_ACC;
|
|
}
|
|
|
|
ret = ompi_osc_rdma_gacc_contig (sync, source_iovec[source_iov_index].iov_base, acc_len / target_primitive->super.size,
|
|
target_primitive, NULL, 0, NULL, peer, (uint64_t) (intptr_t) target_iovec[target_iov_index].iov_base,
|
|
target_handle, acc_len / target_primitive->super.size, target_primitive, op, subreq);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
if (OPAL_UNLIKELY(OMPI_ERR_OUT_OF_RESOURCE != ret)) {
|
|
/* something bad happened. need to figure out how to handle these errors */
|
|
return ret;
|
|
}
|
|
|
|
/* progress and try again */
|
|
ompi_osc_rdma_progress (module);
|
|
continue;
|
|
}
|
|
|
|
/* adjust io vectors */
|
|
target_iovec[target_iov_index].iov_len -= acc_len;
|
|
source_iovec[source_iov_index].iov_len -= acc_len;
|
|
target_iovec[target_iov_index].iov_base = (void *)((intptr_t) target_iovec[target_iov_index].iov_base + acc_len);
|
|
source_iovec[source_iov_index].iov_base = (void *)((intptr_t) source_iovec[source_iov_index].iov_base + acc_len);
|
|
result_position += acc_len;
|
|
|
|
source_iov_index += !source_datatype || (0 == source_iovec[source_iov_index].iov_len);
|
|
target_iov_index += (0 == target_iovec[target_iov_index].iov_len);
|
|
}
|
|
} while (!done);
|
|
|
|
if (request) {
|
|
/* release our reference so the request can complete */
|
|
(void) OPAL_THREAD_ADD_FETCH32 (&request->outstanding_requests, -1);
|
|
}
|
|
|
|
if (source_datatype) {
|
|
opal_convertor_cleanup (&source_convertor);
|
|
OBJ_DESTRUCT(&source_convertor);
|
|
}
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "finished scheduling rdma on non-contiguous datatype(s)");
|
|
|
|
opal_convertor_cleanup (&target_convertor);
|
|
OBJ_DESTRUCT(&target_convertor);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
static void ompi_osc_rdma_cas_atomic_complete (struct mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t *endpoint,
|
|
void *local_address, mca_btl_base_registration_handle_t *local_handle,
|
|
void *context, void *data, int status)
|
|
{
|
|
ompi_osc_rdma_sync_t *sync = (ompi_osc_rdma_sync_t *) context;
|
|
ompi_osc_rdma_frag_t *frag = (ompi_osc_rdma_frag_t *) data;
|
|
void *result_addr = (void *)(intptr_t) ((int64_t *) local_address)[1];
|
|
size_t size = ((int64_t *) local_address)[2];
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "atomic compare-and-swap complete. result: 0x%" PRIx64,
|
|
*((int64_t *) local_address));
|
|
|
|
/* copy the result */
|
|
memcpy (result_addr, local_address, size);
|
|
|
|
ompi_osc_rdma_sync_rdma_dec (sync);
|
|
ompi_osc_rdma_frag_complete (frag);
|
|
}
|
|
|
|
static inline int ompi_osc_rdma_cas_atomic (ompi_osc_rdma_sync_t *sync, const void *source_addr, const void *compare_addr,
|
|
void *result_addr, ompi_datatype_t *datatype, ompi_osc_rdma_peer_t *peer,
|
|
uint64_t target_address, mca_btl_base_registration_handle_t *target_handle)
|
|
{
|
|
ompi_osc_rdma_module_t *module = sync->module;
|
|
const size_t size = datatype->super.size;
|
|
ompi_osc_rdma_frag_t *frag = NULL;
|
|
int64_t compare, source;
|
|
int ret, flags;
|
|
char *ptr;
|
|
|
|
if (8 != size && !(4 == size && (MCA_BTL_ATOMIC_SUPPORTS_32BIT & module->selected_btl->btl_flags))) {
|
|
return OMPI_ERR_NOT_SUPPORTED;
|
|
}
|
|
|
|
compare = (8 == size) ? ((int64_t *) compare_addr)[0] : ((int32_t *) compare_addr)[0];
|
|
source = (8 == size) ? ((int64_t *) source_addr)[0] : ((int32_t *) source_addr)[0];
|
|
flags = (4 == size) ? MCA_BTL_ATOMIC_FLAG_32BIT : 0;
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "initiating compare-and-swap using %d-bit btl atomics. compare: 0x%"
|
|
PRIx64 ", origin: 0x%" PRIx64, (int) size * 8, *((int64_t *) compare_addr), *((int64_t *) source_addr));
|
|
|
|
ret = ompi_osc_rdma_frag_alloc (module, 24, &frag, &ptr);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
/* store the destination and size in the temporary buffer */
|
|
((int64_t *) ptr)[1] = (intptr_t) result_addr;
|
|
((int64_t *) ptr)[2] = size;
|
|
|
|
ompi_osc_rdma_sync_rdma_inc (sync);
|
|
|
|
do {
|
|
ret = module->selected_btl->btl_atomic_cswap (module->selected_btl, peer->data_endpoint, ptr, target_address,
|
|
frag->handle, target_handle, compare, source, flags, MCA_BTL_NO_ORDER,
|
|
ompi_osc_rdma_cas_atomic_complete, sync, frag);
|
|
|
|
ompi_osc_rdma_progress (module);
|
|
} while (OPAL_UNLIKELY(OMPI_ERR_OUT_OF_RESOURCE == ret || OPAL_ERR_TEMP_OUT_OF_RESOURCE == ret));
|
|
|
|
if (OPAL_SUCCESS != ret) {
|
|
ompi_osc_rdma_sync_rdma_dec (sync);
|
|
|
|
if (1 == ret) {
|
|
memcpy (result_addr, ptr, size);
|
|
ret = OMPI_SUCCESS;
|
|
}
|
|
|
|
ompi_osc_rdma_frag_complete (frag);
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static inline void ompi_osc_rdma_fetch_and_op_atomic_complete (struct mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t *endpoint,
|
|
void *local_address, mca_btl_base_registration_handle_t *local_handle,
|
|
void *context, void *data, int status)
|
|
{
|
|
ompi_osc_rdma_sync_t *sync = (ompi_osc_rdma_sync_t *) context;
|
|
ompi_osc_rdma_frag_t *frag = (ompi_osc_rdma_frag_t *) data;
|
|
void *result_addr = (void *)(intptr_t) ((int64_t *) local_address)[1];
|
|
ompi_osc_rdma_request_t *req = (ompi_osc_rdma_request_t *) (intptr_t) ((int64_t *) local_address)[2];
|
|
size_t size = ((int64_t *) local_address)[3];
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "atomic fetch-and-op complete. result: 0x%" PRIx64,
|
|
*((int64_t *) local_address));
|
|
|
|
/* copy the result */
|
|
if (result_addr) {
|
|
memcpy (result_addr, local_address, size);
|
|
}
|
|
|
|
ompi_osc_rdma_sync_rdma_dec (sync);
|
|
ompi_osc_rdma_frag_complete (frag);
|
|
if (req) {
|
|
ompi_osc_rdma_request_complete (req, status);
|
|
}
|
|
}
|
|
|
|
static int ompi_osc_rdma_op_mapping[OMPI_OP_NUM_OF_TYPES] = {
|
|
[OMPI_OP_MAX] = MCA_BTL_ATOMIC_MAX,
|
|
[OMPI_OP_MIN] = MCA_BTL_ATOMIC_MIN,
|
|
[OMPI_OP_SUM] = MCA_BTL_ATOMIC_ADD,
|
|
[OMPI_OP_BAND] = MCA_BTL_ATOMIC_AND,
|
|
[OMPI_OP_BOR] = MCA_BTL_ATOMIC_OR,
|
|
[OMPI_OP_BXOR] = MCA_BTL_ATOMIC_XOR,
|
|
[OMPI_OP_LAND] = MCA_BTL_ATOMIC_LAND,
|
|
[OMPI_OP_LOR] = MCA_BTL_ATOMIC_LOR,
|
|
[OMPI_OP_LXOR] = MCA_BTL_ATOMIC_LXOR,
|
|
[OMPI_OP_REPLACE] = MCA_BTL_ATOMIC_SWAP,
|
|
};
|
|
|
|
static int ompi_osc_rdma_fetch_and_op_atomic (ompi_osc_rdma_sync_t *sync, const void *origin_addr, void *result_addr, ompi_datatype_t *dt,
|
|
ptrdiff_t extent, ompi_osc_rdma_peer_t *peer, uint64_t target_address,
|
|
mca_btl_base_registration_handle_t *target_handle, ompi_op_t *op, ompi_osc_rdma_request_t *req)
|
|
{
|
|
ompi_osc_rdma_module_t *module = sync->module;
|
|
int32_t atomic_flags = module->selected_btl->btl_atomic_flags;
|
|
ompi_osc_rdma_frag_t *frag = NULL;
|
|
int ret, btl_op, flags;
|
|
char *ptr = NULL;
|
|
int64_t origin;
|
|
|
|
if ((8 != extent && !((MCA_BTL_ATOMIC_SUPPORTS_32BIT & atomic_flags) && 4 == extent)) ||
|
|
(!(OMPI_DATATYPE_FLAG_DATA_INT & dt->super.flags) && !(MCA_BTL_ATOMIC_SUPPORTS_FLOAT & atomic_flags)) ||
|
|
!ompi_op_is_intrinsic (op) || (0 == ompi_osc_rdma_op_mapping[op->op_type])) {
|
|
return OMPI_ERR_NOT_SUPPORTED;
|
|
}
|
|
|
|
flags = (4 == extent) ? MCA_BTL_ATOMIC_FLAG_32BIT : 0;
|
|
if (OMPI_DATATYPE_FLAG_DATA_FLOAT & dt->super.flags) {
|
|
flags |= MCA_BTL_ATOMIC_FLAG_FLOAT;
|
|
}
|
|
|
|
btl_op = ompi_osc_rdma_op_mapping[op->op_type];
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "initiating fetch-and-op using %d-bit btl atomics. origin: 0x%" PRIx64,
|
|
(4 == extent) ? 32 : 64, *((int64_t *) origin_addr));
|
|
|
|
ret = ompi_osc_rdma_frag_alloc (module, 32, &frag, &ptr);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
origin = (8 == extent) ? ((int64_t *) origin_addr)[0] : ((int32_t *) origin_addr)[0];
|
|
|
|
/* store the destination, request, and extent in the temporary buffer for the callback */
|
|
((int64_t *) ptr)[1] = (intptr_t) result_addr;
|
|
((int64_t *) ptr)[2] = (intptr_t) req;
|
|
((int64_t *) ptr)[3] = extent;
|
|
|
|
ompi_osc_rdma_sync_rdma_inc (sync);
|
|
|
|
do {
|
|
ret = module->selected_btl->btl_atomic_fop (module->selected_btl, peer->data_endpoint, ptr, target_address,
|
|
frag->handle, target_handle, btl_op, origin, flags,
|
|
MCA_BTL_NO_ORDER, ompi_osc_rdma_fetch_and_op_atomic_complete,
|
|
sync, frag);
|
|
|
|
ompi_osc_rdma_progress (module);
|
|
} while (OPAL_UNLIKELY(OMPI_ERR_OUT_OF_RESOURCE == ret || OPAL_ERR_TEMP_OUT_OF_RESOURCE == ret));
|
|
|
|
if (OPAL_SUCCESS != ret) {
|
|
ompi_osc_rdma_sync_rdma_dec (sync);
|
|
|
|
if (OPAL_LIKELY(1 == ret)) {
|
|
memcpy (result_addr, ptr, extent);
|
|
if (req) {
|
|
ompi_osc_rdma_request_complete (req, OMPI_SUCCESS);
|
|
}
|
|
ret = OPAL_SUCCESS;
|
|
}
|
|
|
|
ompi_osc_rdma_frag_complete (frag);
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static int ompi_osc_rdma_fetch_and_op_cas (ompi_osc_rdma_sync_t *sync, const void *origin_addr, void *result_addr, ompi_datatype_t *dt,
|
|
ptrdiff_t extent, ompi_osc_rdma_peer_t *peer, uint64_t target_address,
|
|
mca_btl_base_registration_handle_t *target_handle, ompi_op_t *op, ompi_osc_rdma_request_t *req)
|
|
{
|
|
ompi_osc_rdma_module_t *module = sync->module;
|
|
ompi_osc_rdma_frag_t *frag = NULL;
|
|
uint64_t address, offset;
|
|
char *ptr = NULL;
|
|
int ret;
|
|
|
|
if (extent > 8) {
|
|
return OMPI_ERR_NOT_SUPPORTED;
|
|
}
|
|
|
|
/* align the address. the user should not call with an unaligned address so don't need to range check here */
|
|
address = target_address & ~7;
|
|
offset = target_address & ~address;
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "initiating fetch-and-op using compare-and-swap. origin: 0x%" PRIx64,
|
|
*((int64_t *) origin_addr));
|
|
|
|
ret = ompi_osc_rdma_frag_alloc (module, 16, &frag, &ptr);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
/* store the destination in the temporary buffer */
|
|
do {
|
|
volatile bool complete = false;
|
|
|
|
ret = ompi_osc_get_data_blocking (module, peer->data_endpoint, address, target_handle, ptr, 8);
|
|
if (OMPI_SUCCESS != ret) {
|
|
ompi_osc_rdma_frag_complete (frag);
|
|
return ret;
|
|
}
|
|
|
|
((int64_t *) ptr)[1] = ((int64_t *) ptr)[0];
|
|
|
|
if (&ompi_mpi_op_no_op.op == op) {
|
|
memcpy (ptr + offset, origin_addr, extent);
|
|
} else {
|
|
ompi_op_reduce (op, (void *) origin_addr, ptr + offset, 1, dt);
|
|
}
|
|
|
|
do {
|
|
ret = module->selected_btl->btl_atomic_cswap (module->selected_btl, peer->data_endpoint, ptr, address,
|
|
frag->handle, target_handle, ((int64_t *) ptr)[1],
|
|
((int64_t *) ptr)[0], 0, MCA_BTL_NO_ORDER,
|
|
ompi_osc_rdma_atomic_complete, (void *) &complete, NULL);
|
|
|
|
ompi_osc_rdma_progress (module);
|
|
} while (OPAL_UNLIKELY(OPAL_ERR_OUT_OF_RESOURCE == ret || OPAL_ERR_TEMP_OUT_OF_RESOURCE == ret));
|
|
|
|
if (OPAL_UNLIKELY(OPAL_SUCCESS != ret)) {
|
|
break;
|
|
}
|
|
|
|
while (!complete) {
|
|
ompi_osc_rdma_progress (module);
|
|
}
|
|
|
|
if (((int64_t *) ptr)[1] == ((int64_t *) ptr)[0]) {
|
|
break;
|
|
}
|
|
} while (1);
|
|
|
|
if (result_addr) {
|
|
memcpy (result_addr, ptr + 8 + offset, extent);
|
|
}
|
|
|
|
ompi_osc_rdma_frag_complete (frag);
|
|
|
|
return ret;
|
|
}
|
|
|
|
static void ompi_osc_rdma_acc_single_atomic_complete (struct mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t *endpoint,
|
|
void *local_address, mca_btl_base_registration_handle_t *local_handle,
|
|
void *context, void *data, int status)
|
|
{
|
|
ompi_osc_rdma_sync_t *sync = (ompi_osc_rdma_sync_t *) context;
|
|
ompi_osc_rdma_request_t *req = (ompi_osc_rdma_request_t *) data;
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "atomic accumulate complete");
|
|
|
|
ompi_osc_rdma_sync_rdma_dec (sync);
|
|
if (req) {
|
|
ompi_osc_rdma_request_complete (req, status);
|
|
}
|
|
}
|
|
|
|
static int ompi_osc_rdma_acc_single_atomic (ompi_osc_rdma_sync_t *sync, const void *origin_addr, ompi_datatype_t *dt, ptrdiff_t extent,
|
|
ompi_osc_rdma_peer_t *peer, uint64_t target_address, mca_btl_base_registration_handle_t *target_handle,
|
|
ompi_op_t *op, ompi_osc_rdma_request_t *req)
|
|
{
|
|
ompi_osc_rdma_module_t *module = sync->module;
|
|
int32_t atomic_flags = module->selected_btl->btl_atomic_flags;
|
|
int ret, btl_op, flags;
|
|
int64_t origin;
|
|
|
|
if (!(module->selected_btl->btl_flags & MCA_BTL_FLAGS_ATOMIC_OPS)) {
|
|
/* btl put atomics not supported or disabled. fall back on fetch-and-op */
|
|
return ompi_osc_rdma_fetch_and_op_atomic (sync, origin_addr, NULL, dt, extent, peer, target_address, target_handle, op, req);
|
|
}
|
|
|
|
if ((8 != extent && !((MCA_BTL_ATOMIC_SUPPORTS_32BIT & atomic_flags) && 4 == extent)) ||
|
|
(!(OMPI_DATATYPE_FLAG_DATA_INT & dt->super.flags) && !(MCA_BTL_ATOMIC_SUPPORTS_FLOAT & atomic_flags)) ||
|
|
!ompi_op_is_intrinsic (op) || (0 == ompi_osc_rdma_op_mapping[op->op_type])) {
|
|
return OMPI_ERR_NOT_SUPPORTED;
|
|
}
|
|
|
|
origin = (8 == extent) ? ((uint64_t *) origin_addr)[0] : ((uint32_t *) origin_addr)[0];
|
|
|
|
/* set the appropriate flags for this atomic */
|
|
flags = (4 == extent) ? MCA_BTL_ATOMIC_FLAG_32BIT : 0;
|
|
if (OMPI_DATATYPE_FLAG_DATA_FLOAT & dt->super.flags) {
|
|
flags |= MCA_BTL_ATOMIC_FLAG_FLOAT;
|
|
}
|
|
|
|
btl_op = ompi_osc_rdma_op_mapping[op->op_type];
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "initiating accumulate using 64-bit btl atomics. origin: 0x%" PRIx64,
|
|
*((int64_t *) origin_addr));
|
|
|
|
ompi_osc_rdma_sync_rdma_inc (sync);
|
|
|
|
do {
|
|
ret = module->selected_btl->btl_atomic_op (module->selected_btl, peer->data_endpoint, target_address,
|
|
target_handle, btl_op, origin, flags, MCA_BTL_NO_ORDER,
|
|
ompi_osc_rdma_acc_single_atomic_complete, sync, req);
|
|
|
|
ompi_osc_rdma_progress (module);
|
|
} while (OPAL_UNLIKELY(OMPI_ERR_OUT_OF_RESOURCE == ret || OPAL_ERR_TEMP_OUT_OF_RESOURCE == ret));
|
|
|
|
if (OPAL_SUCCESS != ret) {
|
|
ompi_osc_rdma_sync_rdma_dec (sync);
|
|
if (1 == ret) {
|
|
if (req) {
|
|
ompi_osc_rdma_request_complete (req, OMPI_SUCCESS);
|
|
}
|
|
ret = OMPI_SUCCESS;
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
/**
|
|
* ompi_osc_rdma_cas_get_complete:
|
|
* Note: This function will not work as is in a heterogeneous environment.
|
|
*/
|
|
static void ompi_osc_rdma_cas_get_complete (struct mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t *endpoint,
|
|
void *local_address, mca_btl_base_registration_handle_t *local_handle,
|
|
void *context, void *data, int status)
|
|
{
|
|
ompi_osc_rdma_request_t *request = (ompi_osc_rdma_request_t *) context;
|
|
ompi_osc_rdma_sync_t *sync = request->sync;
|
|
ompi_osc_rdma_module_t *module = sync->module;
|
|
intptr_t source = (intptr_t) local_address + request->offset;
|
|
ompi_osc_rdma_frag_t *frag = request->frag;
|
|
ompi_osc_rdma_peer_t *peer = request->peer;
|
|
int ret;
|
|
|
|
OSC_RDMA_VERBOSE(status ? MCA_BASE_VERBOSE_ERROR : MCA_BASE_VERBOSE_TRACE, "remote compare-and-swap get complete on sync %p. "
|
|
"status %d", (void *) sync, status);
|
|
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != status)) {
|
|
return;
|
|
}
|
|
|
|
/* copy data to the user buffer (for gacc) */
|
|
memcpy (request->result_addr, (void *) source, request->len);
|
|
|
|
if (0 == memcmp ((void *) source, request->compare_addr, request->len)) {
|
|
/* the target and compare buffers match. write the source to the target */
|
|
memcpy ((void *) source, request->origin_addr, request->len);
|
|
|
|
ret = module->selected_btl->btl_put (module->selected_btl, peer->data_endpoint, local_address,
|
|
request->target_address, local_handle,
|
|
(mca_btl_base_registration_handle_t *) request->ctx,
|
|
request->len, 0, MCA_BTL_NO_ORDER,
|
|
ompi_osc_rdma_acc_put_complete, request, NULL);
|
|
if (OPAL_UNLIKELY(OPAL_SUCCESS != ret)) {
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_ERROR, "could not start put to complete accumulate operation. opal return code "
|
|
"%d. queuing operation...", ret);
|
|
|
|
ret = ompi_osc_rdma_event_queue (module, peer->data_endpoint, OMPI_OSC_RDMA_EVENT_TYPE_PUT, local_address, local_handle,
|
|
request->target_address, (mca_btl_base_registration_handle_t *) request->ctx, request->len,
|
|
ompi_osc_rdma_acc_put_complete, request, NULL);
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
/* this is a no-op. nothing more to do except release the accumulate lock */
|
|
ompi_osc_rdma_frag_complete (frag);
|
|
|
|
if (!ompi_osc_rdma_peer_is_exclusive (peer)) {
|
|
(void) ompi_osc_rdma_lock_release_exclusive (module, request->peer,
|
|
offsetof (ompi_osc_rdma_state_t, accumulate_lock));
|
|
}
|
|
|
|
/* the request is now complete and the outstanding rdma operation is complete */
|
|
ompi_osc_rdma_request_complete (request, status);
|
|
|
|
ompi_osc_rdma_sync_rdma_dec (sync);
|
|
ompi_osc_rdma_peer_clear_flag (peer, OMPI_OSC_RDMA_PEER_ACCUMULATING);
|
|
}
|
|
|
|
static inline int cas_rdma (ompi_osc_rdma_sync_t *sync, const void *source_addr, const void *compare_addr, void *result_addr,
|
|
ompi_datatype_t *datatype, ompi_osc_rdma_peer_t *peer, uint64_t target_address,
|
|
mca_btl_base_registration_handle_t *target_handle)
|
|
{
|
|
ompi_osc_rdma_module_t *module = sync->module;
|
|
const size_t btl_alignment_mask = ALIGNMENT_MASK(module->selected_btl->btl_get_alignment);
|
|
unsigned long offset, aligned_len, len = datatype->super.size;
|
|
ompi_osc_rdma_frag_t *frag = NULL;
|
|
ompi_osc_rdma_request_t *request;
|
|
char *ptr = NULL;
|
|
int ret;
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "initiating compare-and-swap using RMDA on %lu bytes to remote address %" PRIx64
|
|
", sync %p", len, target_address, (void *) sync);
|
|
|
|
OMPI_OSC_RDMA_REQUEST_ALLOC(module, peer, request);
|
|
|
|
request->internal = true;
|
|
request->type = OMPI_OSC_RDMA_TYPE_CSWAP;
|
|
request->sync = sync;
|
|
|
|
OPAL_THREAD_LOCK(&module->lock);
|
|
/* to ensure order wait until the previous accumulate completes */
|
|
while (!ompi_osc_rdma_peer_test_set_flag (peer, OMPI_OSC_RDMA_PEER_ACCUMULATING)) {
|
|
OPAL_THREAD_UNLOCK(&module->lock);
|
|
ompi_osc_rdma_progress (module);
|
|
OPAL_THREAD_LOCK(&module->lock);
|
|
}
|
|
OPAL_THREAD_UNLOCK(&module->lock);
|
|
|
|
offset = target_address & btl_alignment_mask;;
|
|
aligned_len = (len + offset + btl_alignment_mask) & ~btl_alignment_mask;
|
|
|
|
do {
|
|
ret = ompi_osc_rdma_frag_alloc (module, aligned_len, &frag, &ptr);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS == ret)) {
|
|
break;
|
|
}
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_WARN, "could not allocate an rdma fragment for compare-and-swap");
|
|
ompi_osc_rdma_progress (module);
|
|
} while (1);
|
|
|
|
if (!ompi_osc_rdma_peer_is_exclusive (peer)) {
|
|
(void) ompi_osc_rdma_lock_acquire_exclusive (module, peer, offsetof (ompi_osc_rdma_state_t, accumulate_lock));
|
|
}
|
|
|
|
/* set up the request */
|
|
request->frag = frag;
|
|
request->origin_addr = (void *) source_addr;
|
|
request->ctx = (void *) target_handle;
|
|
request->result_addr = result_addr;
|
|
request->compare_addr = compare_addr;
|
|
request->result_dt = datatype;
|
|
request->offset = (ptrdiff_t) offset;
|
|
request->target_address = target_address;
|
|
request->len = len;
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "RDMA compare-and-swap initiating btl get");
|
|
|
|
do {
|
|
ret = module->selected_btl->btl_get (module->selected_btl, peer->data_endpoint, ptr,
|
|
target_address, frag->handle, target_handle,
|
|
aligned_len, 0, MCA_BTL_NO_ORDER,
|
|
ompi_osc_rdma_cas_get_complete, request, NULL);
|
|
if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
|
|
break;
|
|
}
|
|
|
|
if (OPAL_UNLIKELY(OPAL_ERR_OUT_OF_RESOURCE != ret && OPAL_ERR_TEMP_OUT_OF_RESOURCE != ret)) {
|
|
if (!ompi_osc_rdma_peer_is_exclusive (peer)) {
|
|
(void) ompi_osc_rdma_lock_release_exclusive (module, peer, offsetof (ompi_osc_rdma_state_t, accumulate_lock));
|
|
}
|
|
ompi_osc_rdma_frag_complete (frag);
|
|
return ret;
|
|
}
|
|
|
|
ompi_osc_rdma_progress (module);
|
|
} while (1);
|
|
|
|
ompi_osc_rdma_sync_rdma_inc (sync);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
|
|
int ompi_osc_rdma_compare_and_swap (const void *origin_addr, const void *compare_addr, void *result_addr,
|
|
ompi_datatype_t *dt, int target_rank, ptrdiff_t target_disp,
|
|
ompi_win_t *win)
|
|
{
|
|
ompi_osc_rdma_module_t *module = GET_MODULE(win);
|
|
ompi_osc_rdma_peer_t *peer;
|
|
mca_btl_base_registration_handle_t *target_handle;
|
|
ompi_osc_rdma_sync_t *sync;
|
|
uint64_t target_address;
|
|
ptrdiff_t true_lb, true_extent;
|
|
int ret;
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "cswap: 0x%lx, 0x%lx, 0x%lx, %s, %d, %d, %s",
|
|
(unsigned long) origin_addr, (unsigned long) compare_addr, (unsigned long) result_addr,
|
|
dt->name, target_rank, (int) target_disp, win->w_name);
|
|
|
|
sync = ompi_osc_rdma_module_sync_lookup (module, target_rank, &peer);
|
|
if (OPAL_UNLIKELY(NULL == sync)) {
|
|
return OMPI_ERR_RMA_SYNC;
|
|
}
|
|
|
|
ret = ompi_datatype_get_true_extent(dt, &true_lb, &true_extent);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
ret = osc_rdma_get_remote_segment (module, peer, target_disp, true_lb+true_extent, &target_address, &target_handle);
|
|
if (OPAL_UNLIKELY(OPAL_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
if (win->w_acc_ops <= OMPI_WIN_ACCUMULATE_OPS_SAME_OP) {
|
|
/* the user has indicated that they will only use the same op (or same op and no op)
|
|
* for operations on overlapping memory ranges. that indicates it is safe to go ahead
|
|
* and use network atomic operations. */
|
|
ret = ompi_osc_rdma_cas_atomic (sync, origin_addr, compare_addr, result_addr, dt,
|
|
peer, target_address, target_handle);
|
|
if (OMPI_SUCCESS == ret) {
|
|
return OMPI_SUCCESS;
|
|
}
|
|
}
|
|
|
|
if (ompi_osc_rdma_peer_local_base (peer)) {
|
|
return ompi_osc_rdma_cas_local (origin_addr, compare_addr, result_addr, dt,
|
|
peer, target_address, target_handle, module);
|
|
}
|
|
|
|
return cas_rdma (sync, origin_addr, compare_addr, result_addr, dt, peer, target_address,
|
|
target_handle);
|
|
}
|
|
|
|
|
|
static inline
|
|
int ompi_osc_rdma_rget_accumulate_internal (ompi_osc_rdma_sync_t *sync, const void *origin_addr, int origin_count,
|
|
ompi_datatype_t *origin_datatype, void *result_addr, int result_count,
|
|
ompi_datatype_t *result_datatype, ompi_osc_rdma_peer_t *peer,
|
|
int target_rank, MPI_Aint target_disp, int target_count,
|
|
ompi_datatype_t *target_datatype, ompi_op_t *op,
|
|
ompi_osc_rdma_request_t *request)
|
|
{
|
|
ompi_osc_rdma_module_t *module = sync->module;
|
|
mca_btl_base_registration_handle_t *target_handle;
|
|
uint64_t target_address;
|
|
ptrdiff_t lb, origin_extent, target_span;
|
|
int ret;
|
|
|
|
/* short-circuit case. note that origin_count may be 0 if op is MPI_NO_OP */
|
|
if ((result_addr && 0 == result_count) || 0 == target_count) {
|
|
if (request) {
|
|
ompi_osc_rdma_request_complete (request, MPI_SUCCESS);
|
|
}
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
target_span = opal_datatype_span(&target_datatype->super, target_count, &lb);
|
|
|
|
// a buffer defined by (buf, count, dt)
|
|
// will have data starting at buf+offset and ending len bytes later:
|
|
ret = osc_rdma_get_remote_segment (module, peer, target_disp, target_span+lb, &target_address, &target_handle);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
(void) ompi_datatype_get_extent (origin_datatype, &lb, &origin_extent);
|
|
|
|
if (module->acc_single_intrinsic && origin_extent <= 8) {
|
|
if (module->acc_use_amo && ompi_datatype_is_predefined (origin_datatype)) {
|
|
if (NULL == result_addr) {
|
|
ret = ompi_osc_rdma_acc_single_atomic (sync, origin_addr, origin_datatype, origin_extent, peer, target_address,
|
|
target_handle, op, request);
|
|
} else {
|
|
ret = ompi_osc_rdma_fetch_and_op_atomic (sync, origin_addr, result_addr, origin_datatype, origin_extent, peer, target_address,
|
|
target_handle, op, request);
|
|
}
|
|
|
|
if (OMPI_SUCCESS == ret) {
|
|
return OMPI_SUCCESS;
|
|
}
|
|
}
|
|
|
|
ret = ompi_osc_rdma_fetch_and_op_cas (sync, origin_addr, result_addr, origin_datatype, origin_extent, peer, target_address,
|
|
target_handle, op, request);
|
|
if (OMPI_SUCCESS == ret) {
|
|
return OMPI_SUCCESS;
|
|
}
|
|
}
|
|
|
|
if (ompi_osc_rdma_peer_local_base (peer)) {
|
|
/* local/self optimization */
|
|
return ompi_osc_rdma_gacc_local (origin_addr, origin_count, origin_datatype, result_addr, result_count,
|
|
result_datatype, peer, target_address, target_handle, target_count,
|
|
target_datatype, op, module, request);
|
|
}
|
|
|
|
return ompi_osc_rdma_gacc_master (sync, origin_addr, origin_count, origin_datatype, result_addr, result_count,
|
|
result_datatype, peer, target_address, target_handle, target_count,
|
|
target_datatype, op, request);
|
|
}
|
|
|
|
int ompi_osc_rdma_get_accumulate (const void *origin_addr, int origin_count, ompi_datatype_t *origin_datatype,
|
|
void *result_addr, int result_count, ompi_datatype_t *result_datatype,
|
|
int target_rank, MPI_Aint target_disp, int target_count, ompi_datatype_t *target_datatype,
|
|
ompi_op_t *op, ompi_win_t *win)
|
|
{
|
|
ompi_osc_rdma_module_t *module = GET_MODULE(win);
|
|
ompi_osc_rdma_peer_t *peer;
|
|
ompi_osc_rdma_sync_t *sync;
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "get_acc: 0x%lx, %d, %s, 0x%lx, %d, %s, %d, 0x%lx, %d, %s, %s, %s",
|
|
(unsigned long) origin_addr, origin_count, origin_datatype->name,
|
|
(unsigned long) result_addr, result_count, result_datatype->name, target_rank,
|
|
(unsigned long) target_disp, target_count, target_datatype->name, op->o_name,
|
|
win->w_name);
|
|
|
|
sync = ompi_osc_rdma_module_sync_lookup (module, target_rank, &peer);
|
|
if (OPAL_UNLIKELY(NULL == sync)) {
|
|
return OMPI_ERR_RMA_SYNC;
|
|
}
|
|
|
|
return ompi_osc_rdma_rget_accumulate_internal (sync, origin_addr, origin_count, origin_datatype,
|
|
result_addr, result_count, result_datatype,
|
|
peer, target_rank, target_disp, target_count,
|
|
target_datatype, op, NULL);
|
|
}
|
|
|
|
|
|
int ompi_osc_rdma_rget_accumulate (const void *origin_addr, int origin_count, ompi_datatype_t *origin_datatype,
|
|
void *result_addr, int result_count, ompi_datatype_t *result_datatype,
|
|
int target_rank, MPI_Aint target_disp, int target_count, ompi_datatype_t *target_datatype,
|
|
ompi_op_t *op, ompi_win_t *win, ompi_request_t **request)
|
|
{
|
|
ompi_osc_rdma_module_t *module = GET_MODULE(win);
|
|
ompi_osc_rdma_peer_t *peer;
|
|
ompi_osc_rdma_request_t *rdma_request;
|
|
ompi_osc_rdma_sync_t *sync;
|
|
int ret;
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "rget_acc: 0x%lx, %d, %s, 0x%lx, %d, %s, %d, 0x%lx, %d, %s, %s, %s",
|
|
(unsigned long) origin_addr, origin_count, origin_datatype->name,
|
|
(unsigned long) result_addr, result_count, result_datatype->name, target_rank,
|
|
(unsigned long) target_disp, target_count, target_datatype->name, op->o_name,
|
|
win->w_name);
|
|
|
|
sync = ompi_osc_rdma_module_sync_lookup (module, target_rank, &peer);
|
|
if (OPAL_UNLIKELY(NULL == sync)) {
|
|
return OMPI_ERR_RMA_SYNC;
|
|
}
|
|
|
|
OMPI_OSC_RDMA_REQUEST_ALLOC(module, peer, rdma_request);
|
|
|
|
ret = ompi_osc_rdma_rget_accumulate_internal (sync, origin_addr, origin_count, origin_datatype, result_addr,
|
|
result_count, result_datatype, peer, target_rank, target_disp,
|
|
target_count, target_datatype, op, rdma_request);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
OMPI_OSC_RDMA_REQUEST_RETURN(rdma_request);
|
|
return ret;
|
|
}
|
|
|
|
*request = &rdma_request->super;
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
int ompi_osc_rdma_raccumulate (const void *origin_addr, int origin_count, ompi_datatype_t *origin_datatype, int target_rank,
|
|
ptrdiff_t target_disp, int target_count, ompi_datatype_t *target_datatype, ompi_op_t *op,
|
|
ompi_win_t *win, ompi_request_t **request)
|
|
{
|
|
ompi_osc_rdma_module_t *module = GET_MODULE(win);
|
|
ompi_osc_rdma_peer_t *peer;
|
|
ompi_osc_rdma_request_t *rdma_request;
|
|
ompi_osc_rdma_sync_t *sync;
|
|
int ret;
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "racc: 0x%lx, %d, %s, %d, 0x%lx, %d, %s, %s, %s",
|
|
(unsigned long) origin_addr, origin_count, origin_datatype->name, target_rank,
|
|
(unsigned long) target_disp, target_count, target_datatype->name, op->o_name, win->w_name);
|
|
|
|
sync = ompi_osc_rdma_module_sync_lookup (module, target_rank, &peer);
|
|
if (OPAL_UNLIKELY(NULL == sync)) {
|
|
return OMPI_ERR_RMA_SYNC;
|
|
}
|
|
|
|
OMPI_OSC_RDMA_REQUEST_ALLOC(module, peer, rdma_request);
|
|
|
|
ret = ompi_osc_rdma_rget_accumulate_internal (sync, origin_addr, origin_count, origin_datatype, NULL, 0,
|
|
NULL, peer, target_rank, target_disp, target_count, target_datatype,
|
|
op, rdma_request);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
OMPI_OSC_RDMA_REQUEST_RETURN(rdma_request);
|
|
return ret;
|
|
}
|
|
|
|
*request = &rdma_request->super;
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
int ompi_osc_rdma_accumulate (const void *origin_addr, int origin_count, ompi_datatype_t *origin_datatype, int target_rank,
|
|
ptrdiff_t target_disp, int target_count, ompi_datatype_t *target_datatype, ompi_op_t *op,
|
|
ompi_win_t *win)
|
|
{
|
|
ompi_osc_rdma_module_t *module = GET_MODULE(win);
|
|
ompi_osc_rdma_peer_t *peer;
|
|
ompi_osc_rdma_sync_t *sync;
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "acc: 0x%lx, %d, %s, %d, 0x%lx, %d, %s, %s, %s",
|
|
(unsigned long) origin_addr, origin_count, origin_datatype->name, target_rank,
|
|
(unsigned long) target_disp, target_count, target_datatype->name, op->o_name, win->w_name);
|
|
|
|
sync = ompi_osc_rdma_module_sync_lookup (module, target_rank, &peer);
|
|
if (OPAL_UNLIKELY(NULL == sync)) {
|
|
return OMPI_ERR_RMA_SYNC;
|
|
}
|
|
|
|
return ompi_osc_rdma_rget_accumulate_internal (sync, origin_addr, origin_count, origin_datatype, NULL, 0,
|
|
NULL, peer, target_rank, target_disp, target_count, target_datatype,
|
|
op, NULL);
|
|
}
|
|
|
|
|
|
int ompi_osc_rdma_fetch_and_op (const void *origin_addr, void *result_addr, ompi_datatype_t *dt, int target_rank,
|
|
ptrdiff_t target_disp, ompi_op_t *op, ompi_win_t *win)
|
|
{
|
|
ompi_osc_rdma_module_t *module = GET_MODULE(win);
|
|
ompi_osc_rdma_peer_t *peer;
|
|
ompi_osc_rdma_sync_t *sync;
|
|
|
|
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "fop: %p, %s, %d, %lu, %s, %s", result_addr, dt->name,
|
|
target_rank, (unsigned long) target_disp, op->o_name, win->w_name);
|
|
|
|
sync = ompi_osc_rdma_module_sync_lookup (module, target_rank, &peer);
|
|
if (OPAL_UNLIKELY(NULL == sync)) {
|
|
return OMPI_ERR_RMA_SYNC;
|
|
}
|
|
|
|
return ompi_osc_rdma_rget_accumulate_internal (sync, origin_addr, 1, dt, result_addr, 1, dt, peer,
|
|
target_rank, target_disp, 1, dt, op, NULL);
|
|
}
|