1
1
openmpi/ompi/mca/osc/rdma/osc_rdma_data_move.c

1537 строки
54 KiB
C
Исходник Обычный вид История

/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2006 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2009-2011 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_rdma.h"
#include "osc_rdma_header.h"
#include "osc_rdma_data_move.h"
#include "osc_rdma_obj_convert.h"
#include "osc_rdma_frag.h"
#include "osc_rdma_request.h"
#include "opal/threads/condition.h"
#include "opal/threads/mutex.h"
#include "opal/util/arch.h"
#include "opal/util/output.h"
#include "opal/sys/atomic.h"
#include "opal/align.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/pml_base_sendreq.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/mca/osc/base/base.h"
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
- Split the datatype engine into two parts: an MPI specific part in OMPI and a language agnostic part in OPAL. The convertor is completely moved into OPAL. This offers several benefits as described in RFC http://www.open-mpi.org/community/lists/devel/2009/07/6387.php namely: - Fewer basic types (int* and float* types, boolean and wchar - Fixing naming scheme to ompi-nomenclature. - Usability outside of the ompi-layer. - Due to the fixed nature of simple opal types, their information is completely known at compile time and therefore constified - With fewer datatypes (22), the actual sizes of bit-field types may be reduced from 64 to 32 bits, allowing reorganizing the opal_datatype structure, eliminating holes and keeping data required in convertor (upon send/recv) in one cacheline... This has implications to the convertor-datastructure and other parts of the code. - Several performance tests have been run, the netpipe latency does not change with this patch on Linux/x86-64 on the smoky cluster. - Extensive tests have been done to verify correctness (no new regressions) using: 1. mpi_test_suite on linux/x86-64 using clean ompi-trunk and ompi-ddt: a. running both trunk and ompi-ddt resulted in no differences (except for MPI_SHORT_INT and MPI_TYPE_MIX_LB_UB do now run correctly). b. with --enable-memchecker and running under valgrind (one buglet when run with static found in test-suite, commited) 2. ibm testsuite on linux/x86-64 using clean ompi-trunk and ompi-ddt: all passed (except for the dynamic/ tests failed!! as trunk/MTT) 3. compilation and usage of HDF5 tests on Jaguar using PGI and PathScale compilers. 4. compilation and usage on Scicortex. - Please note, that for the heterogeneous case, (-m32 compiled binaries/ompi), neither ompi-trunk, nor ompi-ddt branch would successfully launch. This commit was SVN r21641.
2009-07-13 08:56:31 +04:00
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/op/op.h"
#include "ompi/memchecker.h"
/**
* struct osc_rdma_accumulate_data_t:
*
* @short Data associated with an in-progress accumulation operation.
*/
struct osc_rdma_accumulate_data_t {
ompi_osc_rdma_module_t* module;
void *target;
void *source;
size_t source_len;
ompi_proc_t *proc;
int count;
ompi_datatype_t *datatype;
ompi_op_t *op;
int request_count;
};
/**
* osc_rdma_pending_acc_t:
*
* @short Keep track of accumulate and cswap operations that are
* waiting on the accumulate lock.
*
* @long Since accumulate operations may take several steps to
* complete we need to lock the accumulate lock until the operation
* is complete. While the lock is held it is possible that additional
* accumulate operations will arrive. This structure keep track of
* those operations.
*/
struct osc_rdma_pending_acc_t {
opal_list_item_t super;
ompi_osc_rdma_header_t header;
int source;
void *data;
size_t data_len;
ompi_datatype_t *datatype;
};
typedef struct osc_rdma_pending_acc_t osc_rdma_pending_acc_t;
static void osc_rdma_pending_acc_constructor (osc_rdma_pending_acc_t *pending)
{
pending->data = NULL;
pending->datatype = NULL;
}
static void osc_rdma_pending_acc_destructor (osc_rdma_pending_acc_t *pending)
{
if (NULL != pending->data) {
free (pending->data);
}
if (NULL != pending->datatype) {
OBJ_RELEASE(pending->datatype);
}
}
OBJ_CLASS_DECLARATION(osc_rdma_pending_acc_t);
OBJ_CLASS_INSTANCE(osc_rdma_pending_acc_t, opal_list_item_t,
osc_rdma_pending_acc_constructor, osc_rdma_pending_acc_destructor);
/* end ompi_osc_rdma_pending_acc_t class */
/**
* datatype_buffer_length:
*
* @short Determine the buffer size needed to hold count elements of datatype.
*
* @param[in] datatype - Element type
* @param[in] count - Element count
*
* @returns buflen Buffer length needed to hold count elements of datatype
*/
static inline int datatype_buffer_length (ompi_datatype_t *datatype, int count)
{
ompi_datatype_t *primitive_datatype = NULL;
uint32_t primitive_count;
size_t buflen;
ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
primitive_count *= count;
/* figure out how big a buffer we need */
ompi_datatype_type_size(primitive_datatype, &buflen);
return buflen * primitive_count;
}
/**
* ompi_osc_rdma_control_send:
*
* @short send a control message as part of a fragment
*
* @param[in] module - OSC RDMA module
* @param[in] target - Target peer's rank
* @param[in] data - Data to send
* @param[in] len - Length of data
*
* @returns error OMPI error code or OMPI_SUCCESS
*
* @long "send" a control messages. Adds it to the active fragment, so the
* caller will still need to explicitly flush (either to everyone or
* to a target) before this is sent.
*/
int ompi_osc_rdma_control_send (ompi_osc_rdma_module_t *module, int target,
void *data, size_t len)
{
ompi_osc_rdma_frag_t *frag;
char *ptr;
int ret;
OPAL_THREAD_LOCK(&module->lock);
ret = ompi_osc_rdma_frag_alloc(module, target, len, &frag, &ptr);
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
memcpy (ptr, data, len);
ret = ompi_osc_rdma_frag_finish(module, frag);
}
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}
static int ompi_osc_rdma_control_send_unbuffered_cb (ompi_request_t *request)
{
void *ctx = request->req_complete_cb_data;
ompi_osc_rdma_module_t *module;
/* get module pointer and data */
module = *(ompi_osc_rdma_module_t **)ctx;
/* mark this send as complete */
mark_outgoing_completion (module);
/* free the temporary buffer */
free (ctx);
/* put this request on the garbage collection list */
OPAL_THREAD_LOCK(&module->lock);
opal_list_append (&module->request_gc, (opal_list_item_t *) request);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
/**
* ompi_osc_rdma_control_send_unbuffered:
*
* @short Send an unbuffered control message to a peer.
*
* @param[in] module - OSC RDMA module
* @param[in] target - Target rank
* @param[in] data - Data to send
* @param[in] len - Length of data
*
* @long Directly send a control message. This does not allocate a
* fragment, so should only be used when sending other messages would
* be erroneous (such as complete messages, when there may be queued
* transactions from an overlapping post that has already heard back
* from its peer). The buffer specified by data will be available
* when this call returns.
*/
int ompi_osc_rdma_control_send_unbuffered(ompi_osc_rdma_module_t *module,
int target, void *data, size_t len)
{
void *ctx, *data_copy;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: sending unbuffered fragment to %d", target));
/* allocate a temporary buffer for this send */
ctx = malloc (sizeof(ompi_osc_rdma_module_t*) + len);
if (OPAL_UNLIKELY(NULL == ctx)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* increment outgoing signal count. this send is not part of a passive epoch
* so there it would be erroneous to increment the epoch counters. */
ompi_osc_signal_outgoing (module, MPI_PROC_NULL, 1);
/* store module pointer and data in the buffer */
*(ompi_osc_rdma_module_t**)ctx = module;
data_copy = (ompi_osc_rdma_module_t**)ctx + 1;
memcpy (data_copy, data, len);
return ompi_osc_rdma_isend_w_cb (data_copy, len, MPI_BYTE, target, OSC_RDMA_FRAG_TAG,
module->comm, ompi_osc_rdma_control_send_unbuffered_cb, ctx);
}
/**
* datatype_create:
*
* @short Utility function that creates a new datatype from a packed
* description.
*
* @param[in] module - OSC RDMA module
* @param[in] peer - Peer rank
* @param[out] datatype - New datatype. Must be released with OBJ_RELEASE.
* @param[out] proc - Optional. Proc for peer.
* @param[inout] data - Pointer to a pointer where the description is stored. This
* pointer will be updated to the location after the packed
* description.
*/
static inline int datatype_create (ompi_osc_rdma_module_t *module, int peer, ompi_proc_t **proc, ompi_datatype_t **datatype, void **data)
{
ompi_datatype_t *new_datatype = NULL;
ompi_proc_t *peer_proc;
int ret = OMPI_SUCCESS;
do {
peer_proc = ompi_comm_peer_lookup(module->comm, peer);
if (OPAL_UNLIKELY(NULL == peer_proc)) {
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
"%d: datatype_create: could not resolve proc pointer for peer %d",
ompi_comm_rank(module->comm),
peer));
ret = OMPI_ERROR;
break;
}
new_datatype = ompi_osc_base_datatype_create(peer_proc, data);
if (OPAL_UNLIKELY(NULL == new_datatype)) {
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
"%d: datatype_create: could not resolve datatype for peer %d",
ompi_comm_rank(module->comm), peer));
ret = OMPI_ERROR;
}
} while (0);
*datatype = new_datatype;
if (proc) *proc = peer_proc;
return ret;
}
/**
* process_put:
*
* @shoer Process a put w/ data message
*
* @param[in] module - OSC RDMA module
* @param[in] source - Message source
* @param[in] put_header - Message header + data
*
* @long Process a put message and copy the message data to the specified
* memory region. Note, this function does not handle any bounds
* checking at the moment.
*/
static inline int process_put(ompi_osc_rdma_module_t* module, int source,
ompi_osc_rdma_header_put_t* put_header)
{
char *data = (char*) (put_header + 1);
ompi_proc_t *proc;
struct ompi_datatype_t *datatype;
size_t data_len;
void *target = (unsigned char*) module->baseptr +
((unsigned long) put_header->displacement * module->disp_unit);
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_put: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, &proc, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
data_len = put_header->len - ((uintptr_t) data - (uintptr_t) put_header);
osc_rdma_copy_on_recv (target, data, data_len, proc, put_header->count, datatype);
OBJ_RELEASE(datatype);
return put_header->len;
}
static inline int process_put_long(ompi_osc_rdma_module_t* module, int source,
ompi_osc_rdma_header_put_t* put_header)
{
char *data = (char*) (put_header + 1);
struct ompi_datatype_t *datatype;
void *target = (unsigned char*) module->baseptr +
((unsigned long) put_header->displacement * module->disp_unit);
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_put_long: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
ret = ompi_osc_rdma_component_irecv (module, target,
put_header->count,
datatype, source,
put_header->tag,
module->comm);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
"%d: process_put_long: irecv error: %d",
ompi_comm_rank(module->comm),
ret));
return OMPI_ERROR;
}
OBJ_RELEASE(datatype);
return put_header->len;
}
/**
* osc_rdma_incomming_req_omplete:
*
* @short Completion callback for a send/receive associate with an access
* epoch.
*
* @param[in] request - PML request with an OSC RMDA module as the callback data.
*
* @long This function is called when a send or recieve associated with an
* access epoch completes. When fired this function will increment the
* passive or active incoming count.
*/
static int osc_rdma_incomming_req_omplete (ompi_request_t *request)
{
ompi_osc_rdma_module_t *module = (ompi_osc_rdma_module_t *) request->req_complete_cb_data;
/* we need to peer rank. get it from the pml request */
mca_pml_base_request_t *pml_request = (mca_pml_base_request_t *) request;
int rank = MPI_PROC_NULL;
if (request->req_status.MPI_TAG & 0x01) {
rank = pml_request->req_peer;
}
mark_incoming_completion (module, rank);
/* put this request on the garbage colletion list */
OPAL_THREAD_LOCK(&module->lock);
opal_list_append (&module->request_gc, (opal_list_item_t *) request);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
/**
* @short Post a send to match the remote receive for a get operation.
*
* @param[in] module - OSC RDMA module
* @param[in] source - Source buffer
* @param[in] count - Number of elements in the source buffer
* @param[in] datatype - Type of source elements.
* @param[in] peer - Remote process that has the receive posted
* @param[in] tag - Tag for the send
*
* @long This function posts a send to match the receive posted as part
* of a get operation. When this send is complete the get is considered
* complete at the target (this process).
*/
static int osc_rdma_get_post_send (ompi_osc_rdma_module_t *module, void *source, int count,
ompi_datatype_t *datatype, int peer, int tag)
{
return ompi_osc_rdma_isend_w_cb (source, count, datatype, peer, tag, module->comm,
osc_rdma_incomming_req_omplete, module);
}
/**
* process_get:
*
* @short Process a get message from a remote peer
*
* @param[in] module - OSC RDMA module
* @param[in] target - Peer process
* @param[in] get_header - Incoming message header
*/
static inline int process_get (ompi_osc_rdma_module_t* module, int target,
ompi_osc_rdma_header_get_t* get_header)
{
char *data = (char *) (get_header + 1);
struct ompi_datatype_t *datatype;
void *source = (unsigned char*) module->baseptr +
((unsigned long) get_header->displacement * module->disp_unit);
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_get: received message from %d",
ompi_comm_rank(module->comm),
target));
ret = datatype_create (module, target, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
/* send get data */
ret = osc_rdma_get_post_send (module, source, get_header->count, datatype, target, get_header->tag);
OBJ_RELEASE(datatype);
return OMPI_SUCCESS == ret ? (int) get_header->len : ret;
}
/**
* osc_rdma_accumulate_buffer:
*
* @short Accumulate data into the target buffer.
*
* @param[in] target - Target buffer
* @param[in] source - Source buffer
* @param[in] source_len - Length of source buffer in bytes
* @param[in] proc - Source proc
* @param[in] count - Number of elements in target buffer
* @param[in] datatype - Type of elements in target buffer
* @param[in] op - Operation to be performed
*/
static inline int osc_rdma_accumulate_buffer (void *target, void *source, size_t source_len, ompi_proc_t *proc,
int count, ompi_datatype_t *datatype, ompi_op_t *op)
{
void *buffer = source;
int ret;
assert (NULL != target && NULL != source);
if (op == &ompi_mpi_op_replace.op) {
osc_rdma_copy_on_recv (target, source, source_len, proc, count, datatype);
return OMPI_SUCCESS;
}
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
if (proc->proc_arch != ompi_proc_local()->proc_arch) {
ompi_datatype_t *primitive_datatype = NULL;
uint32_t primitive_count;
size_t buflen;
ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
primitive_count *= count;
/* figure out how big a buffer we need */
ompi_datatype_type_size(primitive_datatype, &buflen);
buflen *= primitive_count;
buffer = malloc (buflen);
if (OPAL_UNLIKELY(NULL == buffer)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
osc_rdma_copy_on_recv (buffer, source, source_len, proc, count, datatype);
}
#endif
/* copy the data from the temporary buffer into the user window */
ret = ompi_osc_base_process_op(target, buffer, source_len, datatype,
count, op);
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
if (proc->proc_arch != ompi_proc_local()->proc_arch) {
free(buffer);
}
#endif
return ret;
}
/**
* @short Create an accumulate data object.
*
* @param[in] module - RDMA OSC module
* @param[in] target - Target for the accumulation
* @param[in] source - Source of accumulate data. Must be allocated with malloc/calloc/etc
* @param[in] source_len - Length of the source buffer in bytes
* @param[in] proc - Source proc
* @param[in] count - Number of elements to accumulate
* @param[in] datatype - Datatype to accumulate
* @oaram[in] op - Operator
* @param[in] request_count - Number of prerequisite requests
* @param[out] acc_data_out - New accumulation data
*
* @long This function is used to create a copy of the data needed to perform an accumulation.
* This data should be provided to ompi_osc_rdma_isend_w_cb or ompi_osc_rdma_irecv_w_cb
* as the ctx parameter with accumulate_cb as the cb parameter.
*/
static int osc_rdma_accumulate_allocate (ompi_osc_rdma_module_t *module, void *target, void *source, size_t source_len,
ompi_proc_t *proc, int count, ompi_datatype_t *datatype, ompi_op_t *op,
int request_count, struct osc_rdma_accumulate_data_t **acc_data_out)
{
struct osc_rdma_accumulate_data_t *acc_data;
acc_data = malloc (sizeof (*acc_data));
if (OPAL_UNLIKELY(NULL == acc_data)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
acc_data->module = module;
acc_data->target = target;
acc_data->source = source;
acc_data->source_len = source_len;
acc_data->proc = proc;
acc_data->count = count;
acc_data->datatype = datatype;
OBJ_RETAIN(datatype);
acc_data->op = op;
OBJ_RETAIN(op);
acc_data->request_count = request_count;
*acc_data_out = acc_data;
return OMPI_SUCCESS;
}
static void osc_rdma_accumulate_free (struct osc_rdma_accumulate_data_t *acc_data)
{
/* the source is always a temporary buffer */
free (acc_data->source);
OBJ_RELEASE(acc_data->datatype);
OBJ_RELEASE(acc_data->op);
free (acc_data);
}
/**
* @short Execute the accumulate once the request counter reaches 0.
*
* @param[in] request - request
*
* The request should be created with ompi_osc_rdma_isend_w_cb or ompi_osc_rdma_irecv_w_cb
* with ctx allocated by osc_rdma_accumulate_allocate. This callback will free the accumulate
* data once the accumulation operation is complete.
*/
static int accumulate_cb (ompi_request_t *request)
{
struct osc_rdma_accumulate_data_t *acc_data = (struct osc_rdma_accumulate_data_t *) request->req_complete_cb_data;
int ret = OMPI_SUCCESS;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"accumulate_cb, request_count = %d", acc_data->request_count));
request->req_complete_cb_data = acc_data->module;
osc_rdma_incomming_req_omplete (request);
--acc_data->request_count;
if (0 != acc_data->request_count) {
/* more requests needed before the buffer can be accumulated */
return OMPI_SUCCESS;
}
if (acc_data->source) {
ret = osc_rdma_accumulate_buffer (acc_data->target, acc_data->source, acc_data->source_len,
acc_data->proc, acc_data->count, acc_data->datatype, acc_data->op);
}
/* drop the accumulate lock */
ompi_osc_rdma_accumulate_unlock (acc_data->module);
osc_rdma_accumulate_free (acc_data);
return ret;
}
static int ompi_osc_rdma_acc_op_queue (ompi_osc_rdma_module_t *module, ompi_osc_rdma_header_t *header, int source,
char *data, size_t data_len, ompi_datatype_t *datatype)
{
osc_rdma_pending_acc_t *pending_acc;
pending_acc = OBJ_NEW(osc_rdma_pending_acc_t);
if (OPAL_UNLIKELY(NULL == pending_acc)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
pending_acc->source = source;
/* save any inline data (eager acc, gacc only) */
pending_acc->data_len = data_len;
if (data_len) {
pending_acc->data = malloc (data_len);
memcpy (pending_acc->data, data, data_len);
}
/* save the datatype */
pending_acc->datatype = datatype;
OBJ_RETAIN(datatype);
/* save the header */
switch (header->base.type) {
case OMPI_OSC_RDMA_HDR_TYPE_ACC:
case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG:
pending_acc->header.acc = header->acc;
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC:
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
pending_acc->header.get_acc = header->get_acc;
break;
case OMPI_OSC_RDMA_HDR_TYPE_CSWAP:
pending_acc->header.cswap = header->cswap;
break;
default:
/* it is a coding error if any other header types are queued this way */
assert (0);
}
/* add to the pending acc queue */
OPAL_THREAD_LOCK(&module->lock);
opal_list_append (&module->pending_acc, &pending_acc->super);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
static int replace_cb (ompi_request_t *request)
{
ompi_osc_rdma_module_t *module = (ompi_osc_rdma_module_t *) request->req_complete_cb_data;
/* unlock the accumulate lock */
ompi_osc_rdma_accumulate_unlock (module);
return OMPI_SUCCESS;
}
/**
* ompi_osc_rdma_acc_start:
*
* @short Start an accumulate with data operation.
*
* @param[in] module - OSC RDMA module
* @param[in] source - Source rank
* @param[in] data - Accumulate data
* @param[in] data_len - Length of the accumulate data
* @param[in] datatype - Accumulation datatype
* @param[in] acc_header - Accumulate header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_rdma_acc_start (ompi_osc_rdma_module_t *module, int source, void *data, size_t data_len,
ompi_datatype_t *datatype, ompi_osc_rdma_header_acc_t *acc_header)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) acc_header->displacement * module->disp_unit);
struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
ompi_proc_t *proc;
int ret;
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
ret = osc_rdma_accumulate_buffer (target, data, data_len, proc, acc_header->count,
datatype, op);
OBJ_RELEASE(op);
ompi_osc_rdma_accumulate_unlock (module);
return ret;
}
/**
* ompi_osc_rdma_acc_start:
*
* @short Start a long accumulate operation.
*
* @param[in] module - OSC RDMA module
* @param[in] source - Source rank
* @param[in] datatype - Accumulation datatype
* @param[in] acc_header - Accumulate header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_rdma_acc_long_start (ompi_osc_rdma_module_t *module, int source, ompi_datatype_t *datatype,
ompi_osc_rdma_header_acc_t *acc_header) {
struct osc_rdma_accumulate_data_t *acc_data;
size_t buflen;
void *buffer;
ompi_proc_t *proc;
void *target = (unsigned char*) module->baseptr +
((unsigned long) acc_header->displacement * module->disp_unit);
struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_rdma_acc_long_start starting..."));
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
do {
if (op == &ompi_mpi_op_replace.op) {
ret = ompi_osc_rdma_irecv_w_cb (target, acc_header->count, datatype, source,
acc_header->tag, module->comm, NULL,
replace_cb, module);
break;
}
buflen = datatype_buffer_length (datatype, acc_header->count);
/* allocate a temporary buffer to receive the accumulate data */
buffer = malloc (buflen);
if (OPAL_UNLIKELY(NULL == buffer)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
break;
}
ret = osc_rdma_accumulate_allocate (module, target, buffer, buflen, proc, acc_header->count,
datatype, op, 1, &acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
free (buffer);
break;
}
ret = ompi_osc_rdma_irecv_w_cb (buffer, acc_header->count, datatype, source, acc_header->tag,
module->comm, NULL, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
osc_rdma_accumulate_free (acc_data);
}
} while (0);
OBJ_RELEASE(op);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
ompi_osc_rdma_accumulate_unlock (module);
}
return ret;
}
/**
* ompi_osc_rdma_gacc_start:
*
* @short Start a accumulate with data + get operation.
*
* @param[in] module - OSC RDMA module
* @param[in] source - Source rank
* @param[in] data - Accumulate data. Must be allocated on the heap.
* @param[in] data_len - Length of the accumulate data
* @param[in] datatype - Accumulation datatype
* @param[in] get_acc_header - Accumulate header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_rdma_gacc_start (ompi_osc_rdma_module_t *module, int source, void *data, size_t data_len,
ompi_datatype_t *datatype, ompi_osc_rdma_header_get_acc_t *get_acc_header)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) get_acc_header->displacement * module->disp_unit);
struct ompi_op_t *op = ompi_osc_base_op_create(get_acc_header->op);
struct osc_rdma_accumulate_data_t *acc_data;
ompi_proc_t *proc;
int ret;
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
do {
ret = osc_rdma_accumulate_allocate (module, target, data, data_len, proc, get_acc_header->count,
datatype, op, 1, &acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
ret = ompi_osc_rdma_isend_w_cb (target, get_acc_header->count, datatype, source, get_acc_header->tag,
module->comm, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
osc_rdma_accumulate_free (acc_data);
}
} while (0);
OBJ_RELEASE(op);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
ompi_osc_rdma_accumulate_unlock (module);
}
return ret;
}
/**
* ompi_osc_rdma_gacc_long_start:
*
* @short Start a long accumulate + get operation.
*
* @param[in] module - OSC RDMA module
* @param[in] source - Source rank
* @param[in] datatype - Accumulation datatype
* @param[in] get_acc_header - Accumulate header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_gacc_long_start (ompi_osc_rdma_module_t *module, int source, ompi_datatype_t *datatype,
ompi_osc_rdma_header_get_acc_t *get_acc_header)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) get_acc_header->displacement * module->disp_unit);
struct ompi_op_t *op = ompi_osc_base_op_create(get_acc_header->op);
struct osc_rdma_accumulate_data_t *acc_data;
ompi_request_t *recv_request;
ompi_proc_t *proc;
size_t buflen;
void *buffer;
int ret;
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
/* allocate a temporary buffer to receive the accumulate data */
buflen = datatype_buffer_length (datatype, get_acc_header->count);
do {
buffer = malloc (buflen);
if (OPAL_UNLIKELY(NULL == buffer)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
break;
}
ret = osc_rdma_accumulate_allocate (module, target, buffer, buflen, proc, get_acc_header->count,
datatype, op, 2, &acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
ret = ompi_osc_rdma_irecv_w_cb (buffer, get_acc_header->count, datatype, source, get_acc_header->tag,
module->comm, &recv_request, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
osc_rdma_accumulate_free (acc_data);
break;
}
ret = ompi_osc_rdma_isend_w_cb (target, get_acc_header->count, datatype, source, get_acc_header->tag,
module->comm, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS == ret)) {
/* cancel the receive and free the accumulate data */
ompi_request_cancel (recv_request);
osc_rdma_accumulate_free (acc_data);
break;
}
} while (0);
OBJ_RELEASE(op);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
ompi_osc_rdma_accumulate_unlock (module);
}
return ret;
}
/**
* ompi_osc_rdma_cswap_start:
*
* @short Start a compare and swap operation
*
* @param[in] module - OSC RDMA module
* @param[in] source - Source rank
* @param[in] data - Compare and swap data
* @param[in] data_len - Length of the compare and swap data. Must be exactly
* twice the size of the datatype.
* @param[in] datatype - Compare and swap datatype
* @param[in] cswap_header - Compare and swap header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_rdma_cswap_start (ompi_osc_rdma_module_t *module, int source, void *data, ompi_datatype_t *datatype,
ompi_osc_rdma_header_cswap_t *cswap_header)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) cswap_header->displacement * module->disp_unit);
void *compare_addr, *origin_addr;
size_t datatype_size;
ompi_proc_t *proc;
int ret;
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
datatype_size = datatype->super.size;
origin_addr = data;
compare_addr = (void *)((uintptr_t) data + datatype_size);
do {
/* no reason to do a non-blocking send here */
ret = MCA_PML_CALL(send(target, 1, datatype, source, cswap_header->tag, MCA_PML_BASE_SEND_STANDARD,
module->comm));
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
/* increment the incomming fragment count so it matches what is expected */
mark_incoming_completion (module, (cswap_header->tag & 0x1) ? source : MPI_PROC_NULL);
if (0 == memcmp (target, compare_addr, datatype_size)) {
osc_rdma_copy_on_recv (target, origin_addr, datatype_size, proc, 1, datatype);
}
} while (0);
ompi_osc_rdma_accumulate_unlock (module);
return ret;
}
/**
* ompi_osc_rdma_progress_pending_acc:
*
* @short Progress one pending accumulation or compare and swap operation.
*
* @param[in] module - OSC RDMA module
*
* @long If the accumulation lock can be aquired progress one pending
* accumulate or compare and swap operation.
*/
int ompi_osc_rdma_progress_pending_acc (ompi_osc_rdma_module_t *module)
{
osc_rdma_pending_acc_t *pending_acc;
int ret;
/* try to aquire the lock. it will be unlocked when the accumulate or cswap
* operation completes */
if (ompi_osc_rdma_accumulate_trylock (module)) {
return OMPI_SUCCESS;
}
pending_acc = (osc_rdma_pending_acc_t *) opal_list_remove_first (&module->pending_acc);
if (OPAL_UNLIKELY(NULL == pending_acc)) {
/* called without any pending accumulation operations */
ompi_osc_rdma_accumulate_unlock (module);
return OMPI_SUCCESS;
}
switch (pending_acc->header.base.type) {
case OMPI_OSC_RDMA_HDR_TYPE_ACC:
ret = ompi_osc_rdma_acc_start (module, pending_acc->source, pending_acc->data, pending_acc->data_len,
pending_acc->datatype, &pending_acc->header.acc);
free (pending_acc->data);
break;
case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG:
ret = ompi_osc_rdma_acc_long_start (module, pending_acc->source, pending_acc->datatype,
&pending_acc->header.acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC:
ret = ompi_osc_rdma_gacc_start (module, pending_acc->source, pending_acc->data,
pending_acc->data_len, pending_acc->datatype,
&pending_acc->header.get_acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
ret = ompi_osc_gacc_long_start (module, pending_acc->source, pending_acc->datatype,
&pending_acc->header.get_acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_CSWAP:
ret = ompi_osc_rdma_cswap_start (module, pending_acc->source, pending_acc->data,
pending_acc->datatype, &pending_acc->header.cswap);
break;
default:
ret = OMPI_ERROR;
/* it is a coding error if this point is reached */
assert (0);
}
pending_acc->data = NULL;
OBJ_RELEASE(pending_acc);
return ret;
}
static inline int process_acc (ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_acc_t *acc_header)
{
char *data = (char *) (acc_header + 1);
struct ompi_datatype_t *datatype;
uint64_t data_len;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_acc: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
data_len = acc_header->len - ((char*) data - (char*) acc_header);
/* try to aquire the accumulate lock */
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
ret = ompi_osc_rdma_acc_start (module, source, data, data_len, datatype,
acc_header);
} else {
/* couldn't aquire the accumulate lock so queue up the accumulate operation */
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) acc_header,
source, data, data_len, datatype);
}
/* Release datatype & op */
OBJ_RELEASE(datatype);
return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
}
static inline int process_acc_long (ompi_osc_rdma_module_t* module, int source,
ompi_osc_rdma_header_acc_t* acc_header)
{
char *data = (char *) (acc_header + 1);
struct ompi_datatype_t *datatype;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_acc_long: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
ret = ompi_osc_rdma_acc_long_start (module, source, datatype, acc_header);
} else {
/* queue the operation */
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) acc_header, source,
NULL, 0, datatype);
}
/* Release datatype & op */
OBJ_RELEASE(datatype);
return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
}
static inline int process_get_acc(ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_get_acc_t *get_acc_header)
{
char *data = (char *) (get_acc_header + 1);
struct ompi_datatype_t *datatype;
void *buffer = NULL;
uint64_t data_len;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_get_acc: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
data_len = get_acc_header->len - ((char*) data - (char*) get_acc_header);
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
/* make a copy of the data since the buffer needs to be returned */
if (data_len) {
buffer = malloc (data_len);
if (OPAL_UNLIKELY(NULL == buffer)) {
OBJ_RELEASE(datatype);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy (buffer, data, data_len);
}
ret = ompi_osc_rdma_gacc_start (module, source, buffer, data_len, datatype,
get_acc_header);
} else {
/* queue the operation */
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) get_acc_header,
source, data, data_len, datatype);
}
/* Release datatype & op */
OBJ_RELEASE(datatype);
return (OMPI_SUCCESS == ret) ? (int) get_acc_header->len : ret;
}
static inline int process_get_acc_long(ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_get_acc_t *get_acc_header)
{
char *data = (char *) (get_acc_header + 1);
struct ompi_datatype_t *datatype;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_acc: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
ret = ompi_osc_gacc_long_start (module, source, datatype, get_acc_header);
} else {
/* queue the operation */
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) get_acc_header,
source, NULL, 0, datatype);
}
/* Release datatype & op */
OBJ_RELEASE(datatype);
return OMPI_SUCCESS == ret ? (int) get_acc_header->len : ret;
}
static inline int process_cswap (ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_cswap_t *cswap_header)
{
char *data = (char*) (cswap_header + 1);
struct ompi_datatype_t *datatype;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_cswap: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
ret = ompi_osc_rdma_cswap_start (module, source, data, datatype, cswap_header);
} else {
/* queue the operation */
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) cswap_header, source,
data, 2 * datatype->super.size, datatype);
}
/* Release datatype */
OBJ_RELEASE(datatype);
return (OMPI_SUCCESS == ret) ? (int) cswap_header->len : ret;
}
static inline int process_complete (ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_complete_t *complete_header)
{
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: process_complete got complete message from %d", source));
module->num_complete_msgs++;
return sizeof (*complete_header);
}
/* flush and unlock headers cannot be processed from the request callback
* because some btls do not provide re-entrant progress functions. these
* fragment will be progressed by the rdma component's progress function */
static inline int process_flush (ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_flush_t *flush_header)
{
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"process_flush header = {.frag_count = %d}", flush_header->frag_count));
/* increase signal count by incoming frags */
module->passive_incoming_frag_signal_count[source] += flush_header->frag_count;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_flush: received message from %d. passive_incoming_frag_signal_count = %d, passive_incoming_frag_count = %d",
ompi_comm_rank(module->comm), source, module->passive_incoming_frag_signal_count[source], module->passive_incoming_frag_count[source]));
ret = ompi_osc_rdma_process_flush (module, source, flush_header);
if (OMPI_SUCCESS != ret) {
ompi_osc_rdma_pending_t *pending;
pending = OBJ_NEW(ompi_osc_rdma_pending_t);
pending->module = module;
pending->source = source;
pending->header.flush = *flush_header;
OPAL_THREAD_LOCK(&mca_osc_rdma_component.lock);
opal_list_append (&mca_osc_rdma_component.pending_operations, &pending->super);
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.lock);
/* we now have to count the current fragment */
module->passive_incoming_frag_signal_count[source]++;
} else {
/* need to account for the current fragment */
module->passive_incoming_frag_count[source] = -1;
}
return sizeof (*flush_header);
}
static inline int process_unlock (ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_unlock_t *unlock_header)
{
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"process_unlock header = {.frag_count = %d}", unlock_header->frag_count));
/* increase signal count by incoming frags */
module->passive_incoming_frag_signal_count[source] += unlock_header->frag_count;
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"osc rdma: processing unlock request from %d. frag count = %d, signal_count = %d, processed_count = %d",
source, unlock_header->frag_count, (int) module->passive_incoming_frag_signal_count[source],
(int) module->passive_incoming_frag_count[source]));
ret = ompi_osc_rdma_process_unlock (module, source, unlock_header);
if (OMPI_SUCCESS != ret) {
ompi_osc_rdma_pending_t *pending;
pending = OBJ_NEW(ompi_osc_rdma_pending_t);
pending->module = module;
pending->source = source;
pending->header.unlock = *unlock_header;
OPAL_THREAD_LOCK(&mca_osc_rdma_component.lock);
opal_list_append (&mca_osc_rdma_component.pending_operations, &pending->super);
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.lock);
/* we now have to count the current fragment */
module->passive_incoming_frag_signal_count[source]++;
} else {
/* need to account for the current fragment */
module->passive_incoming_frag_count[source] = -1;
}
return sizeof (*unlock_header);
}
/*
* Do all the data movement associated with a fragment
*/
static inline int process_frag (ompi_osc_rdma_module_t *module,
ompi_osc_rdma_frag_header_t *frag)
{
ompi_osc_rdma_header_t *header;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: process_frag: from %d, ops %d",
(int) frag->source, (int) frag->num_ops));
header = (ompi_osc_rdma_header_t *) (frag + 1);
for (int i = 0 ; i < frag->num_ops ; ++i) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: process_frag: type 0x%x. offset = %u", header->base.type,
(unsigned) ((uintptr_t)header - (uintptr_t)frag)));
switch (header->base.type) {
case OMPI_OSC_RDMA_HDR_TYPE_PUT:
ret = process_put(module, frag->source, &header->put);
break;
case OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG:
ret = process_put_long(module, frag->source, &header->put);
break;
case OMPI_OSC_RDMA_HDR_TYPE_ACC:
ret = process_acc(module, frag->source, &header->acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG:
ret = process_acc_long (module, frag->source, &header->acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_LOCK_REQ:
ret = ompi_osc_rdma_process_lock(module, frag->source, &header->lock);
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
ret = sizeof (header->lock);
}
break;
case OMPI_OSC_RDMA_HDR_TYPE_UNLOCK_REQ:
ret = process_unlock(module, frag->source, &header->unlock);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET:
ret = process_get (module, frag->source, &header->get);
break;
case OMPI_OSC_RDMA_HDR_TYPE_CSWAP:
ret = process_cswap (module, frag->source, &header->cswap);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC:
ret = process_get_acc (module, frag->source, &header->get_acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
ret = process_get_acc_long (module, frag->source, &header->get_acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_FLUSH_REQ:
ret = process_flush (module, frag->source, &header->flush);
break;
case OMPI_OSC_RDMA_HDR_TYPE_COMPLETE:
ret = process_complete (module, frag->source, &header->complete);
break;
default:
opal_output(0, "Unsupported fragment type 0x%x\n", header->base.type);
abort(); /* FIX ME */
}
if (ret <= 0) {
opal_output(0, "Error processing fragment: %d", ret);
abort(); /* FIX ME */
}
/* the next header will start on an 8-byte boundary. this is done to ensure
* that the next header and the packed datatype is properly aligned */
header = (ompi_osc_rdma_header_t *) OPAL_ALIGN(((uintptr_t) header + ret), 8, uintptr_t);
}
return OMPI_SUCCESS;
}
/* dispatch for callback on message completion */
static int ompi_osc_rdma_callback (ompi_request_t *request)
{
ompi_osc_rdma_module_t *module = (ompi_osc_rdma_module_t *) request->req_complete_cb_data;
ompi_osc_rdma_header_base_t *base_header =
(ompi_osc_rdma_header_base_t *) module->incomming_buffer;
size_t incomming_length = request->req_status._ucount;
int source = request->req_status.MPI_SOURCE;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
assert(incomming_length >= sizeof(ompi_osc_rdma_header_base_t));
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"received rdma callback for fragment. source = %d, count = %u, type = 0x%x",
source, (unsigned) incomming_length, base_header->type));
switch (base_header->type) {
case OMPI_OSC_RDMA_HDR_TYPE_FRAG:
process_frag(module, (ompi_osc_rdma_frag_header_t *) base_header);
break;
case OMPI_OSC_RDMA_HDR_TYPE_POST:
(void) osc_rdma_incomming_post (module);
break;
case OMPI_OSC_RDMA_HDR_TYPE_LOCK_ACK:
ompi_osc_rdma_process_lock_ack(module, (ompi_osc_rdma_header_lock_ack_t *) base_header);
break;
case OMPI_OSC_RDMA_HDR_TYPE_FLUSH_ACK:
ompi_osc_rdma_process_flush_ack (module, source, (ompi_osc_rdma_header_flush_ack_t *) base_header);
break;
case OMPI_OSC_RDMA_HDR_TYPE_UNLOCK_ACK:
ompi_osc_rdma_process_unlock_ack (module, source, (ompi_osc_rdma_header_unlock_ack_t *) base_header);
break;
default:
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"received unexpected message of type %x",
(int) base_header->type));
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"finished processing incomming messages"));
/* restart the receive request */
OPAL_THREAD_LOCK(&module->lock);
mark_incoming_completion (module, (base_header->flags & OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET) ?
source : MPI_PROC_NULL);
osc_rdma_request_gc_clean (module);
opal_list_append (&module->request_gc, (opal_list_item_t *) request);
ompi_osc_rdma_frag_start_receive (module);
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_THREAD_LOCK(&ompi_request_lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"finished posting receive request"));
return OMPI_SUCCESS;
}
int ompi_osc_rdma_frag_start_receive (ompi_osc_rdma_module_t *module)
{
return ompi_osc_rdma_irecv_w_cb (module->incomming_buffer, mca_osc_rdma_component.buffer_size + sizeof (ompi_osc_rdma_frag_header_t),
MPI_BYTE, OMPI_ANY_SOURCE, OSC_RDMA_FRAG_TAG, module->comm, &module->frag_request,
ompi_osc_rdma_callback, module);
}
int ompi_osc_rdma_component_irecv (ompi_osc_rdma_module_t *module, void *buf,
size_t count, struct ompi_datatype_t *datatype,
int src, int tag, struct ompi_communicator_t *comm)
{
return ompi_osc_rdma_irecv_w_cb (buf, count, datatype, src, tag, comm, NULL,
osc_rdma_incomming_req_omplete, module);
}
static int
isend_completion_cb(ompi_request_t *request)
{
ompi_osc_rdma_module_t *module =
(ompi_osc_rdma_module_t*) request->req_complete_cb_data;
OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
"isend_completion_cb called"));
mark_outgoing_completion(module);
/* put this request on the garbage colletion list */
OPAL_THREAD_LOCK(&module->lock);
opal_list_append (&module->request_gc, (opal_list_item_t *) request);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_component_isend(ompi_osc_rdma_module_t *module,
void *buf,
size_t count,
struct ompi_datatype_t *datatype,
int dest,
int tag,
struct ompi_communicator_t *comm)
{
return ompi_osc_rdma_isend_w_cb (buf, count, datatype, dest, tag, comm,
isend_completion_cb, module);
}
int ompi_osc_rdma_isend_w_cb (void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
ompi_communicator_t *comm, ompi_request_complete_fn_t cb, void *ctx)
{
ompi_request_t *request;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: ompi_osc_rdma_isend_w_cb sending %d bytes to %d with tag %d",
count, target, tag));
ret = MCA_PML_CALL(isend_init(ptr, count, datatype, target, tag,
MCA_PML_BASE_SEND_STANDARD, comm, &request));
if (OMPI_SUCCESS != ret) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"error sending fragment. ret = %d", ret));
return ret;
}
request->req_complete_cb = cb;
request->req_complete_cb_data = ctx;
ret = MCA_PML_CALL(start(1, &request));
return ret;
}
int ompi_osc_rdma_irecv_w_cb (void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
ompi_communicator_t *comm, ompi_request_t **request_out,
ompi_request_complete_fn_t cb, void *ctx)
{
ompi_request_t *request;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: ompi_osc_rdma_irecv_w_cb receiving %d bytes from %d with tag %d",
count, target, tag));
ret = MCA_PML_CALL(irecv_init(ptr, count, datatype, target, tag, comm, &request));
if (OMPI_SUCCESS != ret) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"error posting receive. ret = %d", ret));
return ret;
}
request->req_complete_cb = cb;
request->req_complete_cb_data = ctx;
if (request_out) {
*request_out = request;
}
ret = MCA_PML_CALL(start(1, &request));
return ret;
}