1
1

* add new component (RDMA) for RDMA one-sided communication

This commit was SVN r10861.
Этот коммит содержится в:
Brian Barrett 2006-07-17 22:08:55 +00:00
родитель 2185c059e8
Коммит 28b99299b2
20 изменённых файлов: 3641 добавлений и 0 удалений

0
ompi/mca/osc/rdma/.ompi_ignore Обычный файл
Просмотреть файл

3
ompi/mca/osc/rdma/.ompi_unignore Обычный файл
Просмотреть файл

@ -0,0 +1,3 @@
bbarrett
brbarret
bwbarre

60
ompi/mca/osc/rdma/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,60 @@
#
# Copyright (c) 2004-2005 The Trustees of Indiana University.
# All rights reserved.
# Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
# All rights reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
# Use the top-level Makefile.options
include $(top_ompi_srcdir)/config/Makefile.options
pt2pt_sources = \
osc_rdma.h \
osc_rdma.c \
osc_rdma_comm.c \
osc_rdma_component.c \
osc_rdma_data_move.h \
osc_rdma_data_move.c \
osc_rdma_header.h \
osc_rdma_longreq.h \
osc_rdma_longreq.c \
osc_rdma_obj_convert.h \
osc_rdma_obj_convert.c \
osc_rdma_replyreq.h \
osc_rdma_replyreq.c \
osc_rdma_sendreq.h \
osc_rdma_sendreq.c \
osc_rdma_sync.c
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
# (for static builds).
if OMPI_BUILD_osc_rdma_DSO
component_noinst =
component_install = mca_osc_rdma.la
else
component_noinst = libmca_osc_rdma.la
component_install =
endif
mcacomponentdir = $(libdir)/openmpi
mcacomponent_LTLIBRARIES = $(component_install)
mca_osc_rdma_la_SOURCES = $(pt2pt_sources)
mca_osc_rdma_la_LIBADD =
mca_osc_rdma_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(component_noinst)
libmca_osc_rdma_la_SOURCES = $(pt2pt_sources)
libmca_osc_rdma_la_LIBADD =
libmca_osc_rdma_la_LDFLAGS = -module -avoid-version

18
ompi/mca/osc/rdma/configure.params Обычный файл
Просмотреть файл

@ -0,0 +1,18 @@
# -*- shell-script -*-
#
# Copyright (c) 2004-2005 The Trustees of Indiana University.
# All rights reserved.
# Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
# All rights reserved.
# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#
PARAM_CONFIG_FILES="Makefile"

78
ompi/mca/osc/rdma/osc_rdma.c Обычный файл
Просмотреть файл

@ -0,0 +1,78 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_rdma.h"
#include "osc_rdma_sendreq.h"
#include "opal/threads/mutex.h"
#include "ompi/win/win.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/btl/btl.h"
#include "mpi.h"
int
ompi_osc_rdma_module_free(ompi_win_t *win)
{
int ret = OMPI_SUCCESS;
int tmp;
ompi_osc_rdma_module_t *module = P2P_MODULE(win);
/* finish with a barrier */
if (ompi_group_size(win->w_group) > 1) {
ret = module->p2p_comm->c_coll.coll_barrier(module->p2p_comm);
}
/* remove window information */
win->w_osc_module = NULL;
/* remove from component information */
OPAL_THREAD_LOCK(&mca_osc_rdma_component.p2p_c_lock);
tmp = opal_hash_table_remove_value_uint32(&mca_osc_rdma_component.p2p_c_modules,
module->p2p_comm->c_contextid);
/* only take the output of hast_table_remove if there wasn't already an error */
ret = (ret != OMPI_SUCCESS) ? ret : tmp;
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.p2p_c_lock);
OBJ_DESTRUCT(&(module->p2p_locks_pending));
assert(module->p2p_sc_group == NULL);
assert(module->p2p_pw_group == NULL);
free(module->p2p_fence_coll_counts);
free(module->p2p_copy_num_pending_sendreqs);
OBJ_DESTRUCT(&(module->p2p_copy_pending_sendreqs));
OBJ_DESTRUCT(&(module->p2p_long_msgs));
free(module->p2p_num_pending_sendreqs);
OBJ_DESTRUCT(&(module->p2p_pending_sendreqs));
ompi_comm_free(&(module->p2p_comm));
module->p2p_comm = NULL;
module->p2p_win = NULL;
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return ret;
}

260
ompi/mca/osc/rdma/osc_rdma.h Обычный файл
Просмотреть файл

@ -0,0 +1,260 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OMPI_OSC_PT2PT_H
#define OMPI_OSC_PT2PT_H
#include "opal/class/opal_list.h"
#include "opal/class/opal_free_list.h"
#include "opal/class/opal_hash_table.h"
#include "ompi/mca/osc/osc.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/win/win.h"
#include "ompi/communicator/communicator.h"
struct ompi_osc_rdma_component_t {
/** Extend the basic osc component interface */
ompi_osc_base_component_t super;
/** store the state of progress threads for this instance of OMPI */
bool p2p_c_have_progress_threads;
/** lock access to datastructures in the component structure */
opal_mutex_t p2p_c_lock;
/** List of ompi_osc_rdma_module_ts currently in existance.
Needed so that received fragments can be dispatched to the
correct module */
opal_hash_table_t p2p_c_modules;
/** free list of ompi_osc_rdma_sendreq_t structures */
opal_free_list_t p2p_c_sendreqs;
/** free list of ompi_osc_rdma_replyreq_t structures */
opal_free_list_t p2p_c_replyreqs;
/** free list of ompi_osc_rdma_longreq_t structures */
opal_free_list_t p2p_c_longreqs;
};
typedef struct ompi_osc_rdma_component_t ompi_osc_rdma_component_t;
struct ompi_osc_rdma_module_t {
/** Extend the basic osc module interface */
ompi_osc_base_module_t super;
/** lock access to data structures in the current module */
opal_mutex_t p2p_lock;
/** lock for "atomic" window updates from reductions */
opal_mutex_t p2p_acc_lock;
/** pointer back to window */
ompi_win_t *p2p_win;
/** communicator created with this window */
ompi_communicator_t *p2p_comm;
/** list of ompi_osc_rdma_sendreq_t structures, and includes all
requests for this access epoch that have not already been
started. p2p_lock must be held when modifying this field. */
opal_list_t p2p_pending_sendreqs;
/** list of int16_t counters for the number of requests to a
particular rank in p2p_comm for this access epoc. p2p_lock
must be held when modifying this field */
short *p2p_num_pending_sendreqs;
/** For MPI_Fence synchronization, the number of messages to send
in epoch. For Start/Complete, the number of updates for this
Complete. For Post/Wait (poorly named), the number of
Complete counters we're waiting for. For lock, the number of
messages waiting for completion on on the origin side. Not
protected by p2p_lock - must use atomic counter operations. */
volatile int32_t p2p_num_pending_out;
/** For MPI_Fence synchronization, the number of expected incoming
messages. For Start/Complete, the number of expected Post
messages. For Post/Wait, the number of expected updates from
complete. For lock, the number of messages on the passive side
we are waiting for. Not protected by p2p_lock - must use
atomic counter operations. */
volatile int32_t p2p_num_pending_in;
/** cyclic counter for a unique tage for long messages. Not
protected by the p2p_lock - must use create_send_tag() to
create a send tag */
volatile int32_t p2p_tag_counter;
/** list of outstanding long messages that must be processes
(ompi_osc_rdma_request_long). Protected by p2p_lock. */
opal_list_t p2p_long_msgs;
opal_list_t p2p_copy_pending_sendreqs;
short *p2p_copy_num_pending_sendreqs;
bool p2p_eager_send;
/* ********************* FENCE data ************************ */
/* an array of <sizeof(p2p_comm)> ints, each containing the value
1. */
int *p2p_fence_coll_counts;
/* an array of <sizeof(p2p_comm)> shorts, for use in experimenting
with different synchronization costs */
short *p2p_fence_coll_results;
enum { OSC_SYNC_REDUCE_SCATTER, OSC_SYNC_ALLREDUCE, OSC_SYNC_ALLTOALL } p2p_fence_sync_type;
/* ********************* PWSC data ************************ */
struct ompi_group_t *p2p_pw_group;
struct ompi_group_t *p2p_sc_group;
/* ********************* LOCK data ************************ */
int32_t p2p_lock_status; /* one of 0, MPI_LOCK_EXCLUSIVE, MPI_LOCK_SHARED */
int32_t p2p_shared_count;
opal_list_t p2p_locks_pending;
int32_t p2p_lock_received_ack;
};
typedef struct ompi_osc_rdma_module_t ompi_osc_rdma_module_t;
extern ompi_osc_rdma_component_t mca_osc_rdma_component;
/*
* Helper macro for grabbing the module structure from a window instance
*/
#if OMPI_ENABLE_DEBUG
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
static inline ompi_osc_rdma_module_t* P2P_MODULE(struct ompi_win_t* win)
{
ompi_osc_rdma_module_t *module =
(ompi_osc_rdma_module_t*) win->w_osc_module;
assert(module->p2p_win == win);
return module;
}
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#else
#define P2P_MODULE(win) ((ompi_osc_rdma_module_t*) win->w_osc_module)
#endif
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/*
* Component functions
*/
int ompi_osc_rdma_component_init(bool enable_progress_threads,
bool enable_mpi_threads);
int ompi_osc_rdma_component_finalize(void);
int ompi_osc_rdma_component_query(struct ompi_win_t *win,
struct ompi_info_t *info,
struct ompi_communicator_t *comm);
int ompi_osc_rdma_component_select(struct ompi_win_t *win,
struct ompi_info_t *info,
struct ompi_communicator_t *comm);
/*
* Module interface function types
*/
int ompi_osc_rdma_module_free(struct ompi_win_t *win);
int ompi_osc_rdma_module_put(void *origin_addr,
int origin_count,
struct ompi_datatype_t *origin_dt,
int target,
int target_disp,
int target_count,
struct ompi_datatype_t *target_dt,
struct ompi_win_t *win);
int ompi_osc_rdma_module_accumulate(void *origin_addr,
int origin_count,
struct ompi_datatype_t *origin_dt,
int target,
int target_disp,
int target_count,
struct ompi_datatype_t *target_dt,
struct ompi_op_t *op,
struct ompi_win_t *win);
int ompi_osc_rdma_module_get(void *origin_addr,
int origin_count,
struct ompi_datatype_t *origin_dt,
int target,
int target_disp,
int target_count,
struct ompi_datatype_t *target_dt,
struct ompi_win_t *win);
int ompi_osc_rdma_module_fence(int assert, struct ompi_win_t *win);
int ompi_osc_rdma_module_start(struct ompi_group_t *group,
int assert,
struct ompi_win_t *win);
int ompi_osc_rdma_module_complete(struct ompi_win_t *win);
int ompi_osc_rdma_module_post(struct ompi_group_t *group,
int assert,
struct ompi_win_t *win);
int ompi_osc_rdma_module_wait(struct ompi_win_t *win);
int ompi_osc_rdma_module_test(struct ompi_win_t *win,
int *flag);
int ompi_osc_rdma_module_lock(int lock_type,
int target,
int assert,
struct ompi_win_t *win);
int ompi_osc_rdma_module_unlock(int target,
struct ompi_win_t *win);
/*
* passive side sync interface functions
*/
int ompi_osc_rdma_passive_lock(ompi_osc_rdma_module_t *module,
int32_t origin,
int32_t lock_type);
int ompi_osc_rdma_passive_unlock(ompi_osc_rdma_module_t *module,
int32_t origin,
int32_t count);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif /* OMPI_OSC_PT2PT_H */

210
ompi/mca/osc/rdma/osc_rdma_comm.c Обычный файл
Просмотреть файл

@ -0,0 +1,210 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mpi.h"
#include <stdio.h>
#include "osc_rdma.h"
#include "osc_rdma_sendreq.h"
#include "osc_rdma_header.h"
#include "osc_rdma_data_move.h"
static int
enqueue_sendreq(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_sendreq_t *sendreq)
{
OPAL_THREAD_LOCK(&(module->p2p_lock));
opal_list_append(&(module->p2p_pending_sendreqs),
(opal_list_item_t*) sendreq);
module->p2p_num_pending_sendreqs[sendreq->req_target_rank]++;
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_module_accumulate(void *origin_addr, int origin_count,
struct ompi_datatype_t *origin_dt,
int target, int target_disp, int target_count,
struct ompi_datatype_t *target_dt,
struct ompi_op_t *op, ompi_win_t *win)
{
int ret;
ompi_osc_rdma_sendreq_t *sendreq;
if (OMPI_WIN_FENCE & ompi_win_get_mode(win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(win, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
}
if (op != &ompi_mpi_op_replace &&
!ompi_ddt_is_predefined(target_dt)) {
fprintf(stderr, "MPI_Accumulate currently does not support reductions\n");
fprintf(stderr, "with any user-defined types. This will be rectified\n");
fprintf(stderr, "in a future release.\n");
return MPI_ERR_UNSUPPORTED_OPERATION;
}
/* shortcut 0 count case */
if (0 == origin_count || 0 == target_count) {
return OMPI_SUCCESS;
}
/* create sendreq */
ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_PT2PT_ACC,
origin_addr,
origin_count,
origin_dt,
target,
target_disp,
target_count,
target_dt,
P2P_MODULE(win),
&sendreq);
if (OMPI_SUCCESS != ret) return ret;
sendreq->req_op_id = op->o_f_to_c_index;
/* enqueue sendreq */
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
return ret;
}
int
ompi_osc_rdma_module_get(void *origin_addr,
int origin_count,
struct ompi_datatype_t *origin_dt,
int target,
int target_disp,
int target_count,
struct ompi_datatype_t *target_dt,
ompi_win_t *win)
{
int ret;
ompi_osc_rdma_sendreq_t *sendreq;
if (OMPI_WIN_FENCE & ompi_win_get_mode(win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(win, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
}
/* shortcut 0 count case */
if (0 == origin_count || 0 == target_count) {
return OMPI_SUCCESS;
}
/* create sendreq */
ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_PT2PT_GET,
origin_addr,
origin_count,
origin_dt,
target,
target_disp,
target_count,
target_dt,
P2P_MODULE(win),
&sendreq);
if (OMPI_SUCCESS != ret) return ret;
/* if we're doing fence synchronization, try to actively send
right now */
if (P2P_MODULE(win)->p2p_eager_send &&
(OMPI_WIN_FENCE & ompi_win_get_mode(win))) {
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), 1);
ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), sendreq);
if (OMPI_SUCCESS == ret) {
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
P2P_MODULE(win)->p2p_num_pending_sendreqs[sendreq->req_target_rank]++;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
} else {
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
}
} else {
/* enqueue sendreq */
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
}
return ret;
}
int
ompi_osc_rdma_module_put(void *origin_addr, int origin_count,
struct ompi_datatype_t *origin_dt,
int target, int target_disp, int target_count,
struct ompi_datatype_t *target_dt, ompi_win_t *win)
{
int ret;
ompi_osc_rdma_sendreq_t *sendreq;
if (OMPI_WIN_FENCE & ompi_win_get_mode(win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(win, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
}
/* shortcut 0 count case */
if (0 == origin_count || 0 == target_count) {
return OMPI_SUCCESS;
}
/* create sendreq */
ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_PT2PT_PUT,
origin_addr,
origin_count,
origin_dt,
target,
target_disp,
target_count,
target_dt,
P2P_MODULE(win),
&sendreq);
if (OMPI_SUCCESS != ret) return ret;
/* if we're doing fence synchronization, try to actively send
right now */
if (P2P_MODULE(win)->p2p_eager_send &&
(OMPI_WIN_FENCE & ompi_win_get_mode(win))) {
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), 1);
ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), sendreq);
if (OMPI_SUCCESS == ret) {
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
P2P_MODULE(win)->p2p_num_pending_sendreqs[sendreq->req_target_rank]++;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
} else {
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
}
} else {
/* enqueue sendreq */
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
}
return ret;
}

616
ompi/mca/osc/rdma/osc_rdma_component.c Обычный файл
Просмотреть файл

@ -0,0 +1,616 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <string.h>
#include "osc_rdma.h"
#include "osc_rdma_sendreq.h"
#include "osc_rdma_replyreq.h"
#include "osc_rdma_header.h"
#include "osc_rdma_obj_convert.h"
#include "osc_rdma_data_move.h"
#include "opal/threads/mutex.h"
#include "ompi/info/info.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/osc/osc.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/mca/bml/bml.h"
#include "ompi/mca/bml/base/base.h"
#include "ompi/datatype/dt_arch.h"
static int ompi_osc_rdma_component_open(void);
ompi_osc_rdma_component_t mca_osc_rdma_component = {
{ /* ompi_osc_base_component_t */
{ /* ompi_base_component_t */
OMPI_OSC_BASE_VERSION_1_0_0,
"pt2pt",
1,
0,
0,
ompi_osc_rdma_component_open,
NULL
},
{ /* mca_base_component_data */
false /* checkpointable? */
},
ompi_osc_rdma_component_init,
ompi_osc_rdma_component_query,
ompi_osc_rdma_component_select,
ompi_osc_rdma_component_finalize
}
};
ompi_osc_rdma_module_t ompi_osc_rdma_module_template = {
{
ompi_osc_rdma_module_free,
ompi_osc_rdma_module_put,
ompi_osc_rdma_module_get,
ompi_osc_rdma_module_accumulate,
ompi_osc_rdma_module_fence,
ompi_osc_rdma_module_start,
ompi_osc_rdma_module_complete,
ompi_osc_rdma_module_post,
ompi_osc_rdma_module_wait,
ompi_osc_rdma_module_test,
ompi_osc_rdma_module_lock,
ompi_osc_rdma_module_unlock,
}
};
void ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t *descriptor,
void *cbdata);
/* look up parameters for configuring this window. The code first
looks in the info structure passed by the user, then through mca
parameters. */
static bool
check_config_value_bool(char *key, ompi_info_t *info)
{
char *value_string;
int value_len, ret, flag, param;
bool result;
ret = ompi_info_get_valuelen(info, key, &value_len, &flag);
if (OMPI_SUCCESS != ret) goto info_not_found;
if (flag == 0) goto info_not_found;
value_len++;
value_string = malloc(sizeof(char) * value_len);
if (NULL == value_string) goto info_not_found;
ret = ompi_info_get(info, key, value_len, value_string, &flag);
if (OMPI_SUCCESS != ret) {
free(value_string);
goto info_not_found;
}
assert(flag != 0);
ret = ompi_info_value_to_bool(value_string, &result);
free(value_string);
if (OMPI_SUCCESS != ret) goto info_not_found;
return result;
info_not_found:
param = mca_base_param_find("osc", "pt2pt", key);
if (param == OPAL_ERROR) return false;
ret = mca_base_param_lookup_int(param, &flag);
if (OMPI_SUCCESS != ret) return false;
result = flag;
return result;
}
static int fence_sync_index;
static int
ompi_osc_rdma_component_open(void)
{
fence_sync_index =
mca_base_param_reg_string(&mca_osc_rdma_component.super.osc_version,
"fence_sync_method",
"How to synchronize fence: reduce_scatter, allreduce, alltoall",
false, false, "reduce_scatter", NULL);
mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version,
"eager_send",
"Attempt to start data movement during communication call, "
"instead of at synchrnoization time. "
"Info key of same name overrides this value, "
"if info key given.",
false, false, 0, NULL);
mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version,
"no_locks",
"Enable optimizations available only if MPI_LOCK is not used.",
false, false, 0, NULL);
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_component_init(bool enable_progress_threads,
bool enable_mpi_threads)
{
/* we can run with either threads or not threads (may not be able
to do win locks)... */
mca_osc_rdma_component.p2p_c_have_progress_threads =
enable_progress_threads;
OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_modules,
opal_hash_table_t);
opal_hash_table_init(&mca_osc_rdma_component.p2p_c_modules, 2);
OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_sendreqs, opal_free_list_t);
opal_free_list_init(&mca_osc_rdma_component.p2p_c_sendreqs,
sizeof(ompi_osc_rdma_sendreq_t),
OBJ_CLASS(ompi_osc_rdma_sendreq_t),
1, -1, 1);
OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_replyreqs, opal_free_list_t);
opal_free_list_init(&mca_osc_rdma_component.p2p_c_replyreqs,
sizeof(ompi_osc_rdma_replyreq_t),
OBJ_CLASS(ompi_osc_rdma_replyreq_t),
1, -1, 1);
OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_longreqs, opal_free_list_t);
opal_free_list_init(&mca_osc_rdma_component.p2p_c_longreqs,
sizeof(ompi_osc_rdma_longreq_t),
OBJ_CLASS(ompi_osc_rdma_longreq_t),
1, -1, 1);
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_component_finalize(void)
{
size_t num_modules;
if (0 !=
(num_modules = opal_hash_table_get_size(&mca_osc_rdma_component.p2p_c_modules))) {
opal_output(0, "WARNING: There were %d Windows created but not freed.",
num_modules);
}
#if 0
mca_bml.bml_register(MCA_BTL_TAG_OSC_PT2PT, NULL, NULL);
#endif
OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_longreqs);
OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_replyreqs);
OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_sendreqs);
OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_modules);
OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_lock);
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_component_query(ompi_win_t *win,
ompi_info_t *info,
ompi_communicator_t *comm)
{
/* we can always run - return a low priority */
return 10;
}
int
ompi_osc_rdma_component_select(ompi_win_t *win,
ompi_info_t *info,
ompi_communicator_t *comm)
{
ompi_osc_rdma_module_t *module;
int ret, i;
char *sync_string;
/* create module structure */
module = malloc(sizeof(ompi_osc_rdma_module_t));
if (NULL == module) return OMPI_ERROR;
/* fill in the function pointer part */
memcpy(module, &ompi_osc_rdma_module_template,
sizeof(ompi_osc_base_module_t));
/* initialize the p2p part */
OBJ_CONSTRUCT(&(module->p2p_lock), opal_mutex_t);
OBJ_CONSTRUCT(&(module->p2p_acc_lock), opal_mutex_t);
module->p2p_win = win;
ret = ompi_comm_dup(comm, &(module->p2p_comm));
if (ret != OMPI_SUCCESS) {
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return ret;
}
OBJ_CONSTRUCT(&module->p2p_pending_sendreqs, opal_list_t);
module->p2p_num_pending_sendreqs = malloc(sizeof(short) *
ompi_comm_size(module->p2p_comm));
if (NULL == module->p2p_num_pending_sendreqs) {
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
ompi_comm_free(&comm);
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return ret;
}
memset(module->p2p_num_pending_sendreqs, 0,
sizeof(short) * ompi_comm_size(module->p2p_comm));
module->p2p_num_pending_out = 0;
module->p2p_num_pending_in = 0;
module->p2p_tag_counter = 0;
OBJ_CONSTRUCT(&(module->p2p_long_msgs), opal_list_t);
OBJ_CONSTRUCT(&(module->p2p_copy_pending_sendreqs), opal_list_t);
module->p2p_copy_num_pending_sendreqs = malloc(sizeof(short) *
ompi_comm_size(module->p2p_comm));
if (NULL == module->p2p_copy_num_pending_sendreqs) {
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_long_msgs);
free(module->p2p_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
ompi_comm_free(&comm);
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return ret;
}
memset(module->p2p_num_pending_sendreqs, 0,
sizeof(short) * ompi_comm_size(module->p2p_comm));
module->p2p_eager_send = check_config_value_bool("eager_send", info);
/* fence data */
module->p2p_fence_coll_counts = malloc(sizeof(int) *
ompi_comm_size(module->p2p_comm));
if (NULL == module->p2p_fence_coll_counts) {
free(module->p2p_copy_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_long_msgs);
free(module->p2p_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
ompi_comm_free(&comm);
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return ret;
}
for (i = 0 ; i < ompi_comm_size(module->p2p_comm) ; ++i) {
module->p2p_fence_coll_counts[i] = 1;
}
module->p2p_fence_coll_results = malloc(sizeof(int) *
ompi_comm_size(module->p2p_comm));
if (NULL == module->p2p_fence_coll_counts) {
free(module->p2p_fence_coll_counts);
free(module->p2p_copy_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_long_msgs);
free(module->p2p_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
ompi_comm_free(&comm);
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return ret;
}
/* figure out what sync method to use */
mca_base_param_lookup_string(fence_sync_index, &sync_string);
if (0 == strcmp(sync_string, "reduce_scatter")) {
module->p2p_fence_sync_type = OSC_SYNC_REDUCE_SCATTER;
} else if (0 == strcmp(sync_string, "allreduce")) {
module->p2p_fence_sync_type = OSC_SYNC_ALLREDUCE;
} else if (0 == strcmp(sync_string, "alltoall")) {
module->p2p_fence_sync_type = OSC_SYNC_ALLTOALL;
} else {
opal_output(0, "invalid value for fence_sync_method parameter: %s\n", sync_string);
return OMPI_ERROR;
}
/* pwsc data */
module->p2p_pw_group = NULL;
module->p2p_sc_group = NULL;
/* lock data */
module->p2p_lock_status = 0;
module->p2p_shared_count = 0;
OBJ_CONSTRUCT(&(module->p2p_locks_pending), opal_list_t);
module->p2p_lock_received_ack = 0;
/* update component data */
OPAL_THREAD_LOCK(&mca_osc_rdma_component.p2p_c_lock);
opal_hash_table_set_value_uint32(&mca_osc_rdma_component.p2p_c_modules,
module->p2p_comm->c_contextid,
module);
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.p2p_c_lock);
/* fill in window information */
win->w_osc_module = (ompi_osc_base_module_t*) module;
if (check_config_value_bool("no_locks", info)) {
win->w_flags |= OMPI_WIN_NO_LOCKS;
}
/* sync memory - make sure all initialization completed */
opal_atomic_mb();
/* register to receive fragment callbacks */
ret = mca_bml.bml_register(MCA_BTL_TAG_OSC_PT2PT,
ompi_osc_rdma_component_fragment_cb,
NULL);
if (module->p2p_eager_send) {
/* need to barrier if eager sending or we can receive before the
other side has been fully setup, causing much gnashing of
teeth. */
module->p2p_comm->c_coll.coll_barrier(module->p2p_comm);
}
return ret;
}
/* dispatch for callback on message completion */
void
ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t *descriptor,
void *cbdata)
{
int ret;
ompi_osc_rdma_module_t *module;
void *payload;
assert(descriptor->des_dst[0].seg_len >=
sizeof(ompi_osc_rdma_base_header_t));
/* handle message */
switch (((ompi_osc_rdma_base_header_t*) descriptor->des_dst[0].seg_addr.pval)->hdr_type) {
case OMPI_OSC_PT2PT_HDR_PUT:
{
ompi_osc_rdma_send_header_t *header;
/* get our header and payload */
header = (ompi_osc_rdma_send_header_t*)
descriptor->des_dst[0].seg_addr.pval;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
ret = ompi_osc_rdma_sendreq_recv_put(module, header, payload);
}
break;
case OMPI_OSC_PT2PT_HDR_ACC:
{
ompi_osc_rdma_send_header_t *header;
/* get our header and payload */
header = (ompi_osc_rdma_send_header_t*)
descriptor->des_dst[0].seg_addr.pval;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
/* receive into temporary buffer */
ret = ompi_osc_rdma_sendreq_recv_accum(module, header, payload);
}
break;
case OMPI_OSC_PT2PT_HDR_GET:
{
ompi_datatype_t *datatype;
ompi_osc_rdma_send_header_t *header;
ompi_osc_rdma_replyreq_t *replyreq;
ompi_proc_t *proc;
/* get our header and payload */
header = (ompi_osc_rdma_send_header_t*)
descriptor->des_dst[0].seg_addr.pval;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
/* create or get a pointer to our datatype */
proc = module->p2p_comm->c_pml_procs[header->hdr_origin]->proc_ompi;
datatype = ompi_osc_rdma_datatype_create(proc, &payload);
/* create replyreq sendreq */
ret = ompi_osc_rdma_replyreq_alloc_init(module,
header->hdr_origin,
header->hdr_origin_sendreq,
header->hdr_target_disp,
header->hdr_target_count,
datatype,
&replyreq);
/* send replyreq */
ompi_osc_rdma_replyreq_send(module, replyreq);
/* sendreq does the right retain, so we can release safely */
OBJ_RELEASE(datatype);
}
break;
case OMPI_OSC_PT2PT_HDR_REPLY:
{
ompi_osc_rdma_reply_header_t *header;
ompi_osc_rdma_sendreq_t *sendreq;
/* get our header and payload */
header = (ompi_osc_rdma_reply_header_t*)
descriptor->des_dst[0].seg_addr.pval;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_REPLY_HDR_NTOH(*header);
}
#endif
/* get original sendreq pointer */
sendreq = (ompi_osc_rdma_sendreq_t*) header->hdr_origin_sendreq.pval;
module = sendreq->req_module;
/* receive data */
ompi_osc_rdma_replyreq_recv(module, sendreq, header, payload);
}
break;
case OMPI_OSC_PT2PT_HDR_POST:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1);
}
break;
case OMPI_OSC_PT2PT_HDR_COMPLETE:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
/* we've heard from one more place, and have value reqs to
process */
OPAL_THREAD_ADD32(&(module->p2p_num_pending_out), -1);
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), header->hdr_value[0]);
}
break;
case OMPI_OSC_PT2PT_HDR_LOCK_REQ:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
if (header->hdr_value[1] > 0) {
ompi_osc_rdma_passive_lock(module, header->hdr_value[0],
header->hdr_value[1]);
} else {
OPAL_THREAD_ADD32(&(module->p2p_lock_received_ack), 1);
}
}
break;
case OMPI_OSC_PT2PT_HDR_UNLOCK_REQ:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);
}
#endif
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
ompi_osc_rdma_passive_unlock(module, header->hdr_value[0],
header->hdr_value[1]);
}
break;
default:
/* BWB - FIX ME - this sucks */
opal_output(0, "received packet for Window with unknown type");
abort();
}
}

842
ompi/mca/osc/rdma/osc_rdma_data_move.c Обычный файл
Просмотреть файл

@ -0,0 +1,842 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_rdma.h"
#include "osc_rdma_sendreq.h"
#include "osc_rdma_header.h"
#include "osc_rdma_data_move.h"
#include "osc_rdma_obj_convert.h"
#include "opal/util/output.h"
#include "opal/sys/atomic.h"
#include "ompi/mca/bml/bml.h"
#include "ompi/mca/bml/base/base.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/datatype/datatype.h"
#include "ompi/datatype/dt_arch.h"
static inline int32_t
create_send_tag(ompi_osc_rdma_module_t *module)
{
#if OMPI_HAVE_THREAD_SUPPORT && OPAL_HAVE_ATOMIC_CMPSET_32
int32_t newval, oldval;
do {
oldval = module->p2p_tag_counter;
newval = (oldval + 1) % mca_pml.pml_max_tag;
} while (0 == opal_atomic_cmpset_32(&module->p2p_tag_counter, oldval, newval));
return newval;
#elif OMPI_HAVE_THREAD_SUPPORT
int32_t ret;
/* no compare and swap - have to lock the module */
OPAL_THREAD_LOCK(&module->p2p_lock);
module->p2p_tag_counter = (module->p2p_tag_counter + 1) % mca_pml.pml_max_tag;
ret = module->p2p_tag_counter;
OPAL_THREAD_UNLOCK(&module->p2p_lock);
return ret;
#else
module->p2p_tag_counter = (module->p2p_tag_counter + 1) % mca_pml.pml_max_tag;
return module->p2p_tag_counter;
#endif
}
/**********************************************************************
*
* Sending a sendreq to target
*
**********************************************************************/
static void
ompi_osc_rdma_sendreq_send_long_cb(ompi_osc_rdma_longreq_t *longreq)
{
ompi_osc_rdma_sendreq_t *sendreq =
(ompi_osc_rdma_sendreq_t*) longreq->req_comp_cbdata;
opal_output(-1, "%d completed long sendreq to %d",
sendreq->req_module->p2p_comm->c_my_rank,
sendreq->req_target_rank);
opal_list_remove_item(&(sendreq->req_module->p2p_long_msgs),
&(longreq->super.super));
ompi_osc_rdma_longreq_free(longreq);
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
ompi_osc_rdma_sendreq_free(sendreq);
}
static void
ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t *endpoint,
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
ompi_osc_rdma_sendreq_t *sendreq =
(ompi_osc_rdma_sendreq_t*) descriptor->des_cbdata;
ompi_osc_rdma_send_header_t *header =
(ompi_osc_rdma_send_header_t*) descriptor->des_src[0].seg_addr.pval;
if (OMPI_SUCCESS != status) {
/* requeue and return */
/* BWB - FIX ME - figure out where to put this bad boy */
abort();
return;
}
/* have to look at header, and not the sendreq because in the case
of get, it's possible that the sendreq has been freed already
(if the remote side replies before we get our send completion
callback) and already allocated to another request. We don't
wait for this completion before exiting a synchronization point
in the case of get, as we really don't care when it completes -
only when the data arrives. */
if (OMPI_OSC_PT2PT_HDR_GET != header->hdr_base.hdr_type) {
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header);
}
#endif
/* do we need to post a send? */
if (header->hdr_msg_length != 0) {
/* sendreq is done. Mark it as so and get out of here */
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
ompi_osc_rdma_sendreq_free(sendreq);
} else {
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->req_comp_cb = ompi_osc_rdma_sendreq_send_long_cb;
longreq->req_comp_cbdata = sendreq;
opal_output(-1, "%d starting long sendreq to %d (%d)",
sendreq->req_module->p2p_comm->c_my_rank,
sendreq->req_target_rank,
header->hdr_origin_tag);
mca_pml.pml_isend(sendreq->req_origin_convertor.pBaseBuf,
sendreq->req_origin_convertor.count,
sendreq->req_origin_datatype,
sendreq->req_target_rank,
header->hdr_origin_tag,
MCA_PML_BASE_SEND_STANDARD,
sendreq->req_module->p2p_comm,
&(longreq->req_pml_req));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&(sendreq->req_module->p2p_lock));
opal_list_append(&(sendreq->req_module->p2p_long_msgs),
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&(sendreq->req_module->p2p_lock));
}
}
/* release the descriptor and sendreq */
btl->btl_free(btl, descriptor);
/* any other sendreqs to restart? */
/* BWB - FIX ME - implement sending the next sendreq here */
}
/* create the initial fragment, pack header, datatype, and payload (if
size fits) and send */
int
ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_sendreq_t *sendreq)
{
int ret = OMPI_SUCCESS;
mca_bml_base_endpoint_t *endpoint = NULL;
mca_bml_base_btl_t *bml_btl = NULL;
mca_btl_base_descriptor_t *descriptor = NULL;
ompi_osc_rdma_send_header_t *header = NULL;
size_t written_data = 0;
size_t needed_len = sizeof(ompi_osc_rdma_send_header_t);
const void *packed_ddt;
size_t packed_ddt_len = ompi_ddt_pack_description_length(sendreq->req_target_datatype);
/* we always need to send the ddt */
needed_len += packed_ddt_len;
if (OMPI_OSC_PT2PT_GET != sendreq->req_type) {
needed_len += sendreq->req_origin_bytes_packed;
}
/* Get a BTL so we have the eager limit */
endpoint = (mca_bml_base_endpoint_t*) sendreq->req_target_proc->proc_bml;
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager);
descriptor = bml_btl->btl_alloc(bml_btl->btl,
needed_len < bml_btl->btl_eager_limit ? needed_len :
bml_btl->btl_eager_limit);
if (NULL == descriptor) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
/* verify at least enough space for header */
if (descriptor->des_src[0].seg_len < sizeof(ompi_osc_rdma_send_header_t)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
/* setup descriptor */
descriptor->des_cbfunc = ompi_osc_rdma_sendreq_send_cb;
descriptor->des_cbdata = (void*) sendreq;
descriptor->des_flags = MCA_BTL_DES_FLAGS_PRIORITY;
/* pack header */
header = (ompi_osc_rdma_send_header_t*) descriptor->des_src[0].seg_addr.pval;
written_data += sizeof(ompi_osc_rdma_send_header_t);
header->hdr_base.hdr_flags = 0;
header->hdr_windx = sendreq->req_module->p2p_comm->c_contextid;
header->hdr_origin = sendreq->req_module->p2p_comm->c_my_rank;
header->hdr_origin_sendreq.pval = (void*) sendreq;
header->hdr_origin_tag = 0;
header->hdr_target_disp = sendreq->req_target_disp;
header->hdr_target_count = sendreq->req_target_count;
switch (sendreq->req_type) {
case OMPI_OSC_PT2PT_PUT:
header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_PUT;
#if OMPI_ENABLE_MEM_DEBUG
header->hdr_target_op = 0;
#endif
break;
case OMPI_OSC_PT2PT_ACC:
header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_ACC;
header->hdr_target_op = sendreq->req_op_id;
break;
case OMPI_OSC_PT2PT_GET:
header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_GET;
#if OMPI_ENABLE_MEM_DEBUG
header->hdr_target_op = 0;
#endif
break;
}
/* Set datatype id and / or pack datatype */
ret = ompi_ddt_get_pack_description(sendreq->req_target_datatype, &packed_ddt);
if (OMPI_SUCCESS != ret) goto cleanup;
memcpy((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data,
packed_ddt, packed_ddt_len);
written_data += packed_ddt_len;
if (OMPI_OSC_PT2PT_GET != sendreq->req_type) {
/* if sending data and it fits, pack payload */
if (descriptor->des_src[0].seg_len >=
written_data + sendreq->req_origin_bytes_packed) {
struct iovec iov;
uint32_t iov_count = 1;
int32_t free_after;
size_t max_data = sendreq->req_origin_bytes_packed;
iov.iov_len = max_data;
iov.iov_base = (void*) ((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data);
ret = ompi_convertor_pack(&sendreq->req_origin_convertor, &iov, &iov_count,
&max_data, &free_after);
if (ret < 0) {
ret = OMPI_ERR_FATAL;
goto cleanup;
}
assert(max_data == sendreq->req_origin_bytes_packed);
written_data += max_data;
descriptor->des_src[0].seg_len = written_data;
header->hdr_msg_length = sendreq->req_origin_bytes_packed;
} else {
header->hdr_msg_length = 0;
header->hdr_origin_tag = create_send_tag(module);
}
} else {
descriptor->des_src[0].seg_len = written_data;
header->hdr_msg_length = 0;
}
#ifdef WORDS_BIGENDIAN
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
#elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (sendreq->req_target_proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) {
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
OMPI_OSC_PT2PT_SEND_HDR_HTON(*header);
}
#endif
/* send fragment */
opal_output(-1, "%d sending sendreq to %d",
sendreq->req_module->p2p_comm->c_my_rank,
sendreq->req_target_rank);
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT);
goto done;
cleanup:
if (descriptor != NULL) {
mca_bml_base_free(bml_btl, descriptor);
}
done:
return ret;
}
/**********************************************************************
*
* Sending a replyreq back to origin
*
**********************************************************************/
static void
ompi_osc_rdma_replyreq_send_long_cb(ompi_osc_rdma_longreq_t *longreq)
{
ompi_osc_rdma_replyreq_t *replyreq =
(ompi_osc_rdma_replyreq_t*) longreq->req_comp_cbdata;
opal_list_remove_item(&(replyreq->rep_module->p2p_long_msgs),
&(longreq->super.super));
ompi_osc_rdma_longreq_free(longreq);
OPAL_THREAD_ADD32(&(replyreq->rep_module->p2p_num_pending_in), -1);
ompi_osc_rdma_replyreq_free(replyreq);
}
static void
ompi_osc_rdma_replyreq_send_cb(struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t *endpoint,
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
ompi_osc_rdma_replyreq_t *replyreq =
(ompi_osc_rdma_replyreq_t*) descriptor->des_cbdata;
ompi_osc_rdma_reply_header_t *header =
(ompi_osc_rdma_reply_header_t*) descriptor->des_src[0].seg_addr.pval;
if (OMPI_SUCCESS != status) {
/* requeue and return */
/* BWB - FIX ME - figure out where to put this bad boy */
abort();
return;
}
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_REPLY_HDR_NTOH(*header);
}
#endif
/* do we need to post a send? */
if (header->hdr_msg_length != 0) {
/* sendreq is done. Mark it as so and get out of here */
OPAL_THREAD_ADD32(&(replyreq->rep_module->p2p_num_pending_in), -1);
ompi_osc_rdma_replyreq_free(replyreq);
} else {
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->req_comp_cb = ompi_osc_rdma_replyreq_send_long_cb;
longreq->req_comp_cbdata = replyreq;
mca_pml.pml_isend(replyreq->rep_target_convertor.pBaseBuf,
replyreq->rep_target_convertor.count,
replyreq->rep_target_datatype,
replyreq->rep_origin_rank,
header->hdr_target_tag,
MCA_PML_BASE_SEND_STANDARD,
replyreq->rep_module->p2p_comm,
&(longreq->req_pml_req));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&(replyreq->rep_module->p2p_lock));
opal_list_append(&(replyreq->rep_module->p2p_long_msgs),
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&(replyreq->rep_module->p2p_lock));
}
/* release the descriptor and replyreq */
btl->btl_free(btl, descriptor);
/* any other replyreqs to restart? */
}
int
ompi_osc_rdma_replyreq_send(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_replyreq_t *replyreq)
{
int ret = OMPI_SUCCESS;
mca_bml_base_endpoint_t *endpoint = NULL;
mca_bml_base_btl_t *bml_btl = NULL;
mca_btl_base_descriptor_t *descriptor = NULL;
ompi_osc_rdma_reply_header_t *header = NULL;
size_t written_data = 0;
/* Get a BTL and a fragment to go with it */
endpoint = (mca_bml_base_endpoint_t*) replyreq->rep_origin_proc->proc_bml;
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager);
descriptor = bml_btl->btl_alloc(bml_btl->btl,
bml_btl->btl_eager_limit);
if (NULL == descriptor) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
/* verify at least enough space for header */
if (descriptor->des_src[0].seg_len < sizeof(ompi_osc_rdma_reply_header_t)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
/* setup descriptor */
descriptor->des_cbfunc = ompi_osc_rdma_replyreq_send_cb;
descriptor->des_cbdata = (void*) replyreq;
descriptor->des_flags = MCA_BTL_DES_FLAGS_PRIORITY;
/* pack header */
header = (ompi_osc_rdma_reply_header_t*) descriptor->des_src[0].seg_addr.pval;
written_data += sizeof(ompi_osc_rdma_reply_header_t);
header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_REPLY;
header->hdr_base.hdr_flags = 0;
header->hdr_origin_sendreq = replyreq->rep_origin_sendreq;
header->hdr_target_tag = 0;
/* if sending data fits, pack payload */
if (descriptor->des_src[0].seg_len >=
written_data + replyreq->rep_target_bytes_packed) {
struct iovec iov;
uint32_t iov_count = 1;
int32_t free_after;
size_t max_data = replyreq->rep_target_bytes_packed;
iov.iov_len = max_data;
iov.iov_base = (void*) ((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data);
ret = ompi_convertor_pack(&replyreq->rep_target_convertor, &iov, &iov_count,
&max_data, &free_after);
if (ret < 0) {
ret = OMPI_ERR_FATAL;
goto cleanup;
}
assert(max_data == replyreq->rep_target_bytes_packed);
written_data += max_data;
descriptor->des_src[0].seg_len = written_data;
header->hdr_msg_length = replyreq->rep_target_bytes_packed;
} else {
header->hdr_msg_length = 0;
header->hdr_target_tag = create_send_tag(module);
}
#ifdef WORDS_BIGENDIAN
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
#elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (replyreq->rep_origin_proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) {
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
OMPI_OSC_PT2PT_REPLY_HDR_HTON(*header);
}
#endif
/* send fragment */
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT);
goto done;
cleanup:
if (descriptor != NULL) {
mca_bml_base_free(bml_btl, descriptor);
}
done:
return ret;
}
/**********************************************************************
*
* Receive a put on the target side
*
**********************************************************************/
static void
ompi_osc_rdma_sendreq_recv_put_long_cb(ompi_osc_rdma_longreq_t *longreq)
{
opal_list_remove_item(&(longreq->req_module->p2p_long_msgs),
&(longreq->super.super));
OBJ_RELEASE(longreq->req_datatype);
ompi_osc_rdma_longreq_free(longreq);
OPAL_THREAD_ADD32(&(longreq->req_module->p2p_num_pending_in), -1);
}
int
ompi_osc_rdma_sendreq_recv_put(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_send_header_t *header,
void *inbuf)
{
int ret = OMPI_SUCCESS;
void *target = (unsigned char*) module->p2p_win->w_baseptr +
(header->hdr_target_disp * module->p2p_win->w_disp_unit);
ompi_proc_t *proc = module->p2p_comm->c_pml_procs[header->hdr_origin]->proc_ompi;
struct ompi_datatype_t *datatype =
ompi_osc_rdma_datatype_create(proc, &inbuf);
if (header->hdr_msg_length > 0) {
ompi_convertor_t convertor;
struct iovec iov;
uint32_t iov_count = 1;
int32_t free_after = 0;
size_t max_data;
ompi_proc_t *proc;
/* create convertor */
OBJ_CONSTRUCT(&convertor, ompi_convertor_t);
/* initialize convertor */
proc = ompi_comm_peer_lookup(module->p2p_comm, header->hdr_origin);
ompi_convertor_copy_and_prepare_for_recv(proc->proc_convertor,
datatype,
header->hdr_target_count,
target,
0,
&convertor);
iov.iov_len = header->hdr_msg_length;
iov.iov_base = inbuf;
max_data = iov.iov_len;
ompi_convertor_unpack(&convertor,
&iov,
&iov_count,
&max_data,
&free_after);
OBJ_DESTRUCT(&convertor);
OBJ_RELEASE(datatype);
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1);
} else {
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->req_comp_cb = ompi_osc_rdma_sendreq_recv_put_long_cb;
longreq->req_comp_cbdata = NULL;
longreq->req_datatype = datatype;
longreq->req_module = module;
ret = mca_pml.pml_irecv(target,
header->hdr_target_count,
datatype,
header->hdr_origin,
header->hdr_origin_tag,
module->p2p_comm,
&(longreq->req_pml_req));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&(module->p2p_lock));
opal_list_append(&(module->p2p_long_msgs),
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
}
return ret;
}
/**********************************************************************
*
* Receive an accumulate on the target side
*
**********************************************************************/
static void
ompi_osc_rdma_sendreq_recv_accum_long_cb(ompi_osc_rdma_longreq_t *longreq)
{
ompi_osc_rdma_send_header_t *header =
(ompi_osc_rdma_send_header_t*) longreq->req_comp_cbdata;
void *payload = (void*) (header + 1);
int ret;
/* lock the window for accumulates */
OPAL_THREAD_LOCK(&longreq->req_module->p2p_acc_lock);
opal_list_remove_item(&(longreq->req_module->p2p_long_msgs),
&(longreq->super.super));
/* copy the data from the temporary buffer into the user window */
ret = ompi_osc_rdma_process_op(longreq->req_module,
header,
longreq->req_datatype,
longreq->req_op,
payload,
header->hdr_msg_length);
/* unlock the window for accumulates */
OPAL_THREAD_UNLOCK(&longreq->req_module->p2p_acc_lock);
opal_output(-1, "%d finished receiving long accum message from %d",
longreq->req_module->p2p_comm->c_my_rank,
header->hdr_origin);
/* free the temp buffer */
free(longreq->req_comp_cbdata);
/* Release datatype & op */
OBJ_RELEASE(longreq->req_datatype);
OBJ_RELEASE(longreq->req_op);
OPAL_THREAD_ADD32(&(longreq->req_module->p2p_num_pending_in), -1);
ompi_osc_rdma_longreq_free(longreq);
}
int
ompi_osc_rdma_sendreq_recv_accum(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_send_header_t *header,
void *payload)
{
int ret = OMPI_SUCCESS;
struct ompi_op_t *op = ompi_osc_rdma_op_create(header->hdr_target_op);
ompi_proc_t *proc = module->p2p_comm->c_pml_procs[header->hdr_origin]->proc_ompi;
struct ompi_datatype_t *datatype =
ompi_osc_rdma_datatype_create(proc, &payload);
if (header->hdr_msg_length > 0) {
/* lock the window for accumulates */
OPAL_THREAD_LOCK(&module->p2p_acc_lock);
/* copy the data from the temporary buffer into the user window */
ret = ompi_osc_rdma_process_op(module, header, datatype, op, payload,
header->hdr_msg_length);
/* unlock the window for accumulates */
OPAL_THREAD_UNLOCK(&module->p2p_acc_lock);
/* Release datatype & op */
OBJ_RELEASE(datatype);
OBJ_RELEASE(op);
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1);
opal_output(-1, "%d received accum message from %d",
module->p2p_comm->c_my_rank,
header->hdr_origin);
} else {
ompi_osc_rdma_longreq_t *longreq;
long lb, extent, true_lb, true_extent;
size_t buflen;
/* figure out how big a buffer we need */
ompi_ddt_get_extent(datatype, &lb, &extent);
ompi_ddt_get_true_extent(datatype, &true_lb, &true_extent);
buflen = true_extent + (header->hdr_target_count - 1) * extent;
/* get a longreq and fill it in */
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->req_comp_cb = ompi_osc_rdma_sendreq_recv_accum_long_cb;
longreq->req_datatype = datatype;
longreq->req_op = op;
longreq->req_module = module;
/* allocate a buffer to receive into ... */
longreq->req_comp_cbdata = malloc(buflen + sizeof(ompi_osc_rdma_send_header_t));
if (NULL == longreq->req_comp_cbdata) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
/* fill in tmp header */
memcpy(longreq->req_comp_cbdata, header,
sizeof(ompi_osc_rdma_send_header_t));
((ompi_osc_rdma_send_header_t*) longreq->req_comp_cbdata)->hdr_msg_length = buflen;
ret = mca_pml.pml_irecv(((char*) longreq->req_comp_cbdata) + sizeof(ompi_osc_rdma_send_header_t),
header->hdr_target_count,
datatype,
header->hdr_origin,
header->hdr_origin_tag,
module->p2p_comm,
&(longreq->req_pml_req));
opal_output(-1, "%d started long recv accum message from %d (%d)",
module->p2p_comm->c_my_rank,
header->hdr_origin,
header->hdr_origin_tag);
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&(module->p2p_lock));
opal_list_append(&(module->p2p_long_msgs),
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
}
return ret;
}
/**********************************************************************
*
* Recveive a get on the origin side
*
**********************************************************************/
static void
ompi_osc_rdma_replyreq_recv_long_cb(ompi_osc_rdma_longreq_t *longreq)
{
ompi_osc_rdma_sendreq_t *sendreq =
(ompi_osc_rdma_sendreq_t*) longreq->req_comp_cbdata;
opal_list_remove_item(&(longreq->req_module->p2p_long_msgs),
&(longreq->super.super));
ompi_osc_rdma_longreq_free(longreq);
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
ompi_osc_rdma_sendreq_free(sendreq);
}
int
ompi_osc_rdma_replyreq_recv(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_sendreq_t *sendreq,
ompi_osc_rdma_reply_header_t *header,
void *payload)
{
int ret = OMPI_SUCCESS;
/* receive into user buffer */
if (header->hdr_msg_length > 0) {
/* short message. woo! */
struct iovec iov;
uint32_t iov_count = 1;
int32_t free_after = 0;
size_t max_data;
iov.iov_len = header->hdr_msg_length;
iov.iov_base = payload;
max_data = iov.iov_len;
ompi_convertor_unpack(&sendreq->req_origin_convertor,
&iov,
&iov_count,
&max_data,
&free_after);
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
ompi_osc_rdma_sendreq_free(sendreq);
} else {
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->req_comp_cb = ompi_osc_rdma_replyreq_recv_long_cb;
longreq->req_comp_cbdata = sendreq;
longreq->req_module = module;
/* BWB - FIX ME - George is going to kill me for this */
ret = mca_pml.pml_irecv(sendreq->req_origin_convertor.pBaseBuf,
sendreq->req_origin_convertor.count,
sendreq->req_origin_datatype,
sendreq->req_target_rank,
header->hdr_target_tag,
module->p2p_comm,
&(longreq->req_pml_req));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&(module->p2p_lock));
opal_list_append(&(module->p2p_long_msgs),
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
}
return ret;
}
/**********************************************************************
*
* Control message communication
*
**********************************************************************/
static void
ompi_osc_rdma_control_send_cb(struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t *endpoint,
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
/* release the descriptor and sendreq */
btl->btl_free(btl, descriptor);
}
int
ompi_osc_rdma_control_send(ompi_osc_rdma_module_t *module,
ompi_proc_t *proc,
uint8_t type, int32_t value0, int32_t value1)
{
int ret = OMPI_SUCCESS;
mca_bml_base_endpoint_t *endpoint = NULL;
mca_bml_base_btl_t *bml_btl = NULL;
mca_btl_base_descriptor_t *descriptor = NULL;
ompi_osc_rdma_control_header_t *header = NULL;
/* Get a BTL and a fragment to go with it */
endpoint = (mca_bml_base_endpoint_t*) proc->proc_bml;
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager);
descriptor = bml_btl->btl_alloc(bml_btl->btl,
sizeof(ompi_osc_rdma_control_header_t));
if (NULL == descriptor) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
/* verify at least enough space for header */
if (descriptor->des_src[0].seg_len < sizeof(ompi_osc_rdma_control_header_t)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
/* setup descriptor */
descriptor->des_cbfunc = ompi_osc_rdma_control_send_cb;
descriptor->des_cbdata = NULL;
descriptor->des_flags = MCA_BTL_DES_FLAGS_PRIORITY;
descriptor->des_src[0].seg_len = sizeof(ompi_osc_rdma_control_header_t);
/* pack header */
header = (ompi_osc_rdma_control_header_t*) descriptor->des_src[0].seg_addr.pval;
header->hdr_base.hdr_type = type;
header->hdr_value[0] = value0;
header->hdr_value[1] = value1;
header->hdr_windx = module->p2p_comm->c_contextid;
#ifdef WORDS_BIGENDIAN
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
#elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) {
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
OMPI_OSC_PT2PT_CONTROL_HDR_HTON(*header);
}
#endif
/* send fragment */
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT);
goto done;
cleanup:
if (descriptor != NULL) {
mca_bml_base_free(bml_btl, descriptor);
}
done:
return ret;
}

55
ompi/mca/osc/rdma/osc_rdma_data_move.h Обычный файл
Просмотреть файл

@ -0,0 +1,55 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OMPI_MCA_OSC_PT2PT_DATA_MOVE_H
#define OMPI_MCA_OSC_PT2PT_DATA_MOVE_H
#include "osc_rdma_sendreq.h"
#include "osc_rdma_replyreq.h"
/* send a sendreq (the request from the origin for a Put, Get, or
Accumulate, including the payload for Put and Accumulate) */
int ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_sendreq_t *sendreq);
/* send a replyreq (the request from the target of a Get, with the
payload for the origin */
int ompi_osc_rdma_replyreq_send(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_replyreq_t *replyreq);
/* receive the target side of a sendreq for a put, directly into the user's window */
int ompi_osc_rdma_sendreq_recv_put(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_send_header_t *header,
void *payload);
/* receive the target side of a sendreq for an accumulate, possibly
using a temproart buffer, then calling the reduction functions */
int ompi_osc_rdma_sendreq_recv_accum(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_send_header_t *header,
void *payload);
/* receive the origin side of a replyreq (the reply part of an
MPI_Get), directly into the user's window */
int ompi_osc_rdma_replyreq_recv(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_sendreq_t *sendreq,
ompi_osc_rdma_reply_header_t *header,
void *payload);
int ompi_osc_rdma_control_send(ompi_osc_rdma_module_t *module,
ompi_proc_t *proc,
uint8_t type, int32_t value0, int32_t value1);
#endif

136
ompi/mca/osc/rdma/osc_rdma_header.h Обычный файл
Просмотреть файл

@ -0,0 +1,136 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OMPI_MCA_OSC_PT2PT_HDR_H
#define OMPI_MCA_OSC_PT2PT_HDR_H
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#include "opal/types.h"
#define OMPI_OSC_PT2PT_HDR_PUT 0x0001
#define OMPI_OSC_PT2PT_HDR_ACC 0x0002
#define OMPI_OSC_PT2PT_HDR_GET 0x0004
#define OMPI_OSC_PT2PT_HDR_REPLY 0x0008
#define OMPI_OSC_PT2PT_HDR_POST 0x0010
#define OMPI_OSC_PT2PT_HDR_COMPLETE 0x0020
#define OMPI_OSC_PT2PT_HDR_LOCK_REQ 0x0040
#define OMPI_OSC_PT2PT_HDR_UNLOCK_REQ 0x0080
#define OMPI_OSC_PT2PT_HDR_FLAG_NBO 0x0001
struct ompi_osc_rdma_base_header_t {
uint8_t hdr_type;
/* eventually, this will include endian information */
uint8_t hdr_flags;
};
typedef struct ompi_osc_rdma_base_header_t ompi_osc_rdma_base_header_t;
#define OMPI_OSC_PT2PT_BASE_HDR_NTOH(h)
#define OMPI_OSC_PT2PT_BASE_HDR_HTON(h)
struct ompi_osc_rdma_send_header_t {
ompi_osc_rdma_base_header_t hdr_base;
uint16_t hdr_windx;
int32_t hdr_origin;
ompi_ptr_t hdr_origin_sendreq;
int32_t hdr_origin_tag;
int32_t hdr_target_disp;
int32_t hdr_target_count;
int32_t hdr_target_op;
int32_t hdr_msg_length; /* 0 if payload is not included */
};
typedef struct ompi_osc_rdma_send_header_t ompi_osc_rdma_send_header_t;
#define OMPI_OSC_PT2PT_SEND_HDR_HTON(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \
(hdr).hdr_windx = htons((hdr).hdr_windx); \
(hdr).hdr_origin = htonl((hdr).hdr_origin); \
(hdr).hdr_origin_tag = htonl((hdr).hdr_origin_tag); \
(hdr).hdr_target_disp = htonl((hdr).hdr_target_disp); \
(hdr).hdr_target_count = htonl((hdr).hdr_target_count); \
(hdr).hdr_target_op = htonl((hdr).hdr_target_op); \
(hdr).hdr_msg_length = htonl((hdr).hdr_msg_length); \
} while (0)
#define OMPI_OSC_PT2PT_SEND_HDR_NTOH(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \
(hdr).hdr_windx = ntohs((hdr).hdr_windx); \
(hdr).hdr_origin = ntohl((hdr).hdr_origin); \
(hdr).hdr_origin_tag = ntohl((hdr).hdr_origin_tag); \
(hdr).hdr_target_disp = ntohl((hdr).hdr_target_disp); \
(hdr).hdr_target_count = ntohl((hdr).hdr_target_count); \
(hdr).hdr_target_op = ntohl((hdr).hdr_target_op); \
(hdr).hdr_msg_length = ntohl((hdr).hdr_msg_length); \
} while (0)
struct ompi_osc_rdma_reply_header_t {
ompi_osc_rdma_base_header_t hdr_base;
ompi_ptr_t hdr_origin_sendreq;
int32_t hdr_target_tag;
int32_t hdr_msg_length;
};
typedef struct ompi_osc_rdma_reply_header_t ompi_osc_rdma_reply_header_t;
#define OMPI_OSC_PT2PT_REPLY_HDR_HTON(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \
(hdr).hdr_target_tag = htonl((hdr).hdr_target_tag); \
(hdr).hdr_msg_length = htonl((hdr).hdr_msg_length); \
} while (0)
#define OMPI_OSC_PT2PT_REPLY_HDR_NTOH(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \
(hdr).hdr_target_tag = ntohl((hdr).hdr_target_tag); \
(hdr).hdr_msg_length = ntohl((hdr).hdr_msg_length); \
} while (0)
struct ompi_osc_rdma_control_header_t {
ompi_osc_rdma_base_header_t hdr_base;
int16_t hdr_windx;
int32_t hdr_value[2];
};
typedef struct ompi_osc_rdma_control_header_t ompi_osc_rdma_control_header_t;
#define OMPI_OSC_PT2PT_CONTROL_HDR_HTON(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \
(hdr).hdr_windx = htons((hdr).hdr_windx); \
(hdr).hdr_value[0] = htonl((hdr).hdr_value[0]); \
(hdr).hdr_value[1] = htonl((hdr).hdr_value[1]); \
} while (0)
#define OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \
(hdr).hdr_windx = ntohs((hdr).hdr_windx); \
(hdr).hdr_value[0] = ntohl((hdr).hdr_value[0]); \
(hdr).hdr_value[1] = ntohl((hdr).hdr_value[1]); \
} while (0)
#endif /* OMPI_MCA_OSC_PT2PT_HDR_H */

26
ompi/mca/osc/rdma/osc_rdma_longreq.c Обычный файл
Просмотреть файл

@ -0,0 +1,26 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_rdma_longreq.h"
#include "opal/class/opal_list.h"
OBJ_CLASS_INSTANCE(ompi_osc_rdma_longreq_t, opal_free_list_item_t,
NULL, NULL);

71
ompi/mca/osc/rdma/osc_rdma_longreq.h Обычный файл
Просмотреть файл

@ -0,0 +1,71 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OSC_PT2PT_LONGREQ_H
#define OSC_PT2PT_LONGREQ_H
#include "osc_rdma.h"
#include "opal/class/opal_list.h"
#include "opal/class/opal_free_list.h"
#include "ompi/request/request.h"
struct ompi_osc_rdma_longreq_t;
typedef struct ompi_osc_rdma_longreq_t ompi_osc_rdma_longreq_t;
typedef void (*ompi_osc_rdma_longreq_comp_cb_t)(ompi_osc_rdma_longreq_t *longreq);
struct ompi_osc_rdma_longreq_t {
opal_free_list_item_t super;
/* warning - this doesn't always have a sane value */
ompi_osc_rdma_module_t *req_module;
ompi_request_t *req_pml_req;
ompi_osc_rdma_longreq_comp_cb_t req_comp_cb;
/* general storage place - usually holds a request of some type */
void *req_comp_cbdata;
/* for long receives, to avoid a longrecvreq type */
/* BWB - I don't like this, but I don't want another free list. What to do? */
struct ompi_op_t *req_op;
struct ompi_datatype_t *req_datatype;
};
OBJ_CLASS_DECLARATION(ompi_osc_rdma_longreq_t);
static inline int
ompi_osc_rdma_longreq_alloc(ompi_osc_rdma_longreq_t **longreq)
{
opal_free_list_item_t *item;
int ret;
OPAL_FREE_LIST_GET(&mca_osc_rdma_component.p2p_c_longreqs,
item, ret);
*longreq = (ompi_osc_rdma_longreq_t*) item;
return ret;
}
static inline int
ompi_osc_rdma_longreq_free(ompi_osc_rdma_longreq_t *longreq)
{
OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.p2p_c_longreqs,
&longreq->super.super);
return OMPI_SUCCESS;
}
#endif

98
ompi/mca/osc/rdma/osc_rdma_obj_convert.c Обычный файл
Просмотреть файл

@ -0,0 +1,98 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/*
* utility functions for dealing with remote datatype and op structures
*/
#include "ompi_config.h"
#include "ompi/op/op.h"
#include "osc_rdma.h"
#include "osc_rdma_sendreq.h"
#include "osc_rdma_header.h"
#include "osc_rdma_obj_convert.h"
int
ompi_osc_rdma_process_op(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_send_header_t *header,
struct ompi_datatype_t *datatype,
ompi_op_t *op,
void *inbuf,
size_t inbuflen)
{
unsigned char *target_buffer;
/* compute target buffer location */
target_buffer = (unsigned char*) module->p2p_win->w_baseptr +
(header->hdr_target_disp * module->p2p_win->w_disp_unit);
/* BWB - fix me - change back to the pointer comparison when the
replace o_f_to_c_index is set properly */
/* if (op == &ompi_mpi_op_replace) { */
if (header->hdr_target_op == ompi_mpi_op_replace.o_f_to_c_index) {
ompi_convertor_t convertor;
struct iovec iov;
uint32_t iov_count = 1;
int32_t free_after = 0;
size_t max_data;
ompi_proc_t *proc;
/* create convertor */
OBJ_CONSTRUCT(&convertor, ompi_convertor_t);
/* initialize convertor */
proc = ompi_comm_peer_lookup(module->p2p_comm, header->hdr_origin);
ompi_convertor_copy_and_prepare_for_recv(proc->proc_convertor,
datatype,
header->hdr_target_count,
target_buffer,
0,
&convertor);
/* short circuit the reduction operation MPI_REPLACE - it just
replaces the data, so push it out into the user's buffer.
This lets us avoid both the overhead of using the op
invocation and dealing with non-contiguous reductions
(since there are never user-defined reductions in
MPI_ACCUMULATE) */
iov.iov_len = inbuflen;
iov.iov_base = inbuf;
max_data = iov.iov_len;
ompi_convertor_unpack(&convertor,
&iov,
&iov_count,
&max_data,
&free_after);
OBJ_DESTRUCT(&convertor);
} else {
/* reductions other than MPI_REPLACE. Since user-defined
reductions aren't allowed, these all have to be over
contigous data. We make sure to only send complete
datatypes in these cases, so we can unpack directly from
the user buffer*/
/* BWB - FIX ME - this won't work if endianness is different.
Talk to George about a ddt function that allows us to fix
endianness "in place' or what else we could do here to keep
performance from sucking... */
ompi_op_reduce(op, inbuf, target_buffer, header->hdr_target_count,
datatype);
}
return OMPI_SUCCESS;
}

72
ompi/mca/osc/rdma/osc_rdma_obj_convert.h Обычный файл
Просмотреть файл

@ -0,0 +1,72 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/*
* utility functions for dealing with remote datatype and op structures
*/
#include "mpi.h"
#include "ompi/datatype/datatype.h"
static inline
struct ompi_datatype_t*
ompi_osc_rdma_datatype_create(ompi_proc_t *remote_proc, void **payload)
{
struct ompi_datatype_t *datatype =
ompi_ddt_create_from_packed_description(payload, remote_proc);
if (ompi_ddt_is_predefined(datatype)) OBJ_RETAIN(datatype);
return datatype;
}
static inline
ompi_op_t *
ompi_osc_rdma_op_create(int op_id)
{
ompi_op_t *op = MPI_Op_f2c(op_id);
OBJ_RETAIN(op);
return op;
}
int ompi_osc_rdma_process_op(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_send_header_t *header,
struct ompi_datatype_t *datatype,
ompi_op_t *op,
void *inbuf,
size_t inbuflen);
/**
* Convert a window index number into a module instance.
*/
static inline ompi_osc_rdma_module_t*
ompi_osc_rdma_windx_to_module(uint32_t windx)
{
int ret;
ompi_osc_rdma_module_t *module;
/* find the right module and dispatch */
ret = opal_hash_table_get_value_uint32(&mca_osc_rdma_component.p2p_c_modules,
windx,
(void**) (&module));
if (OMPI_SUCCESS != ret) {
opal_output(0, "Could not translate windx %d to a local MPI_Win instance",
windx);
return NULL;
}
return module;
}

79
ompi/mca/osc/rdma/osc_rdma_replyreq.c Обычный файл
Просмотреть файл

@ -0,0 +1,79 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_rdma_replyreq.h"
#include "opal/class/opal_list.h"
#include "ompi/datatype/convertor.h"
int
ompi_osc_rdma_replyreq_alloc_init(ompi_osc_rdma_module_t *module,
int origin,
ompi_ptr_t origin_request,
int target_displacement,
int target_count,
struct ompi_datatype_t *datatype,
ompi_osc_rdma_replyreq_t **replyreq)
{
int ret;
void *target_addr = (unsigned char*) module->p2p_win->w_baseptr +
(target_displacement * module->p2p_win->w_disp_unit);
/* allocate a replyreq */
ret = ompi_osc_rdma_replyreq_alloc(module,
origin,
replyreq);
if (OMPI_SUCCESS != ret) return ret;
/* initialize local side of replyreq */
ret = ompi_osc_rdma_replyreq_init_target(*replyreq,
target_addr,
target_count,
datatype);
if (OMPI_SUCCESS != ret) {
ompi_osc_rdma_replyreq_free(*replyreq);
return ret;
}
/* initialize remote side of replyreq */
ret = ompi_osc_rdma_replyreq_init_origin(*replyreq,
origin_request);
if (OMPI_SUCCESS != ret) {
ompi_osc_rdma_replyreq_free(*replyreq);
return ret;
}
return OMPI_SUCCESS;
}
static void ompi_osc_rdma_replyreq_construct(ompi_osc_rdma_replyreq_t *replyreq)
{
OBJ_CONSTRUCT(&(replyreq->rep_target_convertor), ompi_convertor_t);
}
static void ompi_osc_rdma_replyreq_destruct(ompi_osc_rdma_replyreq_t *replyreq)
{
OBJ_DESTRUCT(&(replyreq->rep_target_convertor));
}
OBJ_CLASS_INSTANCE(ompi_osc_rdma_replyreq_t, opal_list_item_t,
ompi_osc_rdma_replyreq_construct,
ompi_osc_rdma_replyreq_destruct);

139
ompi/mca/osc/rdma/osc_rdma_replyreq.h Обычный файл
Просмотреть файл

@ -0,0 +1,139 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OMPI_OSC_PT2PT_REPLYREQ_H
#define OMPI_OSC_PT2PT_REPLYREQ_H
#include "osc_rdma.h"
#include "osc_rdma_longreq.h"
#include "opal/class/opal_list.h"
#include "opal/threads/mutex.h"
#include "ompi/datatype/datatype.h"
#include "ompi/datatype/convertor.h"
#include "ompi/communicator/communicator.h"
#include "ompi/proc/proc.h"
#include "ompi/op/op.h"
#include "ompi/mca/pml/pml.h"
struct ompi_osc_rdma_replyreq_t {
opal_list_item_t super;
/** pointer to the module that created the replyreq */
ompi_osc_rdma_module_t *rep_module;
/** Datatype for the target side of the operation */
struct ompi_datatype_t *rep_target_datatype;
/** Convertor for the target. Always setup for send. */
ompi_convertor_t rep_target_convertor;
/** packed size of message on the target side */
size_t rep_target_bytes_packed;
/** rank in module's communicator for origin of operation */
int rep_origin_rank;
/** pointer to the proc structure for the origin of the operation */
ompi_proc_t *rep_origin_proc;
ompi_ptr_t rep_origin_sendreq;
};
typedef struct ompi_osc_rdma_replyreq_t ompi_osc_rdma_replyreq_t;
OBJ_CLASS_DECLARATION(ompi_osc_rdma_replyreq_t);
/** allocate and populate a replyreq structure. datatype is
RETAINed for the life of the replyreq */
int
ompi_osc_rdma_replyreq_alloc_init(ompi_osc_rdma_module_t *module,
int origin,
ompi_ptr_t origin_request,
int target_displacement,
int target_count,
struct ompi_datatype_t *datatype,
ompi_osc_rdma_replyreq_t **replyreq);
static inline int
ompi_osc_rdma_replyreq_alloc(ompi_osc_rdma_module_t *module,
int origin_rank,
ompi_osc_rdma_replyreq_t **replyreq)
{
int ret;
opal_free_list_item_t *item;
ompi_proc_t *proc = module->p2p_comm->c_pml_procs[origin_rank]->proc_ompi;
/* BWB - FIX ME - is this really the right return code? */
if (NULL == proc) return OMPI_ERR_OUT_OF_RESOURCE;
OPAL_FREE_LIST_GET(&mca_osc_rdma_component.p2p_c_replyreqs,
item, ret);
if (OMPI_SUCCESS != ret) return ret;
*replyreq = (ompi_osc_rdma_replyreq_t*) item;
(*replyreq)->rep_module = module;
(*replyreq)->rep_origin_rank = origin_rank;
(*replyreq)->rep_origin_proc = proc;
return OMPI_SUCCESS;
}
static inline int
ompi_osc_rdma_replyreq_init_target(ompi_osc_rdma_replyreq_t *replyreq,
void *target_addr,
int target_count,
struct ompi_datatype_t *target_dt)
{
OBJ_RETAIN(target_dt);
replyreq->rep_target_datatype = target_dt;
ompi_convertor_copy_and_prepare_for_send(replyreq->rep_origin_proc->proc_convertor,
target_dt,
target_count,
target_addr,
0,
&(replyreq->rep_target_convertor));
ompi_convertor_get_packed_size(&replyreq->rep_target_convertor,
&replyreq->rep_target_bytes_packed);
return OMPI_SUCCESS;
}
static inline int
ompi_osc_rdma_replyreq_init_origin(ompi_osc_rdma_replyreq_t *replyreq,
ompi_ptr_t origin_request)
{
replyreq->rep_origin_sendreq = origin_request;
return OMPI_SUCCESS;
}
static inline int
ompi_osc_rdma_replyreq_free(ompi_osc_rdma_replyreq_t *replyreq)
{
ompi_convertor_cleanup(&replyreq->rep_target_convertor);
OBJ_RELEASE(replyreq->rep_target_datatype);
OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.p2p_c_replyreqs,
(opal_list_item_t*) replyreq);
return OMPI_SUCCESS;
}
#endif /* OMPI_OSC_PT2PT_REPLYREQ_H */

83
ompi/mca/osc/rdma/osc_rdma_sendreq.c Обычный файл
Просмотреть файл

@ -0,0 +1,83 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_rdma_sendreq.h"
#include "ompi/datatype/convertor.h"
int
ompi_osc_rdma_sendreq_alloc_init(ompi_osc_rdma_req_type_t req_type,
void *origin_addr, int origin_count,
struct ompi_datatype_t *origin_dt,
int target, int target_disp, int target_count,
struct ompi_datatype_t *target_dt,
ompi_osc_rdma_module_t *module,
ompi_osc_rdma_sendreq_t **sendreq)
{
int ret;
/* allocate a sendreq */
ret = ompi_osc_rdma_sendreq_alloc(module, target,
sendreq);
if (OMPI_SUCCESS != ret) return ret;
/* initialize local side of sendreq */
ret = ompi_osc_rdma_sendreq_init_origin(*sendreq,
req_type,
origin_addr,
origin_count,
origin_dt);
if (OMPI_SUCCESS != ret) {
ompi_osc_rdma_sendreq_free(*sendreq);
return ret;
}
/* initialize remote side of sendreq */
ret = ompi_osc_rdma_sendreq_init_target(*sendreq,
target_disp,
target_count,
target_dt);
if (OMPI_SUCCESS != ret) {
ompi_osc_rdma_sendreq_free(*sendreq);
return ret;
}
return OMPI_SUCCESS;
}
static void ompi_osc_rdma_sendreq_construct(ompi_osc_rdma_sendreq_t *req)
{
req->super.req_type = OMPI_REQUEST_WIN;
req->super.req_free = NULL;
req->super.req_cancel = NULL;
OBJ_CONSTRUCT(&(req->req_origin_convertor), ompi_convertor_t);
}
static void ompi_osc_rdma_sendreq_destruct(ompi_osc_rdma_sendreq_t *req)
{
OBJ_DESTRUCT(&(req->req_origin_convertor));
}
OBJ_CLASS_INSTANCE(ompi_osc_rdma_sendreq_t, ompi_request_t,
ompi_osc_rdma_sendreq_construct,
ompi_osc_rdma_sendreq_destruct);

175
ompi/mca/osc/rdma/osc_rdma_sendreq.h Обычный файл
Просмотреть файл

@ -0,0 +1,175 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OMPI_OSC_PT2PT_SENDREQ_H
#define OMPI_OSC_PT2PT_SENDREQ_H
#include "osc_rdma.h"
#include "osc_rdma_longreq.h"
#include "opal/class/opal_list.h"
#include "opal/threads/mutex.h"
#include "ompi/datatype/datatype.h"
#include "ompi/datatype/convertor.h"
#include "ompi/communicator/communicator.h"
#include "ompi/proc/proc.h"
#include "ompi/op/op.h"
#include "ompi/mca/pml/pml.h"
typedef enum {
OMPI_OSC_PT2PT_GET,
OMPI_OSC_PT2PT_ACC,
OMPI_OSC_PT2PT_PUT
} ompi_osc_rdma_req_type_t;
struct ompi_osc_rdma_sendreq_t {
ompi_request_t super;
/** type of sendreq (from ompi_osc_rdma_req_type_t) */
ompi_osc_rdma_req_type_t req_type;
/** pointer to the module that created the sendreq */
ompi_osc_rdma_module_t *req_module;
/** Datatype for the origin side of the operation */
struct ompi_datatype_t *req_origin_datatype;
/** Convertor for the origin side of the operation. Setup for
either send (Put / Accumulate) or receive (Get) */
ompi_convertor_t req_origin_convertor;
/** packed size of message on the origin side */
size_t req_origin_bytes_packed;
/** rank in module's communicator for target of operation */
int req_target_rank;
/** pointer to the proc structure for the target of the operation */
ompi_proc_t *req_target_proc;
/** displacement on target */
int req_target_disp;
/** datatype count on target */
int req_target_count;
/** datatype on target */
struct ompi_datatype_t *req_target_datatype;
/** op index on the target */
int req_op_id;
};
typedef struct ompi_osc_rdma_sendreq_t ompi_osc_rdma_sendreq_t;
OBJ_CLASS_DECLARATION(ompi_osc_rdma_sendreq_t);
/** allocate and populate a sendreq structure. Both datatypes are
RETAINed for the life of the sendreq */
int
ompi_osc_rdma_sendreq_alloc_init(ompi_osc_rdma_req_type_t req_type,
void *origin_addr, int origin_count,
struct ompi_datatype_t *origin_dt,
int target, int target_disp, int target_count,
struct ompi_datatype_t *target_datatype,
ompi_osc_rdma_module_t *module,
ompi_osc_rdma_sendreq_t **sendreq);
static inline int
ompi_osc_rdma_sendreq_alloc(ompi_osc_rdma_module_t *module,
int target_rank,
ompi_osc_rdma_sendreq_t **sendreq)
{
int ret;
opal_free_list_item_t *item;
ompi_proc_t *proc = module->p2p_comm->c_pml_procs[target_rank]->proc_ompi;
/* BWB - FIX ME - is this really the right return code? */
if (NULL == proc) return OMPI_ERR_OUT_OF_RESOURCE;
OPAL_FREE_LIST_GET(&mca_osc_rdma_component.p2p_c_sendreqs,
item, ret);
if (OMPI_SUCCESS != ret) return ret;
*sendreq = (ompi_osc_rdma_sendreq_t*) item;
(*sendreq)->req_module = module;
(*sendreq)->req_target_rank = target_rank;
(*sendreq)->req_target_proc = proc;
return OMPI_SUCCESS;
}
static inline int
ompi_osc_rdma_sendreq_init_origin(ompi_osc_rdma_sendreq_t *sendreq,
ompi_osc_rdma_req_type_t req_type,
void *origin_addr,
int origin_count,
struct ompi_datatype_t *origin_dt)
{
OBJ_RETAIN(origin_dt);
sendreq->req_origin_datatype = origin_dt;
sendreq->req_type = req_type;
if (req_type != OMPI_OSC_PT2PT_GET) {
ompi_convertor_copy_and_prepare_for_send(sendreq->req_target_proc->proc_convertor,
origin_dt,
origin_count,
origin_addr,
0,
&(sendreq->req_origin_convertor));
ompi_convertor_get_packed_size(&sendreq->req_origin_convertor,
&sendreq->req_origin_bytes_packed);
} else {
ompi_convertor_copy_and_prepare_for_recv(sendreq->req_target_proc->proc_convertor,
origin_dt,
origin_count,
origin_addr,
0,
&(sendreq->req_origin_convertor));
ompi_convertor_get_packed_size(&sendreq->req_origin_convertor,
&sendreq->req_origin_bytes_packed);
}
return OMPI_SUCCESS;
}
static inline int
ompi_osc_rdma_sendreq_init_target(ompi_osc_rdma_sendreq_t *sendreq,
int target_disp,
int target_count,
struct ompi_datatype_t *target_datatype)
{
OBJ_RETAIN(target_datatype);
sendreq->req_target_disp = target_disp;
sendreq->req_target_count = target_count;
sendreq->req_target_datatype = target_datatype;
return OMPI_SUCCESS;
}
static inline int
ompi_osc_rdma_sendreq_free(ompi_osc_rdma_sendreq_t *sendreq)
{
ompi_convertor_cleanup(&sendreq->req_origin_convertor);
OBJ_RELEASE(sendreq->req_target_datatype);
OBJ_RELEASE(sendreq->req_origin_datatype);
OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.p2p_c_sendreqs,
(opal_list_item_t*) sendreq);
return OMPI_SUCCESS;
}
#endif /* OMPI_OSC_PT2PT_SENDREQ_H */

620
ompi/mca/osc/rdma/osc_rdma_sync.c Обычный файл
Просмотреть файл

@ -0,0 +1,620 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_rdma.h"
#include "osc_rdma_sendreq.h"
#include "osc_rdma_longreq.h"
#include "osc_rdma_header.h"
#include "osc_rdma_data_move.h"
#include "mpi.h"
#include "opal/runtime/opal_progress.h"
#include "opal/threads/mutex.h"
#include "ompi/communicator/communicator.h"
/* should have p2p_lock before calling */
static inline void
ompi_osc_rdma_progress(ompi_osc_rdma_module_t *module)
{
if (0 != opal_list_get_size(&(module->p2p_long_msgs))) {
opal_list_item_t *item, *next;
OPAL_THREAD_LOCK(&(module->p2p_lock));
/* Have to go the convoluted while() route instead of a for()
loop because the callback will likely remove the request
from the list and free it, and that would lead to much
badness. */
next = opal_list_get_first(&(module->p2p_long_msgs));
while (opal_list_get_end(&(module->p2p_long_msgs)) != (item = next)) {
ompi_osc_rdma_longreq_t *longreq =
(ompi_osc_rdma_longreq_t*) item;
int ret, completed;
next = opal_list_get_next(item);
ret = ompi_request_test(&(longreq->req_pml_req), &completed, NULL);
/* BWB - FIX ME - error handling */
if (completed > 0) {
longreq->req_comp_cb(longreq);
}
}
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
}
opal_progress();
}
static inline void
ompi_osc_rdma_flip_sendreqs(ompi_osc_rdma_module_t *module)
{
short *tmp;
OPAL_THREAD_LOCK(&(module->p2p_lock));
tmp = module->p2p_copy_num_pending_sendreqs;
module->p2p_copy_num_pending_sendreqs =
module->p2p_num_pending_sendreqs;
module->p2p_num_pending_sendreqs = tmp;
memset(module->p2p_num_pending_sendreqs, 0,
sizeof(short) * ompi_comm_size(module->p2p_comm));
/* Copy in all the pending requests */
opal_list_join(&module->p2p_copy_pending_sendreqs,
opal_list_get_end(&module->p2p_copy_pending_sendreqs),
&module->p2p_pending_sendreqs);
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
}
int
ompi_osc_rdma_module_fence(int assert, ompi_win_t *win)
{
short incoming_reqs;
int ret = OMPI_SUCCESS, i;
if (0 != (assert & MPI_MODE_NOPRECEDE)) {
int num_pending;
/* check that the user didn't lie to us - since NOPRECEDED
must be specified by all processes if it is specified by
any process, if we see this it is safe to assume that there
are no pending operations anywhere needed to close out this
epoch. */
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
num_pending = opal_list_get_size(&(P2P_MODULE(win)->p2p_pending_sendreqs));
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
if (0 != num_pending) {
return MPI_ERR_RMA_SYNC;
}
} else {
opal_list_item_t *item;
ompi_osc_rdma_flip_sendreqs(P2P_MODULE(win));
switch (P2P_MODULE(win)->p2p_fence_sync_type) {
/* find out how much data everyone is going to send us. Need
to have the lock during this period so that we have a sane
view of the number of sendreqs */
case OSC_SYNC_REDUCE_SCATTER:
ret = P2P_MODULE(win)->p2p_comm->
c_coll.coll_reduce_scatter(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs,
&incoming_reqs,
P2P_MODULE(win)->p2p_fence_coll_counts,
MPI_SHORT,
MPI_SUM,
P2P_MODULE(win)->p2p_comm);
break;
case OSC_SYNC_ALLREDUCE:
ret = P2P_MODULE(win)->p2p_comm->
c_coll.coll_allreduce(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs,
P2P_MODULE(win)->p2p_fence_coll_results,
ompi_comm_size(P2P_MODULE(win)->p2p_comm),
MPI_SHORT,
MPI_SUM,
P2P_MODULE(win)->p2p_comm);
incoming_reqs = P2P_MODULE(win)->
p2p_fence_coll_results[P2P_MODULE(win)->p2p_comm->c_my_rank];
break;
case OSC_SYNC_ALLTOALL:
ret = P2P_MODULE(win)->p2p_comm->
c_coll.coll_alltoall(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs,
1,
MPI_SHORT,
P2P_MODULE(win)->p2p_fence_coll_results,
1,
MPI_SHORT,
P2P_MODULE(win)->p2p_comm);
incoming_reqs = 0;
for (i = 0 ; i < ompi_comm_size(P2P_MODULE(win)->p2p_comm) ; ++i) {
incoming_reqs += P2P_MODULE(win)->p2p_fence_coll_results[i];
}
break;
default:
assert(0 == 1);
}
if (OMPI_SUCCESS != ret) {
/* put the stupid data back for the user. This is not
cheap, but the user lost his data if we don't. */
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
opal_list_join(&P2P_MODULE(win)->p2p_pending_sendreqs,
opal_list_get_end(&P2P_MODULE(win)->p2p_pending_sendreqs),
&P2P_MODULE(win)->p2p_copy_pending_sendreqs);
for (i = 0 ; i < ompi_comm_size(P2P_MODULE(win)->p2p_comm) ; ++i) {
P2P_MODULE(win)->p2p_num_pending_sendreqs[i] +=
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[i];
}
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
return ret;
}
/* possible we've already received a couple in messages, so
atomicall add however many we're going to wait for */
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_in), incoming_reqs);
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out),
opal_list_get_size(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)));
opal_output(-1, "fence: waiting on %d in and %d out",
P2P_MODULE(win)->p2p_num_pending_in,
P2P_MODULE(win)->p2p_num_pending_out);
/* try to start all the requests. We've copied everything we
need out of pending_sendreqs, so don't need the lock
here */
while (NULL !=
(item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) {
ompi_osc_rdma_sendreq_t *req =
(ompi_osc_rdma_sendreq_t*) item;
ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), req);
if (OMPI_SUCCESS != ret) {
opal_output(0, "fence: failure in starting sendreq (%d). Will try later.",
ret);
opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);
}
}
/* now we know how many things we're waiting for - wait for them... */
while (P2P_MODULE(win)->p2p_num_pending_in > 0 ||
0 != P2P_MODULE(win)->p2p_num_pending_out) {
ompi_osc_rdma_progress(P2P_MODULE(win));
}
}
/* all transfers are done - back to the real world we go */
if (0 == (assert & MPI_MODE_NOSUCCEED)) {
ompi_win_set_mode(win, OMPI_WIN_FENCE);
} else {
ompi_win_set_mode(win, 0);
}
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_module_start(ompi_group_t *group,
int assert,
ompi_win_t *win)
{
assert(P2P_MODULE(win)->p2p_num_pending_in == 0);
assert(P2P_MODULE(win)->p2p_num_pending_out == 0);
OBJ_RETAIN(group);
/* BWB - do I need this? */
ompi_group_increment_proc_count(group);
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
assert(NULL == P2P_MODULE(win)->p2p_sc_group);
P2P_MODULE(win)->p2p_sc_group = group;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
/* Set our mode to access w/ start */
ompi_win_set_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_STARTED);
/* possible we've already received a couple in messages, so
atomicall add however many we're going to wait for */
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_in),
ompi_group_size(P2P_MODULE(win)->p2p_sc_group));
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_module_complete(ompi_win_t *win)
{
int i;
int ret = OMPI_SUCCESS;
ompi_group_t *group;
opal_list_item_t *item;
/* wait for all the post messages */
while (0 != P2P_MODULE(win)->p2p_num_pending_in) {
ompi_osc_rdma_progress(P2P_MODULE(win));
}
ompi_osc_rdma_flip_sendreqs(P2P_MODULE(win));
/* for each process in group, send a control message with number
of updates coming, then start all the requests */
for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_sc_group) ; ++i) {
int comm_rank = -1, j;
/* no need to increment ref count - the communicator isn't
going anywhere while we're here */
ompi_group_t *comm_group = P2P_MODULE(win)->p2p_comm->c_local_group;
/* find the rank in the communicator associated with this windows */
for (j = 0 ;
j < ompi_group_size(comm_group) ;
++j) {
if (P2P_MODULE(win)->p2p_sc_group->grp_proc_pointers[i] ==
comm_group->grp_proc_pointers[j]) {
comm_rank = j;
break;
}
}
if (comm_rank == -1) {
ret = MPI_ERR_RMA_SYNC;
goto cleanup;
}
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out),
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank]);
ompi_osc_rdma_control_send(P2P_MODULE(win),
P2P_MODULE(win)->p2p_sc_group->grp_proc_pointers[i],
OMPI_OSC_PT2PT_HDR_COMPLETE,
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank],
0);
}
/* try to start all the requests. We've copied everything we
need out of pending_sendreqs, so don't need the lock
here */
while (NULL !=
(item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) {
ompi_osc_rdma_sendreq_t *req =
(ompi_osc_rdma_sendreq_t*) item;
ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), req);
if (OMPI_SUCCESS != ret) {
opal_output(0, "complete: failure in starting sendreq (%d). Will try later.",
ret);
opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);
}
}
/* wait for all the requests */
while (0 != P2P_MODULE(win)->p2p_num_pending_out) {
ompi_osc_rdma_progress(P2P_MODULE(win));
}
cleanup:
/* set our mode back to nothing */
ompi_win_set_mode(win, 0);
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
group = P2P_MODULE(win)->p2p_sc_group;
P2P_MODULE(win)->p2p_sc_group = NULL;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
/* BWB - do I need this? */
ompi_group_decrement_proc_count(group);
OBJ_RELEASE(group);
return ret;
}
int
ompi_osc_rdma_module_post(ompi_group_t *group,
int assert,
ompi_win_t *win)
{
int i;
assert(P2P_MODULE(win)->p2p_num_pending_in == 0);
assert(P2P_MODULE(win)->p2p_num_pending_out == 0);
OBJ_RETAIN(group);
/* BWB - do I need this? */
ompi_group_increment_proc_count(group);
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
assert(NULL == P2P_MODULE(win)->p2p_pw_group);
P2P_MODULE(win)->p2p_pw_group = group;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
/* Set our mode to expose w/ post */
ompi_win_set_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED);
/* list how many complete counters we're still waiting on */
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out),
ompi_group_size(P2P_MODULE(win)->p2p_pw_group));
/* send a hello counter to everyone in group */
for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_pw_group) ; ++i) {
ompi_osc_rdma_control_send(P2P_MODULE(win),
group->grp_proc_pointers[i],
OMPI_OSC_PT2PT_HDR_POST, 1, 0);
}
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_module_wait(ompi_win_t *win)
{
ompi_group_t *group;
while (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||
0 != (P2P_MODULE(win)->p2p_num_pending_out)) {
ompi_osc_rdma_progress(P2P_MODULE(win));
}
ompi_win_set_mode(win, 0);
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
group = P2P_MODULE(win)->p2p_pw_group;
P2P_MODULE(win)->p2p_pw_group = NULL;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
/* BWB - do I need this? */
ompi_group_decrement_proc_count(group);
OBJ_RELEASE(group);
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_module_test(ompi_win_t *win,
int *flag)
{
ompi_group_t *group;
if (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||
0 != (P2P_MODULE(win)->p2p_num_pending_out)) {
ompi_osc_rdma_progress(P2P_MODULE(win));
if (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||
0 != (P2P_MODULE(win)->p2p_num_pending_out)) {
*flag = 0;
return OMPI_SUCCESS;
}
}
*flag = 1;
ompi_win_set_mode(win, 0);
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
group = P2P_MODULE(win)->p2p_pw_group;
P2P_MODULE(win)->p2p_pw_group = NULL;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
/* BWB - do I need this? */
ompi_group_decrement_proc_count(group);
OBJ_RELEASE(group);
return OMPI_SUCCESS;
}
struct ompi_osc_rdma_pending_lock_t {
opal_list_item_t super;
ompi_proc_t *proc;
int32_t lock_type;
};
typedef struct ompi_osc_rdma_pending_lock_t ompi_osc_rdma_pending_lock_t;
OBJ_CLASS_INSTANCE(ompi_osc_rdma_pending_lock_t, opal_list_item_t,
NULL, NULL);
int
ompi_osc_rdma_module_lock(int lock_type,
int target,
int assert,
ompi_win_t *win)
{
ompi_proc_t *proc = P2P_MODULE(win)->p2p_comm->c_pml_procs[target]->proc_ompi;
assert(lock_type != 0);
/* set our mode on the window */
ompi_win_set_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_LOCK_ACCESS);
opal_output(-1, "%d sending lock request to %d",
P2P_MODULE(win)->p2p_comm->c_my_rank, target);
/* generate a lock request */
ompi_osc_rdma_control_send(P2P_MODULE(win),
proc,
OMPI_OSC_PT2PT_HDR_LOCK_REQ,
P2P_MODULE(win)->p2p_comm->c_my_rank,
lock_type);
/* return */
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_module_unlock(int target,
ompi_win_t *win)
{
int32_t out_count;
opal_list_item_t *item;
int ret;
ompi_proc_t *proc = P2P_MODULE(win)->p2p_comm->c_pml_procs[target]->proc_ompi;
while (0 == P2P_MODULE(win)->p2p_lock_received_ack) {
ompi_osc_rdma_progress(P2P_MODULE(win));
}
P2P_MODULE(win)->p2p_lock_received_ack = 0;
/* start all the requests */
ompi_osc_rdma_flip_sendreqs(P2P_MODULE(win));
/* try to start all the requests. We've copied everything we need
out of pending_sendreqs, so don't need the lock here */
out_count = opal_list_get_size(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs));
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out), out_count);
while (NULL !=
(item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) {
ompi_osc_rdma_sendreq_t *req =
(ompi_osc_rdma_sendreq_t*) item;
ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), req);
if (OMPI_SUCCESS != ret) {
opal_output(0, "unlock: failure in starting sendreq (%d). Will try later.",
ret);
opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);
}
}
/* wait for all the requests */
while (0 != P2P_MODULE(win)->p2p_num_pending_out) {
ompi_osc_rdma_progress(P2P_MODULE(win));
}
/* send the unlock request */
opal_output(-1, "%d sending unlock request to %d",
P2P_MODULE(win)->p2p_comm->c_my_rank, target);
ompi_osc_rdma_control_send(P2P_MODULE(win),
proc,
OMPI_OSC_PT2PT_HDR_UNLOCK_REQ,
P2P_MODULE(win)->p2p_comm->c_my_rank,
out_count);
/* set our mode on the window */
ompi_win_set_mode(win, 0);
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_passive_lock(ompi_osc_rdma_module_t *module,
int32_t origin,
int32_t lock_type)
{
bool send_ack = false;
int ret = OMPI_SUCCESS;
ompi_proc_t *proc = module->p2p_comm->c_pml_procs[origin]->proc_ompi;
ompi_osc_rdma_pending_lock_t *new_pending;
OPAL_THREAD_LOCK(&(module->p2p_lock));
if (lock_type == MPI_LOCK_EXCLUSIVE) {
if (module->p2p_lock_status == 0) {
module->p2p_lock_status = MPI_LOCK_EXCLUSIVE;
send_ack = true;
} else {
opal_output(-1, "%d queuing lock request from %d (%d)",
module->p2p_comm->c_my_rank, origin, lock_type);
new_pending = OBJ_NEW(ompi_osc_rdma_pending_lock_t);
new_pending->proc = proc;
new_pending->lock_type = lock_type;
opal_list_append(&(module->p2p_locks_pending), &(new_pending->super));
}
} else if (lock_type == MPI_LOCK_SHARED) {
if (module->p2p_lock_status != MPI_LOCK_EXCLUSIVE) {
module->p2p_lock_status = MPI_LOCK_SHARED;
module->p2p_shared_count++;
send_ack = true;
} else {
opal_output(-1, "queuing lock request from %d (%d)",
module->p2p_comm->c_my_rank, origin, lock_type);
new_pending = OBJ_NEW(ompi_osc_rdma_pending_lock_t);
new_pending->proc = proc;
new_pending->lock_type = lock_type;
opal_list_append(&(module->p2p_locks_pending), &(new_pending->super));
}
} else {
ret = OMPI_ERROR;
}
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
if (send_ack) {
opal_output(-1, "%d sending lock ack to %d",
module->p2p_comm->c_my_rank, origin);
ompi_osc_rdma_control_send(module, proc,
OMPI_OSC_PT2PT_HDR_LOCK_REQ,
module->p2p_comm->c_my_rank,
OMPI_SUCCESS);
}
return OMPI_SUCCESS;
}
int
ompi_osc_rdma_passive_unlock(ompi_osc_rdma_module_t *module,
int32_t origin,
int32_t count)
{
ompi_osc_rdma_pending_lock_t *new_pending = NULL;
assert(module->p2p_lock_status != 0);
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), count);
while (0 != module->p2p_num_pending_in) {
ompi_osc_rdma_progress(module);
}
OPAL_THREAD_LOCK(&(module->p2p_lock));
if (module->p2p_lock_status == MPI_LOCK_EXCLUSIVE) {
module->p2p_lock_status = 0;
} else {
module->p2p_shared_count--;
if (module->p2p_shared_count == 0) {
module->p2p_lock_status = 0;
}
}
/* if we were really unlocked, see if we have more to process */
new_pending = (ompi_osc_rdma_pending_lock_t*)
opal_list_remove_first(&(module->p2p_locks_pending));
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
if (NULL != new_pending) {
opal_output(-1, "sending lock request to proc");
/* set lock state and generate a lock request */
module->p2p_lock_status = new_pending->lock_type;
ompi_osc_rdma_control_send(module,
new_pending->proc,
OMPI_OSC_PT2PT_HDR_LOCK_REQ,
module->p2p_comm->c_my_rank,
OMPI_SUCCESS);
OBJ_DESTRUCT(new_pending);
}
return OMPI_SUCCESS;
}