1
1
openmpi/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c
Nathan Hjelm 29b435a5a4 osc/pt2pt: fix bugs that caused incorrect fragment counting
This commit fixes a bug identified by MTT that occurred when mixing
passive and active target synchronization. The bugs fixed in this
commit are:

 - Do not update incoming fragment counts for any type of unbuffered
   control message. These messages are out-of-band and should not be
   considered towards the signal counts.

 - Complete a change from using received counts to expected counts for
   lock, unlock, and flush acks. Part of the change made it into
   master before the rest was ready. This was preventing wakeups in
   some cases.

 - Turn the passive_target_access_epoch module member into a
   counter. As long as at least one peer is locked we are in a
   passive-target epoch and not an active target one. This fix will
   ensure that fragment flags are set appropriately.

fixes #538

Signed-off-by: Nathan Hjelm <hjelmn@lanl.gov>
2015-04-23 13:22:24 -06:00

1771 строка
62 KiB
C

/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2006 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007-2014 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2009-2011 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_pt2pt.h"
#include "osc_pt2pt_header.h"
#include "osc_pt2pt_data_move.h"
#include "osc_pt2pt_frag.h"
#include "osc_pt2pt_request.h"
#include "opal/threads/condition.h"
#include "opal/threads/mutex.h"
#include "opal/util/arch.h"
#include "opal/util/output.h"
#include "opal/sys/atomic.h"
#include "opal/align.h"
#include "opal/mca/btl/btl.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/pml_base_sendreq.h"
#include "opal/mca/btl/btl.h"
#include "ompi/mca/osc/base/base.h"
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/op/op.h"
#include "ompi/memchecker.h"
/**
* struct osc_pt2pt_accumulate_data_t:
*
* @brief Data associated with an in-progress accumulation operation.
*/
struct osc_pt2pt_accumulate_data_t {
opal_list_item_t super;
ompi_osc_pt2pt_module_t* module;
void *target;
void *source;
size_t source_len;
ompi_proc_t *proc;
int count;
int peer;
ompi_datatype_t *datatype;
ompi_op_t *op;
int request_count;
};
typedef struct osc_pt2pt_accumulate_data_t osc_pt2pt_accumulate_data_t;
static void osc_pt2pt_accumulate_data_constructor (osc_pt2pt_accumulate_data_t *acc_data)
{
acc_data->source = NULL;
acc_data->datatype = NULL;
acc_data->op = NULL;
}
static void osc_pt2pt_accumulate_data_destructor (osc_pt2pt_accumulate_data_t *acc_data)
{
if (acc_data->source) {
/* the source buffer is always alloc'd */
free (acc_data->source);
}
if (acc_data->datatype) {
OBJ_RELEASE(acc_data->datatype);
}
if (acc_data->op) {
OBJ_RELEASE(acc_data->op);
}
}
OBJ_CLASS_DECLARATION(osc_pt2pt_accumulate_data_t);
OBJ_CLASS_INSTANCE(osc_pt2pt_accumulate_data_t, opal_list_item_t, osc_pt2pt_accumulate_data_constructor,
osc_pt2pt_accumulate_data_destructor);
/**
* osc_pt2pt_pending_acc_t:
*
* @brief Keep track of accumulate and cswap operations that are
* waiting on the accumulate lock.
*
* Since accumulate operations may take several steps to
* complete we need to lock the accumulate lock until the operation
* is complete. While the lock is held it is possible that additional
* accumulate operations will arrive. This structure keep track of
* those operations.
*/
struct osc_pt2pt_pending_acc_t {
opal_list_item_t super;
ompi_osc_pt2pt_header_t header;
int source;
void *data;
size_t data_len;
ompi_datatype_t *datatype;
};
typedef struct osc_pt2pt_pending_acc_t osc_pt2pt_pending_acc_t;
static void osc_pt2pt_pending_acc_constructor (osc_pt2pt_pending_acc_t *pending)
{
pending->data = NULL;
pending->datatype = NULL;
}
static void osc_pt2pt_pending_acc_destructor (osc_pt2pt_pending_acc_t *pending)
{
if (NULL != pending->data) {
free (pending->data);
}
if (NULL != pending->datatype) {
OBJ_RELEASE(pending->datatype);
}
}
OBJ_CLASS_DECLARATION(osc_pt2pt_pending_acc_t);
OBJ_CLASS_INSTANCE(osc_pt2pt_pending_acc_t, opal_list_item_t,
osc_pt2pt_pending_acc_constructor, osc_pt2pt_pending_acc_destructor);
/* end ompi_osc_pt2pt_pending_acc_t class */
/**
* @brief Class for large datatype descriptions
*
* This class is used to keep track of buffers for large datatype desctiotions
* (datatypes that do not fit in an eager fragment). The structure is designed
* to take advantage of the small datatype description code path.
*/
struct ompi_osc_pt2pt_ddt_buffer_t {
/** allows this class to be stored in the buffer garbage collection
* list */
opal_list_item_t super;
/** OSC PT2PT module */
ompi_osc_pt2pt_module_t *module;
/** source of this header */
int source;
/** header + datatype data */
ompi_osc_pt2pt_header_t *header;
};
typedef struct ompi_osc_pt2pt_ddt_buffer_t ompi_osc_pt2pt_ddt_buffer_t;
static void ompi_osc_pt2pt_ddt_buffer_constructor (ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer)
{
ddt_buffer->header = NULL;
}
static void ompi_osc_pt2pt_ddt_buffer_destructor (ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer)
{
if (ddt_buffer->header) {
free (ddt_buffer->header);
ddt_buffer->header = NULL;
}
}
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_ddt_buffer_t);
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_ddt_buffer_t, opal_list_item_t, ompi_osc_pt2pt_ddt_buffer_constructor,
ompi_osc_pt2pt_ddt_buffer_destructor);
/* end ompi_osc_pt2pt_ddt_buffer_t class */
/**
* datatype_buffer_length:
*
* @brief Determine the buffer size needed to hold count elements of datatype.
*
* @param[in] datatype - Element type
* @param[in] count - Element count
*
* @returns buflen Buffer length needed to hold count elements of datatype
*/
static inline int datatype_buffer_length (ompi_datatype_t *datatype, int count)
{
ompi_datatype_t *primitive_datatype = NULL;
uint32_t primitive_count;
size_t buflen;
ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
primitive_count *= count;
/* figure out how big a buffer we need */
ompi_datatype_type_size(primitive_datatype, &buflen);
return buflen * primitive_count;
}
/**
* ompi_osc_pt2pt_control_send:
*
* @brief send a control message as part of a fragment
*
* @param[in] module - OSC PT2PT module
* @param[in] target - Target peer's rank
* @param[in] data - Data to send
* @param[in] len - Length of data
*
* @returns error OMPI error code or OMPI_SUCCESS
*
* "send" a control messages. Adds it to the active fragment, so the
* caller will still need to explicitly flush (either to everyone or
* to a target) before this is sent.
*/
int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target,
void *data, size_t len)
{
ompi_osc_pt2pt_frag_t *frag;
char *ptr;
int ret;
ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr);
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
memcpy (ptr, data, len);
ret = ompi_osc_pt2pt_frag_finish(module, frag);
}
return ret;
}
static int ompi_osc_pt2pt_control_send_unbuffered_cb (ompi_request_t *request)
{
void *ctx = request->req_complete_cb_data;
ompi_osc_pt2pt_module_t *module;
/* get module pointer and data */
module = *(ompi_osc_pt2pt_module_t **)ctx;
/* mark this send as complete */
mark_outgoing_completion (module);
/* free the temporary buffer */
free (ctx);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);
return OMPI_SUCCESS;
}
/**
* ompi_osc_pt2pt_control_send_unbuffered:
*
* @brief Send an unbuffered control message to a peer.
*
* @param[in] module - OSC PT2PT module
* @param[in] target - Target rank
* @param[in] data - Data to send
* @param[in] len - Length of data
*
* Directly send a control message. This does not allocate a
* fragment, so should only be used when sending other messages would
* be erroneous (such as complete messages, when there may be queued
* transactions from an overlapping post that has already heard back
* from its peer). The buffer specified by data will be available
* when this call returns.
*/
int ompi_osc_pt2pt_control_send_unbuffered(ompi_osc_pt2pt_module_t *module,
int target, void *data, size_t len)
{
void *ctx, *data_copy;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: sending unbuffered fragment to %d", target));
/* allocate a temporary buffer for this send */
ctx = malloc (sizeof(ompi_osc_pt2pt_module_t*) + len);
if (OPAL_UNLIKELY(NULL == ctx)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* increment outgoing signal count. this send is not part of a passive epoch
* so there it would be erroneous to increment the epoch counters. */
ompi_osc_signal_outgoing (module, MPI_PROC_NULL, 1);
/* store module pointer and data in the buffer */
*(ompi_osc_pt2pt_module_t**)ctx = module;
data_copy = (ompi_osc_pt2pt_module_t**)ctx + 1;
memcpy (data_copy, data, len);
return ompi_osc_pt2pt_isend_w_cb (data_copy, len, MPI_BYTE, target, OSC_PT2PT_FRAG_TAG,
module->comm, ompi_osc_pt2pt_control_send_unbuffered_cb, ctx);
}
/**
* datatype_create:
*
* @brief Utility function that creates a new datatype from a packed
* description.
*
* @param[in] module - OSC PT2PT module
* @param[in] peer - Peer rank
* @param[out] datatype - New datatype. Must be released with OBJ_RELEASE.
* @param[out] proc - Optional. Proc for peer.
* @param[inout] data - Pointer to a pointer where the description is stored. This
* pointer will be updated to the location after the packed
* description.
*/
static inline int datatype_create (ompi_osc_pt2pt_module_t *module, int peer, ompi_proc_t **proc, ompi_datatype_t **datatype, void **data)
{
ompi_datatype_t *new_datatype = NULL;
ompi_proc_t *peer_proc;
int ret = OMPI_SUCCESS;
do {
peer_proc = ompi_comm_peer_lookup(module->comm, peer);
if (OPAL_UNLIKELY(NULL == peer_proc)) {
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
"%d: datatype_create: could not resolve proc pointer for peer %d",
ompi_comm_rank(module->comm),
peer));
ret = OMPI_ERROR;
break;
}
new_datatype = ompi_osc_base_datatype_create(peer_proc, data);
if (OPAL_UNLIKELY(NULL == new_datatype)) {
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
"%d: datatype_create: could not resolve datatype for peer %d",
ompi_comm_rank(module->comm), peer));
ret = OMPI_ERROR;
}
} while (0);
*datatype = new_datatype;
if (proc) *proc = peer_proc;
return ret;
}
/**
* process_put:
*
* @shoer Process a put w/ data message
*
* @param[in] module - OSC PT2PT module
* @param[in] source - Message source
* @param[in] put_header - Message header + data
*
* Process a put message and copy the message data to the specified
* memory region. Note, this function does not handle any bounds
* checking at the moment.
*/
static inline int process_put(ompi_osc_pt2pt_module_t* module, int source,
ompi_osc_pt2pt_header_put_t* put_header)
{
char *data = (char*) (put_header + 1);
ompi_proc_t *proc;
struct ompi_datatype_t *datatype;
size_t data_len;
void *target = (unsigned char*) module->baseptr +
((unsigned long) put_header->displacement * module->disp_unit);
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_put: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, &proc, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
data_len = put_header->len - ((uintptr_t) data - (uintptr_t) put_header);
osc_pt2pt_copy_on_recv (target, data, data_len, proc, put_header->count, datatype);
OBJ_RELEASE(datatype);
return put_header->len;
}
static inline int process_put_long(ompi_osc_pt2pt_module_t* module, int source,
ompi_osc_pt2pt_header_put_t* put_header)
{
char *data = (char*) (put_header + 1);
struct ompi_datatype_t *datatype;
void *target = (unsigned char*) module->baseptr +
((unsigned long) put_header->displacement * module->disp_unit);
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_put_long: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
ret = ompi_osc_pt2pt_component_irecv (module, target,
put_header->count,
datatype, source,
put_header->tag,
module->comm);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
"%d: process_put_long: irecv error: %d",
ompi_comm_rank(module->comm),
ret));
return OMPI_ERROR;
}
OBJ_RELEASE(datatype);
return put_header->len;
}
/**
* osc_pt2pt_incoming_req_complete:
*
* @brief Completion callback for a receive associate with an access
* epoch.
*
* @param[in] request - PML request with an OSC RMDA module as the callback data.
*
* This function is called when a send or recieve associated with an
* access epoch completes. When fired this function will increment the
* passive or active incoming count.
*/
static int osc_pt2pt_incoming_req_complete (ompi_request_t *request)
{
ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) request->req_complete_cb_data;
int rank = MPI_PROC_NULL;
if (request->req_status.MPI_TAG & 0x01) {
rank = request->req_status.MPI_SOURCE;
}
mark_incoming_completion (module, rank);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);
return OMPI_SUCCESS;
}
struct osc_pt2pt_get_post_send_cb_data_t {
ompi_osc_pt2pt_module_t *module;
int peer;
};
static int osc_pt2pt_get_post_send_cb (ompi_request_t *request)
{
struct osc_pt2pt_get_post_send_cb_data_t *data =
(struct osc_pt2pt_get_post_send_cb_data_t *) request->req_complete_cb_data;
ompi_osc_pt2pt_module_t *module = data->module;
int rank = data->peer;
free (data);
/* mark this as a completed "incoming" request */
mark_incoming_completion (module, rank);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);
return OMPI_SUCCESS;
}
/**
* @brief Post a send to match the remote receive for a get operation.
*
* @param[in] module - OSC PT2PT module
* @param[in] source - Source buffer
* @param[in] count - Number of elements in the source buffer
* @param[in] datatype - Type of source elements.
* @param[in] peer - Remote process that has the receive posted
* @param[in] tag - Tag for the send
*
* This function posts a send to match the receive posted as part
* of a get operation. When this send is complete the get is considered
* complete at the target (this process).
*/
static int osc_pt2pt_get_post_send (ompi_osc_pt2pt_module_t *module, void *source, int count,
ompi_datatype_t *datatype, int peer, int tag)
{
struct osc_pt2pt_get_post_send_cb_data_t *data;
data = malloc (sizeof (*data));
if (OPAL_UNLIKELY(NULL == data)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
data->module = module;
/* for incoming completion we need to know the peer (MPI_PROC_NULL if this is
* in an active target epoch) */
data->peer = (tag & 0x1) ? peer : MPI_PROC_NULL;
return ompi_osc_pt2pt_isend_w_cb (source, count, datatype, peer, tag, module->comm,
osc_pt2pt_get_post_send_cb, (void *) data);
}
/**
* process_get:
*
* @brief Process a get message from a remote peer
*
* @param[in] module - OSC PT2PT module
* @param[in] target - Peer process
* @param[in] get_header - Incoming message header
*/
static inline int process_get (ompi_osc_pt2pt_module_t* module, int target,
ompi_osc_pt2pt_header_get_t* get_header)
{
char *data = (char *) (get_header + 1);
struct ompi_datatype_t *datatype;
void *source = (unsigned char*) module->baseptr +
((unsigned long) get_header->displacement * module->disp_unit);
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_get: received message from %d",
ompi_comm_rank(module->comm),
target));
ret = datatype_create (module, target, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
/* send get data */
ret = osc_pt2pt_get_post_send (module, source, get_header->count, datatype, target, get_header->tag);
OBJ_RELEASE(datatype);
return OMPI_SUCCESS == ret ? (int) get_header->len : ret;
}
/**
* osc_pt2pt_accumulate_buffer:
*
* @brief Accumulate data into the target buffer.
*
* @param[in] target - Target buffer
* @param[in] source - Source buffer
* @param[in] source_len - Length of source buffer in bytes
* @param[in] proc - Source proc
* @param[in] count - Number of elements in target buffer
* @param[in] datatype - Type of elements in target buffer
* @param[in] op - Operation to be performed
*/
static inline int osc_pt2pt_accumulate_buffer (void *target, void *source, size_t source_len, ompi_proc_t *proc,
int count, ompi_datatype_t *datatype, ompi_op_t *op)
{
void *buffer = source;
int ret;
assert (NULL != target && NULL != source);
if (op == &ompi_mpi_op_replace.op) {
osc_pt2pt_copy_on_recv (target, source, source_len, proc, count, datatype);
return OMPI_SUCCESS;
}
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
if (proc->super.proc_arch != ompi_proc_local()->super.proc_arch) {
ompi_datatype_t *primitive_datatype = NULL;
uint32_t primitive_count;
size_t buflen;
ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
primitive_count *= count;
/* figure out how big a buffer we need */
ompi_datatype_type_size(primitive_datatype, &buflen);
buflen *= primitive_count;
buffer = malloc (buflen);
if (OPAL_UNLIKELY(NULL == buffer)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
osc_pt2pt_copy_on_recv (buffer, source, source_len, proc, count, datatype);
}
#endif
/* copy the data from the temporary buffer into the user window */
ret = ompi_osc_base_process_op(target, buffer, source_len, datatype,
count, op);
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
if (proc->super.proc_arch != ompi_proc_local()->super.proc_arch) {
free(buffer);
}
#endif
return ret;
}
/**
* @brief Create an accumulate data object.
*
* @param[in] module - PT2PT OSC module
* @param[in] target - Target for the accumulation
* @param[in] source - Source of accumulate data. Must be allocated with malloc/calloc/etc
* @param[in] source_len - Length of the source buffer in bytes
* @param[in] proc - Source proc
* @param[in] count - Number of elements to accumulate
* @param[in] datatype - Datatype to accumulate
* @oaram[in] op - Operator
* @param[in] request_count - Number of prerequisite requests
* @param[out] acc_data_out - New accumulation data
*
* This function is used to create a copy of the data needed to perform an accumulation.
* This data should be provided to ompi_osc_pt2pt_isend_w_cb or ompi_osc_pt2pt_irecv_w_cb
* as the ctx parameter with accumulate_cb as the cb parameter.
*/
static int osc_pt2pt_accumulate_allocate (ompi_osc_pt2pt_module_t *module, int peer, void *target, void *source, size_t source_len,
ompi_proc_t *proc, int count, ompi_datatype_t *datatype, ompi_op_t *op,
int request_count, osc_pt2pt_accumulate_data_t **acc_data_out)
{
osc_pt2pt_accumulate_data_t *acc_data;
acc_data = OBJ_NEW(osc_pt2pt_accumulate_data_t);
if (OPAL_UNLIKELY(NULL == acc_data)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
acc_data->module = module;
acc_data->peer = peer;
acc_data->target = target;
acc_data->source = source;
acc_data->source_len = source_len;
acc_data->proc = proc;
acc_data->count = count;
acc_data->datatype = datatype;
OBJ_RETAIN(datatype);
acc_data->op = op;
OBJ_RETAIN(op);
acc_data->request_count = request_count;
*acc_data_out = acc_data;
return OMPI_SUCCESS;
}
/**
* @brief Execute the accumulate once the request counter reaches 0.
*
* @param[in] request - request
*
* The request should be created with ompi_osc_pt2pt_isend_w_cb or ompi_osc_pt2pt_irecv_w_cb
* with ctx allocated by osc_pt2pt_accumulate_allocate. This callback will free the accumulate
* data once the accumulation operation is complete.
*/
static int accumulate_cb (ompi_request_t *request)
{
struct osc_pt2pt_accumulate_data_t *acc_data = (struct osc_pt2pt_accumulate_data_t *) request->req_complete_cb_data;
ompi_osc_pt2pt_module_t *module = acc_data->module;
int rank = MPI_PROC_NULL;
int ret = OMPI_SUCCESS;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"accumulate_cb, request_count = %d", acc_data->request_count));
if (request->req_status.MPI_TAG & 0x01) {
rank = acc_data->peer;
}
mark_incoming_completion (module, rank);
if (0 == OPAL_THREAD_ADD32(&acc_data->request_count, -1)) {
/* no more requests needed before the buffer can be accumulated */
if (acc_data->source) {
ret = osc_pt2pt_accumulate_buffer (acc_data->target, acc_data->source, acc_data->source_len,
acc_data->proc, acc_data->count, acc_data->datatype, acc_data->op);
}
/* drop the accumulate lock */
ompi_osc_pt2pt_accumulate_unlock (module);
osc_pt2pt_gc_add_buffer (module, &acc_data->super);
}
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);
return ret;
}
static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_t *header, int source,
char *data, size_t data_len, ompi_datatype_t *datatype)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
osc_pt2pt_pending_acc_t *pending_acc;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: queuing accumulate operation", ompi_comm_size (module->comm)));
pending_acc = OBJ_NEW(osc_pt2pt_pending_acc_t);
if (OPAL_UNLIKELY(NULL == pending_acc)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* NTH: ensure we don't leave wait/process_flush/etc until this
* accumulate operation is complete. */
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1);
pending_acc->source = source;
/* save any inline data (eager acc, gacc only) */
pending_acc->data_len = data_len;
if (data_len) {
pending_acc->data = malloc (data_len);
memcpy (pending_acc->data, data, data_len);
}
/* save the datatype */
pending_acc->datatype = datatype;
OBJ_RETAIN(datatype);
/* save the header */
switch (header->base.type) {
case OMPI_OSC_PT2PT_HDR_TYPE_ACC:
case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC:
case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
pending_acc->header.acc = header->acc;
break;
case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP:
pending_acc->header.cswap = header->cswap;
break;
default:
/* it is a coding error if any other header types are queued this way */
assert (0);
}
/* add to the pending acc queue */
OPAL_THREAD_SCOPED_LOCK(&module->lock, opal_list_append (&module->pending_acc, &pending_acc->super));
return OMPI_SUCCESS;
}
static int replace_cb (ompi_request_t *request)
{
ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) request->req_complete_cb_data;
int rank = MPI_PROC_NULL;
if (request->req_status.MPI_TAG & 0x01) {
rank = request->req_status.MPI_SOURCE;
}
mark_incoming_completion (module, rank);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);
/* unlock the accumulate lock */
ompi_osc_pt2pt_accumulate_unlock (module);
return OMPI_SUCCESS;
}
/**
* ompi_osc_pt2pt_acc_start:
*
* @brief Start an accumulate with data operation.
*
* @param[in] module - OSC PT2PT module
* @param[in] source - Source rank
* @param[in] data - Accumulate data
* @param[in] data_len - Length of the accumulate data
* @param[in] datatype - Accumulation datatype
* @param[in] acc_header - Accumulate header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_pt2pt_acc_start (ompi_osc_pt2pt_module_t *module, int source, void *data, size_t data_len,
ompi_datatype_t *datatype, ompi_osc_pt2pt_header_acc_t *acc_header)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) acc_header->displacement * module->disp_unit);
struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
ompi_proc_t *proc;
int ret;
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
ret = osc_pt2pt_accumulate_buffer (target, data, data_len, proc, acc_header->count,
datatype, op);
OBJ_RELEASE(op);
ompi_osc_pt2pt_accumulate_unlock (module);
return ret;
}
/**
* ompi_osc_pt2pt_acc_start:
*
* @brief Start a long accumulate operation.
*
* @param[in] module - OSC PT2PT module
* @param[in] source - Source rank
* @param[in] datatype - Accumulation datatype
* @param[in] acc_header - Accumulate header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_pt2pt_acc_long_start (ompi_osc_pt2pt_module_t *module, int source, ompi_datatype_t *datatype,
ompi_osc_pt2pt_header_acc_t *acc_header) {
struct osc_pt2pt_accumulate_data_t *acc_data;
size_t buflen;
void *buffer;
ompi_proc_t *proc;
void *target = (unsigned char*) module->baseptr +
((unsigned long) acc_header->displacement * module->disp_unit);
struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
ompi_datatype_t *primitive_datatype;
uint32_t primitive_count;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_acc_long_start starting..."));
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
do {
if (op == &ompi_mpi_op_replace.op) {
ret = ompi_osc_pt2pt_irecv_w_cb (target, acc_header->count, datatype, source,
acc_header->tag, module->comm, NULL,
replace_cb, module);
break;
}
ret = ompi_osc_base_get_primitive_type_info (datatype, &primitive_datatype, &primitive_count);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
primitive_count *= acc_header->count;
buflen = datatype_buffer_length (datatype, acc_header->count);
/* allocate a temporary buffer to receive the accumulate data */
buffer = malloc (buflen);
if (OPAL_UNLIKELY(NULL == buffer)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
break;
}
ret = osc_pt2pt_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count,
datatype, op, 1, &acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
free (buffer);
break;
}
ret = ompi_osc_pt2pt_irecv_w_cb (buffer, primitive_count, primitive_datatype, source,
acc_header->tag, module->comm, NULL, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OBJ_RELEASE(acc_data);
}
} while (0);
OBJ_RELEASE(op);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
ompi_osc_pt2pt_accumulate_unlock (module);
}
return ret;
}
/**
* ompi_osc_pt2pt_gacc_start:
*
* @brief Start a accumulate with data + get operation.
*
* @param[in] module - OSC PT2PT module
* @param[in] source - Source rank
* @param[in] data - Accumulate data. Must be allocated on the heap.
* @param[in] data_len - Length of the accumulate data
* @param[in] datatype - Accumulation datatype
* @param[in] get_acc_header - Accumulate header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_pt2pt_gacc_start (ompi_osc_pt2pt_module_t *module, int source, void *data, size_t data_len,
ompi_datatype_t *datatype, ompi_osc_pt2pt_header_acc_t *acc_header)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) acc_header->displacement * module->disp_unit);
struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
struct osc_pt2pt_accumulate_data_t *acc_data;
ompi_proc_t *proc;
int ret;
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
do {
ret = osc_pt2pt_accumulate_allocate (module, source, target, data, data_len, proc, acc_header->count,
datatype, op, 1, &acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
ret = ompi_osc_pt2pt_isend_w_cb (target, acc_header->count, datatype, source, acc_header->tag,
module->comm, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OBJ_RELEASE(acc_data);
}
} while (0);
OBJ_RELEASE(op);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
ompi_osc_pt2pt_accumulate_unlock (module);
}
return ret;
}
/**
* ompi_osc_pt2pt_gacc_long_start:
*
* @brief Start a long accumulate + get operation.
*
* @param[in] module - OSC PT2PT module
* @param[in] source - Source rank
* @param[in] datatype - Accumulation datatype
* @param[in] acc_header - Accumulate header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_gacc_long_start (ompi_osc_pt2pt_module_t *module, int source, ompi_datatype_t *datatype,
ompi_osc_pt2pt_header_acc_t *acc_header)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) acc_header->displacement * module->disp_unit);
struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
struct osc_pt2pt_accumulate_data_t *acc_data;
ompi_datatype_t *primitive_datatype;
ompi_request_t *recv_request;
uint32_t primitive_count;
ompi_proc_t *proc;
size_t buflen;
void *buffer;
int ret;
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
/* allocate a temporary buffer to receive the accumulate data */
buflen = datatype_buffer_length (datatype, acc_header->count);
do {
buffer = malloc (buflen);
if (OPAL_UNLIKELY(NULL == buffer)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
break;
}
ret = ompi_osc_base_get_primitive_type_info (datatype, &primitive_datatype, &primitive_count);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
primitive_count *= acc_header->count;
ret = osc_pt2pt_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count,
datatype, op, 2, &acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
ret = ompi_osc_pt2pt_irecv_w_cb (buffer, acc_header->count, datatype, source, acc_header->tag,
module->comm, &recv_request, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OBJ_RELEASE(acc_data);
break;
}
ret = ompi_osc_pt2pt_isend_w_cb (target, primitive_count, primitive_datatype, source, acc_header->tag,
module->comm, accumulate_cb, acc_data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
/* cancel the receive and free the accumulate data */
ompi_request_cancel (recv_request);
OBJ_RELEASE(acc_data);
break;
}
} while (0);
OBJ_RELEASE(op);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
ompi_osc_pt2pt_accumulate_unlock (module);
}
return ret;
}
/**
* ompi_osc_pt2pt_cswap_start:
*
* @brief Start a compare and swap operation
*
* @param[in] module - OSC PT2PT module
* @param[in] source - Source rank
* @param[in] data - Compare and swap data
* @param[in] data_len - Length of the compare and swap data. Must be exactly
* twice the size of the datatype.
* @param[in] datatype - Compare and swap datatype
* @param[in] cswap_header - Compare and swap header
*
* The module's accumulation lock must be held before calling this
* function. It will release the lock when the operation is complete.
*/
static int ompi_osc_pt2pt_cswap_start (ompi_osc_pt2pt_module_t *module, int source, void *data, ompi_datatype_t *datatype,
ompi_osc_pt2pt_header_cswap_t *cswap_header)
{
void *target = (unsigned char*) module->baseptr +
((unsigned long) cswap_header->displacement * module->disp_unit);
void *compare_addr, *origin_addr;
size_t datatype_size;
ompi_proc_t *proc;
int ret;
proc = ompi_comm_peer_lookup(module->comm, source);
assert (NULL != proc);
datatype_size = datatype->super.size;
origin_addr = data;
compare_addr = (void *)((uintptr_t) data + datatype_size);
do {
/* no reason to do a non-blocking send here */
ret = MCA_PML_CALL(send(target, 1, datatype, source, cswap_header->tag, MCA_PML_BASE_SEND_STANDARD,
module->comm));
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
break;
}
/* increment the incoming fragment count so it matches what is expected */
mark_incoming_completion (module, (cswap_header->tag & 0x1) ? source : MPI_PROC_NULL);
if (0 == memcmp (target, compare_addr, datatype_size)) {
osc_pt2pt_copy_on_recv (target, origin_addr, datatype_size, proc, 1, datatype);
}
} while (0);
ompi_osc_pt2pt_accumulate_unlock (module);
return ret;
}
/**
* ompi_osc_pt2pt_progress_pending_acc:
*
* @brief Progress one pending accumulation or compare and swap operation.
*
* @param[in] module - OSC PT2PT module
*
* If the accumulation lock can be aquired progress one pending
* accumulate or compare and swap operation.
*/
int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module)
{
osc_pt2pt_pending_acc_t *pending_acc;
int ret;
/* try to aquire the lock. it will be unlocked when the accumulate or cswap
* operation completes */
if (ompi_osc_pt2pt_accumulate_trylock (module)) {
return OMPI_SUCCESS;
}
pending_acc = (osc_pt2pt_pending_acc_t *) opal_list_remove_first (&module->pending_acc);
if (OPAL_UNLIKELY(NULL == pending_acc)) {
/* called without any pending accumulation operations */
ompi_osc_pt2pt_accumulate_unlock (module);
return OMPI_SUCCESS;
}
switch (pending_acc->header.base.type) {
case OMPI_OSC_PT2PT_HDR_TYPE_ACC:
ret = ompi_osc_pt2pt_acc_start (module, pending_acc->source, pending_acc->data, pending_acc->data_len,
pending_acc->datatype, &pending_acc->header.acc);
free (pending_acc->data);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
ret = ompi_osc_pt2pt_acc_long_start (module, pending_acc->source, pending_acc->datatype,
&pending_acc->header.acc);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC:
ret = ompi_osc_pt2pt_gacc_start (module, pending_acc->source, pending_acc->data,
pending_acc->data_len, pending_acc->datatype,
&pending_acc->header.acc);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
ret = ompi_osc_gacc_long_start (module, pending_acc->source, pending_acc->datatype,
&pending_acc->header.acc);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP:
ret = ompi_osc_pt2pt_cswap_start (module, pending_acc->source, pending_acc->data,
pending_acc->datatype, &pending_acc->header.cswap);
break;
default:
ret = OMPI_ERROR;
/* it is a coding error if this point is reached */
assert (0);
}
/* signal that an operation is complete */
mark_incoming_completion (module, pending_acc->source);
pending_acc->data = NULL;
OBJ_RELEASE(pending_acc);
return ret;
}
static inline int process_acc (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_acc_t *acc_header)
{
char *data = (char *) (acc_header + 1);
struct ompi_datatype_t *datatype;
uint64_t data_len;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_acc: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
data_len = acc_header->len - ((char*) data - (char*) acc_header);
/* try to aquire the accumulate lock */
if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
ret = ompi_osc_pt2pt_acc_start (module, source, data, data_len, datatype,
acc_header);
} else {
/* couldn't aquire the accumulate lock so queue up the accumulate operation */
ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
source, data, data_len, datatype);
}
/* Release datatype & op */
OBJ_RELEASE(datatype);
return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
}
static inline int process_acc_long (ompi_osc_pt2pt_module_t* module, int source,
ompi_osc_pt2pt_header_acc_t* acc_header)
{
char *data = (char *) (acc_header + 1);
struct ompi_datatype_t *datatype;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_acc_long: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
ret = ompi_osc_pt2pt_acc_long_start (module, source, datatype, acc_header);
} else {
/* queue the operation */
ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header, source,
NULL, 0, datatype);
}
/* Release datatype & op */
OBJ_RELEASE(datatype);
return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
}
static inline int process_get_acc(ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_acc_t *acc_header)
{
char *data = (char *) (acc_header + 1);
struct ompi_datatype_t *datatype;
void *buffer = NULL;
uint64_t data_len;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_get_acc: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
data_len = acc_header->len - ((char*) data - (char*) acc_header);
if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
/* make a copy of the data since the buffer needs to be returned */
if (data_len) {
buffer = malloc (data_len);
if (OPAL_UNLIKELY(NULL == buffer)) {
OBJ_RELEASE(datatype);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy (buffer, data, data_len);
}
ret = ompi_osc_pt2pt_gacc_start (module, source, buffer, data_len, datatype,
acc_header);
} else {
/* queue the operation */
ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
source, data, data_len, datatype);
}
/* Release datatype & op */
OBJ_RELEASE(datatype);
return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
}
static inline int process_get_acc_long(ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_acc_t *acc_header)
{
char *data = (char *) (acc_header + 1);
struct ompi_datatype_t *datatype;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_acc: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
ret = ompi_osc_gacc_long_start (module, source, datatype, acc_header);
} else {
/* queue the operation */
ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
source, NULL, 0, datatype);
}
/* Release datatype & op */
OBJ_RELEASE(datatype);
return OMPI_SUCCESS == ret ? (int) acc_header->len : ret;
}
static inline int process_cswap (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_cswap_t *cswap_header)
{
char *data = (char*) (cswap_header + 1);
struct ompi_datatype_t *datatype;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_cswap: received message from %d",
ompi_comm_rank(module->comm),
source));
ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
return ret;
}
if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
ret = ompi_osc_pt2pt_cswap_start (module, source, data, datatype, cswap_header);
} else {
/* queue the operation */
ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) cswap_header, source,
data, 2 * datatype->super.size, datatype);
}
/* Release datatype */
OBJ_RELEASE(datatype);
return (OMPI_SUCCESS == ret) ? (int) cswap_header->len : ret;
}
static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_complete_t *complete_header)
{
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: process_complete got complete message from %d. expected fragment count %d. "
"current signal count %d. current incomming count: %d",
source, complete_header->frag_count, module->active_incoming_frag_signal_count,
module->active_incoming_frag_count));
/* the current fragment is not part of the frag_count so we need to add it here */
OPAL_THREAD_ADD32((int32_t *) &module->active_incoming_frag_signal_count,
complete_header->frag_count + 1);
if (0 == OPAL_THREAD_ADD32((int32_t *) &module->num_complete_msgs, 1)) {
opal_condition_broadcast (&module->cond);
}
return sizeof (*complete_header);
}
/* flush and unlock headers cannot be processed from the request callback
* because some btls do not provide re-entrant progress functions. these
* fragment will be progressed by the pt2pt component's progress function */
static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_flush_t *flush_header)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"process_flush header = {.frag_count = %d}", flush_header->frag_count));
/* increase signal count by incoming frags */
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -(int32_t) flush_header->frag_count);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"%d: process_flush: received message from %d. passive_incoming_frag_count = %d",
ompi_comm_rank(module->comm), source, peer->passive_incoming_frag_count));
ret = ompi_osc_pt2pt_process_flush (module, source, flush_header);
if (OMPI_SUCCESS != ret) {
ompi_osc_pt2pt_pending_t *pending;
pending = OBJ_NEW(ompi_osc_pt2pt_pending_t);
pending->module = module;
pending->source = source;
pending->header.flush = *flush_header;
osc_pt2pt_add_pending (pending);
}
/* signal incomming will increment this counter */
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1);
return sizeof (*flush_header);
}
static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source,
ompi_osc_pt2pt_header_unlock_t *unlock_header)
{
ompi_osc_pt2pt_peer_t *peer = module->peers + source;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"process_unlock header = {.frag_count = %d}", unlock_header->frag_count));
/* increase signal count by incoming frags */
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -(int32_t) unlock_header->frag_count);
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"osc pt2pt: processing unlock request from %d. frag count = %d, processed_count = %d",
source, unlock_header->frag_count, (int) peer->passive_incoming_frag_count));
ret = ompi_osc_pt2pt_process_unlock (module, source, unlock_header);
if (OMPI_SUCCESS != ret) {
ompi_osc_pt2pt_pending_t *pending;
pending = OBJ_NEW(ompi_osc_pt2pt_pending_t);
pending->module = module;
pending->source = source;
pending->header.unlock = *unlock_header;
osc_pt2pt_add_pending (pending);
}
/* signal incomming will increment this counter */
OPAL_THREAD_ADD32(&peer->passive_incoming_frag_count, -1);
return sizeof (*unlock_header);
}
static int process_large_datatype_request_cb (ompi_request_t *request)
{
ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer = (ompi_osc_pt2pt_ddt_buffer_t *) request->req_complete_cb_data;
ompi_osc_pt2pt_module_t *module = ddt_buffer->module;
ompi_osc_pt2pt_header_t *header = ddt_buffer->header;
int source = ddt_buffer->source;
/* process the request */
switch (header->base.type) {
case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG:
(void) process_put_long (module, source, &header->put);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_GET:
(void) process_get (module, source, &header->get);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
(void) process_acc_long (module, source, &header->acc);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
(void) process_get_acc_long (module, source, &header->acc);
break;
default:
/* developer error */
assert (0);
return OMPI_ERROR;
}
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);
/* free the datatype buffer */
osc_pt2pt_gc_add_buffer (module, &ddt_buffer->super);
return OMPI_SUCCESS;
}
/**
* @short process a request with a large datatype
*
* @param[in] module - OSC PT2PT module
* @param[in] source - header source
* @param[in] header - header to process
*
* It is possible to construct datatypes whos description is too large
* to fit in an OSC PT2PT fragment. In this case the remote side posts
* a send of the datatype description. This function posts the matching
* receive and processes the header on completion.
*/
static int process_large_datatype_request (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_t *header)
{
ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer;
int header_len, tag, ret;
uint64_t ddt_len;
/* determine the header size and receive tag */
switch (header->base.type) {
case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG:
header_len = sizeof (header->put);
tag = header->put.tag;
break;
case OMPI_OSC_PT2PT_HDR_TYPE_GET:
header_len = sizeof (header->get);
tag = header->get.tag;
break;
case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
header_len = sizeof (header->acc);
tag = header->acc.tag;
break;
case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
header_len = sizeof (header->acc);
tag = header->acc.tag;
break;
default:
/* developer error */
opal_output (0, "Unsupported header/flag combination");
return OMPI_ERROR;
}
ddt_len = *((uint64_t *)((uintptr_t) header + header_len));
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"process_large_datatype_request: processing fragment with type %d. ddt_len %lu",
header->base.type, (unsigned long) ddt_len));
ddt_buffer = OBJ_NEW(ompi_osc_pt2pt_ddt_buffer_t);
if (OPAL_UNLIKELY(NULL == ddt_buffer)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
ddt_buffer->module = module;
ddt_buffer->source = source;
ddt_buffer->header = malloc (ddt_len + header_len);
if (OPAL_UNLIKELY(NULL == ddt_buffer->header)) {
OBJ_RELEASE(ddt_buffer);
return OMPI_ERR_OUT_OF_RESOURCE;
}
memcpy (ddt_buffer->header, header, header_len);
ret = ompi_osc_pt2pt_irecv_w_cb ((void *)((uintptr_t) ddt_buffer->header + header_len),
ddt_len, MPI_BYTE, source, tag, module->comm, NULL,
process_large_datatype_request_cb, ddt_buffer);
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
OBJ_RELEASE(ddt_buffer);
return ret;
}
return header_len + 8;
}
/*
* Do all the data movement associated with a fragment
*/
static inline int process_frag (ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_frag_header_t *frag)
{
ompi_osc_pt2pt_header_t *header;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: process_frag: from %d, ops %d",
(int) frag->source, (int) frag->num_ops));
header = (ompi_osc_pt2pt_header_t *) (frag + 1);
for (int i = 0 ; i < frag->num_ops ; ++i) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: process_frag: type 0x%x. flag 0x%x. offset %u",
header->base.type, (unsigned) ((uintptr_t)header - (uintptr_t)frag),
header->base.flags));
if (OPAL_LIKELY(!(header->base.flags & OMPI_OSC_PT2PT_HDR_FLAG_LARGE_DATATYPE))) {
switch (header->base.type) {
case OMPI_OSC_PT2PT_HDR_TYPE_PUT:
ret = process_put(module, frag->source, &header->put);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG:
ret = process_put_long(module, frag->source, &header->put);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_ACC:
ret = process_acc(module, frag->source, &header->acc);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
ret = process_acc_long (module, frag->source, &header->acc);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_REQ:
ret = ompi_osc_pt2pt_process_lock(module, frag->source, &header->lock);
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
ret = sizeof (header->lock);
}
break;
case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ:
ret = process_unlock(module, frag->source, &header->unlock);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_GET:
ret = process_get (module, frag->source, &header->get);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP:
ret = process_cswap (module, frag->source, &header->cswap);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC:
ret = process_get_acc (module, frag->source, &header->acc);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
ret = process_get_acc_long (module, frag->source, &header->acc);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ:
ret = process_flush (module, frag->source, &header->flush);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_COMPLETE:
ret = process_complete (module, frag->source, &header->complete);
break;
default:
opal_output(0, "Unsupported fragment type 0x%x\n", header->base.type);
abort(); /* FIX ME */
}
} else {
ret = process_large_datatype_request (module, frag->source, header);
}
if (ret <= 0) {
opal_output(0, "Error processing fragment: %d", ret);
abort(); /* FIX ME */
}
/* the next header will start on an 8-byte boundary. this is done to ensure
* that the next header and the packed datatype is properly aligned */
header = (ompi_osc_pt2pt_header_t *) OPAL_ALIGN(((uintptr_t) header + ret), 8, uintptr_t);
}
return OMPI_SUCCESS;
}
/* dispatch for callback on message completion */
static int ompi_osc_pt2pt_callback (ompi_request_t *request)
{
ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) request->req_complete_cb_data;
ompi_osc_pt2pt_header_base_t *base_header =
(ompi_osc_pt2pt_header_base_t *) module->incoming_buffer;
size_t incoming_length = request->req_status._ucount;
int source = request->req_status.MPI_SOURCE;
OPAL_THREAD_UNLOCK(&ompi_request_lock);
assert(incoming_length >= sizeof(ompi_osc_pt2pt_header_base_t));
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"received pt2pt callback for fragment. source = %d, count = %u, type = 0x%x",
source, (unsigned) incoming_length, base_header->type));
switch (base_header->type) {
case OMPI_OSC_PT2PT_HDR_TYPE_FRAG:
process_frag(module, (ompi_osc_pt2pt_frag_header_t *) base_header);
/* only data fragments should be included in the completion counters */
mark_incoming_completion (module, (base_header->flags & OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET) ? source : MPI_PROC_NULL);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_POST:
(void) osc_pt2pt_incoming_post (module, source);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_ACK:
ompi_osc_pt2pt_process_lock_ack(module, (ompi_osc_pt2pt_header_lock_ack_t *) base_header);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_ACK:
ompi_osc_pt2pt_process_flush_ack (module, source, (ompi_osc_pt2pt_header_flush_ack_t *) base_header);
break;
case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_ACK:
ompi_osc_pt2pt_process_unlock_ack (module, source, (ompi_osc_pt2pt_header_unlock_ack_t *) base_header);
break;
default:
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"received unexpected message of type %x",
(int) base_header->type));
}
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"finished processing incoming messages"));
osc_pt2pt_gc_clean (module);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);
ompi_osc_pt2pt_frag_start_receive (module);
OPAL_THREAD_LOCK(&ompi_request_lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"finished posting receive request"));
return OMPI_SUCCESS;
}
int ompi_osc_pt2pt_frag_start_receive (ompi_osc_pt2pt_module_t *module)
{
return ompi_osc_pt2pt_irecv_w_cb (module->incoming_buffer, mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t),
MPI_BYTE, OMPI_ANY_SOURCE, OSC_PT2PT_FRAG_TAG, module->comm, &module->frag_request,
ompi_osc_pt2pt_callback, module);
}
int ompi_osc_pt2pt_component_irecv (ompi_osc_pt2pt_module_t *module, void *buf,
size_t count, struct ompi_datatype_t *datatype,
int src, int tag, struct ompi_communicator_t *comm)
{
return ompi_osc_pt2pt_irecv_w_cb (buf, count, datatype, src, tag, comm, NULL,
osc_pt2pt_incoming_req_complete, module);
}
static int
isend_completion_cb(ompi_request_t *request)
{
ompi_osc_pt2pt_module_t *module =
(ompi_osc_pt2pt_module_t*) request->req_complete_cb_data;
OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
"isend_completion_cb called"));
mark_outgoing_completion(module);
/* put this request on the garbage colletion list */
osc_pt2pt_gc_add_request (module, request);
return OMPI_SUCCESS;
}
int ompi_osc_pt2pt_component_isend (ompi_osc_pt2pt_module_t *module, void *buf,
size_t count, struct ompi_datatype_t *datatype,
int dest, int tag, struct ompi_communicator_t *comm)
{
return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, comm,
isend_completion_cb, module);
}
int ompi_osc_pt2pt_isend_w_cb (void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
ompi_communicator_t *comm, ompi_request_complete_fn_t cb, void *ctx)
{
ompi_request_t *request;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: ompi_osc_pt2pt_isend_w_cb sending %d bytes to %d with tag %d",
count, target, tag));
ret = MCA_PML_CALL(isend_init(ptr, count, datatype, target, tag,
MCA_PML_BASE_SEND_STANDARD, comm, &request));
if (OMPI_SUCCESS != ret) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"error sending fragment. ret = %d", ret));
return ret;
}
request->req_complete_cb = cb;
request->req_complete_cb_data = ctx;
ret = MCA_PML_CALL(start(1, &request));
return ret;
}
int ompi_osc_pt2pt_irecv_w_cb (void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
ompi_communicator_t *comm, ompi_request_t **request_out,
ompi_request_complete_fn_t cb, void *ctx)
{
ompi_request_t *request;
int ret;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc pt2pt: ompi_osc_pt2pt_irecv_w_cb receiving %d bytes from %d with tag %d",
count, target, tag));
ret = MCA_PML_CALL(irecv_init(ptr, count, datatype, target, tag, comm, &request));
if (OMPI_SUCCESS != ret) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"error posting receive. ret = %d", ret));
return ret;
}
request->req_complete_cb = cb;
request->req_complete_cb_data = ctx;
if (request_out) {
*request_out = request;
}
ret = MCA_PML_CALL(start(1, &request));
return ret;
}