0ed44f2fdb
This commit adds large datatype description support to the osc/rdma component. Support is provided by an additional send/recv of the datatype description if the description does not fit in an eager buffer. The code is designed to require minimal new code and not for speed. We consider this code path to be a slow path. Refs trac:1905 cmr=v1.8:reviewer=jsquyres This commit was SVN r31197. The following Trac tickets were found above: Ticket 1905 --> https://svn.open-mpi.org/trac/ompi/ticket/1905
215 строки
6.7 KiB
C
215 строки
6.7 KiB
C
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
|
|
/*
|
|
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
|
|
* Copyright (c) 2014 Los Alamos National Security, LLC. All rights
|
|
* reserved.
|
|
* $COPYRIGHT$
|
|
*
|
|
* Additional copyrights may follow
|
|
*
|
|
* $HEADER$
|
|
*/
|
|
|
|
#include "ompi_config.h"
|
|
|
|
#include "opal/class/opal_list.h"
|
|
#include "ompi/mca/osc/base/base.h"
|
|
#include "ompi/mca/pml/pml.h"
|
|
|
|
#include "osc_rdma.h"
|
|
#include "osc_rdma_frag.h"
|
|
#include "osc_rdma_data_move.h"
|
|
|
|
static void ompi_osc_rdma_frag_constructor (ompi_osc_rdma_frag_t *frag){
|
|
frag->buffer = malloc (mca_osc_rdma_component.buffer_size + sizeof (ompi_osc_rdma_frag_header_t));
|
|
assert (frag->buffer);
|
|
}
|
|
|
|
static void ompi_osc_rdma_frag_destructor (ompi_osc_rdma_frag_t *frag) {
|
|
if (NULL != frag->buffer) {
|
|
free (frag->buffer);
|
|
}
|
|
}
|
|
|
|
OBJ_CLASS_INSTANCE(ompi_osc_rdma_frag_t, opal_list_item_t,
|
|
ompi_osc_rdma_frag_constructor, ompi_osc_rdma_frag_destructor);
|
|
|
|
static int frag_send_cb (ompi_request_t *request)
|
|
{
|
|
ompi_osc_rdma_frag_t *frag =
|
|
(ompi_osc_rdma_frag_t*) request->req_complete_cb_data;
|
|
ompi_osc_rdma_module_t *module = frag->module;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: frag_send complete to %d, frag = %p, request = %p",
|
|
frag->target, (void *) frag, (void *) request));
|
|
|
|
mark_outgoing_completion(module);
|
|
OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.frags, &frag->super);
|
|
|
|
|
|
/* put this request on the garbage colletion list */
|
|
osc_rdma_gc_add_request (request);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
|
|
static int
|
|
frag_send(ompi_osc_rdma_module_t *module,
|
|
ompi_osc_rdma_frag_t *frag)
|
|
{
|
|
int count;
|
|
|
|
count = (int)((uintptr_t) frag->top - (uintptr_t) frag->buffer);
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: frag_send called to %d, frag = %p, count = %d",
|
|
frag->target, (void *) frag, count));
|
|
|
|
return ompi_osc_rdma_isend_w_cb (frag->buffer, count, MPI_BYTE, frag->target, OSC_RDMA_FRAG_TAG,
|
|
module->comm, frag_send_cb, frag);
|
|
}
|
|
|
|
|
|
int
|
|
ompi_osc_rdma_frag_start(ompi_osc_rdma_module_t *module,
|
|
ompi_osc_rdma_frag_t *frag)
|
|
{
|
|
int ret;
|
|
|
|
assert(0 == frag->pending);
|
|
assert(module->peers[frag->target].active_frag != frag);
|
|
|
|
/* we need to signal now that a frag is outgoing to ensure the count sent
|
|
* with the unlock message is correct */
|
|
ompi_osc_signal_outgoing (module, frag->target, 1);
|
|
|
|
/* if eager sends are not active, can't send yet, so buffer and
|
|
get out... */
|
|
if (module->passive_target_access_epoch) {
|
|
if (!module->passive_eager_send_active[frag->target]) {
|
|
opal_list_append(&module->queued_frags, &frag->super);
|
|
return OMPI_SUCCESS;
|
|
}
|
|
} else {
|
|
if (!module->active_eager_send_active) {
|
|
opal_list_append(&module->queued_frags, &frag->super);
|
|
return OMPI_SUCCESS;
|
|
}
|
|
}
|
|
|
|
ret = frag_send(module, frag);
|
|
|
|
opal_condition_broadcast(&module->cond);
|
|
|
|
return ret;
|
|
}
|
|
|
|
|
|
int
|
|
ompi_osc_rdma_frag_flush_target(ompi_osc_rdma_module_t *module, int target)
|
|
{
|
|
int ret = OMPI_SUCCESS;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: frag flush target begin"));
|
|
|
|
/* flush the active frag */
|
|
if (NULL != module->peers[target].active_frag) {
|
|
ompi_osc_rdma_frag_t *frag = module->peers[target].active_frag;
|
|
|
|
if (0 != frag->pending) {
|
|
/* communication going on while synchronizing; this is a bug */
|
|
return OMPI_ERR_RMA_SYNC;
|
|
}
|
|
|
|
module->peers[target].active_frag = NULL;
|
|
|
|
ret = ompi_osc_rdma_frag_start(module, frag);
|
|
if (OMPI_SUCCESS != ret) return ret;
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: frag flush target finished active frag"));
|
|
|
|
/* walk through the pending list and send */
|
|
ompi_osc_rdma_frag_t *frag, *next;
|
|
OPAL_LIST_FOREACH_SAFE(frag, next, &module->queued_frags, ompi_osc_rdma_frag_t) {
|
|
if (frag->target == target) {
|
|
opal_list_remove_item(&module->queued_frags, &frag->super);
|
|
ret = frag_send(module, frag);
|
|
if (OMPI_SUCCESS != ret) return ret;
|
|
}
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: frag flush target finished"));
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
|
|
int
|
|
ompi_osc_rdma_frag_flush_all(ompi_osc_rdma_module_t *module)
|
|
{
|
|
int ret = OMPI_SUCCESS;
|
|
int i;
|
|
ompi_osc_rdma_frag_t *frag, *next;
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: frag flush all begin"));
|
|
|
|
/* flush the active frag */
|
|
for (i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
|
|
if (NULL != module->peers[i].active_frag) {
|
|
ompi_osc_rdma_frag_t *frag = module->peers[i].active_frag;
|
|
|
|
if (0 != frag->pending) {
|
|
/* communication going on while synchronizing; this is a bug */
|
|
return OMPI_ERR_RMA_SYNC;
|
|
}
|
|
|
|
module->peers[i].active_frag = NULL;
|
|
|
|
ret = ompi_osc_rdma_frag_start(module, frag);
|
|
if (OMPI_SUCCESS != ret) return ret;
|
|
}
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: frag flush all finished active frag"));
|
|
|
|
/* try to start all the queued frags */
|
|
OPAL_LIST_FOREACH_SAFE(frag, next, &module->queued_frags, ompi_osc_rdma_frag_t) {
|
|
opal_list_remove_item(&module->queued_frags, &frag->super);
|
|
ret = frag_send(module, frag);
|
|
if (OMPI_SUCCESS != ret) {
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: failure for frag send: %d", ret));
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"osc rdma: frag flush all done"));
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|
|
|
|
int osc_rdma_incoming_post (ompi_osc_rdma_module_t *module)
|
|
{
|
|
OPAL_THREAD_LOCK(&module->lock);
|
|
module->num_post_msgs++;
|
|
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
|
|
"received post message. num_post_msgs = %d", module->num_post_msgs));
|
|
|
|
if (0 == module->num_post_msgs) {
|
|
module->active_eager_send_active = true;
|
|
}
|
|
opal_condition_broadcast (&module->cond);
|
|
OPAL_THREAD_UNLOCK(&module->lock);
|
|
|
|
return OMPI_SUCCESS;
|
|
}
|