From 28b99299b203d0596f1198a19d0c4211d83f0aa9 Mon Sep 17 00:00:00 2001 From: Brian Barrett Date: Mon, 17 Jul 2006 22:08:55 +0000 Subject: [PATCH] * add new component (RDMA) for RDMA one-sided communication This commit was SVN r10861. --- ompi/mca/osc/rdma/.ompi_ignore | 0 ompi/mca/osc/rdma/.ompi_unignore | 3 + ompi/mca/osc/rdma/Makefile.am | 60 ++ ompi/mca/osc/rdma/configure.params | 18 + ompi/mca/osc/rdma/osc_rdma.c | 78 +++ ompi/mca/osc/rdma/osc_rdma.h | 260 +++++++ ompi/mca/osc/rdma/osc_rdma_comm.c | 210 ++++++ ompi/mca/osc/rdma/osc_rdma_component.c | 616 +++++++++++++++++ ompi/mca/osc/rdma/osc_rdma_data_move.c | 842 +++++++++++++++++++++++ ompi/mca/osc/rdma/osc_rdma_data_move.h | 55 ++ ompi/mca/osc/rdma/osc_rdma_header.h | 136 ++++ ompi/mca/osc/rdma/osc_rdma_longreq.c | 26 + ompi/mca/osc/rdma/osc_rdma_longreq.h | 71 ++ ompi/mca/osc/rdma/osc_rdma_obj_convert.c | 98 +++ ompi/mca/osc/rdma/osc_rdma_obj_convert.h | 72 ++ ompi/mca/osc/rdma/osc_rdma_replyreq.c | 79 +++ ompi/mca/osc/rdma/osc_rdma_replyreq.h | 139 ++++ ompi/mca/osc/rdma/osc_rdma_sendreq.c | 83 +++ ompi/mca/osc/rdma/osc_rdma_sendreq.h | 175 +++++ ompi/mca/osc/rdma/osc_rdma_sync.c | 620 +++++++++++++++++ 20 files changed, 3641 insertions(+) create mode 100644 ompi/mca/osc/rdma/.ompi_ignore create mode 100644 ompi/mca/osc/rdma/.ompi_unignore create mode 100644 ompi/mca/osc/rdma/Makefile.am create mode 100644 ompi/mca/osc/rdma/configure.params create mode 100644 ompi/mca/osc/rdma/osc_rdma.c create mode 100644 ompi/mca/osc/rdma/osc_rdma.h create mode 100644 ompi/mca/osc/rdma/osc_rdma_comm.c create mode 100644 ompi/mca/osc/rdma/osc_rdma_component.c create mode 100644 ompi/mca/osc/rdma/osc_rdma_data_move.c create mode 100644 ompi/mca/osc/rdma/osc_rdma_data_move.h create mode 100644 ompi/mca/osc/rdma/osc_rdma_header.h create mode 100644 ompi/mca/osc/rdma/osc_rdma_longreq.c create mode 100644 ompi/mca/osc/rdma/osc_rdma_longreq.h create mode 100644 ompi/mca/osc/rdma/osc_rdma_obj_convert.c create mode 100644 ompi/mca/osc/rdma/osc_rdma_obj_convert.h create mode 100644 ompi/mca/osc/rdma/osc_rdma_replyreq.c create mode 100644 ompi/mca/osc/rdma/osc_rdma_replyreq.h create mode 100644 ompi/mca/osc/rdma/osc_rdma_sendreq.c create mode 100644 ompi/mca/osc/rdma/osc_rdma_sendreq.h create mode 100644 ompi/mca/osc/rdma/osc_rdma_sync.c diff --git a/ompi/mca/osc/rdma/.ompi_ignore b/ompi/mca/osc/rdma/.ompi_ignore new file mode 100644 index 0000000000..e69de29bb2 diff --git a/ompi/mca/osc/rdma/.ompi_unignore b/ompi/mca/osc/rdma/.ompi_unignore new file mode 100644 index 0000000000..12935bab8c --- /dev/null +++ b/ompi/mca/osc/rdma/.ompi_unignore @@ -0,0 +1,3 @@ +bbarrett +brbarret +bwbarre diff --git a/ompi/mca/osc/rdma/Makefile.am b/ompi/mca/osc/rdma/Makefile.am new file mode 100644 index 0000000000..d0d075d36b --- /dev/null +++ b/ompi/mca/osc/rdma/Makefile.am @@ -0,0 +1,60 @@ +# +# Copyright (c) 2004-2005 The Trustees of Indiana University. +# All rights reserved. +# Copyright (c) 2004-2005 The Trustees of the University of Tennessee. +# All rights reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# Use the top-level Makefile.options + +include $(top_ompi_srcdir)/config/Makefile.options + +pt2pt_sources = \ + osc_rdma.h \ + osc_rdma.c \ + osc_rdma_comm.c \ + osc_rdma_component.c \ + osc_rdma_data_move.h \ + osc_rdma_data_move.c \ + osc_rdma_header.h \ + osc_rdma_longreq.h \ + osc_rdma_longreq.c \ + osc_rdma_obj_convert.h \ + osc_rdma_obj_convert.c \ + osc_rdma_replyreq.h \ + osc_rdma_replyreq.c \ + osc_rdma_sendreq.h \ + osc_rdma_sendreq.c \ + osc_rdma_sync.c + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if OMPI_BUILD_osc_rdma_DSO +component_noinst = +component_install = mca_osc_rdma.la +else +component_noinst = libmca_osc_rdma.la +component_install = +endif + +mcacomponentdir = $(libdir)/openmpi +mcacomponent_LTLIBRARIES = $(component_install) +mca_osc_rdma_la_SOURCES = $(pt2pt_sources) +mca_osc_rdma_la_LIBADD = +mca_osc_rdma_la_LDFLAGS = -module -avoid-version + +noinst_LTLIBRARIES = $(component_noinst) +libmca_osc_rdma_la_SOURCES = $(pt2pt_sources) +libmca_osc_rdma_la_LIBADD = +libmca_osc_rdma_la_LDFLAGS = -module -avoid-version diff --git a/ompi/mca/osc/rdma/configure.params b/ompi/mca/osc/rdma/configure.params new file mode 100644 index 0000000000..c6e9ce75a8 --- /dev/null +++ b/ompi/mca/osc/rdma/configure.params @@ -0,0 +1,18 @@ +# -*- shell-script -*- +# +# Copyright (c) 2004-2005 The Trustees of Indiana University. +# All rights reserved. +# Copyright (c) 2004-2005 The Trustees of the University of Tennessee. +# All rights reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +PARAM_CONFIG_FILES="Makefile" diff --git a/ompi/mca/osc/rdma/osc_rdma.c b/ompi/mca/osc/rdma/osc_rdma.c new file mode 100644 index 0000000000..af58c6e689 --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma.c @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "osc_rdma.h" +#include "osc_rdma_sendreq.h" + +#include "opal/threads/mutex.h" +#include "ompi/win/win.h" +#include "ompi/communicator/communicator.h" +#include "ompi/mca/btl/btl.h" +#include "mpi.h" + + +int +ompi_osc_rdma_module_free(ompi_win_t *win) +{ + int ret = OMPI_SUCCESS; + int tmp; + ompi_osc_rdma_module_t *module = P2P_MODULE(win); + + /* finish with a barrier */ + if (ompi_group_size(win->w_group) > 1) { + ret = module->p2p_comm->c_coll.coll_barrier(module->p2p_comm); + } + + /* remove window information */ + win->w_osc_module = NULL; + + /* remove from component information */ + OPAL_THREAD_LOCK(&mca_osc_rdma_component.p2p_c_lock); + tmp = opal_hash_table_remove_value_uint32(&mca_osc_rdma_component.p2p_c_modules, + module->p2p_comm->c_contextid); + /* only take the output of hast_table_remove if there wasn't already an error */ + ret = (ret != OMPI_SUCCESS) ? ret : tmp; + OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.p2p_c_lock); + + OBJ_DESTRUCT(&(module->p2p_locks_pending)); + + assert(module->p2p_sc_group == NULL); + assert(module->p2p_pw_group == NULL); + free(module->p2p_fence_coll_counts); + + free(module->p2p_copy_num_pending_sendreqs); + OBJ_DESTRUCT(&(module->p2p_copy_pending_sendreqs)); + + OBJ_DESTRUCT(&(module->p2p_long_msgs)); + + free(module->p2p_num_pending_sendreqs); + + OBJ_DESTRUCT(&(module->p2p_pending_sendreqs)); + + ompi_comm_free(&(module->p2p_comm)); + module->p2p_comm = NULL; + + module->p2p_win = NULL; + + OBJ_DESTRUCT(&(module->p2p_acc_lock)); + OBJ_DESTRUCT(&(module->p2p_lock)); + + free(module); + + return ret; +} diff --git a/ompi/mca/osc/rdma/osc_rdma.h b/ompi/mca/osc/rdma/osc_rdma.h new file mode 100644 index 0000000000..ddbd655237 --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma.h @@ -0,0 +1,260 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OMPI_OSC_PT2PT_H +#define OMPI_OSC_PT2PT_H + +#include "opal/class/opal_list.h" +#include "opal/class/opal_free_list.h" +#include "opal/class/opal_hash_table.h" + +#include "ompi/mca/osc/osc.h" +#include "ompi/mca/btl/btl.h" +#include "ompi/win/win.h" +#include "ompi/communicator/communicator.h" + +struct ompi_osc_rdma_component_t { + /** Extend the basic osc component interface */ + ompi_osc_base_component_t super; + + /** store the state of progress threads for this instance of OMPI */ + bool p2p_c_have_progress_threads; + + /** lock access to datastructures in the component structure */ + opal_mutex_t p2p_c_lock; + + /** List of ompi_osc_rdma_module_ts currently in existance. + Needed so that received fragments can be dispatched to the + correct module */ + opal_hash_table_t p2p_c_modules; + + /** free list of ompi_osc_rdma_sendreq_t structures */ + opal_free_list_t p2p_c_sendreqs; + /** free list of ompi_osc_rdma_replyreq_t structures */ + opal_free_list_t p2p_c_replyreqs; + /** free list of ompi_osc_rdma_longreq_t structures */ + opal_free_list_t p2p_c_longreqs; +}; +typedef struct ompi_osc_rdma_component_t ompi_osc_rdma_component_t; + + +struct ompi_osc_rdma_module_t { + /** Extend the basic osc module interface */ + ompi_osc_base_module_t super; + + /** lock access to data structures in the current module */ + opal_mutex_t p2p_lock; + + /** lock for "atomic" window updates from reductions */ + opal_mutex_t p2p_acc_lock; + + /** pointer back to window */ + ompi_win_t *p2p_win; + + /** communicator created with this window */ + ompi_communicator_t *p2p_comm; + + /** list of ompi_osc_rdma_sendreq_t structures, and includes all + requests for this access epoch that have not already been + started. p2p_lock must be held when modifying this field. */ + opal_list_t p2p_pending_sendreqs; + + /** list of int16_t counters for the number of requests to a + particular rank in p2p_comm for this access epoc. p2p_lock + must be held when modifying this field */ + short *p2p_num_pending_sendreqs; + + /** For MPI_Fence synchronization, the number of messages to send + in epoch. For Start/Complete, the number of updates for this + Complete. For Post/Wait (poorly named), the number of + Complete counters we're waiting for. For lock, the number of + messages waiting for completion on on the origin side. Not + protected by p2p_lock - must use atomic counter operations. */ + volatile int32_t p2p_num_pending_out; + + /** For MPI_Fence synchronization, the number of expected incoming + messages. For Start/Complete, the number of expected Post + messages. For Post/Wait, the number of expected updates from + complete. For lock, the number of messages on the passive side + we are waiting for. Not protected by p2p_lock - must use + atomic counter operations. */ + volatile int32_t p2p_num_pending_in; + + /** cyclic counter for a unique tage for long messages. Not + protected by the p2p_lock - must use create_send_tag() to + create a send tag */ + volatile int32_t p2p_tag_counter; + + /** list of outstanding long messages that must be processes + (ompi_osc_rdma_request_long). Protected by p2p_lock. */ + opal_list_t p2p_long_msgs; + + opal_list_t p2p_copy_pending_sendreqs; + short *p2p_copy_num_pending_sendreqs; + + bool p2p_eager_send; + + /* ********************* FENCE data ************************ */ + /* an array of ints, each containing the value + 1. */ + int *p2p_fence_coll_counts; + /* an array of shorts, for use in experimenting + with different synchronization costs */ + short *p2p_fence_coll_results; + + enum { OSC_SYNC_REDUCE_SCATTER, OSC_SYNC_ALLREDUCE, OSC_SYNC_ALLTOALL } p2p_fence_sync_type; + + /* ********************* PWSC data ************************ */ + + struct ompi_group_t *p2p_pw_group; + struct ompi_group_t *p2p_sc_group; + + /* ********************* LOCK data ************************ */ + int32_t p2p_lock_status; /* one of 0, MPI_LOCK_EXCLUSIVE, MPI_LOCK_SHARED */ + int32_t p2p_shared_count; + opal_list_t p2p_locks_pending; + int32_t p2p_lock_received_ack; +}; +typedef struct ompi_osc_rdma_module_t ompi_osc_rdma_module_t; + + +extern ompi_osc_rdma_component_t mca_osc_rdma_component; + + +/* + * Helper macro for grabbing the module structure from a window instance + */ +#if OMPI_ENABLE_DEBUG + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif + +static inline ompi_osc_rdma_module_t* P2P_MODULE(struct ompi_win_t* win) +{ + ompi_osc_rdma_module_t *module = + (ompi_osc_rdma_module_t*) win->w_osc_module; + + assert(module->p2p_win == win); + + return module; +} + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif + + +#else +#define P2P_MODULE(win) ((ompi_osc_rdma_module_t*) win->w_osc_module) +#endif + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif + + +/* + * Component functions + */ + +int ompi_osc_rdma_component_init(bool enable_progress_threads, + bool enable_mpi_threads); + +int ompi_osc_rdma_component_finalize(void); + +int ompi_osc_rdma_component_query(struct ompi_win_t *win, + struct ompi_info_t *info, + struct ompi_communicator_t *comm); + +int ompi_osc_rdma_component_select(struct ompi_win_t *win, + struct ompi_info_t *info, + struct ompi_communicator_t *comm); + + +/* + * Module interface function types + */ +int ompi_osc_rdma_module_free(struct ompi_win_t *win); + +int ompi_osc_rdma_module_put(void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + int target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_win_t *win); + +int ompi_osc_rdma_module_accumulate(void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + int target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, + struct ompi_win_t *win); + +int ompi_osc_rdma_module_get(void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + int target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_win_t *win); + +int ompi_osc_rdma_module_fence(int assert, struct ompi_win_t *win); + +int ompi_osc_rdma_module_start(struct ompi_group_t *group, + int assert, + struct ompi_win_t *win); +int ompi_osc_rdma_module_complete(struct ompi_win_t *win); + +int ompi_osc_rdma_module_post(struct ompi_group_t *group, + int assert, + struct ompi_win_t *win); + +int ompi_osc_rdma_module_wait(struct ompi_win_t *win); + +int ompi_osc_rdma_module_test(struct ompi_win_t *win, + int *flag); + +int ompi_osc_rdma_module_lock(int lock_type, + int target, + int assert, + struct ompi_win_t *win); + +int ompi_osc_rdma_module_unlock(int target, + struct ompi_win_t *win); + +/* + * passive side sync interface functions + */ +int ompi_osc_rdma_passive_lock(ompi_osc_rdma_module_t *module, + int32_t origin, + int32_t lock_type); + +int ompi_osc_rdma_passive_unlock(ompi_osc_rdma_module_t *module, + int32_t origin, + int32_t count); + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif + +#endif /* OMPI_OSC_PT2PT_H */ diff --git a/ompi/mca/osc/rdma/osc_rdma_comm.c b/ompi/mca/osc/rdma/osc_rdma_comm.c new file mode 100644 index 0000000000..d9dbc24c24 --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma_comm.c @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "mpi.h" + +#include + +#include "osc_rdma.h" +#include "osc_rdma_sendreq.h" +#include "osc_rdma_header.h" +#include "osc_rdma_data_move.h" + +static int +enqueue_sendreq(ompi_osc_rdma_module_t *module, + ompi_osc_rdma_sendreq_t *sendreq) +{ + OPAL_THREAD_LOCK(&(module->p2p_lock)); + opal_list_append(&(module->p2p_pending_sendreqs), + (opal_list_item_t*) sendreq); + module->p2p_num_pending_sendreqs[sendreq->req_target_rank]++; + OPAL_THREAD_UNLOCK(&(module->p2p_lock)); + + return OMPI_SUCCESS; +} + + + +int +ompi_osc_rdma_module_accumulate(void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, int target_disp, int target_count, + struct ompi_datatype_t *target_dt, + struct ompi_op_t *op, ompi_win_t *win) +{ + int ret; + ompi_osc_rdma_sendreq_t *sendreq; + + if (OMPI_WIN_FENCE & ompi_win_get_mode(win)) { + /* well, we're definitely in an access epoch now */ + ompi_win_set_mode(win, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH | + OMPI_WIN_EXPOSE_EPOCH); + } + + if (op != &ompi_mpi_op_replace && + !ompi_ddt_is_predefined(target_dt)) { + fprintf(stderr, "MPI_Accumulate currently does not support reductions\n"); + fprintf(stderr, "with any user-defined types. This will be rectified\n"); + fprintf(stderr, "in a future release.\n"); + return MPI_ERR_UNSUPPORTED_OPERATION; + } + + /* shortcut 0 count case */ + if (0 == origin_count || 0 == target_count) { + return OMPI_SUCCESS; + } + + /* create sendreq */ + ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_PT2PT_ACC, + origin_addr, + origin_count, + origin_dt, + target, + target_disp, + target_count, + target_dt, + P2P_MODULE(win), + &sendreq); + if (OMPI_SUCCESS != ret) return ret; + + sendreq->req_op_id = op->o_f_to_c_index; + + /* enqueue sendreq */ + ret = enqueue_sendreq(P2P_MODULE(win), sendreq); + + return ret; +} + + +int +ompi_osc_rdma_module_get(void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt, + int target, + int target_disp, + int target_count, + struct ompi_datatype_t *target_dt, + ompi_win_t *win) +{ + int ret; + ompi_osc_rdma_sendreq_t *sendreq; + + if (OMPI_WIN_FENCE & ompi_win_get_mode(win)) { + /* well, we're definitely in an access epoch now */ + ompi_win_set_mode(win, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH | + OMPI_WIN_EXPOSE_EPOCH); + } + + /* shortcut 0 count case */ + if (0 == origin_count || 0 == target_count) { + return OMPI_SUCCESS; + } + + /* create sendreq */ + ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_PT2PT_GET, + origin_addr, + origin_count, + origin_dt, + target, + target_disp, + target_count, + target_dt, + P2P_MODULE(win), + &sendreq); + if (OMPI_SUCCESS != ret) return ret; + + /* if we're doing fence synchronization, try to actively send + right now */ + if (P2P_MODULE(win)->p2p_eager_send && + (OMPI_WIN_FENCE & ompi_win_get_mode(win))) { + OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), 1); + + ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), sendreq); + + if (OMPI_SUCCESS == ret) { + OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock)); + P2P_MODULE(win)->p2p_num_pending_sendreqs[sendreq->req_target_rank]++; + OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock)); + } else { + OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1); + ret = enqueue_sendreq(P2P_MODULE(win), sendreq); + } + } else { + /* enqueue sendreq */ + ret = enqueue_sendreq(P2P_MODULE(win), sendreq); + } + + return ret; +} + + +int +ompi_osc_rdma_module_put(void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, int target_disp, int target_count, + struct ompi_datatype_t *target_dt, ompi_win_t *win) +{ + int ret; + ompi_osc_rdma_sendreq_t *sendreq; + + if (OMPI_WIN_FENCE & ompi_win_get_mode(win)) { + /* well, we're definitely in an access epoch now */ + ompi_win_set_mode(win, OMPI_WIN_FENCE | OMPI_WIN_ACCESS_EPOCH | + OMPI_WIN_EXPOSE_EPOCH); + } + + /* shortcut 0 count case */ + if (0 == origin_count || 0 == target_count) { + return OMPI_SUCCESS; + } + + /* create sendreq */ + ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_PT2PT_PUT, + origin_addr, + origin_count, + origin_dt, + target, + target_disp, + target_count, + target_dt, + P2P_MODULE(win), + &sendreq); + if (OMPI_SUCCESS != ret) return ret; + + /* if we're doing fence synchronization, try to actively send + right now */ + if (P2P_MODULE(win)->p2p_eager_send && + (OMPI_WIN_FENCE & ompi_win_get_mode(win))) { + OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), 1); + + ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), sendreq); + + if (OMPI_SUCCESS == ret) { + OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock)); + P2P_MODULE(win)->p2p_num_pending_sendreqs[sendreq->req_target_rank]++; + OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock)); + } else { + OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1); + ret = enqueue_sendreq(P2P_MODULE(win), sendreq); + } + } else { + /* enqueue sendreq */ + ret = enqueue_sendreq(P2P_MODULE(win), sendreq); + } + + return ret; +} diff --git a/ompi/mca/osc/rdma/osc_rdma_component.c b/ompi/mca/osc/rdma/osc_rdma_component.c new file mode 100644 index 0000000000..4ef4821de0 --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma_component.c @@ -0,0 +1,616 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include + +#include "osc_rdma.h" +#include "osc_rdma_sendreq.h" +#include "osc_rdma_replyreq.h" +#include "osc_rdma_header.h" +#include "osc_rdma_obj_convert.h" +#include "osc_rdma_data_move.h" + +#include "opal/threads/mutex.h" +#include "ompi/info/info.h" +#include "ompi/communicator/communicator.h" +#include "ompi/mca/osc/osc.h" +#include "ompi/mca/btl/btl.h" +#include "ompi/mca/bml/bml.h" +#include "ompi/mca/bml/base/base.h" +#include "ompi/datatype/dt_arch.h" + +static int ompi_osc_rdma_component_open(void); + +ompi_osc_rdma_component_t mca_osc_rdma_component = { + { /* ompi_osc_base_component_t */ + { /* ompi_base_component_t */ + OMPI_OSC_BASE_VERSION_1_0_0, + "pt2pt", + 1, + 0, + 0, + ompi_osc_rdma_component_open, + NULL + }, + { /* mca_base_component_data */ + false /* checkpointable? */ + }, + ompi_osc_rdma_component_init, + ompi_osc_rdma_component_query, + ompi_osc_rdma_component_select, + ompi_osc_rdma_component_finalize + } +}; + + +ompi_osc_rdma_module_t ompi_osc_rdma_module_template = { + { + ompi_osc_rdma_module_free, + + ompi_osc_rdma_module_put, + ompi_osc_rdma_module_get, + ompi_osc_rdma_module_accumulate, + + ompi_osc_rdma_module_fence, + + ompi_osc_rdma_module_start, + ompi_osc_rdma_module_complete, + ompi_osc_rdma_module_post, + ompi_osc_rdma_module_wait, + ompi_osc_rdma_module_test, + + ompi_osc_rdma_module_lock, + ompi_osc_rdma_module_unlock, + } +}; + + +void ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl, + mca_btl_base_tag_t tag, + mca_btl_base_descriptor_t *descriptor, + void *cbdata); + +/* look up parameters for configuring this window. The code first + looks in the info structure passed by the user, then through mca + parameters. */ +static bool +check_config_value_bool(char *key, ompi_info_t *info) +{ + char *value_string; + int value_len, ret, flag, param; + bool result; + + ret = ompi_info_get_valuelen(info, key, &value_len, &flag); + if (OMPI_SUCCESS != ret) goto info_not_found; + if (flag == 0) goto info_not_found; + value_len++; + + value_string = malloc(sizeof(char) * value_len); + if (NULL == value_string) goto info_not_found; + + ret = ompi_info_get(info, key, value_len, value_string, &flag); + if (OMPI_SUCCESS != ret) { + free(value_string); + goto info_not_found; + } + assert(flag != 0); + ret = ompi_info_value_to_bool(value_string, &result); + free(value_string); + if (OMPI_SUCCESS != ret) goto info_not_found; + return result; + + info_not_found: + param = mca_base_param_find("osc", "pt2pt", key); + if (param == OPAL_ERROR) return false; + + ret = mca_base_param_lookup_int(param, &flag); + if (OMPI_SUCCESS != ret) return false; + + result = flag; + + return result; +} + + +static int fence_sync_index; + + +static int +ompi_osc_rdma_component_open(void) +{ + fence_sync_index = + mca_base_param_reg_string(&mca_osc_rdma_component.super.osc_version, + "fence_sync_method", + "How to synchronize fence: reduce_scatter, allreduce, alltoall", + false, false, "reduce_scatter", NULL); + + mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version, + "eager_send", + "Attempt to start data movement during communication call, " + "instead of at synchrnoization time. " + "Info key of same name overrides this value, " + "if info key given.", + false, false, 0, NULL); + + mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version, + "no_locks", + "Enable optimizations available only if MPI_LOCK is not used.", + false, false, 0, NULL); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_rdma_component_init(bool enable_progress_threads, + bool enable_mpi_threads) +{ + /* we can run with either threads or not threads (may not be able + to do win locks)... */ + mca_osc_rdma_component.p2p_c_have_progress_threads = + enable_progress_threads; + + OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_lock, opal_mutex_t); + + OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_modules, + opal_hash_table_t); + opal_hash_table_init(&mca_osc_rdma_component.p2p_c_modules, 2); + + OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_sendreqs, opal_free_list_t); + opal_free_list_init(&mca_osc_rdma_component.p2p_c_sendreqs, + sizeof(ompi_osc_rdma_sendreq_t), + OBJ_CLASS(ompi_osc_rdma_sendreq_t), + 1, -1, 1); + + OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_replyreqs, opal_free_list_t); + opal_free_list_init(&mca_osc_rdma_component.p2p_c_replyreqs, + sizeof(ompi_osc_rdma_replyreq_t), + OBJ_CLASS(ompi_osc_rdma_replyreq_t), + 1, -1, 1); + + OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_longreqs, opal_free_list_t); + opal_free_list_init(&mca_osc_rdma_component.p2p_c_longreqs, + sizeof(ompi_osc_rdma_longreq_t), + OBJ_CLASS(ompi_osc_rdma_longreq_t), + 1, -1, 1); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_rdma_component_finalize(void) +{ + size_t num_modules; + + if (0 != + (num_modules = opal_hash_table_get_size(&mca_osc_rdma_component.p2p_c_modules))) { + opal_output(0, "WARNING: There were %d Windows created but not freed.", + num_modules); + } + +#if 0 + mca_bml.bml_register(MCA_BTL_TAG_OSC_PT2PT, NULL, NULL); +#endif + + OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_longreqs); + OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_replyreqs); + OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_sendreqs); + OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_modules); + OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_lock); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_rdma_component_query(ompi_win_t *win, + ompi_info_t *info, + ompi_communicator_t *comm) +{ + /* we can always run - return a low priority */ + return 10; +} + + +int +ompi_osc_rdma_component_select(ompi_win_t *win, + ompi_info_t *info, + ompi_communicator_t *comm) +{ + ompi_osc_rdma_module_t *module; + int ret, i; + char *sync_string; + + /* create module structure */ + module = malloc(sizeof(ompi_osc_rdma_module_t)); + if (NULL == module) return OMPI_ERROR; + + /* fill in the function pointer part */ + memcpy(module, &ompi_osc_rdma_module_template, + sizeof(ompi_osc_base_module_t)); + + /* initialize the p2p part */ + OBJ_CONSTRUCT(&(module->p2p_lock), opal_mutex_t); + OBJ_CONSTRUCT(&(module->p2p_acc_lock), opal_mutex_t); + + module->p2p_win = win; + + ret = ompi_comm_dup(comm, &(module->p2p_comm)); + if (ret != OMPI_SUCCESS) { + OBJ_DESTRUCT(&(module->p2p_acc_lock)); + OBJ_DESTRUCT(&(module->p2p_lock)); + free(module); + return ret; + } + + OBJ_CONSTRUCT(&module->p2p_pending_sendreqs, opal_list_t); + module->p2p_num_pending_sendreqs = malloc(sizeof(short) * + ompi_comm_size(module->p2p_comm)); + if (NULL == module->p2p_num_pending_sendreqs) { + OBJ_DESTRUCT(&module->p2p_pending_sendreqs); + ompi_comm_free(&comm); + OBJ_DESTRUCT(&(module->p2p_acc_lock)); + OBJ_DESTRUCT(&(module->p2p_lock)); + free(module); + return ret; + } + memset(module->p2p_num_pending_sendreqs, 0, + sizeof(short) * ompi_comm_size(module->p2p_comm)); + + module->p2p_num_pending_out = 0; + module->p2p_num_pending_in = 0; + module->p2p_tag_counter = 0; + + OBJ_CONSTRUCT(&(module->p2p_long_msgs), opal_list_t); + + OBJ_CONSTRUCT(&(module->p2p_copy_pending_sendreqs), opal_list_t); + module->p2p_copy_num_pending_sendreqs = malloc(sizeof(short) * + ompi_comm_size(module->p2p_comm)); + if (NULL == module->p2p_copy_num_pending_sendreqs) { + OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs); + OBJ_DESTRUCT(&module->p2p_long_msgs); + free(module->p2p_num_pending_sendreqs); + OBJ_DESTRUCT(&module->p2p_pending_sendreqs); + ompi_comm_free(&comm); + OBJ_DESTRUCT(&(module->p2p_acc_lock)); + OBJ_DESTRUCT(&(module->p2p_lock)); + free(module); + return ret; + } + memset(module->p2p_num_pending_sendreqs, 0, + sizeof(short) * ompi_comm_size(module->p2p_comm)); + + module->p2p_eager_send = check_config_value_bool("eager_send", info); + + /* fence data */ + module->p2p_fence_coll_counts = malloc(sizeof(int) * + ompi_comm_size(module->p2p_comm)); + if (NULL == module->p2p_fence_coll_counts) { + free(module->p2p_copy_num_pending_sendreqs); + OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs); + OBJ_DESTRUCT(&module->p2p_long_msgs); + free(module->p2p_num_pending_sendreqs); + OBJ_DESTRUCT(&module->p2p_pending_sendreqs); + ompi_comm_free(&comm); + OBJ_DESTRUCT(&(module->p2p_acc_lock)); + OBJ_DESTRUCT(&(module->p2p_lock)); + free(module); + return ret; + } + for (i = 0 ; i < ompi_comm_size(module->p2p_comm) ; ++i) { + module->p2p_fence_coll_counts[i] = 1; + } + + module->p2p_fence_coll_results = malloc(sizeof(int) * + ompi_comm_size(module->p2p_comm)); + if (NULL == module->p2p_fence_coll_counts) { + free(module->p2p_fence_coll_counts); + free(module->p2p_copy_num_pending_sendreqs); + OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs); + OBJ_DESTRUCT(&module->p2p_long_msgs); + free(module->p2p_num_pending_sendreqs); + OBJ_DESTRUCT(&module->p2p_pending_sendreqs); + ompi_comm_free(&comm); + OBJ_DESTRUCT(&(module->p2p_acc_lock)); + OBJ_DESTRUCT(&(module->p2p_lock)); + free(module); + return ret; + } + + /* figure out what sync method to use */ + mca_base_param_lookup_string(fence_sync_index, &sync_string); + if (0 == strcmp(sync_string, "reduce_scatter")) { + module->p2p_fence_sync_type = OSC_SYNC_REDUCE_SCATTER; + } else if (0 == strcmp(sync_string, "allreduce")) { + module->p2p_fence_sync_type = OSC_SYNC_ALLREDUCE; + } else if (0 == strcmp(sync_string, "alltoall")) { + module->p2p_fence_sync_type = OSC_SYNC_ALLTOALL; + } else { + opal_output(0, "invalid value for fence_sync_method parameter: %s\n", sync_string); + return OMPI_ERROR; + } + + /* pwsc data */ + module->p2p_pw_group = NULL; + module->p2p_sc_group = NULL; + + /* lock data */ + module->p2p_lock_status = 0; + module->p2p_shared_count = 0; + OBJ_CONSTRUCT(&(module->p2p_locks_pending), opal_list_t); + module->p2p_lock_received_ack = 0; + + /* update component data */ + OPAL_THREAD_LOCK(&mca_osc_rdma_component.p2p_c_lock); + opal_hash_table_set_value_uint32(&mca_osc_rdma_component.p2p_c_modules, + module->p2p_comm->c_contextid, + module); + OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.p2p_c_lock); + + /* fill in window information */ + win->w_osc_module = (ompi_osc_base_module_t*) module; + if (check_config_value_bool("no_locks", info)) { + win->w_flags |= OMPI_WIN_NO_LOCKS; + } + + /* sync memory - make sure all initialization completed */ + opal_atomic_mb(); + + /* register to receive fragment callbacks */ + ret = mca_bml.bml_register(MCA_BTL_TAG_OSC_PT2PT, + ompi_osc_rdma_component_fragment_cb, + NULL); + + + if (module->p2p_eager_send) { + /* need to barrier if eager sending or we can receive before the + other side has been fully setup, causing much gnashing of + teeth. */ + module->p2p_comm->c_coll.coll_barrier(module->p2p_comm); + } + + return ret; +} + + + +/* dispatch for callback on message completion */ +void +ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl, + mca_btl_base_tag_t tag, + mca_btl_base_descriptor_t *descriptor, + void *cbdata) +{ + int ret; + ompi_osc_rdma_module_t *module; + void *payload; + + assert(descriptor->des_dst[0].seg_len >= + sizeof(ompi_osc_rdma_base_header_t)); + + /* handle message */ + switch (((ompi_osc_rdma_base_header_t*) descriptor->des_dst[0].seg_addr.pval)->hdr_type) { + case OMPI_OSC_PT2PT_HDR_PUT: + { + ompi_osc_rdma_send_header_t *header; + + /* get our header and payload */ + header = (ompi_osc_rdma_send_header_t*) + descriptor->des_dst[0].seg_addr.pval; + payload = (void*) (header + 1); + +#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT + if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { + OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header); + } +#endif + + /* get our module pointer */ + module = ompi_osc_rdma_windx_to_module(header->hdr_windx); + if (NULL == module) return; + + ret = ompi_osc_rdma_sendreq_recv_put(module, header, payload); + } + break; + + case OMPI_OSC_PT2PT_HDR_ACC: + { + ompi_osc_rdma_send_header_t *header; + + /* get our header and payload */ + header = (ompi_osc_rdma_send_header_t*) + descriptor->des_dst[0].seg_addr.pval; + payload = (void*) (header + 1); + +#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT + if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { + OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header); + } +#endif + + /* get our module pointer */ + module = ompi_osc_rdma_windx_to_module(header->hdr_windx); + if (NULL == module) return; + + /* receive into temporary buffer */ + ret = ompi_osc_rdma_sendreq_recv_accum(module, header, payload); + } + break; + + case OMPI_OSC_PT2PT_HDR_GET: + { + ompi_datatype_t *datatype; + ompi_osc_rdma_send_header_t *header; + ompi_osc_rdma_replyreq_t *replyreq; + ompi_proc_t *proc; + + /* get our header and payload */ + header = (ompi_osc_rdma_send_header_t*) + descriptor->des_dst[0].seg_addr.pval; + payload = (void*) (header + 1); + +#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT + if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { + OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header); + } +#endif + + /* get our module pointer */ + module = ompi_osc_rdma_windx_to_module(header->hdr_windx); + if (NULL == module) return; + + /* create or get a pointer to our datatype */ + proc = module->p2p_comm->c_pml_procs[header->hdr_origin]->proc_ompi; + datatype = ompi_osc_rdma_datatype_create(proc, &payload); + + /* create replyreq sendreq */ + ret = ompi_osc_rdma_replyreq_alloc_init(module, + header->hdr_origin, + header->hdr_origin_sendreq, + header->hdr_target_disp, + header->hdr_target_count, + datatype, + &replyreq); + + /* send replyreq */ + ompi_osc_rdma_replyreq_send(module, replyreq); + + /* sendreq does the right retain, so we can release safely */ + OBJ_RELEASE(datatype); + } + break; + + case OMPI_OSC_PT2PT_HDR_REPLY: + { + ompi_osc_rdma_reply_header_t *header; + ompi_osc_rdma_sendreq_t *sendreq; + + /* get our header and payload */ + header = (ompi_osc_rdma_reply_header_t*) + descriptor->des_dst[0].seg_addr.pval; + payload = (void*) (header + 1); + +#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT + if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { + OMPI_OSC_PT2PT_REPLY_HDR_NTOH(*header); + } +#endif + + /* get original sendreq pointer */ + sendreq = (ompi_osc_rdma_sendreq_t*) header->hdr_origin_sendreq.pval; + module = sendreq->req_module; + + /* receive data */ + ompi_osc_rdma_replyreq_recv(module, sendreq, header, payload); + } + break; + case OMPI_OSC_PT2PT_HDR_POST: + { + ompi_osc_rdma_control_header_t *header = + (ompi_osc_rdma_control_header_t*) + descriptor->des_dst[0].seg_addr.pval; + +#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT + if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { + OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header); + } +#endif + + /* get our module pointer */ + module = ompi_osc_rdma_windx_to_module(header->hdr_windx); + if (NULL == module) return; + + OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1); + } + break; + case OMPI_OSC_PT2PT_HDR_COMPLETE: + { + ompi_osc_rdma_control_header_t *header = + (ompi_osc_rdma_control_header_t*) + descriptor->des_dst[0].seg_addr.pval; + +#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT + if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { + OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header); + } +#endif + + /* get our module pointer */ + module = ompi_osc_rdma_windx_to_module(header->hdr_windx); + if (NULL == module) return; + + /* we've heard from one more place, and have value reqs to + process */ + OPAL_THREAD_ADD32(&(module->p2p_num_pending_out), -1); + OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), header->hdr_value[0]); + } + break; + + case OMPI_OSC_PT2PT_HDR_LOCK_REQ: + { + ompi_osc_rdma_control_header_t *header = + (ompi_osc_rdma_control_header_t*) + descriptor->des_dst[0].seg_addr.pval; + +#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT + if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { + OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header); + } +#endif + + /* get our module pointer */ + module = ompi_osc_rdma_windx_to_module(header->hdr_windx); + if (NULL == module) return; + + if (header->hdr_value[1] > 0) { + ompi_osc_rdma_passive_lock(module, header->hdr_value[0], + header->hdr_value[1]); + } else { + OPAL_THREAD_ADD32(&(module->p2p_lock_received_ack), 1); + } + } + break; + + case OMPI_OSC_PT2PT_HDR_UNLOCK_REQ: + { + ompi_osc_rdma_control_header_t *header = + (ompi_osc_rdma_control_header_t*) + descriptor->des_dst[0].seg_addr.pval; + +#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT + if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { + OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header); + } +#endif + + /* get our module pointer */ + module = ompi_osc_rdma_windx_to_module(header->hdr_windx); + if (NULL == module) return; + + ompi_osc_rdma_passive_unlock(module, header->hdr_value[0], + header->hdr_value[1]); + } + break; + + default: + /* BWB - FIX ME - this sucks */ + opal_output(0, "received packet for Window with unknown type"); + abort(); + } +} diff --git a/ompi/mca/osc/rdma/osc_rdma_data_move.c b/ompi/mca/osc/rdma/osc_rdma_data_move.c new file mode 100644 index 0000000000..9b612ad867 --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma_data_move.c @@ -0,0 +1,842 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "osc_rdma.h" +#include "osc_rdma_sendreq.h" +#include "osc_rdma_header.h" +#include "osc_rdma_data_move.h" +#include "osc_rdma_obj_convert.h" + +#include "opal/util/output.h" +#include "opal/sys/atomic.h" +#include "ompi/mca/bml/bml.h" +#include "ompi/mca/bml/base/base.h" +#include "ompi/mca/btl/btl.h" +#include "ompi/datatype/datatype.h" +#include "ompi/datatype/dt_arch.h" + +static inline int32_t +create_send_tag(ompi_osc_rdma_module_t *module) +{ +#if OMPI_HAVE_THREAD_SUPPORT && OPAL_HAVE_ATOMIC_CMPSET_32 + int32_t newval, oldval; + do { + oldval = module->p2p_tag_counter; + newval = (oldval + 1) % mca_pml.pml_max_tag; + } while (0 == opal_atomic_cmpset_32(&module->p2p_tag_counter, oldval, newval)); + return newval; +#elif OMPI_HAVE_THREAD_SUPPORT + int32_t ret; + /* no compare and swap - have to lock the module */ + OPAL_THREAD_LOCK(&module->p2p_lock); + module->p2p_tag_counter = (module->p2p_tag_counter + 1) % mca_pml.pml_max_tag; + ret = module->p2p_tag_counter; + OPAL_THREAD_UNLOCK(&module->p2p_lock); + return ret; +#else + module->p2p_tag_counter = (module->p2p_tag_counter + 1) % mca_pml.pml_max_tag; + return module->p2p_tag_counter; +#endif +} + + +/********************************************************************** + * + * Sending a sendreq to target + * + **********************************************************************/ +static void +ompi_osc_rdma_sendreq_send_long_cb(ompi_osc_rdma_longreq_t *longreq) +{ + ompi_osc_rdma_sendreq_t *sendreq = + (ompi_osc_rdma_sendreq_t*) longreq->req_comp_cbdata; + + opal_output(-1, "%d completed long sendreq to %d", + sendreq->req_module->p2p_comm->c_my_rank, + sendreq->req_target_rank); + + opal_list_remove_item(&(sendreq->req_module->p2p_long_msgs), + &(longreq->super.super)); + + ompi_osc_rdma_longreq_free(longreq); + + OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1); + ompi_osc_rdma_sendreq_free(sendreq); +} + + +static void +ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl, + struct mca_btl_base_endpoint_t *endpoint, + struct mca_btl_base_descriptor_t* descriptor, + int status) +{ + ompi_osc_rdma_sendreq_t *sendreq = + (ompi_osc_rdma_sendreq_t*) descriptor->des_cbdata; + ompi_osc_rdma_send_header_t *header = + (ompi_osc_rdma_send_header_t*) descriptor->des_src[0].seg_addr.pval; + + if (OMPI_SUCCESS != status) { + /* requeue and return */ + /* BWB - FIX ME - figure out where to put this bad boy */ + abort(); + return; + } + + /* have to look at header, and not the sendreq because in the case + of get, it's possible that the sendreq has been freed already + (if the remote side replies before we get our send completion + callback) and already allocated to another request. We don't + wait for this completion before exiting a synchronization point + in the case of get, as we really don't care when it completes - + only when the data arrives. */ + if (OMPI_OSC_PT2PT_HDR_GET != header->hdr_base.hdr_type) { +#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT + if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { + OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header); + } +#endif + /* do we need to post a send? */ + if (header->hdr_msg_length != 0) { + /* sendreq is done. Mark it as so and get out of here */ + OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1); + ompi_osc_rdma_sendreq_free(sendreq); + } else { + ompi_osc_rdma_longreq_t *longreq; + ompi_osc_rdma_longreq_alloc(&longreq); + + longreq->req_comp_cb = ompi_osc_rdma_sendreq_send_long_cb; + longreq->req_comp_cbdata = sendreq; + opal_output(-1, "%d starting long sendreq to %d (%d)", + sendreq->req_module->p2p_comm->c_my_rank, + sendreq->req_target_rank, + header->hdr_origin_tag); + + mca_pml.pml_isend(sendreq->req_origin_convertor.pBaseBuf, + sendreq->req_origin_convertor.count, + sendreq->req_origin_datatype, + sendreq->req_target_rank, + header->hdr_origin_tag, + MCA_PML_BASE_SEND_STANDARD, + sendreq->req_module->p2p_comm, + &(longreq->req_pml_req)); + + /* put the send request in the waiting list */ + OPAL_THREAD_LOCK(&(sendreq->req_module->p2p_lock)); + opal_list_append(&(sendreq->req_module->p2p_long_msgs), + &(longreq->super.super)); + OPAL_THREAD_UNLOCK(&(sendreq->req_module->p2p_lock)); + } + } + + /* release the descriptor and sendreq */ + btl->btl_free(btl, descriptor); + + /* any other sendreqs to restart? */ + /* BWB - FIX ME - implement sending the next sendreq here */ +} + + +/* create the initial fragment, pack header, datatype, and payload (if + size fits) and send */ +int +ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module, + ompi_osc_rdma_sendreq_t *sendreq) +{ + int ret = OMPI_SUCCESS; + mca_bml_base_endpoint_t *endpoint = NULL; + mca_bml_base_btl_t *bml_btl = NULL; + mca_btl_base_descriptor_t *descriptor = NULL; + ompi_osc_rdma_send_header_t *header = NULL; + size_t written_data = 0; + size_t needed_len = sizeof(ompi_osc_rdma_send_header_t); + const void *packed_ddt; + size_t packed_ddt_len = ompi_ddt_pack_description_length(sendreq->req_target_datatype); + + /* we always need to send the ddt */ + needed_len += packed_ddt_len; + if (OMPI_OSC_PT2PT_GET != sendreq->req_type) { + needed_len += sendreq->req_origin_bytes_packed; + } + + /* Get a BTL so we have the eager limit */ + endpoint = (mca_bml_base_endpoint_t*) sendreq->req_target_proc->proc_bml; + bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); + descriptor = bml_btl->btl_alloc(bml_btl->btl, + needed_len < bml_btl->btl_eager_limit ? needed_len : + bml_btl->btl_eager_limit); + if (NULL == descriptor) { + ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; + goto cleanup; + } + + /* verify at least enough space for header */ + if (descriptor->des_src[0].seg_len < sizeof(ompi_osc_rdma_send_header_t)) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto cleanup; + } + + /* setup descriptor */ + descriptor->des_cbfunc = ompi_osc_rdma_sendreq_send_cb; + descriptor->des_cbdata = (void*) sendreq; + descriptor->des_flags = MCA_BTL_DES_FLAGS_PRIORITY; + + /* pack header */ + header = (ompi_osc_rdma_send_header_t*) descriptor->des_src[0].seg_addr.pval; + written_data += sizeof(ompi_osc_rdma_send_header_t); + header->hdr_base.hdr_flags = 0; + header->hdr_windx = sendreq->req_module->p2p_comm->c_contextid; + header->hdr_origin = sendreq->req_module->p2p_comm->c_my_rank; + header->hdr_origin_sendreq.pval = (void*) sendreq; + header->hdr_origin_tag = 0; + header->hdr_target_disp = sendreq->req_target_disp; + header->hdr_target_count = sendreq->req_target_count; + + switch (sendreq->req_type) { + case OMPI_OSC_PT2PT_PUT: + header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_PUT; +#if OMPI_ENABLE_MEM_DEBUG + header->hdr_target_op = 0; +#endif + break; + + case OMPI_OSC_PT2PT_ACC: + header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_ACC; + header->hdr_target_op = sendreq->req_op_id; + break; + + case OMPI_OSC_PT2PT_GET: + header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_GET; +#if OMPI_ENABLE_MEM_DEBUG + header->hdr_target_op = 0; +#endif + break; + } + + /* Set datatype id and / or pack datatype */ + ret = ompi_ddt_get_pack_description(sendreq->req_target_datatype, &packed_ddt); + if (OMPI_SUCCESS != ret) goto cleanup; + memcpy((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data, + packed_ddt, packed_ddt_len); + written_data += packed_ddt_len; + + if (OMPI_OSC_PT2PT_GET != sendreq->req_type) { + /* if sending data and it fits, pack payload */ + if (descriptor->des_src[0].seg_len >= + written_data + sendreq->req_origin_bytes_packed) { + struct iovec iov; + uint32_t iov_count = 1; + int32_t free_after; + size_t max_data = sendreq->req_origin_bytes_packed; + + iov.iov_len = max_data; + iov.iov_base = (void*) ((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data); + + ret = ompi_convertor_pack(&sendreq->req_origin_convertor, &iov, &iov_count, + &max_data, &free_after); + if (ret < 0) { + ret = OMPI_ERR_FATAL; + goto cleanup; + } + + assert(max_data == sendreq->req_origin_bytes_packed); + written_data += max_data; + descriptor->des_src[0].seg_len = written_data; + + header->hdr_msg_length = sendreq->req_origin_bytes_packed; + } else { + header->hdr_msg_length = 0; + header->hdr_origin_tag = create_send_tag(module); + } + } else { + descriptor->des_src[0].seg_len = written_data; + header->hdr_msg_length = 0; + } + +#ifdef WORDS_BIGENDIAN + header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO; +#elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT + if (sendreq->req_target_proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) { + header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO; + OMPI_OSC_PT2PT_SEND_HDR_HTON(*header); + } +#endif + + /* send fragment */ + opal_output(-1, "%d sending sendreq to %d", + sendreq->req_module->p2p_comm->c_my_rank, + sendreq->req_target_rank); + + ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT); + goto done; + + cleanup: + if (descriptor != NULL) { + mca_bml_base_free(bml_btl, descriptor); + } + + done: + return ret; +} + + +/********************************************************************** + * + * Sending a replyreq back to origin + * + **********************************************************************/ +static void +ompi_osc_rdma_replyreq_send_long_cb(ompi_osc_rdma_longreq_t *longreq) +{ + ompi_osc_rdma_replyreq_t *replyreq = + (ompi_osc_rdma_replyreq_t*) longreq->req_comp_cbdata; + + opal_list_remove_item(&(replyreq->rep_module->p2p_long_msgs), + &(longreq->super.super)); + + ompi_osc_rdma_longreq_free(longreq); + + OPAL_THREAD_ADD32(&(replyreq->rep_module->p2p_num_pending_in), -1); + ompi_osc_rdma_replyreq_free(replyreq); +} + + +static void +ompi_osc_rdma_replyreq_send_cb(struct mca_btl_base_module_t* btl, + struct mca_btl_base_endpoint_t *endpoint, + struct mca_btl_base_descriptor_t* descriptor, + int status) +{ + ompi_osc_rdma_replyreq_t *replyreq = + (ompi_osc_rdma_replyreq_t*) descriptor->des_cbdata; + ompi_osc_rdma_reply_header_t *header = + (ompi_osc_rdma_reply_header_t*) descriptor->des_src[0].seg_addr.pval; + + if (OMPI_SUCCESS != status) { + /* requeue and return */ + /* BWB - FIX ME - figure out where to put this bad boy */ + abort(); + return; + } + +#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT + if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) { + OMPI_OSC_PT2PT_REPLY_HDR_NTOH(*header); + } +#endif + + /* do we need to post a send? */ + if (header->hdr_msg_length != 0) { + /* sendreq is done. Mark it as so and get out of here */ + OPAL_THREAD_ADD32(&(replyreq->rep_module->p2p_num_pending_in), -1); + ompi_osc_rdma_replyreq_free(replyreq); + } else { + ompi_osc_rdma_longreq_t *longreq; + ompi_osc_rdma_longreq_alloc(&longreq); + + longreq->req_comp_cb = ompi_osc_rdma_replyreq_send_long_cb; + longreq->req_comp_cbdata = replyreq; + + mca_pml.pml_isend(replyreq->rep_target_convertor.pBaseBuf, + replyreq->rep_target_convertor.count, + replyreq->rep_target_datatype, + replyreq->rep_origin_rank, + header->hdr_target_tag, + MCA_PML_BASE_SEND_STANDARD, + replyreq->rep_module->p2p_comm, + &(longreq->req_pml_req)); + + /* put the send request in the waiting list */ + OPAL_THREAD_LOCK(&(replyreq->rep_module->p2p_lock)); + opal_list_append(&(replyreq->rep_module->p2p_long_msgs), + &(longreq->super.super)); + OPAL_THREAD_UNLOCK(&(replyreq->rep_module->p2p_lock)); + } + + /* release the descriptor and replyreq */ + btl->btl_free(btl, descriptor); + + /* any other replyreqs to restart? */ +} + + +int +ompi_osc_rdma_replyreq_send(ompi_osc_rdma_module_t *module, + ompi_osc_rdma_replyreq_t *replyreq) +{ + int ret = OMPI_SUCCESS; + mca_bml_base_endpoint_t *endpoint = NULL; + mca_bml_base_btl_t *bml_btl = NULL; + mca_btl_base_descriptor_t *descriptor = NULL; + ompi_osc_rdma_reply_header_t *header = NULL; + size_t written_data = 0; + + /* Get a BTL and a fragment to go with it */ + endpoint = (mca_bml_base_endpoint_t*) replyreq->rep_origin_proc->proc_bml; + bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); + descriptor = bml_btl->btl_alloc(bml_btl->btl, + bml_btl->btl_eager_limit); + if (NULL == descriptor) { + ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; + goto cleanup; + } + + /* verify at least enough space for header */ + if (descriptor->des_src[0].seg_len < sizeof(ompi_osc_rdma_reply_header_t)) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto cleanup; + } + + /* setup descriptor */ + descriptor->des_cbfunc = ompi_osc_rdma_replyreq_send_cb; + descriptor->des_cbdata = (void*) replyreq; + descriptor->des_flags = MCA_BTL_DES_FLAGS_PRIORITY; + + /* pack header */ + header = (ompi_osc_rdma_reply_header_t*) descriptor->des_src[0].seg_addr.pval; + written_data += sizeof(ompi_osc_rdma_reply_header_t); + header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_REPLY; + header->hdr_base.hdr_flags = 0; + header->hdr_origin_sendreq = replyreq->rep_origin_sendreq; + header->hdr_target_tag = 0; + + /* if sending data fits, pack payload */ + if (descriptor->des_src[0].seg_len >= + written_data + replyreq->rep_target_bytes_packed) { + struct iovec iov; + uint32_t iov_count = 1; + int32_t free_after; + size_t max_data = replyreq->rep_target_bytes_packed; + + iov.iov_len = max_data; + iov.iov_base = (void*) ((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data); + + ret = ompi_convertor_pack(&replyreq->rep_target_convertor, &iov, &iov_count, + &max_data, &free_after); + if (ret < 0) { + ret = OMPI_ERR_FATAL; + goto cleanup; + } + + assert(max_data == replyreq->rep_target_bytes_packed); + written_data += max_data; + descriptor->des_src[0].seg_len = written_data; + + header->hdr_msg_length = replyreq->rep_target_bytes_packed; + } else { + header->hdr_msg_length = 0; + header->hdr_target_tag = create_send_tag(module); + } + +#ifdef WORDS_BIGENDIAN + header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO; +#elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT + if (replyreq->rep_origin_proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) { + header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO; + OMPI_OSC_PT2PT_REPLY_HDR_HTON(*header); + } +#endif + + /* send fragment */ + ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT); + goto done; + + cleanup: + if (descriptor != NULL) { + mca_bml_base_free(bml_btl, descriptor); + } + + done: + return ret; +} + + +/********************************************************************** + * + * Receive a put on the target side + * + **********************************************************************/ +static void +ompi_osc_rdma_sendreq_recv_put_long_cb(ompi_osc_rdma_longreq_t *longreq) +{ + opal_list_remove_item(&(longreq->req_module->p2p_long_msgs), + &(longreq->super.super)); + + OBJ_RELEASE(longreq->req_datatype); + ompi_osc_rdma_longreq_free(longreq); + + OPAL_THREAD_ADD32(&(longreq->req_module->p2p_num_pending_in), -1); +} + + +int +ompi_osc_rdma_sendreq_recv_put(ompi_osc_rdma_module_t *module, + ompi_osc_rdma_send_header_t *header, + void *inbuf) +{ + int ret = OMPI_SUCCESS; + void *target = (unsigned char*) module->p2p_win->w_baseptr + + (header->hdr_target_disp * module->p2p_win->w_disp_unit); + ompi_proc_t *proc = module->p2p_comm->c_pml_procs[header->hdr_origin]->proc_ompi; + struct ompi_datatype_t *datatype = + ompi_osc_rdma_datatype_create(proc, &inbuf); + + if (header->hdr_msg_length > 0) { + ompi_convertor_t convertor; + struct iovec iov; + uint32_t iov_count = 1; + int32_t free_after = 0; + size_t max_data; + ompi_proc_t *proc; + + /* create convertor */ + OBJ_CONSTRUCT(&convertor, ompi_convertor_t); + + /* initialize convertor */ + proc = ompi_comm_peer_lookup(module->p2p_comm, header->hdr_origin); + ompi_convertor_copy_and_prepare_for_recv(proc->proc_convertor, + datatype, + header->hdr_target_count, + target, + 0, + &convertor); + iov.iov_len = header->hdr_msg_length; + iov.iov_base = inbuf; + max_data = iov.iov_len; + ompi_convertor_unpack(&convertor, + &iov, + &iov_count, + &max_data, + &free_after); + OBJ_DESTRUCT(&convertor); + OBJ_RELEASE(datatype); + OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1); + + } else { + ompi_osc_rdma_longreq_t *longreq; + ompi_osc_rdma_longreq_alloc(&longreq); + + longreq->req_comp_cb = ompi_osc_rdma_sendreq_recv_put_long_cb; + longreq->req_comp_cbdata = NULL; + longreq->req_datatype = datatype; + longreq->req_module = module; + + ret = mca_pml.pml_irecv(target, + header->hdr_target_count, + datatype, + header->hdr_origin, + header->hdr_origin_tag, + module->p2p_comm, + &(longreq->req_pml_req)); + + /* put the send request in the waiting list */ + OPAL_THREAD_LOCK(&(module->p2p_lock)); + opal_list_append(&(module->p2p_long_msgs), + &(longreq->super.super)); + OPAL_THREAD_UNLOCK(&(module->p2p_lock)); + } + + return ret; +} + + + + +/********************************************************************** + * + * Receive an accumulate on the target side + * + **********************************************************************/ +static void +ompi_osc_rdma_sendreq_recv_accum_long_cb(ompi_osc_rdma_longreq_t *longreq) +{ + ompi_osc_rdma_send_header_t *header = + (ompi_osc_rdma_send_header_t*) longreq->req_comp_cbdata; + void *payload = (void*) (header + 1); + int ret; + + /* lock the window for accumulates */ + OPAL_THREAD_LOCK(&longreq->req_module->p2p_acc_lock); + + opal_list_remove_item(&(longreq->req_module->p2p_long_msgs), + &(longreq->super.super)); + + /* copy the data from the temporary buffer into the user window */ + ret = ompi_osc_rdma_process_op(longreq->req_module, + header, + longreq->req_datatype, + longreq->req_op, + payload, + header->hdr_msg_length); + + /* unlock the window for accumulates */ + OPAL_THREAD_UNLOCK(&longreq->req_module->p2p_acc_lock); + + opal_output(-1, "%d finished receiving long accum message from %d", + longreq->req_module->p2p_comm->c_my_rank, + header->hdr_origin); + + /* free the temp buffer */ + free(longreq->req_comp_cbdata); + + /* Release datatype & op */ + OBJ_RELEASE(longreq->req_datatype); + OBJ_RELEASE(longreq->req_op); + + OPAL_THREAD_ADD32(&(longreq->req_module->p2p_num_pending_in), -1); + + ompi_osc_rdma_longreq_free(longreq); +} + + +int +ompi_osc_rdma_sendreq_recv_accum(ompi_osc_rdma_module_t *module, + ompi_osc_rdma_send_header_t *header, + void *payload) +{ + int ret = OMPI_SUCCESS; + struct ompi_op_t *op = ompi_osc_rdma_op_create(header->hdr_target_op); + ompi_proc_t *proc = module->p2p_comm->c_pml_procs[header->hdr_origin]->proc_ompi; + struct ompi_datatype_t *datatype = + ompi_osc_rdma_datatype_create(proc, &payload); + + if (header->hdr_msg_length > 0) { + /* lock the window for accumulates */ + OPAL_THREAD_LOCK(&module->p2p_acc_lock); + + /* copy the data from the temporary buffer into the user window */ + ret = ompi_osc_rdma_process_op(module, header, datatype, op, payload, + header->hdr_msg_length); + + /* unlock the window for accumulates */ + OPAL_THREAD_UNLOCK(&module->p2p_acc_lock); + + /* Release datatype & op */ + OBJ_RELEASE(datatype); + OBJ_RELEASE(op); + + OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1); + + opal_output(-1, "%d received accum message from %d", + module->p2p_comm->c_my_rank, + header->hdr_origin); + + } else { + ompi_osc_rdma_longreq_t *longreq; + long lb, extent, true_lb, true_extent; + size_t buflen; + + /* figure out how big a buffer we need */ + ompi_ddt_get_extent(datatype, &lb, &extent); + ompi_ddt_get_true_extent(datatype, &true_lb, &true_extent); + buflen = true_extent + (header->hdr_target_count - 1) * extent; + + /* get a longreq and fill it in */ + ompi_osc_rdma_longreq_alloc(&longreq); + + longreq->req_comp_cb = ompi_osc_rdma_sendreq_recv_accum_long_cb; + longreq->req_datatype = datatype; + longreq->req_op = op; + longreq->req_module = module; + + /* allocate a buffer to receive into ... */ + longreq->req_comp_cbdata = malloc(buflen + sizeof(ompi_osc_rdma_send_header_t)); + + if (NULL == longreq->req_comp_cbdata) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + /* fill in tmp header */ + memcpy(longreq->req_comp_cbdata, header, + sizeof(ompi_osc_rdma_send_header_t)); + ((ompi_osc_rdma_send_header_t*) longreq->req_comp_cbdata)->hdr_msg_length = buflen; + + ret = mca_pml.pml_irecv(((char*) longreq->req_comp_cbdata) + sizeof(ompi_osc_rdma_send_header_t), + header->hdr_target_count, + datatype, + header->hdr_origin, + header->hdr_origin_tag, + module->p2p_comm, + &(longreq->req_pml_req)); + + opal_output(-1, "%d started long recv accum message from %d (%d)", + module->p2p_comm->c_my_rank, + header->hdr_origin, + header->hdr_origin_tag); + + /* put the send request in the waiting list */ + OPAL_THREAD_LOCK(&(module->p2p_lock)); + opal_list_append(&(module->p2p_long_msgs), + &(longreq->super.super)); + OPAL_THREAD_UNLOCK(&(module->p2p_lock)); + } + + return ret; +} + + +/********************************************************************** + * + * Recveive a get on the origin side + * + **********************************************************************/ +static void +ompi_osc_rdma_replyreq_recv_long_cb(ompi_osc_rdma_longreq_t *longreq) +{ + ompi_osc_rdma_sendreq_t *sendreq = + (ompi_osc_rdma_sendreq_t*) longreq->req_comp_cbdata; + + opal_list_remove_item(&(longreq->req_module->p2p_long_msgs), + &(longreq->super.super)); + + ompi_osc_rdma_longreq_free(longreq); + + OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1); + ompi_osc_rdma_sendreq_free(sendreq); +} + +int +ompi_osc_rdma_replyreq_recv(ompi_osc_rdma_module_t *module, + ompi_osc_rdma_sendreq_t *sendreq, + ompi_osc_rdma_reply_header_t *header, + void *payload) +{ + int ret = OMPI_SUCCESS; + + /* receive into user buffer */ + if (header->hdr_msg_length > 0) { + /* short message. woo! */ + + struct iovec iov; + uint32_t iov_count = 1; + int32_t free_after = 0; + size_t max_data; + + iov.iov_len = header->hdr_msg_length; + iov.iov_base = payload; + max_data = iov.iov_len; + ompi_convertor_unpack(&sendreq->req_origin_convertor, + &iov, + &iov_count, + &max_data, + &free_after); + + OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1); + ompi_osc_rdma_sendreq_free(sendreq); + } else { + ompi_osc_rdma_longreq_t *longreq; + ompi_osc_rdma_longreq_alloc(&longreq); + + longreq->req_comp_cb = ompi_osc_rdma_replyreq_recv_long_cb; + longreq->req_comp_cbdata = sendreq; + longreq->req_module = module; + + /* BWB - FIX ME - George is going to kill me for this */ + ret = mca_pml.pml_irecv(sendreq->req_origin_convertor.pBaseBuf, + sendreq->req_origin_convertor.count, + sendreq->req_origin_datatype, + sendreq->req_target_rank, + header->hdr_target_tag, + module->p2p_comm, + &(longreq->req_pml_req)); + + /* put the send request in the waiting list */ + OPAL_THREAD_LOCK(&(module->p2p_lock)); + opal_list_append(&(module->p2p_long_msgs), + &(longreq->super.super)); + OPAL_THREAD_UNLOCK(&(module->p2p_lock)); + } + + return ret; +} + + +/********************************************************************** + * + * Control message communication + * + **********************************************************************/ +static void +ompi_osc_rdma_control_send_cb(struct mca_btl_base_module_t* btl, + struct mca_btl_base_endpoint_t *endpoint, + struct mca_btl_base_descriptor_t* descriptor, + int status) +{ + /* release the descriptor and sendreq */ + btl->btl_free(btl, descriptor); +} + + +int +ompi_osc_rdma_control_send(ompi_osc_rdma_module_t *module, + ompi_proc_t *proc, + uint8_t type, int32_t value0, int32_t value1) +{ + int ret = OMPI_SUCCESS; + mca_bml_base_endpoint_t *endpoint = NULL; + mca_bml_base_btl_t *bml_btl = NULL; + mca_btl_base_descriptor_t *descriptor = NULL; + ompi_osc_rdma_control_header_t *header = NULL; + + /* Get a BTL and a fragment to go with it */ + endpoint = (mca_bml_base_endpoint_t*) proc->proc_bml; + bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); + descriptor = bml_btl->btl_alloc(bml_btl->btl, + sizeof(ompi_osc_rdma_control_header_t)); + if (NULL == descriptor) { + ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; + goto cleanup; + } + + /* verify at least enough space for header */ + if (descriptor->des_src[0].seg_len < sizeof(ompi_osc_rdma_control_header_t)) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto cleanup; + } + + /* setup descriptor */ + descriptor->des_cbfunc = ompi_osc_rdma_control_send_cb; + descriptor->des_cbdata = NULL; + descriptor->des_flags = MCA_BTL_DES_FLAGS_PRIORITY; + descriptor->des_src[0].seg_len = sizeof(ompi_osc_rdma_control_header_t); + + /* pack header */ + header = (ompi_osc_rdma_control_header_t*) descriptor->des_src[0].seg_addr.pval; + header->hdr_base.hdr_type = type; + header->hdr_value[0] = value0; + header->hdr_value[1] = value1; + header->hdr_windx = module->p2p_comm->c_contextid; + +#ifdef WORDS_BIGENDIAN + header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO; +#elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT + if (proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) { + header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO; + OMPI_OSC_PT2PT_CONTROL_HDR_HTON(*header); + } +#endif + + /* send fragment */ + ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT); + goto done; + + cleanup: + if (descriptor != NULL) { + mca_bml_base_free(bml_btl, descriptor); + } + + done: + return ret; +} diff --git a/ompi/mca/osc/rdma/osc_rdma_data_move.h b/ompi/mca/osc/rdma/osc_rdma_data_move.h new file mode 100644 index 0000000000..b13ad9b146 --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma_data_move.h @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OMPI_MCA_OSC_PT2PT_DATA_MOVE_H +#define OMPI_MCA_OSC_PT2PT_DATA_MOVE_H + +#include "osc_rdma_sendreq.h" +#include "osc_rdma_replyreq.h" + +/* send a sendreq (the request from the origin for a Put, Get, or + Accumulate, including the payload for Put and Accumulate) */ +int ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module, + ompi_osc_rdma_sendreq_t *sendreq); + +/* send a replyreq (the request from the target of a Get, with the + payload for the origin */ +int ompi_osc_rdma_replyreq_send(ompi_osc_rdma_module_t *module, + ompi_osc_rdma_replyreq_t *replyreq); + +/* receive the target side of a sendreq for a put, directly into the user's window */ +int ompi_osc_rdma_sendreq_recv_put(ompi_osc_rdma_module_t *module, + ompi_osc_rdma_send_header_t *header, + void *payload); + +/* receive the target side of a sendreq for an accumulate, possibly + using a temproart buffer, then calling the reduction functions */ +int ompi_osc_rdma_sendreq_recv_accum(ompi_osc_rdma_module_t *module, + ompi_osc_rdma_send_header_t *header, + void *payload); + +/* receive the origin side of a replyreq (the reply part of an + MPI_Get), directly into the user's window */ +int ompi_osc_rdma_replyreq_recv(ompi_osc_rdma_module_t *module, + ompi_osc_rdma_sendreq_t *sendreq, + ompi_osc_rdma_reply_header_t *header, + void *payload); + +int ompi_osc_rdma_control_send(ompi_osc_rdma_module_t *module, + ompi_proc_t *proc, + uint8_t type, int32_t value0, int32_t value1); + +#endif diff --git a/ompi/mca/osc/rdma/osc_rdma_header.h b/ompi/mca/osc/rdma/osc_rdma_header.h new file mode 100644 index 0000000000..dc2cc6e7b2 --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma_header.h @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OMPI_MCA_OSC_PT2PT_HDR_H +#define OMPI_MCA_OSC_PT2PT_HDR_H + +#ifdef HAVE_NETINET_IN_H +#include +#endif + +#include "opal/types.h" + +#define OMPI_OSC_PT2PT_HDR_PUT 0x0001 +#define OMPI_OSC_PT2PT_HDR_ACC 0x0002 +#define OMPI_OSC_PT2PT_HDR_GET 0x0004 +#define OMPI_OSC_PT2PT_HDR_REPLY 0x0008 +#define OMPI_OSC_PT2PT_HDR_POST 0x0010 +#define OMPI_OSC_PT2PT_HDR_COMPLETE 0x0020 +#define OMPI_OSC_PT2PT_HDR_LOCK_REQ 0x0040 +#define OMPI_OSC_PT2PT_HDR_UNLOCK_REQ 0x0080 + +#define OMPI_OSC_PT2PT_HDR_FLAG_NBO 0x0001 + +struct ompi_osc_rdma_base_header_t { + uint8_t hdr_type; + /* eventually, this will include endian information */ + uint8_t hdr_flags; +}; +typedef struct ompi_osc_rdma_base_header_t ompi_osc_rdma_base_header_t; + +#define OMPI_OSC_PT2PT_BASE_HDR_NTOH(h) +#define OMPI_OSC_PT2PT_BASE_HDR_HTON(h) + +struct ompi_osc_rdma_send_header_t { + ompi_osc_rdma_base_header_t hdr_base; + uint16_t hdr_windx; + + int32_t hdr_origin; + ompi_ptr_t hdr_origin_sendreq; + int32_t hdr_origin_tag; + + int32_t hdr_target_disp; + int32_t hdr_target_count; + int32_t hdr_target_op; + + int32_t hdr_msg_length; /* 0 if payload is not included */ +}; +typedef struct ompi_osc_rdma_send_header_t ompi_osc_rdma_send_header_t; + +#define OMPI_OSC_PT2PT_SEND_HDR_HTON(hdr) \ + do { \ + OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \ + (hdr).hdr_windx = htons((hdr).hdr_windx); \ + (hdr).hdr_origin = htonl((hdr).hdr_origin); \ + (hdr).hdr_origin_tag = htonl((hdr).hdr_origin_tag); \ + (hdr).hdr_target_disp = htonl((hdr).hdr_target_disp); \ + (hdr).hdr_target_count = htonl((hdr).hdr_target_count); \ + (hdr).hdr_target_op = htonl((hdr).hdr_target_op); \ + (hdr).hdr_msg_length = htonl((hdr).hdr_msg_length); \ + } while (0) + +#define OMPI_OSC_PT2PT_SEND_HDR_NTOH(hdr) \ + do { \ + OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \ + (hdr).hdr_windx = ntohs((hdr).hdr_windx); \ + (hdr).hdr_origin = ntohl((hdr).hdr_origin); \ + (hdr).hdr_origin_tag = ntohl((hdr).hdr_origin_tag); \ + (hdr).hdr_target_disp = ntohl((hdr).hdr_target_disp); \ + (hdr).hdr_target_count = ntohl((hdr).hdr_target_count); \ + (hdr).hdr_target_op = ntohl((hdr).hdr_target_op); \ + (hdr).hdr_msg_length = ntohl((hdr).hdr_msg_length); \ + } while (0) + + +struct ompi_osc_rdma_reply_header_t { + ompi_osc_rdma_base_header_t hdr_base; + + ompi_ptr_t hdr_origin_sendreq; + + int32_t hdr_target_tag; + int32_t hdr_msg_length; +}; +typedef struct ompi_osc_rdma_reply_header_t ompi_osc_rdma_reply_header_t; + +#define OMPI_OSC_PT2PT_REPLY_HDR_HTON(hdr) \ + do { \ + OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \ + (hdr).hdr_target_tag = htonl((hdr).hdr_target_tag); \ + (hdr).hdr_msg_length = htonl((hdr).hdr_msg_length); \ + } while (0) + +#define OMPI_OSC_PT2PT_REPLY_HDR_NTOH(hdr) \ + do { \ + OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \ + (hdr).hdr_target_tag = ntohl((hdr).hdr_target_tag); \ + (hdr).hdr_msg_length = ntohl((hdr).hdr_msg_length); \ + } while (0) + + +struct ompi_osc_rdma_control_header_t { + ompi_osc_rdma_base_header_t hdr_base; + int16_t hdr_windx; + int32_t hdr_value[2]; +}; +typedef struct ompi_osc_rdma_control_header_t ompi_osc_rdma_control_header_t; + +#define OMPI_OSC_PT2PT_CONTROL_HDR_HTON(hdr) \ + do { \ + OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \ + (hdr).hdr_windx = htons((hdr).hdr_windx); \ + (hdr).hdr_value[0] = htonl((hdr).hdr_value[0]); \ + (hdr).hdr_value[1] = htonl((hdr).hdr_value[1]); \ + } while (0) + +#define OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(hdr) \ + do { \ + OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \ + (hdr).hdr_windx = ntohs((hdr).hdr_windx); \ + (hdr).hdr_value[0] = ntohl((hdr).hdr_value[0]); \ + (hdr).hdr_value[1] = ntohl((hdr).hdr_value[1]); \ + } while (0) + +#endif /* OMPI_MCA_OSC_PT2PT_HDR_H */ diff --git a/ompi/mca/osc/rdma/osc_rdma_longreq.c b/ompi/mca/osc/rdma/osc_rdma_longreq.c new file mode 100644 index 0000000000..fc59ef8ac0 --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma_longreq.c @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "osc_rdma_longreq.h" + +#include "opal/class/opal_list.h" + + +OBJ_CLASS_INSTANCE(ompi_osc_rdma_longreq_t, opal_free_list_item_t, + NULL, NULL); + diff --git a/ompi/mca/osc/rdma/osc_rdma_longreq.h b/ompi/mca/osc/rdma/osc_rdma_longreq.h new file mode 100644 index 0000000000..1330562e0c --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma_longreq.h @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OSC_PT2PT_LONGREQ_H +#define OSC_PT2PT_LONGREQ_H + +#include "osc_rdma.h" + +#include "opal/class/opal_list.h" +#include "opal/class/opal_free_list.h" +#include "ompi/request/request.h" + +struct ompi_osc_rdma_longreq_t; +typedef struct ompi_osc_rdma_longreq_t ompi_osc_rdma_longreq_t; + +typedef void (*ompi_osc_rdma_longreq_comp_cb_t)(ompi_osc_rdma_longreq_t *longreq); + +struct ompi_osc_rdma_longreq_t { + opal_free_list_item_t super; + + /* warning - this doesn't always have a sane value */ + ompi_osc_rdma_module_t *req_module; + + ompi_request_t *req_pml_req; + ompi_osc_rdma_longreq_comp_cb_t req_comp_cb; + + /* general storage place - usually holds a request of some type */ + void *req_comp_cbdata; + + /* for long receives, to avoid a longrecvreq type */ + /* BWB - I don't like this, but I don't want another free list. What to do? */ + struct ompi_op_t *req_op; + struct ompi_datatype_t *req_datatype; +}; +OBJ_CLASS_DECLARATION(ompi_osc_rdma_longreq_t); + +static inline int +ompi_osc_rdma_longreq_alloc(ompi_osc_rdma_longreq_t **longreq) +{ + opal_free_list_item_t *item; + int ret; + + OPAL_FREE_LIST_GET(&mca_osc_rdma_component.p2p_c_longreqs, + item, ret); + + *longreq = (ompi_osc_rdma_longreq_t*) item; + return ret; +} + +static inline int +ompi_osc_rdma_longreq_free(ompi_osc_rdma_longreq_t *longreq) +{ + OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.p2p_c_longreqs, + &longreq->super.super); + return OMPI_SUCCESS; +} + +#endif diff --git a/ompi/mca/osc/rdma/osc_rdma_obj_convert.c b/ompi/mca/osc/rdma/osc_rdma_obj_convert.c new file mode 100644 index 0000000000..dd29f343d2 --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma_obj_convert.c @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +/* + * utility functions for dealing with remote datatype and op structures + */ + +#include "ompi_config.h" + +#include "ompi/op/op.h" + +#include "osc_rdma.h" +#include "osc_rdma_sendreq.h" +#include "osc_rdma_header.h" +#include "osc_rdma_obj_convert.h" + +int +ompi_osc_rdma_process_op(ompi_osc_rdma_module_t *module, + ompi_osc_rdma_send_header_t *header, + struct ompi_datatype_t *datatype, + ompi_op_t *op, + void *inbuf, + size_t inbuflen) +{ + unsigned char *target_buffer; + + /* compute target buffer location */ + target_buffer = (unsigned char*) module->p2p_win->w_baseptr + + (header->hdr_target_disp * module->p2p_win->w_disp_unit); + + /* BWB - fix me - change back to the pointer comparison when the + replace o_f_to_c_index is set properly */ + /* if (op == &ompi_mpi_op_replace) { */ + if (header->hdr_target_op == ompi_mpi_op_replace.o_f_to_c_index) { + ompi_convertor_t convertor; + struct iovec iov; + uint32_t iov_count = 1; + int32_t free_after = 0; + size_t max_data; + ompi_proc_t *proc; + + /* create convertor */ + OBJ_CONSTRUCT(&convertor, ompi_convertor_t); + + /* initialize convertor */ + proc = ompi_comm_peer_lookup(module->p2p_comm, header->hdr_origin); + ompi_convertor_copy_and_prepare_for_recv(proc->proc_convertor, + datatype, + header->hdr_target_count, + target_buffer, + 0, + &convertor); + + /* short circuit the reduction operation MPI_REPLACE - it just + replaces the data, so push it out into the user's buffer. + This lets us avoid both the overhead of using the op + invocation and dealing with non-contiguous reductions + (since there are never user-defined reductions in + MPI_ACCUMULATE) */ + iov.iov_len = inbuflen; + iov.iov_base = inbuf; + max_data = iov.iov_len; + ompi_convertor_unpack(&convertor, + &iov, + &iov_count, + &max_data, + &free_after); + OBJ_DESTRUCT(&convertor); + } else { + /* reductions other than MPI_REPLACE. Since user-defined + reductions aren't allowed, these all have to be over + contigous data. We make sure to only send complete + datatypes in these cases, so we can unpack directly from + the user buffer*/ + /* BWB - FIX ME - this won't work if endianness is different. + Talk to George about a ddt function that allows us to fix + endianness "in place' or what else we could do here to keep + performance from sucking... */ + + ompi_op_reduce(op, inbuf, target_buffer, header->hdr_target_count, + datatype); + } + + return OMPI_SUCCESS; +} diff --git a/ompi/mca/osc/rdma/osc_rdma_obj_convert.h b/ompi/mca/osc/rdma/osc_rdma_obj_convert.h new file mode 100644 index 0000000000..db26cac515 --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma_obj_convert.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +/* + * utility functions for dealing with remote datatype and op structures + */ + +#include "mpi.h" + +#include "ompi/datatype/datatype.h" + +static inline +struct ompi_datatype_t* +ompi_osc_rdma_datatype_create(ompi_proc_t *remote_proc, void **payload) +{ + struct ompi_datatype_t *datatype = + ompi_ddt_create_from_packed_description(payload, remote_proc); + if (ompi_ddt_is_predefined(datatype)) OBJ_RETAIN(datatype); + return datatype; +} + +static inline +ompi_op_t * +ompi_osc_rdma_op_create(int op_id) +{ + ompi_op_t *op = MPI_Op_f2c(op_id); + OBJ_RETAIN(op); + return op; +} + + +int ompi_osc_rdma_process_op(ompi_osc_rdma_module_t *module, + ompi_osc_rdma_send_header_t *header, + struct ompi_datatype_t *datatype, + ompi_op_t *op, + void *inbuf, + size_t inbuflen); + +/** + * Convert a window index number into a module instance. + */ +static inline ompi_osc_rdma_module_t* +ompi_osc_rdma_windx_to_module(uint32_t windx) +{ + int ret; + ompi_osc_rdma_module_t *module; + + /* find the right module and dispatch */ + ret = opal_hash_table_get_value_uint32(&mca_osc_rdma_component.p2p_c_modules, + windx, + (void**) (&module)); + if (OMPI_SUCCESS != ret) { + opal_output(0, "Could not translate windx %d to a local MPI_Win instance", + windx); + return NULL; + } + + return module; +} diff --git a/ompi/mca/osc/rdma/osc_rdma_replyreq.c b/ompi/mca/osc/rdma/osc_rdma_replyreq.c new file mode 100644 index 0000000000..80b3f3b37c --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma_replyreq.c @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "osc_rdma_replyreq.h" + +#include "opal/class/opal_list.h" +#include "ompi/datatype/convertor.h" + +int +ompi_osc_rdma_replyreq_alloc_init(ompi_osc_rdma_module_t *module, + int origin, + ompi_ptr_t origin_request, + int target_displacement, + int target_count, + struct ompi_datatype_t *datatype, + ompi_osc_rdma_replyreq_t **replyreq) +{ + int ret; + void *target_addr = (unsigned char*) module->p2p_win->w_baseptr + + (target_displacement * module->p2p_win->w_disp_unit); + + + /* allocate a replyreq */ + ret = ompi_osc_rdma_replyreq_alloc(module, + origin, + replyreq); + if (OMPI_SUCCESS != ret) return ret; + + /* initialize local side of replyreq */ + ret = ompi_osc_rdma_replyreq_init_target(*replyreq, + target_addr, + target_count, + datatype); + if (OMPI_SUCCESS != ret) { + ompi_osc_rdma_replyreq_free(*replyreq); + return ret; + } + + /* initialize remote side of replyreq */ + ret = ompi_osc_rdma_replyreq_init_origin(*replyreq, + origin_request); + if (OMPI_SUCCESS != ret) { + ompi_osc_rdma_replyreq_free(*replyreq); + return ret; + } + + return OMPI_SUCCESS; +} + + +static void ompi_osc_rdma_replyreq_construct(ompi_osc_rdma_replyreq_t *replyreq) +{ + OBJ_CONSTRUCT(&(replyreq->rep_target_convertor), ompi_convertor_t); +} + +static void ompi_osc_rdma_replyreq_destruct(ompi_osc_rdma_replyreq_t *replyreq) +{ + OBJ_DESTRUCT(&(replyreq->rep_target_convertor)); +} + + +OBJ_CLASS_INSTANCE(ompi_osc_rdma_replyreq_t, opal_list_item_t, + ompi_osc_rdma_replyreq_construct, + ompi_osc_rdma_replyreq_destruct); diff --git a/ompi/mca/osc/rdma/osc_rdma_replyreq.h b/ompi/mca/osc/rdma/osc_rdma_replyreq.h new file mode 100644 index 0000000000..1de566e8ff --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma_replyreq.h @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OMPI_OSC_PT2PT_REPLYREQ_H +#define OMPI_OSC_PT2PT_REPLYREQ_H + +#include "osc_rdma.h" +#include "osc_rdma_longreq.h" + +#include "opal/class/opal_list.h" +#include "opal/threads/mutex.h" +#include "ompi/datatype/datatype.h" +#include "ompi/datatype/convertor.h" +#include "ompi/communicator/communicator.h" +#include "ompi/proc/proc.h" +#include "ompi/op/op.h" +#include "ompi/mca/pml/pml.h" + + +struct ompi_osc_rdma_replyreq_t { + opal_list_item_t super; + + /** pointer to the module that created the replyreq */ + ompi_osc_rdma_module_t *rep_module; + + /** Datatype for the target side of the operation */ + struct ompi_datatype_t *rep_target_datatype; + /** Convertor for the target. Always setup for send. */ + ompi_convertor_t rep_target_convertor; + /** packed size of message on the target side */ + size_t rep_target_bytes_packed; + + /** rank in module's communicator for origin of operation */ + int rep_origin_rank; + /** pointer to the proc structure for the origin of the operation */ + ompi_proc_t *rep_origin_proc; + + ompi_ptr_t rep_origin_sendreq; +}; +typedef struct ompi_osc_rdma_replyreq_t ompi_osc_rdma_replyreq_t; +OBJ_CLASS_DECLARATION(ompi_osc_rdma_replyreq_t); + + +/** allocate and populate a replyreq structure. datatype is + RETAINed for the life of the replyreq */ +int +ompi_osc_rdma_replyreq_alloc_init(ompi_osc_rdma_module_t *module, + int origin, + ompi_ptr_t origin_request, + int target_displacement, + int target_count, + struct ompi_datatype_t *datatype, + ompi_osc_rdma_replyreq_t **replyreq); + + +static inline int +ompi_osc_rdma_replyreq_alloc(ompi_osc_rdma_module_t *module, + int origin_rank, + ompi_osc_rdma_replyreq_t **replyreq) +{ + int ret; + opal_free_list_item_t *item; + ompi_proc_t *proc = module->p2p_comm->c_pml_procs[origin_rank]->proc_ompi; + + /* BWB - FIX ME - is this really the right return code? */ + if (NULL == proc) return OMPI_ERR_OUT_OF_RESOURCE; + + OPAL_FREE_LIST_GET(&mca_osc_rdma_component.p2p_c_replyreqs, + item, ret); + if (OMPI_SUCCESS != ret) return ret; + *replyreq = (ompi_osc_rdma_replyreq_t*) item; + + (*replyreq)->rep_module = module; + (*replyreq)->rep_origin_rank = origin_rank; + (*replyreq)->rep_origin_proc = proc; + + return OMPI_SUCCESS; +} + + +static inline int +ompi_osc_rdma_replyreq_init_target(ompi_osc_rdma_replyreq_t *replyreq, + void *target_addr, + int target_count, + struct ompi_datatype_t *target_dt) +{ + OBJ_RETAIN(target_dt); + replyreq->rep_target_datatype = target_dt; + + ompi_convertor_copy_and_prepare_for_send(replyreq->rep_origin_proc->proc_convertor, + target_dt, + target_count, + target_addr, + 0, + &(replyreq->rep_target_convertor)); + ompi_convertor_get_packed_size(&replyreq->rep_target_convertor, + &replyreq->rep_target_bytes_packed); + + return OMPI_SUCCESS; +} + + +static inline int +ompi_osc_rdma_replyreq_init_origin(ompi_osc_rdma_replyreq_t *replyreq, + ompi_ptr_t origin_request) +{ + replyreq->rep_origin_sendreq = origin_request; + + return OMPI_SUCCESS; +} + + +static inline int +ompi_osc_rdma_replyreq_free(ompi_osc_rdma_replyreq_t *replyreq) +{ + ompi_convertor_cleanup(&replyreq->rep_target_convertor); + + OBJ_RELEASE(replyreq->rep_target_datatype); + + OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.p2p_c_replyreqs, + (opal_list_item_t*) replyreq); + + return OMPI_SUCCESS; +} + +#endif /* OMPI_OSC_PT2PT_REPLYREQ_H */ diff --git a/ompi/mca/osc/rdma/osc_rdma_sendreq.c b/ompi/mca/osc/rdma/osc_rdma_sendreq.c new file mode 100644 index 0000000000..1385f58c50 --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma_sendreq.c @@ -0,0 +1,83 @@ + +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "osc_rdma_sendreq.h" + +#include "ompi/datatype/convertor.h" + + +int +ompi_osc_rdma_sendreq_alloc_init(ompi_osc_rdma_req_type_t req_type, + void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, int target_disp, int target_count, + struct ompi_datatype_t *target_dt, + ompi_osc_rdma_module_t *module, + ompi_osc_rdma_sendreq_t **sendreq) +{ + int ret; + + /* allocate a sendreq */ + ret = ompi_osc_rdma_sendreq_alloc(module, target, + sendreq); + if (OMPI_SUCCESS != ret) return ret; + + /* initialize local side of sendreq */ + ret = ompi_osc_rdma_sendreq_init_origin(*sendreq, + req_type, + origin_addr, + origin_count, + origin_dt); + if (OMPI_SUCCESS != ret) { + ompi_osc_rdma_sendreq_free(*sendreq); + return ret; + } + + /* initialize remote side of sendreq */ + ret = ompi_osc_rdma_sendreq_init_target(*sendreq, + target_disp, + target_count, + target_dt); + if (OMPI_SUCCESS != ret) { + ompi_osc_rdma_sendreq_free(*sendreq); + return ret; + } + + return OMPI_SUCCESS; +} + + +static void ompi_osc_rdma_sendreq_construct(ompi_osc_rdma_sendreq_t *req) +{ + req->super.req_type = OMPI_REQUEST_WIN; + req->super.req_free = NULL; + req->super.req_cancel = NULL; + OBJ_CONSTRUCT(&(req->req_origin_convertor), ompi_convertor_t); +} + + +static void ompi_osc_rdma_sendreq_destruct(ompi_osc_rdma_sendreq_t *req) +{ + OBJ_DESTRUCT(&(req->req_origin_convertor)); +} + + +OBJ_CLASS_INSTANCE(ompi_osc_rdma_sendreq_t, ompi_request_t, + ompi_osc_rdma_sendreq_construct, + ompi_osc_rdma_sendreq_destruct); diff --git a/ompi/mca/osc/rdma/osc_rdma_sendreq.h b/ompi/mca/osc/rdma/osc_rdma_sendreq.h new file mode 100644 index 0000000000..1dc780a95d --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma_sendreq.h @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OMPI_OSC_PT2PT_SENDREQ_H +#define OMPI_OSC_PT2PT_SENDREQ_H + +#include "osc_rdma.h" +#include "osc_rdma_longreq.h" + +#include "opal/class/opal_list.h" +#include "opal/threads/mutex.h" +#include "ompi/datatype/datatype.h" +#include "ompi/datatype/convertor.h" +#include "ompi/communicator/communicator.h" +#include "ompi/proc/proc.h" +#include "ompi/op/op.h" +#include "ompi/mca/pml/pml.h" + +typedef enum { + OMPI_OSC_PT2PT_GET, + OMPI_OSC_PT2PT_ACC, + OMPI_OSC_PT2PT_PUT +} ompi_osc_rdma_req_type_t; + + +struct ompi_osc_rdma_sendreq_t { + ompi_request_t super; + + /** type of sendreq (from ompi_osc_rdma_req_type_t) */ + ompi_osc_rdma_req_type_t req_type; + /** pointer to the module that created the sendreq */ + ompi_osc_rdma_module_t *req_module; + + /** Datatype for the origin side of the operation */ + struct ompi_datatype_t *req_origin_datatype; + /** Convertor for the origin side of the operation. Setup for + either send (Put / Accumulate) or receive (Get) */ + ompi_convertor_t req_origin_convertor; + /** packed size of message on the origin side */ + size_t req_origin_bytes_packed; + + /** rank in module's communicator for target of operation */ + int req_target_rank; + /** pointer to the proc structure for the target of the operation */ + ompi_proc_t *req_target_proc; + + /** displacement on target */ + int req_target_disp; + /** datatype count on target */ + int req_target_count; + /** datatype on target */ + struct ompi_datatype_t *req_target_datatype; + + /** op index on the target */ + int req_op_id; +}; +typedef struct ompi_osc_rdma_sendreq_t ompi_osc_rdma_sendreq_t; +OBJ_CLASS_DECLARATION(ompi_osc_rdma_sendreq_t); + + +/** allocate and populate a sendreq structure. Both datatypes are + RETAINed for the life of the sendreq */ +int +ompi_osc_rdma_sendreq_alloc_init(ompi_osc_rdma_req_type_t req_type, + void *origin_addr, int origin_count, + struct ompi_datatype_t *origin_dt, + int target, int target_disp, int target_count, + struct ompi_datatype_t *target_datatype, + ompi_osc_rdma_module_t *module, + ompi_osc_rdma_sendreq_t **sendreq); + +static inline int +ompi_osc_rdma_sendreq_alloc(ompi_osc_rdma_module_t *module, + int target_rank, + ompi_osc_rdma_sendreq_t **sendreq) +{ + int ret; + opal_free_list_item_t *item; + ompi_proc_t *proc = module->p2p_comm->c_pml_procs[target_rank]->proc_ompi; + + /* BWB - FIX ME - is this really the right return code? */ + if (NULL == proc) return OMPI_ERR_OUT_OF_RESOURCE; + + OPAL_FREE_LIST_GET(&mca_osc_rdma_component.p2p_c_sendreqs, + item, ret); + if (OMPI_SUCCESS != ret) return ret; + *sendreq = (ompi_osc_rdma_sendreq_t*) item; + + (*sendreq)->req_module = module; + (*sendreq)->req_target_rank = target_rank; + (*sendreq)->req_target_proc = proc; + + return OMPI_SUCCESS; +} + + +static inline int +ompi_osc_rdma_sendreq_init_origin(ompi_osc_rdma_sendreq_t *sendreq, + ompi_osc_rdma_req_type_t req_type, + void *origin_addr, + int origin_count, + struct ompi_datatype_t *origin_dt) +{ + OBJ_RETAIN(origin_dt); + sendreq->req_origin_datatype = origin_dt; + sendreq->req_type = req_type; + + if (req_type != OMPI_OSC_PT2PT_GET) { + ompi_convertor_copy_and_prepare_for_send(sendreq->req_target_proc->proc_convertor, + origin_dt, + origin_count, + origin_addr, + 0, + &(sendreq->req_origin_convertor)); + ompi_convertor_get_packed_size(&sendreq->req_origin_convertor, + &sendreq->req_origin_bytes_packed); + } else { + ompi_convertor_copy_and_prepare_for_recv(sendreq->req_target_proc->proc_convertor, + origin_dt, + origin_count, + origin_addr, + 0, + &(sendreq->req_origin_convertor)); + ompi_convertor_get_packed_size(&sendreq->req_origin_convertor, + &sendreq->req_origin_bytes_packed); + } + + return OMPI_SUCCESS; +} + + +static inline int +ompi_osc_rdma_sendreq_init_target(ompi_osc_rdma_sendreq_t *sendreq, + int target_disp, + int target_count, + struct ompi_datatype_t *target_datatype) +{ + OBJ_RETAIN(target_datatype); + + sendreq->req_target_disp = target_disp; + sendreq->req_target_count = target_count; + sendreq->req_target_datatype = target_datatype; + + return OMPI_SUCCESS; +} + + +static inline int +ompi_osc_rdma_sendreq_free(ompi_osc_rdma_sendreq_t *sendreq) +{ + ompi_convertor_cleanup(&sendreq->req_origin_convertor); + + OBJ_RELEASE(sendreq->req_target_datatype); + OBJ_RELEASE(sendreq->req_origin_datatype); + + OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.p2p_c_sendreqs, + (opal_list_item_t*) sendreq); + + return OMPI_SUCCESS; +} + +#endif /* OMPI_OSC_PT2PT_SENDREQ_H */ diff --git a/ompi/mca/osc/rdma/osc_rdma_sync.c b/ompi/mca/osc/rdma/osc_rdma_sync.c new file mode 100644 index 0000000000..61432066f6 --- /dev/null +++ b/ompi/mca/osc/rdma/osc_rdma_sync.c @@ -0,0 +1,620 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "osc_rdma.h" +#include "osc_rdma_sendreq.h" +#include "osc_rdma_longreq.h" +#include "osc_rdma_header.h" +#include "osc_rdma_data_move.h" + +#include "mpi.h" +#include "opal/runtime/opal_progress.h" +#include "opal/threads/mutex.h" +#include "ompi/communicator/communicator.h" + + +/* should have p2p_lock before calling */ +static inline void +ompi_osc_rdma_progress(ompi_osc_rdma_module_t *module) +{ + if (0 != opal_list_get_size(&(module->p2p_long_msgs))) { + opal_list_item_t *item, *next; + + OPAL_THREAD_LOCK(&(module->p2p_lock)); + + /* Have to go the convoluted while() route instead of a for() + loop because the callback will likely remove the request + from the list and free it, and that would lead to much + badness. */ + next = opal_list_get_first(&(module->p2p_long_msgs)); + while (opal_list_get_end(&(module->p2p_long_msgs)) != (item = next)) { + ompi_osc_rdma_longreq_t *longreq = + (ompi_osc_rdma_longreq_t*) item; + int ret, completed; + next = opal_list_get_next(item); + + ret = ompi_request_test(&(longreq->req_pml_req), &completed, NULL); + /* BWB - FIX ME - error handling */ + if (completed > 0) { + longreq->req_comp_cb(longreq); + } + } + + OPAL_THREAD_UNLOCK(&(module->p2p_lock)); + } + opal_progress(); +} + +static inline void +ompi_osc_rdma_flip_sendreqs(ompi_osc_rdma_module_t *module) +{ + short *tmp; + + OPAL_THREAD_LOCK(&(module->p2p_lock)); + + tmp = module->p2p_copy_num_pending_sendreqs; + module->p2p_copy_num_pending_sendreqs = + module->p2p_num_pending_sendreqs; + module->p2p_num_pending_sendreqs = tmp; + memset(module->p2p_num_pending_sendreqs, 0, + sizeof(short) * ompi_comm_size(module->p2p_comm)); + + /* Copy in all the pending requests */ + opal_list_join(&module->p2p_copy_pending_sendreqs, + opal_list_get_end(&module->p2p_copy_pending_sendreqs), + &module->p2p_pending_sendreqs); + + OPAL_THREAD_UNLOCK(&(module->p2p_lock)); +} + + +int +ompi_osc_rdma_module_fence(int assert, ompi_win_t *win) +{ + short incoming_reqs; + int ret = OMPI_SUCCESS, i; + + if (0 != (assert & MPI_MODE_NOPRECEDE)) { + int num_pending; + + /* check that the user didn't lie to us - since NOPRECEDED + must be specified by all processes if it is specified by + any process, if we see this it is safe to assume that there + are no pending operations anywhere needed to close out this + epoch. */ + OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock)); + num_pending = opal_list_get_size(&(P2P_MODULE(win)->p2p_pending_sendreqs)); + OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock)); + if (0 != num_pending) { + return MPI_ERR_RMA_SYNC; + } + + } else { + opal_list_item_t *item; + + ompi_osc_rdma_flip_sendreqs(P2P_MODULE(win)); + + switch (P2P_MODULE(win)->p2p_fence_sync_type) { + + /* find out how much data everyone is going to send us. Need + to have the lock during this period so that we have a sane + view of the number of sendreqs */ + case OSC_SYNC_REDUCE_SCATTER: + ret = P2P_MODULE(win)->p2p_comm-> + c_coll.coll_reduce_scatter(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs, + &incoming_reqs, + P2P_MODULE(win)->p2p_fence_coll_counts, + MPI_SHORT, + MPI_SUM, + P2P_MODULE(win)->p2p_comm); + break; + + case OSC_SYNC_ALLREDUCE: + ret = P2P_MODULE(win)->p2p_comm-> + c_coll.coll_allreduce(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs, + P2P_MODULE(win)->p2p_fence_coll_results, + ompi_comm_size(P2P_MODULE(win)->p2p_comm), + MPI_SHORT, + MPI_SUM, + P2P_MODULE(win)->p2p_comm); + incoming_reqs = P2P_MODULE(win)-> + p2p_fence_coll_results[P2P_MODULE(win)->p2p_comm->c_my_rank]; + break; + + case OSC_SYNC_ALLTOALL: + ret = P2P_MODULE(win)->p2p_comm-> + c_coll.coll_alltoall(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs, + 1, + MPI_SHORT, + P2P_MODULE(win)->p2p_fence_coll_results, + 1, + MPI_SHORT, + P2P_MODULE(win)->p2p_comm); + incoming_reqs = 0; + for (i = 0 ; i < ompi_comm_size(P2P_MODULE(win)->p2p_comm) ; ++i) { + incoming_reqs += P2P_MODULE(win)->p2p_fence_coll_results[i]; + } + break; + default: + assert(0 == 1); + } + + if (OMPI_SUCCESS != ret) { + /* put the stupid data back for the user. This is not + cheap, but the user lost his data if we don't. */ + OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock)); + opal_list_join(&P2P_MODULE(win)->p2p_pending_sendreqs, + opal_list_get_end(&P2P_MODULE(win)->p2p_pending_sendreqs), + &P2P_MODULE(win)->p2p_copy_pending_sendreqs); + + for (i = 0 ; i < ompi_comm_size(P2P_MODULE(win)->p2p_comm) ; ++i) { + P2P_MODULE(win)->p2p_num_pending_sendreqs[i] += + P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[i]; + } + + OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock)); + return ret; + } + + /* possible we've already received a couple in messages, so + atomicall add however many we're going to wait for */ + OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_in), incoming_reqs); + OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out), + opal_list_get_size(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs))); + + opal_output(-1, "fence: waiting on %d in and %d out", + P2P_MODULE(win)->p2p_num_pending_in, + P2P_MODULE(win)->p2p_num_pending_out); + + /* try to start all the requests. We've copied everything we + need out of pending_sendreqs, so don't need the lock + here */ + while (NULL != + (item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) { + ompi_osc_rdma_sendreq_t *req = + (ompi_osc_rdma_sendreq_t*) item; + + ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), req); + + if (OMPI_SUCCESS != ret) { + opal_output(0, "fence: failure in starting sendreq (%d). Will try later.", + ret); + opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item); + } + } + + /* now we know how many things we're waiting for - wait for them... */ + while (P2P_MODULE(win)->p2p_num_pending_in > 0 || + 0 != P2P_MODULE(win)->p2p_num_pending_out) { + ompi_osc_rdma_progress(P2P_MODULE(win)); + } + } + + /* all transfers are done - back to the real world we go */ + if (0 == (assert & MPI_MODE_NOSUCCEED)) { + ompi_win_set_mode(win, OMPI_WIN_FENCE); + } else { + ompi_win_set_mode(win, 0); + } + + return OMPI_SUCCESS; +} + + +int +ompi_osc_rdma_module_start(ompi_group_t *group, + int assert, + ompi_win_t *win) +{ + assert(P2P_MODULE(win)->p2p_num_pending_in == 0); + assert(P2P_MODULE(win)->p2p_num_pending_out == 0); + + OBJ_RETAIN(group); + /* BWB - do I need this? */ + ompi_group_increment_proc_count(group); + + OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock)); + assert(NULL == P2P_MODULE(win)->p2p_sc_group); + P2P_MODULE(win)->p2p_sc_group = group; + OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock)); + + /* Set our mode to access w/ start */ + ompi_win_set_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_STARTED); + + /* possible we've already received a couple in messages, so + atomicall add however many we're going to wait for */ + OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_in), + ompi_group_size(P2P_MODULE(win)->p2p_sc_group)); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_rdma_module_complete(ompi_win_t *win) +{ + int i; + int ret = OMPI_SUCCESS; + ompi_group_t *group; + opal_list_item_t *item; + + /* wait for all the post messages */ + while (0 != P2P_MODULE(win)->p2p_num_pending_in) { + ompi_osc_rdma_progress(P2P_MODULE(win)); + } + + ompi_osc_rdma_flip_sendreqs(P2P_MODULE(win)); + + /* for each process in group, send a control message with number + of updates coming, then start all the requests */ + for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_sc_group) ; ++i) { + int comm_rank = -1, j; + /* no need to increment ref count - the communicator isn't + going anywhere while we're here */ + ompi_group_t *comm_group = P2P_MODULE(win)->p2p_comm->c_local_group; + + /* find the rank in the communicator associated with this windows */ + for (j = 0 ; + j < ompi_group_size(comm_group) ; + ++j) { + if (P2P_MODULE(win)->p2p_sc_group->grp_proc_pointers[i] == + comm_group->grp_proc_pointers[j]) { + comm_rank = j; + break; + } + } + if (comm_rank == -1) { + ret = MPI_ERR_RMA_SYNC; + goto cleanup; + } + + OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out), + P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank]); + ompi_osc_rdma_control_send(P2P_MODULE(win), + P2P_MODULE(win)->p2p_sc_group->grp_proc_pointers[i], + OMPI_OSC_PT2PT_HDR_COMPLETE, + P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank], + 0); + } + + /* try to start all the requests. We've copied everything we + need out of pending_sendreqs, so don't need the lock + here */ + while (NULL != + (item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) { + ompi_osc_rdma_sendreq_t *req = + (ompi_osc_rdma_sendreq_t*) item; + + ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), req); + + if (OMPI_SUCCESS != ret) { + opal_output(0, "complete: failure in starting sendreq (%d). Will try later.", + ret); + opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item); + } + } + + /* wait for all the requests */ + while (0 != P2P_MODULE(win)->p2p_num_pending_out) { + ompi_osc_rdma_progress(P2P_MODULE(win)); + } + + cleanup: + /* set our mode back to nothing */ + ompi_win_set_mode(win, 0); + + OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock)); + group = P2P_MODULE(win)->p2p_sc_group; + P2P_MODULE(win)->p2p_sc_group = NULL; + OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock)); + + /* BWB - do I need this? */ + ompi_group_decrement_proc_count(group); + OBJ_RELEASE(group); + + return ret; +} + +int +ompi_osc_rdma_module_post(ompi_group_t *group, + int assert, + ompi_win_t *win) +{ + int i; + + assert(P2P_MODULE(win)->p2p_num_pending_in == 0); + assert(P2P_MODULE(win)->p2p_num_pending_out == 0); + + OBJ_RETAIN(group); + /* BWB - do I need this? */ + ompi_group_increment_proc_count(group); + + OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock)); + assert(NULL == P2P_MODULE(win)->p2p_pw_group); + P2P_MODULE(win)->p2p_pw_group = group; + OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock)); + + /* Set our mode to expose w/ post */ + ompi_win_set_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED); + + /* list how many complete counters we're still waiting on */ + OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out), + ompi_group_size(P2P_MODULE(win)->p2p_pw_group)); + + /* send a hello counter to everyone in group */ + for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_pw_group) ; ++i) { + ompi_osc_rdma_control_send(P2P_MODULE(win), + group->grp_proc_pointers[i], + OMPI_OSC_PT2PT_HDR_POST, 1, 0); + } + + return OMPI_SUCCESS; +} + + +int +ompi_osc_rdma_module_wait(ompi_win_t *win) +{ + ompi_group_t *group; + + while (0 != (P2P_MODULE(win)->p2p_num_pending_in) || + 0 != (P2P_MODULE(win)->p2p_num_pending_out)) { + ompi_osc_rdma_progress(P2P_MODULE(win)); + } + + ompi_win_set_mode(win, 0); + + OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock)); + group = P2P_MODULE(win)->p2p_pw_group; + P2P_MODULE(win)->p2p_pw_group = NULL; + OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock)); + + /* BWB - do I need this? */ + ompi_group_decrement_proc_count(group); + OBJ_RELEASE(group); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_rdma_module_test(ompi_win_t *win, + int *flag) +{ + ompi_group_t *group; + + if (0 != (P2P_MODULE(win)->p2p_num_pending_in) || + 0 != (P2P_MODULE(win)->p2p_num_pending_out)) { + ompi_osc_rdma_progress(P2P_MODULE(win)); + if (0 != (P2P_MODULE(win)->p2p_num_pending_in) || + 0 != (P2P_MODULE(win)->p2p_num_pending_out)) { + *flag = 0; + return OMPI_SUCCESS; + } + } + + *flag = 1; + + ompi_win_set_mode(win, 0); + + OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock)); + group = P2P_MODULE(win)->p2p_pw_group; + P2P_MODULE(win)->p2p_pw_group = NULL; + OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock)); + + /* BWB - do I need this? */ + ompi_group_decrement_proc_count(group); + OBJ_RELEASE(group); + + return OMPI_SUCCESS; +} + + +struct ompi_osc_rdma_pending_lock_t { + opal_list_item_t super; + ompi_proc_t *proc; + int32_t lock_type; +}; +typedef struct ompi_osc_rdma_pending_lock_t ompi_osc_rdma_pending_lock_t; +OBJ_CLASS_INSTANCE(ompi_osc_rdma_pending_lock_t, opal_list_item_t, + NULL, NULL); + + +int +ompi_osc_rdma_module_lock(int lock_type, + int target, + int assert, + ompi_win_t *win) +{ + ompi_proc_t *proc = P2P_MODULE(win)->p2p_comm->c_pml_procs[target]->proc_ompi; + + assert(lock_type != 0); + + /* set our mode on the window */ + ompi_win_set_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_LOCK_ACCESS); + + opal_output(-1, "%d sending lock request to %d", + P2P_MODULE(win)->p2p_comm->c_my_rank, target); + /* generate a lock request */ + ompi_osc_rdma_control_send(P2P_MODULE(win), + proc, + OMPI_OSC_PT2PT_HDR_LOCK_REQ, + P2P_MODULE(win)->p2p_comm->c_my_rank, + lock_type); + + /* return */ + return OMPI_SUCCESS; +} + + +int +ompi_osc_rdma_module_unlock(int target, + ompi_win_t *win) +{ + int32_t out_count; + opal_list_item_t *item; + int ret; + ompi_proc_t *proc = P2P_MODULE(win)->p2p_comm->c_pml_procs[target]->proc_ompi; + + while (0 == P2P_MODULE(win)->p2p_lock_received_ack) { + ompi_osc_rdma_progress(P2P_MODULE(win)); + } + P2P_MODULE(win)->p2p_lock_received_ack = 0; + + /* start all the requests */ + ompi_osc_rdma_flip_sendreqs(P2P_MODULE(win)); + + /* try to start all the requests. We've copied everything we need + out of pending_sendreqs, so don't need the lock here */ + out_count = opal_list_get_size(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)); + + OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out), out_count); + + while (NULL != + (item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) { + ompi_osc_rdma_sendreq_t *req = + (ompi_osc_rdma_sendreq_t*) item; + + ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), req); + + if (OMPI_SUCCESS != ret) { + opal_output(0, "unlock: failure in starting sendreq (%d). Will try later.", + ret); + opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item); + } + } + + /* wait for all the requests */ + while (0 != P2P_MODULE(win)->p2p_num_pending_out) { + ompi_osc_rdma_progress(P2P_MODULE(win)); + } + + /* send the unlock request */ + opal_output(-1, "%d sending unlock request to %d", + P2P_MODULE(win)->p2p_comm->c_my_rank, target); + ompi_osc_rdma_control_send(P2P_MODULE(win), + proc, + OMPI_OSC_PT2PT_HDR_UNLOCK_REQ, + P2P_MODULE(win)->p2p_comm->c_my_rank, + out_count); + + /* set our mode on the window */ + ompi_win_set_mode(win, 0); + + return OMPI_SUCCESS; +} + + +int +ompi_osc_rdma_passive_lock(ompi_osc_rdma_module_t *module, + int32_t origin, + int32_t lock_type) +{ + bool send_ack = false; + int ret = OMPI_SUCCESS; + ompi_proc_t *proc = module->p2p_comm->c_pml_procs[origin]->proc_ompi; + ompi_osc_rdma_pending_lock_t *new_pending; + + OPAL_THREAD_LOCK(&(module->p2p_lock)); + if (lock_type == MPI_LOCK_EXCLUSIVE) { + if (module->p2p_lock_status == 0) { + module->p2p_lock_status = MPI_LOCK_EXCLUSIVE; + send_ack = true; + } else { + opal_output(-1, "%d queuing lock request from %d (%d)", + module->p2p_comm->c_my_rank, origin, lock_type); + new_pending = OBJ_NEW(ompi_osc_rdma_pending_lock_t); + new_pending->proc = proc; + new_pending->lock_type = lock_type; + opal_list_append(&(module->p2p_locks_pending), &(new_pending->super)); + } + } else if (lock_type == MPI_LOCK_SHARED) { + if (module->p2p_lock_status != MPI_LOCK_EXCLUSIVE) { + module->p2p_lock_status = MPI_LOCK_SHARED; + module->p2p_shared_count++; + send_ack = true; + } else { + opal_output(-1, "queuing lock request from %d (%d)", + module->p2p_comm->c_my_rank, origin, lock_type); + new_pending = OBJ_NEW(ompi_osc_rdma_pending_lock_t); + new_pending->proc = proc; + new_pending->lock_type = lock_type; + opal_list_append(&(module->p2p_locks_pending), &(new_pending->super)); + } + } else { + ret = OMPI_ERROR; + } + OPAL_THREAD_UNLOCK(&(module->p2p_lock)); + + if (send_ack) { + opal_output(-1, "%d sending lock ack to %d", + module->p2p_comm->c_my_rank, origin); + ompi_osc_rdma_control_send(module, proc, + OMPI_OSC_PT2PT_HDR_LOCK_REQ, + module->p2p_comm->c_my_rank, + OMPI_SUCCESS); + } + + return OMPI_SUCCESS; +} + + +int +ompi_osc_rdma_passive_unlock(ompi_osc_rdma_module_t *module, + int32_t origin, + int32_t count) +{ + ompi_osc_rdma_pending_lock_t *new_pending = NULL; + + assert(module->p2p_lock_status != 0); + + OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), count); + + while (0 != module->p2p_num_pending_in) { + ompi_osc_rdma_progress(module); + } + + OPAL_THREAD_LOCK(&(module->p2p_lock)); + if (module->p2p_lock_status == MPI_LOCK_EXCLUSIVE) { + module->p2p_lock_status = 0; + } else { + module->p2p_shared_count--; + if (module->p2p_shared_count == 0) { + module->p2p_lock_status = 0; + } + } + + /* if we were really unlocked, see if we have more to process */ + new_pending = (ompi_osc_rdma_pending_lock_t*) + opal_list_remove_first(&(module->p2p_locks_pending)); + OPAL_THREAD_UNLOCK(&(module->p2p_lock)); + + if (NULL != new_pending) { + opal_output(-1, "sending lock request to proc"); + /* set lock state and generate a lock request */ + module->p2p_lock_status = new_pending->lock_type; + ompi_osc_rdma_control_send(module, + new_pending->proc, + OMPI_OSC_PT2PT_HDR_LOCK_REQ, + module->p2p_comm->c_my_rank, + OMPI_SUCCESS); + OBJ_DESTRUCT(new_pending); + } + + return OMPI_SUCCESS; +}