552c9ca5a0
WHAT: Open our low-level communication infrastructure by moving all necessary components (btl/rcache/allocator/mpool) down in OPAL All the components required for inter-process communications are currently deeply integrated in the OMPI layer. Several groups/institutions have express interest in having a more generic communication infrastructure, without all the OMPI layer dependencies. This communication layer should be made available at a different software level, available to all layers in the Open MPI software stack. As an example, our ORTE layer could replace the current OOB and instead use the BTL directly, gaining access to more reactive network interfaces than TCP. Similarly, external software libraries could take advantage of our highly optimized AM (active message) communication layer for their own purpose. UTK with support from Sandia, developped a version of Open MPI where the entire communication infrastucture has been moved down to OPAL (btl/rcache/allocator/mpool). Most of the moved components have been updated to match the new schema, with few exceptions (mainly BTLs where I have no way of compiling/testing them). Thus, the completion of this RFC is tied to being able to completing this move for all BTLs. For this we need help from the rest of the Open MPI community, especially those supporting some of the BTLs. A non-exhaustive list of BTLs that qualify here is: mx, portals4, scif, udapl, ugni, usnic. This commit was SVN r32317.
1802 строки
62 KiB
C
1802 строки
62 KiB
C
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
|
|
/*
|
|
* Copyright (c) 2004-2005 The Trustees of Indiana University.
|
|
* All rights reserved.
|
|
* Copyright (c) 2004-2006 The Trustees of the University of Tennessee.
|
|
* All rights reserved.
|
|
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
|
|
* University of Stuttgart. All rights reserved.
|
|
* Copyright (c) 2004-2005 The Regents of the University of California.
|
|
* All rights reserved.
|
|
* Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights
|
|
* reserved.
|
|
* Copyright (c) 2009-2011 Oracle and/or its affiliates. All rights reserved.
|
|
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "ompi_config.h"
|
|
|
|
#include "osc_rdma.h"
|
|
#include "osc_rdma_header.h"
|
|
#include "osc_rdma_data_move.h"
|
|
#include "osc_rdma_obj_convert.h"
|
|
#include "osc_rdma_frag.h"
|
|
#include "osc_rdma_request.h"
|
|
|
|
#include "opal/threads/condition.h"
|
|
#include "opal/threads/mutex.h"
|
|
#include "opal/util/arch.h"
|
|
#include "opal/util/output.h"
|
|
#include "opal/sys/atomic.h"
|
|
#include "opal/align.h"
|
|
#include "opal/mca/btl/btl.h"
|
|
|
|
#include "ompi/mca/pml/pml.h"
|
|
#include "ompi/mca/pml/base/pml_base_sendreq.h"
|
|
#include "opal/mca/btl/btl.h"
|
|
#include "ompi/mca/osc/base/base.h"
|
|
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
|
|
#include "ompi/datatype/ompi_datatype.h"
|
|
#include "ompi/op/op.h"
|
|
#include "ompi/memchecker.h"
|
|
|
|
/**
|
|
* struct osc_rdma_accumulate_data_t:
|
|
*
|
|
* @brief Data associated with an in-progress accumulation operation.
|
|
*/
|
|
struct osc_rdma_accumulate_data_t {
|
|
opal_list_item_t super;
|
|
ompi_osc_rdma_module_t* module;
|
|
void *target;
|
|
void *source;
|
|
size_t source_len;
|
|
ompi_proc_t *proc;
|
|
int count;
|
|
int peer;
|
|
ompi_datatype_t *datatype;
|
|
ompi_op_t *op;
|
|
int request_count;
|
|
};
|
|
typedef struct osc_rdma_accumulate_data_t osc_rdma_accumulate_data_t;
|
|
|
|
static void osc_rdma_accumulate_data_constructor (osc_rdma_accumulate_data_t *acc_data)
|
|
{
|
|
acc_data->source = NULL;
|
|
acc_data->datatype = NULL;
|
|
acc_data->op = NULL;
|
|
}
|
|
|
|
static void osc_rdma_accumulate_data_destructor (osc_rdma_accumulate_data_t *acc_data)
|
|
{
|
|
if (acc_data->source) {
|
|
/* the source buffer is always alloc'd */
|
|
free (acc_data->source);
|
|
}
|
|
|
|
if (acc_data->datatype) {
|
|
OBJ_RELEASE(acc_data->datatype);
|
|
}
|
|
|
|
if (acc_data->op) {
|
|
OBJ_RELEASE(acc_data->op);
|
|
}
|
|
}
|
|
|
|
OBJ_CLASS_DECLARATION(osc_rdma_accumulate_data_t);
|
|
OBJ_CLASS_INSTANCE(osc_rdma_accumulate_data_t, opal_list_item_t, osc_rdma_accumulate_data_constructor,
|
|
osc_rdma_accumulate_data_destructor);
|
|
|
|
/**
|
|
* osc_rdma_pending_acc_t:
|
|
*
|
|
* @brief Keep track of accumulate and cswap operations that are
|
|
* waiting on the accumulate lock.
|
|
*
|
|
* Since accumulate operations may take several steps to
|
|
* complete we need to lock the accumulate lock until the operation
|
|
* is complete. While the lock is held it is possible that additional
|
|
* accumulate operations will arrive. This structure keep track of
|
|
* those operations.
|
|
*/
|
|
struct osc_rdma_pending_acc_t {
|
|
opal_list_item_t super;
|
|
ompi_osc_rdma_header_t header;
|
|
int source;
|
|
void *data;
|
|
size_t data_len;
|
|
ompi_datatype_t *datatype;
|
|
};
|
|
typedef struct osc_rdma_pending_acc_t osc_rdma_pending_acc_t;
|
|
|
|
static void osc_rdma_pending_acc_constructor (osc_rdma_pending_acc_t *pending)
|
|
{
|
|
pending->data = NULL;
|
|
pending->datatype = NULL;
|
|
}
|
|
|
|
static void osc_rdma_pending_acc_destructor (osc_rdma_pending_acc_t *pending)
|
|
{
|
|
if (NULL != pending->data) {
|
|
free (pending->data);
|
|
}
|
|
|
|
if (NULL != pending->datatype) {
|
|
OBJ_RELEASE(pending->datatype);
|
|
}
|
|
}
|
|
|
|
OBJ_CLASS_DECLARATION(osc_rdma_pending_acc_t);
|
|
OBJ_CLASS_INSTANCE(osc_rdma_pending_acc_t, opal_list_item_t,
|
|
osc_rdma_pending_acc_constructor, osc_rdma_pending_acc_destructor);
|
|
/* end ompi_osc_rdma_pending_acc_t class */
|
|
|
|
/**
|
|
* @brief Class for large datatype descriptions
|
|
*
|
|
* This class is used to keep track of buffers for large datatype desctiotions
|
|
* (datatypes that do not fit in an eager fragment). The structure is designed
|
|
* to take advantage of the small datatype description code path.
|
|
*/
|
|
struct ompi_osc_rdma_ddt_buffer_t {
|
|
/** allows this class to be stored in the buffer garbage collection
|
|
* list */
|
|
opal_list_item_t super;
|
|
|
|
/** OSC RDMA module */
|
|
ompi_osc_rdma_module_t *module;
|
|
/** source of this header */
|
|
int source;
|
|
/** header + datatype data */
|
|
ompi_osc_rdma_header_t *header;
|
|
};
|
|
typedef struct ompi_osc_rdma_ddt_buffer_t ompi_osc_rdma_ddt_buffer_t;
|
|
|
|
static void ompi_osc_rdma_ddt_buffer_constructor (ompi_osc_rdma_ddt_buffer_t *ddt_buffer)
|
|
{
|
|
ddt_buffer->header = NULL;
|
|
}
|
|
|
|
static void ompi_osc_rdma_ddt_buffer_destructor (ompi_osc_rdma_ddt_buffer_t *ddt_buffer)
|
|
{
|
|
if (ddt_buffer->header) {
|
|
free (ddt_buffer->header);
|
|
ddt_buffer->header = NULL;
|
|
}
|
|
}
|
|
|
|
OBJ_CLASS_DECLARATION(ompi_osc_rdma_ddt_buffer_t);
|
|
OBJ_CLASS_INSTANCE(ompi_osc_rdma_ddt_buffer_t, opal_list_item_t, ompi_osc_rdma_ddt_buffer_constructor,
|
|
ompi_osc_rdma_ddt_buffer_destructor);
|
|
/* end ompi_osc_rdma_ddt_buffer_t class */
|
|
|
|
/**
|
|
* datatype_buffer_length:
|
|
*
|
|
* @brief Determine the buffer size needed to hold count elements of datatype.
|
|
*
|
|
* @param[in] datatype - Element type
|
|
* @param[in] count - Element count
|
|
*
|
|
* @returns buflen Buffer length needed to hold count elements of datatype
|
|
*/
|
|
static inline int datatype_buffer_length (ompi_datatype_t *datatype, int count)
|
|
{
|
|
ompi_datatype_t *primitive_datatype = NULL;
|
|
uint32_t primitive_count;
|
|
size_t buflen;
|
|
|
|
ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
|
|
primitive_count *= count;
|
|
|
|
/* figure out how big a buffer we need */
|
|
ompi_datatype_type_size(primitive_datatype, &buflen);
|
|
|
|
return buflen * primitive_count;
|
|
}
|
|
|
|
/**
|
|
* ompi_osc_rdma_control_send:
|
|
*
|
|
* @brief send a control message as part of a fragment
|
|
*
|
|
* @param[in] module - OSC RDMA module
|
|
* @param[in] target - Target peer's rank
|
|
* @param[in] data - Data to send
|
|
* @param[in] len - Length of data
|
|
*
|
|
* @returns error OMPI error code or OMPI_SUCCESS
|
|
*
|
|
* "send" a control messages. Adds it to the active fragment, so the
|
|
* caller will still need to explicitly flush (either to everyone or
|
|
* to a target) before this is sent.
|
|
*/
|
|
int ompi_osc_rdma_control_send (ompi_osc_rdma_module_t *module, int target,
|
|
void *data, size_t len)
|
|
{
|
|
ompi_osc_rdma_frag_t *frag;
|
|
char *ptr;
|
|
int ret;
|
|
|
|
OPAL_THREAD_LOCK(&module->lock);
|
|
|
|
ret = ompi_osc_rdma_frag_alloc(module, target, len, &frag, &ptr);
|
|
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
|
|
memcpy (ptr, data, len);
|
|
|
|
ret = ompi_osc_rdma_frag_finish(module, frag);
|
|
}
|
|
|
|
OPAL_THREAD_UNLOCK(&module->lock);
|
|
|
|
return ret;
|
|
}
|
|
|
|
static int ompi_osc_rdma_control_send_unbuffered_cb (ompi_request_t *request)
|
|
{
|
|
void *ctx = request->req_complete_cb_data;
|
|
ompi_osc_rdma_module_t *module;
|
|
|
|
/* get module pointer and data */
|
|
module = *(ompi_osc_rdma_module_t **)ctx;
|
|
|
|
/* mark this send as complete */
|
|
mark_outgoing_completion (module);
|
|
|
|
/* free the temporary buffer */
|
|
free (ctx);
|
|
|
|
/* put this request on the garbage colletion list */
|
|
osc_rdma_gc_add_request (request);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
* ompi_osc_rdma_control_send_unbuffered:
|
|
*
|
|
* @brief Send an unbuffered control message to a peer.
|
|
*
|
|
* @param[in] module - OSC RDMA module
|
|
* @param[in] target - Target rank
|
|
* @param[in] data - Data to send
|
|
* @param[in] len - Length of data
|
|
*
|
|
* Directly send a control message. This does not allocate a
|
|
* fragment, so should only be used when sending other messages would
|
|
* be erroneous (such as complete messages, when there may be queued
|
|
* transactions from an overlapping post that has already heard back
|
|
* from its peer). The buffer specified by data will be available
|
|
* when this call returns.
|
|
*/
|
|
int ompi_osc_rdma_control_send_unbuffered(ompi_osc_rdma_module_t *module,
|
|
int target, void *data, size_t len)
|
|
{
|
|
void *ctx, *data_copy;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: sending unbuffered fragment to %d", target));
|
|
|
|
/* allocate a temporary buffer for this send */
|
|
ctx = malloc (sizeof(ompi_osc_rdma_module_t*) + len);
|
|
if (OPAL_UNLIKELY(NULL == ctx)) {
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
/* increment outgoing signal count. this send is not part of a passive epoch
|
|
* so there it would be erroneous to increment the epoch counters. */
|
|
ompi_osc_signal_outgoing (module, MPI_PROC_NULL, 1);
|
|
|
|
/* store module pointer and data in the buffer */
|
|
*(ompi_osc_rdma_module_t**)ctx = module;
|
|
data_copy = (ompi_osc_rdma_module_t**)ctx + 1;
|
|
memcpy (data_copy, data, len);
|
|
|
|
return ompi_osc_rdma_isend_w_cb (data_copy, len, MPI_BYTE, target, OSC_RDMA_FRAG_TAG,
|
|
module->comm, ompi_osc_rdma_control_send_unbuffered_cb, ctx);
|
|
}
|
|
|
|
/**
|
|
* datatype_create:
|
|
*
|
|
* @brief Utility function that creates a new datatype from a packed
|
|
* description.
|
|
*
|
|
* @param[in] module - OSC RDMA module
|
|
* @param[in] peer - Peer rank
|
|
* @param[out] datatype - New datatype. Must be released with OBJ_RELEASE.
|
|
* @param[out] proc - Optional. Proc for peer.
|
|
* @param[inout] data - Pointer to a pointer where the description is stored. This
|
|
* pointer will be updated to the location after the packed
|
|
* description.
|
|
*/
|
|
static inline int datatype_create (ompi_osc_rdma_module_t *module, int peer, ompi_proc_t **proc, ompi_datatype_t **datatype, void **data)
|
|
{
|
|
ompi_datatype_t *new_datatype = NULL;
|
|
ompi_proc_t *peer_proc;
|
|
int ret = OMPI_SUCCESS;
|
|
|
|
do {
|
|
peer_proc = ompi_comm_peer_lookup(module->comm, peer);
|
|
if (OPAL_UNLIKELY(NULL == peer_proc)) {
|
|
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
|
|
"%d: datatype_create: could not resolve proc pointer for peer %d",
|
|
ompi_comm_rank(module->comm),
|
|
peer));
|
|
ret = OMPI_ERROR;
|
|
break;
|
|
}
|
|
|
|
new_datatype = ompi_osc_base_datatype_create(peer_proc, data);
|
|
if (OPAL_UNLIKELY(NULL == new_datatype)) {
|
|
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
|
|
"%d: datatype_create: could not resolve datatype for peer %d",
|
|
ompi_comm_rank(module->comm), peer));
|
|
ret = OMPI_ERROR;
|
|
}
|
|
} while (0);
|
|
|
|
*datatype = new_datatype;
|
|
if (proc) *proc = peer_proc;
|
|
|
|
return ret;
|
|
}
|
|
|
|
/**
|
|
* process_put:
|
|
*
|
|
* @shoer Process a put w/ data message
|
|
*
|
|
* @param[in] module - OSC RDMA module
|
|
* @param[in] source - Message source
|
|
* @param[in] put_header - Message header + data
|
|
*
|
|
* Process a put message and copy the message data to the specified
|
|
* memory region. Note, this function does not handle any bounds
|
|
* checking at the moment.
|
|
*/
|
|
static inline int process_put(ompi_osc_rdma_module_t* module, int source,
|
|
ompi_osc_rdma_header_put_t* put_header)
|
|
{
|
|
char *data = (char*) (put_header + 1);
|
|
ompi_proc_t *proc;
|
|
struct ompi_datatype_t *datatype;
|
|
size_t data_len;
|
|
void *target = (unsigned char*) module->baseptr +
|
|
((unsigned long) put_header->displacement * module->disp_unit);
|
|
int ret;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"%d: process_put: received message from %d",
|
|
ompi_comm_rank(module->comm),
|
|
source));
|
|
|
|
ret = datatype_create (module, source, &proc, &datatype, (void **) &data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
data_len = put_header->len - ((uintptr_t) data - (uintptr_t) put_header);
|
|
|
|
osc_rdma_copy_on_recv (target, data, data_len, proc, put_header->count, datatype);
|
|
|
|
OBJ_RELEASE(datatype);
|
|
|
|
return put_header->len;
|
|
}
|
|
|
|
static inline int process_put_long(ompi_osc_rdma_module_t* module, int source,
|
|
ompi_osc_rdma_header_put_t* put_header)
|
|
{
|
|
char *data = (char*) (put_header + 1);
|
|
struct ompi_datatype_t *datatype;
|
|
void *target = (unsigned char*) module->baseptr +
|
|
((unsigned long) put_header->displacement * module->disp_unit);
|
|
int ret;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"%d: process_put_long: received message from %d",
|
|
ompi_comm_rank(module->comm),
|
|
source));
|
|
|
|
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
ret = ompi_osc_rdma_component_irecv (module, target,
|
|
put_header->count,
|
|
datatype, source,
|
|
put_header->tag,
|
|
module->comm);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
|
|
"%d: process_put_long: irecv error: %d",
|
|
ompi_comm_rank(module->comm),
|
|
ret));
|
|
return OMPI_ERROR;
|
|
}
|
|
|
|
OBJ_RELEASE(datatype);
|
|
|
|
return put_header->len;
|
|
}
|
|
|
|
/**
|
|
* osc_rdma_incoming_req_complete:
|
|
*
|
|
* @brief Completion callback for a receive associate with an access
|
|
* epoch.
|
|
*
|
|
* @param[in] request - PML request with an OSC RMDA module as the callback data.
|
|
*
|
|
* This function is called when a send or recieve associated with an
|
|
* access epoch completes. When fired this function will increment the
|
|
* passive or active incoming count.
|
|
*/
|
|
static int osc_rdma_incoming_req_complete (ompi_request_t *request)
|
|
{
|
|
ompi_osc_rdma_module_t *module = (ompi_osc_rdma_module_t *) request->req_complete_cb_data;
|
|
int rank = MPI_PROC_NULL;
|
|
|
|
if (request->req_status.MPI_TAG & 0x01) {
|
|
rank = request->req_status.MPI_SOURCE;
|
|
}
|
|
|
|
mark_incoming_completion (module, rank);
|
|
|
|
/* put this request on the garbage colletion list */
|
|
osc_rdma_gc_add_request (request);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
struct osc_rdma_get_post_send_cb_data_t {
|
|
ompi_osc_rdma_module_t *module;
|
|
int peer;
|
|
};
|
|
|
|
static int osc_rdma_get_post_send_cb (ompi_request_t *request)
|
|
{
|
|
struct osc_rdma_get_post_send_cb_data_t *data =
|
|
(struct osc_rdma_get_post_send_cb_data_t *) request->req_complete_cb_data;
|
|
ompi_osc_rdma_module_t *module = data->module;
|
|
int rank = data->peer;
|
|
|
|
free (data);
|
|
|
|
/* mark this as a completed "incoming" request */
|
|
mark_incoming_completion (module, rank);
|
|
|
|
/* put this request on the garbage colletion list */
|
|
osc_rdma_gc_add_request (request);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
* @brief Post a send to match the remote receive for a get operation.
|
|
*
|
|
* @param[in] module - OSC RDMA module
|
|
* @param[in] source - Source buffer
|
|
* @param[in] count - Number of elements in the source buffer
|
|
* @param[in] datatype - Type of source elements.
|
|
* @param[in] peer - Remote process that has the receive posted
|
|
* @param[in] tag - Tag for the send
|
|
*
|
|
* This function posts a send to match the receive posted as part
|
|
* of a get operation. When this send is complete the get is considered
|
|
* complete at the target (this process).
|
|
*/
|
|
static int osc_rdma_get_post_send (ompi_osc_rdma_module_t *module, void *source, int count,
|
|
ompi_datatype_t *datatype, int peer, int tag)
|
|
{
|
|
struct osc_rdma_get_post_send_cb_data_t *data;
|
|
|
|
data = malloc (sizeof (*data));
|
|
if (OPAL_UNLIKELY(NULL == data)) {
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
data->module = module;
|
|
/* for incoming completion we need to know the peer (MPI_PROC_NULL if this is
|
|
* in an active target epoch) */
|
|
data->peer = (tag & 0x1) ? peer : MPI_PROC_NULL;
|
|
|
|
return ompi_osc_rdma_isend_w_cb (source, count, datatype, peer, tag, module->comm,
|
|
osc_rdma_get_post_send_cb, (void *) data);
|
|
}
|
|
|
|
/**
|
|
* process_get:
|
|
*
|
|
* @brief Process a get message from a remote peer
|
|
*
|
|
* @param[in] module - OSC RDMA module
|
|
* @param[in] target - Peer process
|
|
* @param[in] get_header - Incoming message header
|
|
*/
|
|
static inline int process_get (ompi_osc_rdma_module_t* module, int target,
|
|
ompi_osc_rdma_header_get_t* get_header)
|
|
{
|
|
char *data = (char *) (get_header + 1);
|
|
struct ompi_datatype_t *datatype;
|
|
void *source = (unsigned char*) module->baseptr +
|
|
((unsigned long) get_header->displacement * module->disp_unit);
|
|
int ret;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"%d: process_get: received message from %d",
|
|
ompi_comm_rank(module->comm),
|
|
target));
|
|
|
|
ret = datatype_create (module, target, NULL, &datatype, (void **) &data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
/* send get data */
|
|
ret = osc_rdma_get_post_send (module, source, get_header->count, datatype, target, get_header->tag);
|
|
|
|
OBJ_RELEASE(datatype);
|
|
|
|
return OMPI_SUCCESS == ret ? (int) get_header->len : ret;
|
|
}
|
|
|
|
/**
|
|
* osc_rdma_accumulate_buffer:
|
|
*
|
|
* @brief Accumulate data into the target buffer.
|
|
*
|
|
* @param[in] target - Target buffer
|
|
* @param[in] source - Source buffer
|
|
* @param[in] source_len - Length of source buffer in bytes
|
|
* @param[in] proc - Source proc
|
|
* @param[in] count - Number of elements in target buffer
|
|
* @param[in] datatype - Type of elements in target buffer
|
|
* @param[in] op - Operation to be performed
|
|
*/
|
|
static inline int osc_rdma_accumulate_buffer (void *target, void *source, size_t source_len, ompi_proc_t *proc,
|
|
int count, ompi_datatype_t *datatype, ompi_op_t *op)
|
|
{
|
|
void *buffer = source;
|
|
int ret;
|
|
|
|
assert (NULL != target && NULL != source);
|
|
|
|
if (op == &ompi_mpi_op_replace.op) {
|
|
osc_rdma_copy_on_recv (target, source, source_len, proc, count, datatype);
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
|
|
if (proc->proc_arch != ompi_proc_local()->proc_arch) {
|
|
ompi_datatype_t *primitive_datatype = NULL;
|
|
uint32_t primitive_count;
|
|
size_t buflen;
|
|
|
|
ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
|
|
primitive_count *= count;
|
|
|
|
/* figure out how big a buffer we need */
|
|
ompi_datatype_type_size(primitive_datatype, &buflen);
|
|
buflen *= primitive_count;
|
|
|
|
buffer = malloc (buflen);
|
|
if (OPAL_UNLIKELY(NULL == buffer)) {
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
osc_rdma_copy_on_recv (buffer, source, source_len, proc, count, datatype);
|
|
}
|
|
#endif
|
|
|
|
/* copy the data from the temporary buffer into the user window */
|
|
ret = ompi_osc_base_process_op(target, buffer, source_len, datatype,
|
|
count, op);
|
|
|
|
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
|
|
if (proc->proc_arch != ompi_proc_local()->proc_arch) {
|
|
free(buffer);
|
|
}
|
|
#endif
|
|
|
|
return ret;
|
|
}
|
|
|
|
/**
|
|
* @brief Create an accumulate data object.
|
|
*
|
|
* @param[in] module - RDMA OSC module
|
|
* @param[in] target - Target for the accumulation
|
|
* @param[in] source - Source of accumulate data. Must be allocated with malloc/calloc/etc
|
|
* @param[in] source_len - Length of the source buffer in bytes
|
|
* @param[in] proc - Source proc
|
|
* @param[in] count - Number of elements to accumulate
|
|
* @param[in] datatype - Datatype to accumulate
|
|
* @oaram[in] op - Operator
|
|
* @param[in] request_count - Number of prerequisite requests
|
|
* @param[out] acc_data_out - New accumulation data
|
|
*
|
|
* This function is used to create a copy of the data needed to perform an accumulation.
|
|
* This data should be provided to ompi_osc_rdma_isend_w_cb or ompi_osc_rdma_irecv_w_cb
|
|
* as the ctx parameter with accumulate_cb as the cb parameter.
|
|
*/
|
|
static int osc_rdma_accumulate_allocate (ompi_osc_rdma_module_t *module, int peer, void *target, void *source, size_t source_len,
|
|
ompi_proc_t *proc, int count, ompi_datatype_t *datatype, ompi_op_t *op,
|
|
int request_count, osc_rdma_accumulate_data_t **acc_data_out)
|
|
{
|
|
osc_rdma_accumulate_data_t *acc_data;
|
|
|
|
acc_data = OBJ_NEW(osc_rdma_accumulate_data_t);
|
|
if (OPAL_UNLIKELY(NULL == acc_data)) {
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
acc_data->module = module;
|
|
acc_data->peer = peer;
|
|
acc_data->target = target;
|
|
acc_data->source = source;
|
|
acc_data->source_len = source_len;
|
|
acc_data->proc = proc;
|
|
acc_data->count = count;
|
|
acc_data->datatype = datatype;
|
|
OBJ_RETAIN(datatype);
|
|
acc_data->op = op;
|
|
OBJ_RETAIN(op);
|
|
acc_data->request_count = request_count;
|
|
|
|
*acc_data_out = acc_data;
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
* @brief Execute the accumulate once the request counter reaches 0.
|
|
*
|
|
* @param[in] request - request
|
|
*
|
|
* The request should be created with ompi_osc_rdma_isend_w_cb or ompi_osc_rdma_irecv_w_cb
|
|
* with ctx allocated by osc_rdma_accumulate_allocate. This callback will free the accumulate
|
|
* data once the accumulation operation is complete.
|
|
*/
|
|
static int accumulate_cb (ompi_request_t *request)
|
|
{
|
|
struct osc_rdma_accumulate_data_t *acc_data = (struct osc_rdma_accumulate_data_t *) request->req_complete_cb_data;
|
|
ompi_osc_rdma_module_t *module = acc_data->module;
|
|
int rank = MPI_PROC_NULL;
|
|
int ret = OMPI_SUCCESS;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"accumulate_cb, request_count = %d", acc_data->request_count));
|
|
|
|
if (request->req_status.MPI_TAG & 0x01) {
|
|
rank = acc_data->peer;
|
|
}
|
|
|
|
mark_incoming_completion (module, rank);
|
|
|
|
OPAL_THREAD_LOCK(&module->lock);
|
|
if (0 == --acc_data->request_count) {
|
|
/* no more requests needed before the buffer can be accumulated */
|
|
|
|
if (acc_data->source) {
|
|
ret = osc_rdma_accumulate_buffer (acc_data->target, acc_data->source, acc_data->source_len,
|
|
acc_data->proc, acc_data->count, acc_data->datatype, acc_data->op);
|
|
}
|
|
|
|
/* drop the accumulate lock */
|
|
ompi_osc_rdma_accumulate_unlock (module);
|
|
|
|
osc_rdma_gc_add_buffer (&acc_data->super);
|
|
}
|
|
|
|
/* put this request on the garbage colletion list */
|
|
osc_rdma_gc_add_request (request);
|
|
|
|
OPAL_THREAD_UNLOCK(&module->lock);
|
|
|
|
return ret;
|
|
}
|
|
|
|
|
|
static int ompi_osc_rdma_acc_op_queue (ompi_osc_rdma_module_t *module, ompi_osc_rdma_header_t *header, int source,
|
|
char *data, size_t data_len, ompi_datatype_t *datatype)
|
|
{
|
|
osc_rdma_pending_acc_t *pending_acc;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"%d: queuing accumulate operation", ompi_comm_size (module->comm)));
|
|
|
|
pending_acc = OBJ_NEW(osc_rdma_pending_acc_t);
|
|
if (OPAL_UNLIKELY(NULL == pending_acc)) {
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
if (!ompi_osc_rdma_no_locks) {
|
|
/* NTH: ensure we don't leave wait/process_flush/etc until this
|
|
* accumulate operation is complete. */
|
|
module->passive_incoming_frag_signal_count[source]++;
|
|
}
|
|
|
|
pending_acc->source = source;
|
|
|
|
/* save any inline data (eager acc, gacc only) */
|
|
pending_acc->data_len = data_len;
|
|
|
|
if (data_len) {
|
|
pending_acc->data = malloc (data_len);
|
|
memcpy (pending_acc->data, data, data_len);
|
|
}
|
|
|
|
/* save the datatype */
|
|
pending_acc->datatype = datatype;
|
|
OBJ_RETAIN(datatype);
|
|
|
|
/* save the header */
|
|
switch (header->base.type) {
|
|
case OMPI_OSC_RDMA_HDR_TYPE_ACC:
|
|
case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG:
|
|
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC:
|
|
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
|
|
pending_acc->header.acc = header->acc;
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_CSWAP:
|
|
pending_acc->header.cswap = header->cswap;
|
|
break;
|
|
default:
|
|
/* it is a coding error if any other header types are queued this way */
|
|
assert (0);
|
|
}
|
|
|
|
/* add to the pending acc queue */
|
|
OPAL_THREAD_LOCK(&module->lock);
|
|
opal_list_append (&module->pending_acc, &pending_acc->super);
|
|
OPAL_THREAD_UNLOCK(&module->lock);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
static int replace_cb (ompi_request_t *request)
|
|
{
|
|
ompi_osc_rdma_module_t *module = (ompi_osc_rdma_module_t *) request->req_complete_cb_data;
|
|
int rank = MPI_PROC_NULL;
|
|
|
|
if (request->req_status.MPI_TAG & 0x01) {
|
|
rank = request->req_status.MPI_SOURCE;
|
|
}
|
|
|
|
mark_incoming_completion (module, rank);
|
|
|
|
/* put this request on the garbage colletion list */
|
|
osc_rdma_gc_add_request (request);
|
|
|
|
/* unlock the accumulate lock */
|
|
ompi_osc_rdma_accumulate_unlock (module);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
* ompi_osc_rdma_acc_start:
|
|
*
|
|
* @brief Start an accumulate with data operation.
|
|
*
|
|
* @param[in] module - OSC RDMA module
|
|
* @param[in] source - Source rank
|
|
* @param[in] data - Accumulate data
|
|
* @param[in] data_len - Length of the accumulate data
|
|
* @param[in] datatype - Accumulation datatype
|
|
* @param[in] acc_header - Accumulate header
|
|
*
|
|
* The module's accumulation lock must be held before calling this
|
|
* function. It will release the lock when the operation is complete.
|
|
*/
|
|
static int ompi_osc_rdma_acc_start (ompi_osc_rdma_module_t *module, int source, void *data, size_t data_len,
|
|
ompi_datatype_t *datatype, ompi_osc_rdma_header_acc_t *acc_header)
|
|
{
|
|
void *target = (unsigned char*) module->baseptr +
|
|
((unsigned long) acc_header->displacement * module->disp_unit);
|
|
struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
|
|
ompi_proc_t *proc;
|
|
int ret;
|
|
|
|
proc = ompi_comm_peer_lookup(module->comm, source);
|
|
assert (NULL != proc);
|
|
|
|
ret = osc_rdma_accumulate_buffer (target, data, data_len, proc, acc_header->count,
|
|
datatype, op);
|
|
|
|
OBJ_RELEASE(op);
|
|
|
|
ompi_osc_rdma_accumulate_unlock (module);
|
|
|
|
return ret;
|
|
}
|
|
|
|
/**
|
|
* ompi_osc_rdma_acc_start:
|
|
*
|
|
* @brief Start a long accumulate operation.
|
|
*
|
|
* @param[in] module - OSC RDMA module
|
|
* @param[in] source - Source rank
|
|
* @param[in] datatype - Accumulation datatype
|
|
* @param[in] acc_header - Accumulate header
|
|
*
|
|
* The module's accumulation lock must be held before calling this
|
|
* function. It will release the lock when the operation is complete.
|
|
*/
|
|
static int ompi_osc_rdma_acc_long_start (ompi_osc_rdma_module_t *module, int source, ompi_datatype_t *datatype,
|
|
ompi_osc_rdma_header_acc_t *acc_header) {
|
|
struct osc_rdma_accumulate_data_t *acc_data;
|
|
size_t buflen;
|
|
void *buffer;
|
|
ompi_proc_t *proc;
|
|
void *target = (unsigned char*) module->baseptr +
|
|
((unsigned long) acc_header->displacement * module->disp_unit);
|
|
struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
|
|
ompi_datatype_t *primitive_datatype;
|
|
uint32_t primitive_count;
|
|
int ret;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"ompi_osc_rdma_acc_long_start starting..."));
|
|
|
|
proc = ompi_comm_peer_lookup(module->comm, source);
|
|
assert (NULL != proc);
|
|
|
|
do {
|
|
if (op == &ompi_mpi_op_replace.op) {
|
|
ret = ompi_osc_rdma_irecv_w_cb (target, acc_header->count, datatype, source,
|
|
acc_header->tag, module->comm, NULL,
|
|
replace_cb, module);
|
|
break;
|
|
}
|
|
|
|
ret = ompi_osc_base_get_primitive_type_info (datatype, &primitive_datatype, &primitive_count);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
break;
|
|
}
|
|
|
|
primitive_count *= acc_header->count;
|
|
|
|
buflen = datatype_buffer_length (datatype, acc_header->count);
|
|
|
|
/* allocate a temporary buffer to receive the accumulate data */
|
|
buffer = malloc (buflen);
|
|
if (OPAL_UNLIKELY(NULL == buffer)) {
|
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
|
break;
|
|
}
|
|
|
|
ret = osc_rdma_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count,
|
|
datatype, op, 1, &acc_data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
free (buffer);
|
|
break;
|
|
}
|
|
|
|
ret = ompi_osc_rdma_irecv_w_cb (buffer, primitive_count, primitive_datatype, source,
|
|
acc_header->tag, module->comm, NULL, accumulate_cb, acc_data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
OBJ_RELEASE(acc_data);
|
|
}
|
|
} while (0);
|
|
|
|
OBJ_RELEASE(op);
|
|
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
ompi_osc_rdma_accumulate_unlock (module);
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
/**
|
|
* ompi_osc_rdma_gacc_start:
|
|
*
|
|
* @brief Start a accumulate with data + get operation.
|
|
*
|
|
* @param[in] module - OSC RDMA module
|
|
* @param[in] source - Source rank
|
|
* @param[in] data - Accumulate data. Must be allocated on the heap.
|
|
* @param[in] data_len - Length of the accumulate data
|
|
* @param[in] datatype - Accumulation datatype
|
|
* @param[in] get_acc_header - Accumulate header
|
|
*
|
|
* The module's accumulation lock must be held before calling this
|
|
* function. It will release the lock when the operation is complete.
|
|
*/
|
|
static int ompi_osc_rdma_gacc_start (ompi_osc_rdma_module_t *module, int source, void *data, size_t data_len,
|
|
ompi_datatype_t *datatype, ompi_osc_rdma_header_acc_t *acc_header)
|
|
{
|
|
void *target = (unsigned char*) module->baseptr +
|
|
((unsigned long) acc_header->displacement * module->disp_unit);
|
|
struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
|
|
struct osc_rdma_accumulate_data_t *acc_data;
|
|
ompi_proc_t *proc;
|
|
int ret;
|
|
|
|
proc = ompi_comm_peer_lookup(module->comm, source);
|
|
assert (NULL != proc);
|
|
|
|
do {
|
|
ret = osc_rdma_accumulate_allocate (module, source, target, data, data_len, proc, acc_header->count,
|
|
datatype, op, 1, &acc_data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
break;
|
|
}
|
|
|
|
ret = ompi_osc_rdma_isend_w_cb (target, acc_header->count, datatype, source, acc_header->tag,
|
|
module->comm, accumulate_cb, acc_data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
OBJ_RELEASE(acc_data);
|
|
}
|
|
} while (0);
|
|
|
|
OBJ_RELEASE(op);
|
|
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
ompi_osc_rdma_accumulate_unlock (module);
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
/**
|
|
* ompi_osc_rdma_gacc_long_start:
|
|
*
|
|
* @brief Start a long accumulate + get operation.
|
|
*
|
|
* @param[in] module - OSC RDMA module
|
|
* @param[in] source - Source rank
|
|
* @param[in] datatype - Accumulation datatype
|
|
* @param[in] acc_header - Accumulate header
|
|
*
|
|
* The module's accumulation lock must be held before calling this
|
|
* function. It will release the lock when the operation is complete.
|
|
*/
|
|
static int ompi_osc_gacc_long_start (ompi_osc_rdma_module_t *module, int source, ompi_datatype_t *datatype,
|
|
ompi_osc_rdma_header_acc_t *acc_header)
|
|
{
|
|
void *target = (unsigned char*) module->baseptr +
|
|
((unsigned long) acc_header->displacement * module->disp_unit);
|
|
struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
|
|
struct osc_rdma_accumulate_data_t *acc_data;
|
|
ompi_datatype_t *primitive_datatype;
|
|
ompi_request_t *recv_request;
|
|
uint32_t primitive_count;
|
|
ompi_proc_t *proc;
|
|
size_t buflen;
|
|
void *buffer;
|
|
int ret;
|
|
|
|
proc = ompi_comm_peer_lookup(module->comm, source);
|
|
assert (NULL != proc);
|
|
|
|
/* allocate a temporary buffer to receive the accumulate data */
|
|
buflen = datatype_buffer_length (datatype, acc_header->count);
|
|
|
|
do {
|
|
buffer = malloc (buflen);
|
|
if (OPAL_UNLIKELY(NULL == buffer)) {
|
|
ret = OMPI_ERR_OUT_OF_RESOURCE;
|
|
break;
|
|
}
|
|
|
|
ret = ompi_osc_base_get_primitive_type_info (datatype, &primitive_datatype, &primitive_count);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
break;
|
|
}
|
|
|
|
primitive_count *= acc_header->count;
|
|
|
|
ret = osc_rdma_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count,
|
|
datatype, op, 2, &acc_data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
break;
|
|
}
|
|
|
|
ret = ompi_osc_rdma_irecv_w_cb (buffer, acc_header->count, datatype, source, acc_header->tag,
|
|
module->comm, &recv_request, accumulate_cb, acc_data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
OBJ_RELEASE(acc_data);
|
|
break;
|
|
}
|
|
|
|
ret = ompi_osc_rdma_isend_w_cb (target, primitive_count, primitive_datatype, source, acc_header->tag,
|
|
module->comm, accumulate_cb, acc_data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
/* cancel the receive and free the accumulate data */
|
|
ompi_request_cancel (recv_request);
|
|
OBJ_RELEASE(acc_data);
|
|
break;
|
|
}
|
|
} while (0);
|
|
|
|
OBJ_RELEASE(op);
|
|
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
ompi_osc_rdma_accumulate_unlock (module);
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
/**
|
|
* ompi_osc_rdma_cswap_start:
|
|
*
|
|
* @brief Start a compare and swap operation
|
|
*
|
|
* @param[in] module - OSC RDMA module
|
|
* @param[in] source - Source rank
|
|
* @param[in] data - Compare and swap data
|
|
* @param[in] data_len - Length of the compare and swap data. Must be exactly
|
|
* twice the size of the datatype.
|
|
* @param[in] datatype - Compare and swap datatype
|
|
* @param[in] cswap_header - Compare and swap header
|
|
*
|
|
* The module's accumulation lock must be held before calling this
|
|
* function. It will release the lock when the operation is complete.
|
|
*/
|
|
static int ompi_osc_rdma_cswap_start (ompi_osc_rdma_module_t *module, int source, void *data, ompi_datatype_t *datatype,
|
|
ompi_osc_rdma_header_cswap_t *cswap_header)
|
|
{
|
|
void *target = (unsigned char*) module->baseptr +
|
|
((unsigned long) cswap_header->displacement * module->disp_unit);
|
|
void *compare_addr, *origin_addr;
|
|
size_t datatype_size;
|
|
ompi_proc_t *proc;
|
|
int ret;
|
|
|
|
proc = ompi_comm_peer_lookup(module->comm, source);
|
|
assert (NULL != proc);
|
|
|
|
datatype_size = datatype->super.size;
|
|
|
|
origin_addr = data;
|
|
compare_addr = (void *)((uintptr_t) data + datatype_size);
|
|
|
|
do {
|
|
/* no reason to do a non-blocking send here */
|
|
ret = MCA_PML_CALL(send(target, 1, datatype, source, cswap_header->tag, MCA_PML_BASE_SEND_STANDARD,
|
|
module->comm));
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
break;
|
|
}
|
|
|
|
/* increment the incoming fragment count so it matches what is expected */
|
|
mark_incoming_completion (module, (cswap_header->tag & 0x1) ? source : MPI_PROC_NULL);
|
|
|
|
if (0 == memcmp (target, compare_addr, datatype_size)) {
|
|
osc_rdma_copy_on_recv (target, origin_addr, datatype_size, proc, 1, datatype);
|
|
}
|
|
} while (0);
|
|
|
|
ompi_osc_rdma_accumulate_unlock (module);
|
|
|
|
return ret;
|
|
}
|
|
|
|
/**
|
|
* ompi_osc_rdma_progress_pending_acc:
|
|
*
|
|
* @brief Progress one pending accumulation or compare and swap operation.
|
|
*
|
|
* @param[in] module - OSC RDMA module
|
|
*
|
|
* If the accumulation lock can be aquired progress one pending
|
|
* accumulate or compare and swap operation.
|
|
*/
|
|
int ompi_osc_rdma_progress_pending_acc (ompi_osc_rdma_module_t *module)
|
|
{
|
|
osc_rdma_pending_acc_t *pending_acc;
|
|
int ret;
|
|
|
|
/* try to aquire the lock. it will be unlocked when the accumulate or cswap
|
|
* operation completes */
|
|
if (ompi_osc_rdma_accumulate_trylock (module)) {
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
pending_acc = (osc_rdma_pending_acc_t *) opal_list_remove_first (&module->pending_acc);
|
|
if (OPAL_UNLIKELY(NULL == pending_acc)) {
|
|
/* called without any pending accumulation operations */
|
|
ompi_osc_rdma_accumulate_unlock (module);
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
switch (pending_acc->header.base.type) {
|
|
case OMPI_OSC_RDMA_HDR_TYPE_ACC:
|
|
ret = ompi_osc_rdma_acc_start (module, pending_acc->source, pending_acc->data, pending_acc->data_len,
|
|
pending_acc->datatype, &pending_acc->header.acc);
|
|
free (pending_acc->data);
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG:
|
|
ret = ompi_osc_rdma_acc_long_start (module, pending_acc->source, pending_acc->datatype,
|
|
&pending_acc->header.acc);
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC:
|
|
ret = ompi_osc_rdma_gacc_start (module, pending_acc->source, pending_acc->data,
|
|
pending_acc->data_len, pending_acc->datatype,
|
|
&pending_acc->header.acc);
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
|
|
ret = ompi_osc_gacc_long_start (module, pending_acc->source, pending_acc->datatype,
|
|
&pending_acc->header.acc);
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_CSWAP:
|
|
ret = ompi_osc_rdma_cswap_start (module, pending_acc->source, pending_acc->data,
|
|
pending_acc->datatype, &pending_acc->header.cswap);
|
|
break;
|
|
default:
|
|
ret = OMPI_ERROR;
|
|
/* it is a coding error if this point is reached */
|
|
assert (0);
|
|
}
|
|
|
|
if (!ompi_osc_rdma_no_locks) {
|
|
/* signal that an operation is complete */
|
|
mark_incoming_completion (module, pending_acc->source);
|
|
}
|
|
|
|
pending_acc->data = NULL;
|
|
OBJ_RELEASE(pending_acc);
|
|
|
|
return ret;
|
|
}
|
|
|
|
static inline int process_acc (ompi_osc_rdma_module_t *module, int source,
|
|
ompi_osc_rdma_header_acc_t *acc_header)
|
|
{
|
|
char *data = (char *) (acc_header + 1);
|
|
struct ompi_datatype_t *datatype;
|
|
uint64_t data_len;
|
|
int ret;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"%d: process_acc: received message from %d",
|
|
ompi_comm_rank(module->comm),
|
|
source));
|
|
|
|
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
data_len = acc_header->len - ((char*) data - (char*) acc_header);
|
|
|
|
/* try to aquire the accumulate lock */
|
|
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
|
|
ret = ompi_osc_rdma_acc_start (module, source, data, data_len, datatype,
|
|
acc_header);
|
|
} else {
|
|
/* couldn't aquire the accumulate lock so queue up the accumulate operation */
|
|
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) acc_header,
|
|
source, data, data_len, datatype);
|
|
}
|
|
|
|
/* Release datatype & op */
|
|
OBJ_RELEASE(datatype);
|
|
|
|
return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
|
|
}
|
|
|
|
static inline int process_acc_long (ompi_osc_rdma_module_t* module, int source,
|
|
ompi_osc_rdma_header_acc_t* acc_header)
|
|
{
|
|
char *data = (char *) (acc_header + 1);
|
|
struct ompi_datatype_t *datatype;
|
|
int ret;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"%d: process_acc_long: received message from %d",
|
|
ompi_comm_rank(module->comm),
|
|
source));
|
|
|
|
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
|
|
ret = ompi_osc_rdma_acc_long_start (module, source, datatype, acc_header);
|
|
} else {
|
|
/* queue the operation */
|
|
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) acc_header, source,
|
|
NULL, 0, datatype);
|
|
}
|
|
|
|
/* Release datatype & op */
|
|
OBJ_RELEASE(datatype);
|
|
|
|
return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
|
|
}
|
|
|
|
static inline int process_get_acc(ompi_osc_rdma_module_t *module, int source,
|
|
ompi_osc_rdma_header_acc_t *acc_header)
|
|
{
|
|
char *data = (char *) (acc_header + 1);
|
|
struct ompi_datatype_t *datatype;
|
|
void *buffer = NULL;
|
|
uint64_t data_len;
|
|
int ret;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"%d: process_get_acc: received message from %d",
|
|
ompi_comm_rank(module->comm),
|
|
source));
|
|
|
|
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
data_len = acc_header->len - ((char*) data - (char*) acc_header);
|
|
|
|
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
|
|
/* make a copy of the data since the buffer needs to be returned */
|
|
if (data_len) {
|
|
buffer = malloc (data_len);
|
|
if (OPAL_UNLIKELY(NULL == buffer)) {
|
|
OBJ_RELEASE(datatype);
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
memcpy (buffer, data, data_len);
|
|
}
|
|
|
|
ret = ompi_osc_rdma_gacc_start (module, source, buffer, data_len, datatype,
|
|
acc_header);
|
|
} else {
|
|
/* queue the operation */
|
|
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) acc_header,
|
|
source, data, data_len, datatype);
|
|
}
|
|
|
|
/* Release datatype & op */
|
|
OBJ_RELEASE(datatype);
|
|
|
|
return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
|
|
}
|
|
|
|
static inline int process_get_acc_long(ompi_osc_rdma_module_t *module, int source,
|
|
ompi_osc_rdma_header_acc_t *acc_header)
|
|
{
|
|
char *data = (char *) (acc_header + 1);
|
|
struct ompi_datatype_t *datatype;
|
|
int ret;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"%d: process_acc: received message from %d",
|
|
ompi_comm_rank(module->comm),
|
|
source));
|
|
|
|
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
|
|
ret = ompi_osc_gacc_long_start (module, source, datatype, acc_header);
|
|
} else {
|
|
/* queue the operation */
|
|
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) acc_header,
|
|
source, NULL, 0, datatype);
|
|
}
|
|
|
|
/* Release datatype & op */
|
|
OBJ_RELEASE(datatype);
|
|
|
|
return OMPI_SUCCESS == ret ? (int) acc_header->len : ret;
|
|
}
|
|
|
|
|
|
static inline int process_cswap (ompi_osc_rdma_module_t *module, int source,
|
|
ompi_osc_rdma_header_cswap_t *cswap_header)
|
|
{
|
|
char *data = (char*) (cswap_header + 1);
|
|
struct ompi_datatype_t *datatype;
|
|
int ret;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"%d: process_cswap: received message from %d",
|
|
ompi_comm_rank(module->comm),
|
|
source));
|
|
|
|
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
return ret;
|
|
}
|
|
|
|
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
|
|
ret = ompi_osc_rdma_cswap_start (module, source, data, datatype, cswap_header);
|
|
} else {
|
|
/* queue the operation */
|
|
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) cswap_header, source,
|
|
data, 2 * datatype->super.size, datatype);
|
|
}
|
|
|
|
/* Release datatype */
|
|
OBJ_RELEASE(datatype);
|
|
|
|
return (OMPI_SUCCESS == ret) ? (int) cswap_header->len : ret;
|
|
}
|
|
|
|
static inline int process_complete (ompi_osc_rdma_module_t *module, int source,
|
|
ompi_osc_rdma_header_complete_t *complete_header)
|
|
{
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: process_complete got complete message from %d. expected fragment count %d",
|
|
source, complete_header->frag_count));
|
|
|
|
OPAL_THREAD_LOCK(&module->lock);
|
|
|
|
/* the current fragment is not part of the frag_count so we need to add it here */
|
|
module->active_incoming_frag_signal_count += complete_header->frag_count + 1;
|
|
module->num_complete_msgs++;
|
|
|
|
if (0 == module->num_complete_msgs) {
|
|
opal_condition_broadcast (&module->cond);
|
|
}
|
|
|
|
OPAL_THREAD_UNLOCK(&module->lock);
|
|
|
|
return sizeof (*complete_header);
|
|
}
|
|
|
|
/* flush and unlock headers cannot be processed from the request callback
|
|
* because some btls do not provide re-entrant progress functions. these
|
|
* fragment will be progressed by the rdma component's progress function */
|
|
static inline int process_flush (ompi_osc_rdma_module_t *module, int source,
|
|
ompi_osc_rdma_header_flush_t *flush_header)
|
|
{
|
|
int ret;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"process_flush header = {.frag_count = %d}", flush_header->frag_count));
|
|
|
|
/* increase signal count by incoming frags */
|
|
module->passive_incoming_frag_signal_count[source] += flush_header->frag_count;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"%d: process_flush: received message from %d. passive_incoming_frag_signal_count = %d, passive_incoming_frag_count = %d",
|
|
ompi_comm_rank(module->comm), source, module->passive_incoming_frag_signal_count[source], module->passive_incoming_frag_count[source]));
|
|
|
|
ret = ompi_osc_rdma_process_flush (module, source, flush_header);
|
|
if (OMPI_SUCCESS != ret) {
|
|
ompi_osc_rdma_pending_t *pending;
|
|
|
|
pending = OBJ_NEW(ompi_osc_rdma_pending_t);
|
|
pending->module = module;
|
|
pending->source = source;
|
|
pending->header.flush = *flush_header;
|
|
|
|
OPAL_THREAD_LOCK(&mca_osc_rdma_component.lock);
|
|
opal_list_append (&mca_osc_rdma_component.pending_operations, &pending->super);
|
|
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.lock);
|
|
|
|
/* we now have to count the current fragment */
|
|
module->passive_incoming_frag_signal_count[source]++;
|
|
} else {
|
|
/* need to account for the current fragment */
|
|
module->passive_incoming_frag_count[source] = -1;
|
|
}
|
|
|
|
return sizeof (*flush_header);
|
|
}
|
|
|
|
static inline int process_unlock (ompi_osc_rdma_module_t *module, int source,
|
|
ompi_osc_rdma_header_unlock_t *unlock_header)
|
|
{
|
|
int ret;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"process_unlock header = {.frag_count = %d}", unlock_header->frag_count));
|
|
|
|
/* increase signal count by incoming frags */
|
|
module->passive_incoming_frag_signal_count[source] += unlock_header->frag_count;
|
|
|
|
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: processing unlock request from %d. frag count = %d, signal_count = %d, processed_count = %d",
|
|
source, unlock_header->frag_count, (int) module->passive_incoming_frag_signal_count[source],
|
|
(int) module->passive_incoming_frag_count[source]));
|
|
|
|
ret = ompi_osc_rdma_process_unlock (module, source, unlock_header);
|
|
if (OMPI_SUCCESS != ret) {
|
|
ompi_osc_rdma_pending_t *pending;
|
|
|
|
pending = OBJ_NEW(ompi_osc_rdma_pending_t);
|
|
pending->module = module;
|
|
pending->source = source;
|
|
pending->header.unlock = *unlock_header;
|
|
|
|
OPAL_THREAD_LOCK(&mca_osc_rdma_component.lock);
|
|
opal_list_append (&mca_osc_rdma_component.pending_operations, &pending->super);
|
|
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.lock);
|
|
|
|
/* we now have to count the current fragment */
|
|
module->passive_incoming_frag_signal_count[source]++;
|
|
} else {
|
|
/* need to account for the current fragment */
|
|
module->passive_incoming_frag_count[source] = -1;
|
|
}
|
|
|
|
return sizeof (*unlock_header);
|
|
}
|
|
|
|
static int process_large_datatype_request_cb (ompi_request_t *request)
|
|
{
|
|
ompi_osc_rdma_ddt_buffer_t *ddt_buffer = (ompi_osc_rdma_ddt_buffer_t *) request->req_complete_cb_data;
|
|
ompi_osc_rdma_module_t *module = ddt_buffer->module;
|
|
ompi_osc_rdma_header_t *header = ddt_buffer->header;
|
|
int source = ddt_buffer->source;
|
|
|
|
/* process the request */
|
|
switch (header->base.type) {
|
|
case OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG:
|
|
(void) process_put_long (module, source, &header->put);
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_GET:
|
|
(void) process_get (module, source, &header->get);
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG:
|
|
(void) process_acc_long (module, source, &header->acc);
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
|
|
(void) process_get_acc_long (module, source, &header->acc);
|
|
break;
|
|
default:
|
|
/* developer error */
|
|
assert (0);
|
|
return OMPI_ERROR;
|
|
}
|
|
|
|
/* put this request on the garbage colletion list */
|
|
osc_rdma_gc_add_request (request);
|
|
|
|
/* free the datatype buffer */
|
|
osc_rdma_gc_add_buffer (&ddt_buffer->super);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
/**
|
|
* @short process a request with a large datatype
|
|
*
|
|
* @param[in] module - OSC RDMA module
|
|
* @param[in] source - header source
|
|
* @param[in] header - header to process
|
|
*
|
|
* It is possible to construct datatypes whos description is too large
|
|
* to fit in an OSC RDMA fragment. In this case the remote side posts
|
|
* a send of the datatype description. This function posts the matching
|
|
* receive and processes the header on completion.
|
|
*/
|
|
static int process_large_datatype_request (ompi_osc_rdma_module_t *module, int source, ompi_osc_rdma_header_t *header)
|
|
{
|
|
ompi_osc_rdma_ddt_buffer_t *ddt_buffer;
|
|
int header_len, tag, ret;
|
|
uint64_t ddt_len;
|
|
|
|
/* determine the header size and receive tag */
|
|
switch (header->base.type) {
|
|
case OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG:
|
|
header_len = sizeof (header->put);
|
|
tag = header->put.tag;
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_GET:
|
|
header_len = sizeof (header->get);
|
|
tag = header->get.tag;
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG:
|
|
header_len = sizeof (header->acc);
|
|
tag = header->acc.tag;
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
|
|
header_len = sizeof (header->acc);
|
|
tag = header->acc.tag;
|
|
break;
|
|
default:
|
|
/* developer error */
|
|
opal_output (0, "Unsupported header/flag combination");
|
|
return OMPI_ERROR;
|
|
}
|
|
|
|
ddt_len = *((uint64_t *)((uintptr_t) header + header_len));
|
|
|
|
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
|
|
"process_large_datatype_request: processing fragment with type %d. ddt_len %lu",
|
|
header->base.type, (unsigned long) ddt_len));
|
|
|
|
ddt_buffer = OBJ_NEW(ompi_osc_rdma_ddt_buffer_t);
|
|
if (OPAL_UNLIKELY(NULL == ddt_buffer)) {
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
ddt_buffer->module = module;
|
|
ddt_buffer->source = source;
|
|
|
|
ddt_buffer->header = malloc (ddt_len + header_len);
|
|
if (OPAL_UNLIKELY(NULL == ddt_buffer->header)) {
|
|
OBJ_RELEASE(ddt_buffer);
|
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
|
}
|
|
|
|
memcpy (ddt_buffer->header, header, header_len);
|
|
|
|
ret = ompi_osc_rdma_irecv_w_cb ((void *)((uintptr_t) ddt_buffer->header + header_len),
|
|
ddt_len, MPI_BYTE, source, tag, module->comm, NULL,
|
|
process_large_datatype_request_cb, ddt_buffer);
|
|
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
|
|
OBJ_RELEASE(ddt_buffer);
|
|
return ret;
|
|
}
|
|
|
|
return header_len + 8;
|
|
}
|
|
|
|
/*
|
|
* Do all the data movement associated with a fragment
|
|
*/
|
|
static inline int process_frag (ompi_osc_rdma_module_t *module,
|
|
ompi_osc_rdma_frag_header_t *frag)
|
|
{
|
|
ompi_osc_rdma_header_t *header;
|
|
int ret;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: process_frag: from %d, ops %d",
|
|
(int) frag->source, (int) frag->num_ops));
|
|
|
|
header = (ompi_osc_rdma_header_t *) (frag + 1);
|
|
|
|
for (int i = 0 ; i < frag->num_ops ; ++i) {
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: process_frag: type 0x%x. flag 0x%x. offset %u",
|
|
header->base.type, (unsigned) ((uintptr_t)header - (uintptr_t)frag),
|
|
header->base.flags));
|
|
|
|
if (OPAL_LIKELY(!(header->base.flags & OMPI_OSC_RDMA_HDR_FLAG_LARGE_DATATYPE))) {
|
|
switch (header->base.type) {
|
|
case OMPI_OSC_RDMA_HDR_TYPE_PUT:
|
|
ret = process_put(module, frag->source, &header->put);
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG:
|
|
ret = process_put_long(module, frag->source, &header->put);
|
|
break;
|
|
|
|
case OMPI_OSC_RDMA_HDR_TYPE_ACC:
|
|
ret = process_acc(module, frag->source, &header->acc);
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG:
|
|
ret = process_acc_long (module, frag->source, &header->acc);
|
|
break;
|
|
|
|
case OMPI_OSC_RDMA_HDR_TYPE_LOCK_REQ:
|
|
ret = ompi_osc_rdma_process_lock(module, frag->source, &header->lock);
|
|
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
|
|
ret = sizeof (header->lock);
|
|
}
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_UNLOCK_REQ:
|
|
ret = process_unlock(module, frag->source, &header->unlock);
|
|
break;
|
|
|
|
case OMPI_OSC_RDMA_HDR_TYPE_GET:
|
|
ret = process_get (module, frag->source, &header->get);
|
|
break;
|
|
|
|
case OMPI_OSC_RDMA_HDR_TYPE_CSWAP:
|
|
ret = process_cswap (module, frag->source, &header->cswap);
|
|
break;
|
|
|
|
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC:
|
|
ret = process_get_acc (module, frag->source, &header->acc);
|
|
break;
|
|
|
|
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
|
|
ret = process_get_acc_long (module, frag->source, &header->acc);
|
|
break;
|
|
|
|
case OMPI_OSC_RDMA_HDR_TYPE_FLUSH_REQ:
|
|
ret = process_flush (module, frag->source, &header->flush);
|
|
break;
|
|
|
|
case OMPI_OSC_RDMA_HDR_TYPE_COMPLETE:
|
|
ret = process_complete (module, frag->source, &header->complete);
|
|
break;
|
|
|
|
default:
|
|
opal_output(0, "Unsupported fragment type 0x%x\n", header->base.type);
|
|
abort(); /* FIX ME */
|
|
}
|
|
} else {
|
|
ret = process_large_datatype_request (module, frag->source, header);
|
|
}
|
|
|
|
if (ret <= 0) {
|
|
opal_output(0, "Error processing fragment: %d", ret);
|
|
abort(); /* FIX ME */
|
|
}
|
|
|
|
/* the next header will start on an 8-byte boundary. this is done to ensure
|
|
* that the next header and the packed datatype is properly aligned */
|
|
header = (ompi_osc_rdma_header_t *) OPAL_ALIGN(((uintptr_t) header + ret), 8, uintptr_t);
|
|
}
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
/* dispatch for callback on message completion */
|
|
static int ompi_osc_rdma_callback (ompi_request_t *request)
|
|
{
|
|
ompi_osc_rdma_module_t *module = (ompi_osc_rdma_module_t *) request->req_complete_cb_data;
|
|
ompi_osc_rdma_header_base_t *base_header =
|
|
(ompi_osc_rdma_header_base_t *) module->incoming_buffer;
|
|
size_t incoming_length = request->req_status._ucount;
|
|
int source = request->req_status.MPI_SOURCE;
|
|
|
|
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
|
|
|
assert(incoming_length >= sizeof(ompi_osc_rdma_header_base_t));
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"received rdma callback for fragment. source = %d, count = %u, type = 0x%x",
|
|
source, (unsigned) incoming_length, base_header->type));
|
|
|
|
switch (base_header->type) {
|
|
case OMPI_OSC_RDMA_HDR_TYPE_FRAG:
|
|
process_frag(module, (ompi_osc_rdma_frag_header_t *) base_header);
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_POST:
|
|
(void) osc_rdma_incoming_post (module, source);
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_LOCK_ACK:
|
|
ompi_osc_rdma_process_lock_ack(module, (ompi_osc_rdma_header_lock_ack_t *) base_header);
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_FLUSH_ACK:
|
|
ompi_osc_rdma_process_flush_ack (module, source, (ompi_osc_rdma_header_flush_ack_t *) base_header);
|
|
break;
|
|
case OMPI_OSC_RDMA_HDR_TYPE_UNLOCK_ACK:
|
|
ompi_osc_rdma_process_unlock_ack (module, source, (ompi_osc_rdma_header_unlock_ack_t *) base_header);
|
|
break;
|
|
default:
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"received unexpected message of type %x",
|
|
(int) base_header->type));
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"finished processing incoming messages"));
|
|
|
|
/* restart the receive request */
|
|
OPAL_THREAD_LOCK(&module->lock);
|
|
|
|
mark_incoming_completion (module, (base_header->flags & OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET) ?
|
|
source : MPI_PROC_NULL);
|
|
|
|
osc_rdma_gc_clean ();
|
|
|
|
/* put this request on the garbage colletion list */
|
|
osc_rdma_gc_add_request (request);
|
|
ompi_osc_rdma_frag_start_receive (module);
|
|
|
|
OPAL_THREAD_UNLOCK(&module->lock);
|
|
|
|
OPAL_THREAD_LOCK(&ompi_request_lock);
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"finished posting receive request"));
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
int ompi_osc_rdma_frag_start_receive (ompi_osc_rdma_module_t *module)
|
|
{
|
|
return ompi_osc_rdma_irecv_w_cb (module->incoming_buffer, mca_osc_rdma_component.buffer_size + sizeof (ompi_osc_rdma_frag_header_t),
|
|
MPI_BYTE, OMPI_ANY_SOURCE, OSC_RDMA_FRAG_TAG, module->comm, &module->frag_request,
|
|
ompi_osc_rdma_callback, module);
|
|
}
|
|
|
|
int ompi_osc_rdma_component_irecv (ompi_osc_rdma_module_t *module, void *buf,
|
|
size_t count, struct ompi_datatype_t *datatype,
|
|
int src, int tag, struct ompi_communicator_t *comm)
|
|
{
|
|
return ompi_osc_rdma_irecv_w_cb (buf, count, datatype, src, tag, comm, NULL,
|
|
osc_rdma_incoming_req_complete, module);
|
|
}
|
|
|
|
|
|
static int
|
|
isend_completion_cb(ompi_request_t *request)
|
|
{
|
|
ompi_osc_rdma_module_t *module =
|
|
(ompi_osc_rdma_module_t*) request->req_complete_cb_data;
|
|
|
|
OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
|
|
"isend_completion_cb called"));
|
|
|
|
mark_outgoing_completion(module);
|
|
|
|
/* put this request on the garbage colletion list */
|
|
osc_rdma_gc_add_request (request);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
|
|
int
|
|
ompi_osc_rdma_component_isend(ompi_osc_rdma_module_t *module,
|
|
void *buf,
|
|
size_t count,
|
|
struct ompi_datatype_t *datatype,
|
|
int dest,
|
|
int tag,
|
|
struct ompi_communicator_t *comm)
|
|
{
|
|
return ompi_osc_rdma_isend_w_cb (buf, count, datatype, dest, tag, comm,
|
|
isend_completion_cb, module);
|
|
}
|
|
|
|
int ompi_osc_rdma_isend_w_cb (void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
|
|
ompi_communicator_t *comm, ompi_request_complete_fn_t cb, void *ctx)
|
|
{
|
|
ompi_request_t *request;
|
|
int ret;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: ompi_osc_rdma_isend_w_cb sending %d bytes to %d with tag %d",
|
|
count, target, tag));
|
|
|
|
ret = MCA_PML_CALL(isend_init(ptr, count, datatype, target, tag,
|
|
MCA_PML_BASE_SEND_STANDARD, comm, &request));
|
|
if (OMPI_SUCCESS != ret) {
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"error sending fragment. ret = %d", ret));
|
|
return ret;
|
|
}
|
|
|
|
request->req_complete_cb = cb;
|
|
request->req_complete_cb_data = ctx;
|
|
|
|
ret = MCA_PML_CALL(start(1, &request));
|
|
|
|
return ret;
|
|
}
|
|
|
|
int ompi_osc_rdma_irecv_w_cb (void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
|
|
ompi_communicator_t *comm, ompi_request_t **request_out,
|
|
ompi_request_complete_fn_t cb, void *ctx)
|
|
{
|
|
ompi_request_t *request;
|
|
int ret;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: ompi_osc_rdma_irecv_w_cb receiving %d bytes from %d with tag %d",
|
|
count, target, tag));
|
|
|
|
ret = MCA_PML_CALL(irecv_init(ptr, count, datatype, target, tag, comm, &request));
|
|
if (OMPI_SUCCESS != ret) {
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"error posting receive. ret = %d", ret));
|
|
return ret;
|
|
}
|
|
|
|
request->req_complete_cb = cb;
|
|
request->req_complete_cb_data = ctx;
|
|
if (request_out) {
|
|
*request_out = request;
|
|
}
|
|
|
|
ret = MCA_PML_CALL(start(1, &request));
|
|
|
|
return ret;
|
|
}
|