1
1
openmpi/ompi/mca/osc/rdma/osc_rdma_data_move.c
Nathan Hjelm 29e00f9161 osc/rdma: fix issues with mpi_leave_pinned when using rdma capable btls
It seems we can't release accumulate buffers in completion callbacks
because the btls don't release registration resources until after the
callback has fired. The fix is to keep track of the unused buffers and
free them later. This should resolve issues when running IMB-EXT and
IMB-RMA.

cmr=v1.7.5:reviewer=jsquyres

This commit was SVN r31029.
2014-03-12 14:39:03 +00:00

1562 строки
55 KiB
C

/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2006 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2009-2011 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_rdma.h"
#include "osc_rdma_header.h"
#include "osc_rdma_data_move.h"
#include "osc_rdma_obj_convert.h"
#include "osc_rdma_frag.h"
#include "osc_rdma_request.h"
#include "opal/threads/condition.h"
#include "opal/threads/mutex.h"
#include "opal/util/arch.h"
#include "opal/util/output.h"
#include "opal/sys/atomic.h"
#include "opal/align.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/pml_base_sendreq.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/mca/osc/base/base.h"
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/op/op.h"
#include "ompi/memchecker.h"
/**
* struct osc_rdma_accumulate_data_t:
*
* @short Data associated with an in-progress accumulation operation.
*/
struct osc_rdma_accumulate_data_t {
opal_list_item_t super;
ompi_osc_rdma_module_t* module;
void *target;
void *source;
size_t source_len;
ompi_proc_t *proc;
int count;
int peer;
ompi_datatype_t *datatype;
ompi_op_t *op;
int request_count;
};
typedef struct osc_rdma_accumulate_data_t osc_rdma_accumulate_data_t;
static void osc_rdma_accumulate_data_constructor (osc_rdma_accumulate_data_t *acc_data)
{
acc_data->source = NULL;
acc_data->datatype = NULL;
acc_data->op = NULL;
}
static void osc_rdma_accumulate_data_destructor (osc_rdma_accumulate_data_t *acc_data)
{
if (acc_data->source) {
/* the source buffer is always alloc'd */
free (acc_data->source);
}
if (acc_data->datatype) {
OBJ_RELEASE(acc_data->datatype);
}
if (acc_data->op) {
OBJ_RELEASE(acc_data->op);
}
}
OBJ_CLASS_DECLARATION(osc_rdma_accumulate_data_t);
OBJ_CLASS_INSTANCE(osc_rdma_accumulate_data_t, opal_list_item_t, osc_rdma_accumulate_data_constructor,
osc_rdma_accumulate_data_destructor);
/**
* osc_rdma_pending_acc_t:
*
* @short Keep track of accumulate and cswap operations that are
* waiting on the accumulate lock.
*
* @long Since accumulate operations may take several steps to
* complete we need to lock the accumulate lock until the operation
* is complete. While the lock is held it is possible that additional
* accumulate operations will arrive. This structure keep track of
* those operations.
*/
struct osc_rdma_pending_acc_t {
opal_list_item_t super;
ompi_osc_rdma_header_t header;
int source;
void *data;
size_t data_len;
ompi_datatype_t *datatype;
};
typedef struct osc_rdma_pending_acc_t osc_rdma_pending_acc_t;
static void osc_rdma_pending_acc_constructor (osc_rdma_pending_acc_t *pending)
{
pending->data = NULL;
pending->datatype = NULL;
}
static void osc_rdma_pending_acc_destructor (osc_rdma_pending_acc_t *pending)
{
if (NULL != pending->data) {
free (pending->data);
}
if (NULL != pending->datatype) {
OBJ_RELEASE(pending->datatype);
}
}
OBJ_CLASS_DECLARATION(osc_rdma_pending_acc_t);
OBJ_CLASS_INSTANCE(osc_rdma_pending_acc_t, opal_list_item_t,
osc_rdma_pending_acc_constructor, osc_rdma_pending_acc_destructor);
/* end ompi_osc_rdma_pending_acc_t class */
/**
* datatype_buffer_length:
*
* @short Determine the buffer size needed to hold count elements of datatype.
*
* @param[in] datatype - Element type
* @param[in] count - Element count
*
* @returns buflen Buffer length needed to hold count elements of datatype
*/
static inline int datatype_buffer_length (ompi_datatype_t *datatype, int count)
{
ompi_datatype_t *primitive_datatype = NULL;
uint32_t primitive_count;
size_t buflen;
ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
primitive_count *= count;
/* figure out how big a buffer we need */
ompi_datatype_type_size(primitive_datatype, &buflen);
return buflen * primitive_count;
}
/**
* ompi_osc_rdma_control_send:
*
* @short send a control message as part of a fragment
*
* @param[in] module - OSC RDMA module
* @param[in] target - Target peer's rank
* @param[in] data - Data to send
* @param[in] len - Length of data
*
* @returns error OMPI error code or OMPI_SUCCESS
*
* @long "send" a control messages. Adds it to the active fragment, so the
* caller will still need to explicitly flush (either to everyone or
* to a target) before this is sent.
*/
int ompi_osc_rdma_control_send (ompi_osc_rdma_module_t *module, int target,
void *data, size_t len)
{
ompi_osc_rdma_frag_t *frag;
char *ptr;
int ret;
OPAL_THREAD_LOCK(&module->lock);
ret = ompi_osc_rdma_frag_alloc(module, target, len, &frag, &ptr);
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
memcpy (ptr, data, len);
ret = ompi_osc_rdma_frag_finish(module, frag);
}
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}
static int ompi_osc_rdma_control_send_unbuffered_cb (ompi_request_t *request)
{
void *ctx = request->req_complete_cb_data;
ompi_osc_rdma_module_t *module;
/* get module pointer and data */
module = *(ompi_osc_rdma_module_t **)ctx;
/* mark this send as complete */
mark_outgoing_completion (module);
/* free the temporary buffer */
free (ctx);
/* put this request on the garbage collection list */
OPAL_THREAD_LOCK(&module->lock);
opal_list_append (&module->request_gc, (opal_list_item_t *) request);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
/**
* ompi_osc_rdma_control_send_unbuffered:
*
* @short Send an unbuffered control message to a peer.
*
* @param[in] module - OSC RDMA module
* @param[in] target - Target rank
* @param[in] data - Data to send
* @param[in] len - Length of data
*
* @long Directly send a control message. This does not allocate a
* fragment, so should only be used when sending other messages would
* be erroneous (such as complete messages, when there may be queued
* transactions from an overlapping post that has already heard back
* from its peer). The buffer specified by data will be available
* when this call returns.
*/
int ompi_osc_rdma_control_send_unbuffered(ompi_osc_rdma_module_t *module,
int target, void *data, size_t len)
{
void *ctx, *data_copy;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: sending unbuffered fragment to %d", target));
/* allocate a temporary buffer for this send */
ctx = malloc (sizeof(ompi_osc_rdma_module_t*) + len);
if (OPAL_UNLIKELY(NULL == ctx)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* increment outgoing signal count. this send is not part of a passive epoch
* so there it would be erroneous to increment the epoch counters. */
ompi_osc_signal_outgoing (module, MPI_PROC_NULL, 1);
/* store module pointer and data in the buffer */
*(ompi_osc_rdma_module_t**)ctx = module;
data_copy = (ompi_osc_rdma_module_t**)ctx + 1;
memcpy (data_copy, data, len);
return ompi_osc_rdma_isend_w_cb (data_copy, len, MPI_BYTE, target, OSC_RDMA_FRAG_TAG,
module->comm, ompi_osc_rdma_control_send_unbuffered_cb, ctx);
}
/**
* datatype_create:
*
* @short Utility function that creates a new datatype from a packed
* description.
*
* @param[in] module - OSC RDMA module
* @param[in] peer - Peer rank
* @param[out] datatype - New datatype. Must be released with OBJ_RELEASE.
* @param[out] proc - Optional. Proc for peer.
* @param[inout] data - Pointer to a pointer where the description is stored. This
* pointer will be updated to the location after the packed
* description.
*/
static inline int datatype_create (ompi_osc_rdma_module_t *module, int peer, ompi_proc_t **proc, ompi_datatype_t **datatype, void **data)
{
ompi_datatype_t *new_datatype = NULL;
ompi_proc_t *peer_proc;
int ret = OMPI_SUCCESS;
do {
peer_proc = ompi_comm_peer_lookup(module->comm, peer);
if (OPAL_UNLIKELY(NULL == peer_proc)) {
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
"%d: datatype_create: could not resolve proc pointer for peer %d",
ompi_comm_rank(module->comm),
peer));
ret = OMPI_ERROR;
break;
}
new_datatype = ompi_osc_base_datatype_create(peer_proc, data);
if (OPAL_UNLIKELY(NULL == new_datatype)) {
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
"%d: datatype_create: could not resolve datatype for peer %d",
ompi_comm_rank(module->comm), peer));
ret = OMPI_ERROR;
}
} while (0);
*datatype = new_datatype;
if (proc) *proc = peer_proc;
return ret;
}
/**
* process_put:
*
* @shoer Process a put w/ data message
*
* @param[in] module - OSC RDMA module
* @param[in] source - Message source
* @param[in] put_header - Message header + data
*
* @long Process a put message and copy the message data to the specified
* memory region. Note, this function does not handle any bounds
* checking at the moment.
*/
static inline int process_put(ompi_osc_rdma_module_t* module, int source,
ompi_osc_rdma_header_put_t* put_header)
{
char *data = (char*) (put_header + 1);
ompi_proc_t *proc;
struct ompi_datatype_t *datatype;
size_t data_len;
void *target = (unsigned char*) module->baseptr +
((unsigned long) put_header->displacement * module->disp_unit);
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_put: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, &proc, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
data_len = put_header->len - ((uintptr_t) data - (uintptr_t) put_header);
osc_rdma_copy_on_recv (target, data, data_len, proc, put_header->count, datatype);
OBJ_RELEASE(datatype);
return put_header->len;
}
static inline int process_put_long(ompi_osc_rdma_module_t* module, int source,
ompi_osc_rdma_header_put_t* put_header)
{
char *data = (char*) (put_header + 1);
struct ompi_datatype_t *datatype;
void *target = (unsigned char*) module->baseptr +
((unsigned long) put_header->displacement * module->disp_unit);
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_put_long: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
ret = ompi_osc_rdma_component_irecv (module, target,
put_header->count,
datatype, source,
put_header->tag,
module->comm);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
"%d: process_put_long: irecv error: %d",
ompi_comm_rank(module->comm),
ret));
return OMPI_ERROR;
}
OBJ_RELEASE(datatype);
return put_header->len;
}
/**
* osc_rdma_incomming_req_omplete:
*
* @short Completion callback for a send/receive associate with an access
* epoch.
*
* @param[in] request - PML request with an OSC RMDA module as the callback data.
*
* @long This function is called when a send or recieve associated with an
* access epoch completes. When fired this function will increment the
* passive or active incoming count.
*/
static int osc_rdma_incomming_req_omplete (ompi_request_t *request)
{
ompi_osc_rdma_module_t *module = (ompi_osc_rdma_module_t *) request->req_complete_cb_data;
/* we need to peer rank. get it from the pml request */
int rank = MPI_PROC_NULL;
if (request->req_status.MPI_TAG & 0x01) {
rank = request->req_status.MPI_SOURCE;
}
mark_incoming_completion (module, rank);
/* put this request on the garbage colletion list */
OPAL_THREAD_LOCK(&module->lock);
opal_list_append (&module->request_gc, (opal_list_item_t *) request);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
/**
* @short Post a send to match the remote receive for a get operation.
*
* @param[in] module - OSC RDMA module
* @param[in] source - Source buffer
* @param[in] count - Number of elements in the source buffer
* @param[in] datatype - Type of source elements.
* @param[in] peer - Remote process that has the receive posted
* @param[in] tag - Tag for the send
*
* @long This function posts a send to match the receive posted as part
* of a get operation. When this send is complete the get is considered
* complete at the target (this process).
*/
static int osc_rdma_get_post_send (ompi_osc_rdma_module_t *module, void *source, int count,
ompi_datatype_t *datatype, int peer, int tag)
{
return ompi_osc_rdma_isend_w_cb (source, count, datatype, peer, tag, module->comm,
osc_rdma_incomming_req_omplete, module);
}
/**
* process_get:
*
* @short Process a get message from a remote peer
*
* @param[in] module - OSC RDMA module
* @param[in] target - Peer process
* @param[in] get_header - Incoming message header
*/
static inline int process_get (ompi_osc_rdma_module_t* module, int target,
ompi_osc_rdma_header_get_t* get_header)
{
char *data = (char *) (get_header + 1);
struct ompi_datatype_t *datatype;
void *source = (unsigned char*) module->baseptr +
((unsigned long) get_header->displacement * module->disp_unit);
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_get: received message from %d",
ompi_comm_rank(module->comm),
target));
ret = datatype_create (module, target, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
/* send get data */
ret = osc_rdma_get_post_send (module, source, get_header->count, datatype, target, get_header->tag);
OBJ_RELEASE(datatype);
return OMPI_SUCCESS == ret ? (int) get_header->len : ret;
}
/**
* osc_rdma_accumulate_buffer:
*
* @short Accumulate data into the target buffer.
*
* @param[in] target - Target buffer
* @param[in] source - Source buffer
* @param[in] source_len - Length of source buffer in bytes
* @param[in] proc - Source proc
* @param[in] count - Number of elements in target buffer
* @param[in] datatype - Type of elements in target buffer
* @param[in] op - Operation to be performed
*/
static inline int osc_rdma_accumulate_buffer (void *target, void *source, size_t source_len, ompi_proc_t *proc,
int count, ompi_datatype_t *datatype, ompi_op_t *op)
{
void *buffer = source;
int ret;
assert (NULL != target && NULL != source);
if (op == &ompi_mpi_op_replace.op) {
osc_rdma_copy_on_recv (target, source, source_len, proc, count, datatype);
return OMPI_SUCCESS;
}
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
if (proc->proc_arch != ompi_proc_local()->proc_arch) {
ompi_datatype_t *primitive_datatype = NULL;
uint32_t primitive_count;
size_t buflen;
ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
primitive_count *= count;
/* figure out how big a buffer we need */
ompi_datatype_type_size(primitive_datatype, &buflen);
buflen *= primitive_count;
buffer = malloc (buflen);
if (OPAL_UNLIKELY(NULL == buffer)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
osc_rdma_copy_on_recv (buffer, source, source_len, proc, count, datatype);
}
#endif
/* copy the data from the temporary buffer into the user window */
ret = ompi_osc_base_process_op(target, buffer, source_len, datatype,
count, op);
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
if (proc->proc_arch != ompi_proc_local()->proc_arch) {
free(buffer);
}
#endif
return ret;
}
/**
* @short Create an accumulate data object.
*
* @param[in] module - RDMA OSC module
* @param[in] target - Target for the accumulation
* @param[in] source - Source of accumulate data. Must be allocated with malloc/calloc/etc
* @param[in] source_len - Length of the source buffer in bytes
* @param[in] proc - Source proc
* @param[in] count - Number of elements to accumulate
* @param[in] datatype - Datatype to accumulate
* @oaram[in] op - Operator
* @param[in] request_count - Number of prerequisite requests
* @param[out] acc_data_out - New accumulation data
*
* @long This function is used to create a copy of the data needed to perform an accumulation.
* This data should be provided to ompi_osc_rdma_isend_w_cb or ompi_osc_rdma_irecv_w_cb
* as the ctx parameter with accumulate_cb as the cb parameter.
*/
static int osc_rdma_accumulate_allocate (ompi_osc_rdma_module_t *module, int peer, void *target, void *source, size_t source_len,
ompi_proc_t *proc, int count, ompi_datatype_t *datatype, ompi_op_t *op,
int request_count, osc_rdma_accumulate_data_t **acc_data_out)
{
osc_rdma_accumulate_data_t *acc_data;
acc_data = OBJ_NEW(osc_rdma_accumulate_data_t);
if (OPAL_UNLIKELY(NULL == acc_data)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
acc_data->module = module;
acc_data->peer = peer;
acc_data->target = target;
acc_data->source = source;
acc_data->source_len = source_len;
acc_data->proc = proc;
acc_data->count = count;
acc_data->datatype = datatype;
OBJ_RETAIN(datatype);
acc_data->op = op;
OBJ_RETAIN(op);
acc_data->request_count = request_count;
*acc_data_out = acc_data;
return OMPI_SUCCESS;
}
/**
* @short Execute the accumulate once the request counter reaches 0.
*
* @param[in] request - request
*
* The request should be created with ompi_osc_rdma_isend_w_cb or ompi_osc_rdma_irecv_w_cb
* with ctx allocated by osc_rdma_accumulate_allocate. This callback will free the accumulate
* data once the accumulation operation is complete.
*/
static int accumulate_cb (ompi_request_t *request)
{
struct osc_rdma_accumulate_data_t *acc_data = (struct osc_rdma_accumulate_data_t *) request->req_complete_cb_data;
ompi_osc_rdma_module_t *module = acc_data->module;
int rank = MPI_PROC_NULL;
int ret = OMPI_SUCCESS;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"accumulate_cb, request_count = %d", acc_data->request_count));
if (request->req_status.MPI_TAG & 0x01) {
rank = acc_data->peer;
}
mark_incoming_completion (module, rank);
OPAL_THREAD_LOCK(&module->lock);
if (0 == --acc_data->request_count) {
/* no more requests needed before the buffer can be accumulated */
if (acc_data->source) {
ret = osc_rdma_accumulate_buffer (acc_data->target, acc_data->source, acc_data->source_len,
acc_data->proc, acc_data->count, acc_data->datatype, acc_data->op);
}
/* drop the accumulate lock */
ompi_osc_rdma_accumulate_unlock (module);
opal_list_append (&module->buffer_gc, &acc_data->super);
}
opal_list_append (&module->request_gc, (opal_list_item_t *) request);
OPAL_THREAD_UNLOCK(&module->lock);
return ret;
}
static int ompi_osc_rdma_acc_op_queue (ompi_osc_rdma_module_t *module, ompi_osc_rdma_header_t *header, int source,
char *data, size_t data_len, ompi_datatype_t *datatype)
{
osc_rdma_pending_acc_t *pending_acc;
pending_acc = OBJ_NEW(osc_rdma_pending_acc_t);
if (OPAL_UNLIKELY(NULL == pending_acc)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
pending_acc->source = source;
/* save any inline data (eager acc, gacc only) */
pending_acc->data_len = data_len;
if (data_len) {
pending_acc->data = malloc (data_len);
memcpy (pending_acc->data, data, data_len);
}
/* save the datatype */
pending_acc->datatype = datatype;
OBJ_RETAIN(datatype);
/* save the header */
switch (header->base.type) {
case OMPI_OSC_RDMA_HDR_TYPE_ACC:
case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG:
pending_acc->header.acc = header->acc;
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC:
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
pending_acc->header.get_acc = header->get_acc;
break;
case OMPI_OSC_RDMA_HDR_TYPE_CSWAP:
pending_acc->header.cswap = header->cswap;
break;
default:
/* it is a coding error if any other header types are queued this way */
assert (0);
}
/* add to the pending acc queue */
OPAL_THREAD_LOCK(&module->lock);
opal_list_append (&module->pending_acc, &pending_acc->super);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
static int replace_cb (ompi_request_t *request)
{
ompi_osc_rdma_module_t *module = (ompi_osc_rdma_module_t *) request->req_complete_cb_data;
/* unlock the accumulate lock */
ompi_osc_rdma_accumulate_unlock (module);
return OMPI_SUCCESS;
}
/**
* ompi_osc_rdma_acc_start:
*
* @short Start an accumulate with data operation.
*
* @param[in] module - OSC RDMA module
* @param[in] source - Source rank
* @param[in] data - Accumulate data
* @param[in] data_len - Length of the accumulate data
* @param[in] datatype - Accumulation datatype
* @param[in] acc_header - Accumulate header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_rdma_acc_start (ompi_osc_rdma_module_t *module, int source, void *data, size_t data_len,
ompi_datatype_t *datatype, ompi_osc_rdma_header_acc_t *acc_header)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) acc_header->displacement * module->disp_unit);
struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
ompi_proc_t *proc;
int ret;
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
ret = osc_rdma_accumulate_buffer (target, data, data_len, proc, acc_header->count,
datatype, op);
OBJ_RELEASE(op);
ompi_osc_rdma_accumulate_unlock (module);
return ret;
}
/**
* ompi_osc_rdma_acc_start:
*
* @short Start a long accumulate operation.
*
* @param[in] module - OSC RDMA module
* @param[in] source - Source rank
* @param[in] datatype - Accumulation datatype
* @param[in] acc_header - Accumulate header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_rdma_acc_long_start (ompi_osc_rdma_module_t *module, int source, ompi_datatype_t *datatype,
ompi_osc_rdma_header_acc_t *acc_header) {
struct osc_rdma_accumulate_data_t *acc_data;
size_t buflen;
void *buffer;
ompi_proc_t *proc;
void *target = (unsigned char*) module->baseptr +
((unsigned long) acc_header->displacement * module->disp_unit);
struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_rdma_acc_long_start starting..."));
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
do {
if (op == &ompi_mpi_op_replace.op) {
ret = ompi_osc_rdma_irecv_w_cb (target, acc_header->count, datatype, source,
acc_header->tag, module->comm, NULL,
replace_cb, module);
break;
}
buflen = datatype_buffer_length (datatype, acc_header->count);
/* allocate a temporary buffer to receive the accumulate data */
buffer = malloc (buflen);
if (OPAL_UNLIKELY(NULL == buffer)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
break;
}
ret = osc_rdma_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count,
datatype, op, 1, &acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
free (buffer);
break;
}
ret = ompi_osc_rdma_irecv_w_cb (buffer, acc_header->count, datatype, source, acc_header->tag,
module->comm, NULL, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OBJ_RELEASE(acc_data);
}
} while (0);
OBJ_RELEASE(op);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
ompi_osc_rdma_accumulate_unlock (module);
}
return ret;
}
/**
* ompi_osc_rdma_gacc_start:
*
* @short Start a accumulate with data + get operation.
*
* @param[in] module - OSC RDMA module
* @param[in] source - Source rank
* @param[in] data - Accumulate data. Must be allocated on the heap.
* @param[in] data_len - Length of the accumulate data
* @param[in] datatype - Accumulation datatype
* @param[in] get_acc_header - Accumulate header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_rdma_gacc_start (ompi_osc_rdma_module_t *module, int source, void *data, size_t data_len,
ompi_datatype_t *datatype, ompi_osc_rdma_header_get_acc_t *get_acc_header)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) get_acc_header->displacement * module->disp_unit);
struct ompi_op_t *op = ompi_osc_base_op_create(get_acc_header->op);
struct osc_rdma_accumulate_data_t *acc_data;
ompi_proc_t *proc;
int ret;
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
do {
ret = osc_rdma_accumulate_allocate (module, source, target, data, data_len, proc, get_acc_header->count,
datatype, op, 1, &acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
ret = ompi_osc_rdma_isend_w_cb (target, get_acc_header->count, datatype, source, get_acc_header->tag,
module->comm, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OBJ_RELEASE(acc_data);
}
} while (0);
OBJ_RELEASE(op);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
ompi_osc_rdma_accumulate_unlock (module);
}
return ret;
}
/**
* ompi_osc_rdma_gacc_long_start:
*
* @short Start a long accumulate + get operation.
*
* @param[in] module - OSC RDMA module
* @param[in] source - Source rank
* @param[in] datatype - Accumulation datatype
* @param[in] get_acc_header - Accumulate header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_gacc_long_start (ompi_osc_rdma_module_t *module, int source, ompi_datatype_t *datatype,
ompi_osc_rdma_header_get_acc_t *get_acc_header)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) get_acc_header->displacement * module->disp_unit);
struct ompi_op_t *op = ompi_osc_base_op_create(get_acc_header->op);
struct osc_rdma_accumulate_data_t *acc_data;
ompi_request_t *recv_request;
ompi_proc_t *proc;
size_t buflen;
void *buffer;
int ret;
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
/* allocate a temporary buffer to receive the accumulate data */
buflen = datatype_buffer_length (datatype, get_acc_header->count);
do {
buffer = malloc (buflen);
if (OPAL_UNLIKELY(NULL == buffer)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
break;
}
ret = osc_rdma_accumulate_allocate (module, source, target, buffer, buflen, proc, get_acc_header->count,
datatype, op, 2, &acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
ret = ompi_osc_rdma_irecv_w_cb (buffer, get_acc_header->count, datatype, source, get_acc_header->tag,
module->comm, &recv_request, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OBJ_RELEASE(acc_data);
break;
}
ret = ompi_osc_rdma_isend_w_cb (target, get_acc_header->count, datatype, source, get_acc_header->tag,
module->comm, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS == ret)) {
/* cancel the receive and free the accumulate data */
ompi_request_cancel (recv_request);
OBJ_RELEASE(acc_data);
break;
}
} while (0);
OBJ_RELEASE(op);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
ompi_osc_rdma_accumulate_unlock (module);
}
return ret;
}
/**
* ompi_osc_rdma_cswap_start:
*
* @short Start a compare and swap operation
*
* @param[in] module - OSC RDMA module
* @param[in] source - Source rank
* @param[in] data - Compare and swap data
* @param[in] data_len - Length of the compare and swap data. Must be exactly
* twice the size of the datatype.
* @param[in] datatype - Compare and swap datatype
* @param[in] cswap_header - Compare and swap header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_rdma_cswap_start (ompi_osc_rdma_module_t *module, int source, void *data, ompi_datatype_t *datatype,
ompi_osc_rdma_header_cswap_t *cswap_header)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) cswap_header->displacement * module->disp_unit);
void *compare_addr, *origin_addr;
size_t datatype_size;
ompi_proc_t *proc;
int ret;
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
datatype_size = datatype->super.size;
origin_addr = data;
compare_addr = (void *)((uintptr_t) data + datatype_size);
do {
/* no reason to do a non-blocking send here */
ret = MCA_PML_CALL(send(target, 1, datatype, source, cswap_header->tag, MCA_PML_BASE_SEND_STANDARD,
module->comm));
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
/* increment the incomming fragment count so it matches what is expected */
mark_incoming_completion (module, (cswap_header->tag & 0x1) ? source : MPI_PROC_NULL);
if (0 == memcmp (target, compare_addr, datatype_size)) {
osc_rdma_copy_on_recv (target, origin_addr, datatype_size, proc, 1, datatype);
}
} while (0);
ompi_osc_rdma_accumulate_unlock (module);
return ret;
}
/**
* ompi_osc_rdma_progress_pending_acc:
*
* @short Progress one pending accumulation or compare and swap operation.
*
* @param[in] module - OSC RDMA module
*
* @long If the accumulation lock can be aquired progress one pending
* accumulate or compare and swap operation.
*/
int ompi_osc_rdma_progress_pending_acc (ompi_osc_rdma_module_t *module)
{
osc_rdma_pending_acc_t *pending_acc;
int ret;
/* try to aquire the lock. it will be unlocked when the accumulate or cswap
* operation completes */
if (ompi_osc_rdma_accumulate_trylock (module)) {
return OMPI_SUCCESS;
}
pending_acc = (osc_rdma_pending_acc_t *) opal_list_remove_first (&module->pending_acc);
if (OPAL_UNLIKELY(NULL == pending_acc)) {
/* called without any pending accumulation operations */
ompi_osc_rdma_accumulate_unlock (module);
return OMPI_SUCCESS;
}
switch (pending_acc->header.base.type) {
case OMPI_OSC_RDMA_HDR_TYPE_ACC:
ret = ompi_osc_rdma_acc_start (module, pending_acc->source, pending_acc->data, pending_acc->data_len,
pending_acc->datatype, &pending_acc->header.acc);
free (pending_acc->data);
break;
case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG:
ret = ompi_osc_rdma_acc_long_start (module, pending_acc->source, pending_acc->datatype,
&pending_acc->header.acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC:
ret = ompi_osc_rdma_gacc_start (module, pending_acc->source, pending_acc->data,
pending_acc->data_len, pending_acc->datatype,
&pending_acc->header.get_acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
ret = ompi_osc_gacc_long_start (module, pending_acc->source, pending_acc->datatype,
&pending_acc->header.get_acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_CSWAP:
ret = ompi_osc_rdma_cswap_start (module, pending_acc->source, pending_acc->data,
pending_acc->datatype, &pending_acc->header.cswap);
break;
default:
ret = OMPI_ERROR;
/* it is a coding error if this point is reached */
assert (0);
}
pending_acc->data = NULL;
OBJ_RELEASE(pending_acc);
return ret;
}
static inline int process_acc (ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_acc_t *acc_header)
{
char *data = (char *) (acc_header + 1);
struct ompi_datatype_t *datatype;
uint64_t data_len;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_acc: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
data_len = acc_header->len - ((char*) data - (char*) acc_header);
/* try to aquire the accumulate lock */
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
ret = ompi_osc_rdma_acc_start (module, source, data, data_len, datatype,
acc_header);
} else {
/* couldn't aquire the accumulate lock so queue up the accumulate operation */
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) acc_header,
source, data, data_len, datatype);
}
/* Release datatype & op */
OBJ_RELEASE(datatype);
return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
}
static inline int process_acc_long (ompi_osc_rdma_module_t* module, int source,
ompi_osc_rdma_header_acc_t* acc_header)
{
char *data = (char *) (acc_header + 1);
struct ompi_datatype_t *datatype;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_acc_long: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
ret = ompi_osc_rdma_acc_long_start (module, source, datatype, acc_header);
} else {
/* queue the operation */
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) acc_header, source,
NULL, 0, datatype);
}
/* Release datatype & op */
OBJ_RELEASE(datatype);
return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
}
static inline int process_get_acc(ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_get_acc_t *get_acc_header)
{
char *data = (char *) (get_acc_header + 1);
struct ompi_datatype_t *datatype;
void *buffer = NULL;
uint64_t data_len;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_get_acc: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
data_len = get_acc_header->len - ((char*) data - (char*) get_acc_header);
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
/* make a copy of the data since the buffer needs to be returned */
if (data_len) {
buffer = malloc (data_len);
if (OPAL_UNLIKELY(NULL == buffer)) {
OBJ_RELEASE(datatype);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy (buffer, data, data_len);
}
ret = ompi_osc_rdma_gacc_start (module, source, buffer, data_len, datatype,
get_acc_header);
} else {
/* queue the operation */
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) get_acc_header,
source, data, data_len, datatype);
}
/* Release datatype & op */
OBJ_RELEASE(datatype);
return (OMPI_SUCCESS == ret) ? (int) get_acc_header->len : ret;
}
static inline int process_get_acc_long(ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_get_acc_t *get_acc_header)
{
char *data = (char *) (get_acc_header + 1);
struct ompi_datatype_t *datatype;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_acc: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
ret = ompi_osc_gacc_long_start (module, source, datatype, get_acc_header);
} else {
/* queue the operation */
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) get_acc_header,
source, NULL, 0, datatype);
}
/* Release datatype & op */
OBJ_RELEASE(datatype);
return OMPI_SUCCESS == ret ? (int) get_acc_header->len : ret;
}
static inline int process_cswap (ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_cswap_t *cswap_header)
{
char *data = (char*) (cswap_header + 1);
struct ompi_datatype_t *datatype;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_cswap: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
if (0 == ompi_osc_rdma_accumulate_trylock (module)) {
ret = ompi_osc_rdma_cswap_start (module, source, data, datatype, cswap_header);
} else {
/* queue the operation */
ret = ompi_osc_rdma_acc_op_queue (module, (ompi_osc_rdma_header_t *) cswap_header, source,
data, 2 * datatype->super.size, datatype);
}
/* Release datatype */
OBJ_RELEASE(datatype);
return (OMPI_SUCCESS == ret) ? (int) cswap_header->len : ret;
}
static inline int process_complete (ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_complete_t *complete_header)
{
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: process_complete got complete message from %d", source));
module->num_complete_msgs++;
return sizeof (*complete_header);
}
/* flush and unlock headers cannot be processed from the request callback
* because some btls do not provide re-entrant progress functions. these
* fragment will be progressed by the rdma component's progress function */
static inline int process_flush (ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_flush_t *flush_header)
{
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"process_flush header = {.frag_count = %d}", flush_header->frag_count));
/* increase signal count by incoming frags */
module->passive_incoming_frag_signal_count[source] += flush_header->frag_count;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_flush: received message from %d. passive_incoming_frag_signal_count = %d, passive_incoming_frag_count = %d",
ompi_comm_rank(module->comm), source, module->passive_incoming_frag_signal_count[source], module->passive_incoming_frag_count[source]));
ret = ompi_osc_rdma_process_flush (module, source, flush_header);
if (OMPI_SUCCESS != ret) {
ompi_osc_rdma_pending_t *pending;
pending = OBJ_NEW(ompi_osc_rdma_pending_t);
pending->module = module;
pending->source = source;
pending->header.flush = *flush_header;
OPAL_THREAD_LOCK(&mca_osc_rdma_component.lock);
opal_list_append (&mca_osc_rdma_component.pending_operations, &pending->super);
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.lock);
/* we now have to count the current fragment */
module->passive_incoming_frag_signal_count[source]++;
} else {
/* need to account for the current fragment */
module->passive_incoming_frag_count[source] = -1;
}
return sizeof (*flush_header);
}
static inline int process_unlock (ompi_osc_rdma_module_t *module, int source,
ompi_osc_rdma_header_unlock_t *unlock_header)
{
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"process_unlock header = {.frag_count = %d}", unlock_header->frag_count));
/* increase signal count by incoming frags */
module->passive_incoming_frag_signal_count[source] += unlock_header->frag_count;
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"osc rdma: processing unlock request from %d. frag count = %d, signal_count = %d, processed_count = %d",
source, unlock_header->frag_count, (int) module->passive_incoming_frag_signal_count[source],
(int) module->passive_incoming_frag_count[source]));
ret = ompi_osc_rdma_process_unlock (module, source, unlock_header);
if (OMPI_SUCCESS != ret) {
ompi_osc_rdma_pending_t *pending;
pending = OBJ_NEW(ompi_osc_rdma_pending_t);
pending->module = module;
pending->source = source;
pending->header.unlock = *unlock_header;
OPAL_THREAD_LOCK(&mca_osc_rdma_component.lock);
opal_list_append (&mca_osc_rdma_component.pending_operations, &pending->super);
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.lock);
/* we now have to count the current fragment */
module->passive_incoming_frag_signal_count[source]++;
} else {
/* need to account for the current fragment */
module->passive_incoming_frag_count[source] = -1;
}
return sizeof (*unlock_header);
}
/*
* Do all the data movement associated with a fragment
*/
static inline int process_frag (ompi_osc_rdma_module_t *module,
ompi_osc_rdma_frag_header_t *frag)
{
ompi_osc_rdma_header_t *header;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: process_frag: from %d, ops %d",
(int) frag->source, (int) frag->num_ops));
header = (ompi_osc_rdma_header_t *) (frag + 1);
for (int i = 0 ; i < frag->num_ops ; ++i) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: process_frag: type 0x%x. offset = %u", header->base.type,
(unsigned) ((uintptr_t)header - (uintptr_t)frag)));
switch (header->base.type) {
case OMPI_OSC_RDMA_HDR_TYPE_PUT:
ret = process_put(module, frag->source, &header->put);
break;
case OMPI_OSC_RDMA_HDR_TYPE_PUT_LONG:
ret = process_put_long(module, frag->source, &header->put);
break;
case OMPI_OSC_RDMA_HDR_TYPE_ACC:
ret = process_acc(module, frag->source, &header->acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_ACC_LONG:
ret = process_acc_long (module, frag->source, &header->acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_LOCK_REQ:
ret = ompi_osc_rdma_process_lock(module, frag->source, &header->lock);
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
ret = sizeof (header->lock);
}
break;
case OMPI_OSC_RDMA_HDR_TYPE_UNLOCK_REQ:
ret = process_unlock(module, frag->source, &header->unlock);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET:
ret = process_get (module, frag->source, &header->get);
break;
case OMPI_OSC_RDMA_HDR_TYPE_CSWAP:
ret = process_cswap (module, frag->source, &header->cswap);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC:
ret = process_get_acc (module, frag->source, &header->get_acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_GET_ACC_LONG:
ret = process_get_acc_long (module, frag->source, &header->get_acc);
break;
case OMPI_OSC_RDMA_HDR_TYPE_FLUSH_REQ:
ret = process_flush (module, frag->source, &header->flush);
break;
case OMPI_OSC_RDMA_HDR_TYPE_COMPLETE:
ret = process_complete (module, frag->source, &header->complete);
break;
default:
opal_output(0, "Unsupported fragment type 0x%x\n", header->base.type);
abort(); /* FIX ME */
}
if (ret <= 0) {
opal_output(0, "Error processing fragment: %d", ret);
abort(); /* FIX ME */
}
/* the next header will start on an 8-byte boundary. this is done to ensure
* that the next header and the packed datatype is properly aligned */
header = (ompi_osc_rdma_header_t *) OPAL_ALIGN(((uintptr_t) header + ret), 8, uintptr_t);
}
return OMPI_SUCCESS;
}
/* dispatch for callback on message completion */
static int ompi_osc_rdma_callback (ompi_request_t *request)
{
ompi_osc_rdma_module_t *module = (ompi_osc_rdma_module_t *) request->req_complete_cb_data;
ompi_osc_rdma_header_base_t *base_header =
(ompi_osc_rdma_header_base_t *) module->incomming_buffer;
size_t incomming_length = request->req_status._ucount;
int source = request->req_status.MPI_SOURCE;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
assert(incomming_length >= sizeof(ompi_osc_rdma_header_base_t));
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"received rdma callback for fragment. source = %d, count = %u, type = 0x%x",
source, (unsigned) incomming_length, base_header->type));
switch (base_header->type) {
case OMPI_OSC_RDMA_HDR_TYPE_FRAG:
process_frag(module, (ompi_osc_rdma_frag_header_t *) base_header);
break;
case OMPI_OSC_RDMA_HDR_TYPE_POST:
(void) osc_rdma_incomming_post (module);
break;
case OMPI_OSC_RDMA_HDR_TYPE_LOCK_ACK:
ompi_osc_rdma_process_lock_ack(module, (ompi_osc_rdma_header_lock_ack_t *) base_header);
break;
case OMPI_OSC_RDMA_HDR_TYPE_FLUSH_ACK:
ompi_osc_rdma_process_flush_ack (module, source, (ompi_osc_rdma_header_flush_ack_t *) base_header);
break;
case OMPI_OSC_RDMA_HDR_TYPE_UNLOCK_ACK:
ompi_osc_rdma_process_unlock_ack (module, source, (ompi_osc_rdma_header_unlock_ack_t *) base_header);
break;
default:
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"received unexpected message of type %x",
(int) base_header->type));
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"finished processing incomming messages"));
/* restart the receive request */
OPAL_THREAD_LOCK(&module->lock);
mark_incoming_completion (module, (base_header->flags & OMPI_OSC_RDMA_HDR_FLAG_PASSIVE_TARGET) ?
source : MPI_PROC_NULL);
osc_rdma_gc_clean (module);
opal_list_append (&module->request_gc, (opal_list_item_t *) request);
ompi_osc_rdma_frag_start_receive (module);
OPAL_THREAD_UNLOCK(&module->lock);
OPAL_THREAD_LOCK(&ompi_request_lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"finished posting receive request"));
return OMPI_SUCCESS;
}
int ompi_osc_rdma_frag_start_receive (ompi_osc_rdma_module_t *module)
{
return ompi_osc_rdma_irecv_w_cb (module->incomming_buffer, mca_osc_rdma_component.buffer_size + sizeof (ompi_osc_rdma_frag_header_t),
MPI_BYTE, OMPI_ANY_SOURCE, OSC_RDMA_FRAG_TAG, module->comm, &module->frag_request,
ompi_osc_rdma_callback, module);
}
int ompi_osc_rdma_component_irecv (ompi_osc_rdma_module_t *module, void *buf,
size_t count, struct ompi_datatype_t *datatype,
int src, int tag, struct ompi_communicator_t *comm)
{
return ompi_osc_rdma_irecv_w_cb (buf, count, datatype, src, tag, comm, NULL,
osc_rdma_incomming_req_omplete, module);
}
static int
isend_completion_cb(ompi_request_t *request)
{
ompi_osc_rdma_module_t *module =
(ompi_osc_rdma_module_t*) request->req_complete_cb_data;
OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
"isend_completion_cb called"));
mark_outgoing_completion(module);
/* put this request on the garbage colletion list */
OPAL_THREAD_LOCK(&module->lock);
opal_list_append (&module->request_gc, (opal_list_item_t *) request);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_component_isend(ompi_osc_rdma_module_t *module,
void *buf,
size_t count,
struct ompi_datatype_t *datatype,
int dest,
int tag,
struct ompi_communicator_t *comm)
{
return ompi_osc_rdma_isend_w_cb (buf, count, datatype, dest, tag, comm,
isend_completion_cb, module);
}
int ompi_osc_rdma_isend_w_cb (void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
ompi_communicator_t *comm, ompi_request_complete_fn_t cb, void *ctx)
{
ompi_request_t *request;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: ompi_osc_rdma_isend_w_cb sending %d bytes to %d with tag %d",
count, target, tag));
ret = MCA_PML_CALL(isend_init(ptr, count, datatype, target, tag,
MCA_PML_BASE_SEND_STANDARD, comm, &request));
if (OMPI_SUCCESS != ret) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"error sending fragment. ret = %d", ret));
return ret;
}
request->req_complete_cb = cb;
request->req_complete_cb_data = ctx;
ret = MCA_PML_CALL(start(1, &request));
return ret;
}
int ompi_osc_rdma_irecv_w_cb (void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
ompi_communicator_t *comm, ompi_request_t **request_out,
ompi_request_complete_fn_t cb, void *ctx)
{
ompi_request_t *request;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc rdma: ompi_osc_rdma_irecv_w_cb receiving %d bytes from %d with tag %d",
count, target, tag));
ret = MCA_PML_CALL(irecv_init(ptr, count, datatype, target, tag, comm, &request));
if (OMPI_SUCCESS != ret) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"error posting receive. ret = %d", ret));
return ret;
}
request->req_complete_cb = cb;
request->req_complete_cb_data = ctx;
if (request_out) {
*request_out = request;
}
ret = MCA_PML_CALL(start(1, &request));
return ret;
}