1
1

Some much needed cleanup of the rdma one-sided component, similar to

r14703 for the point-to-point component.

  * Associate the list of long message requests to poll with the 
    component, not the individual modules
  * add progress thread that sits on the OMPI request structure
    and wakes up at the appropriate time to poll the message
    list to move long messages asynchronously.
  * Instead of calling opal_progress() all over the place, move
    to using the condition variables like the rest of the project.
    Has the advantage of moving it slightly further along in the
    becoming thread safe thing.
  * Fix a problem with the passive side of unlock where it could 
    go recursive and cause all kinds of problems, especially 
    when progress threads are used. Instead, have two parts of 
    passive unlock -- one to start the unlock, and another to 
    complete the lock and send the ack back. The data moving 
    code trips the second at the right time. 

This commit was SVN r14751.

The following SVN revision numbers were found above:
  r14703 --> open-mpi/ompi@2b4b754925
Этот коммит содержится в:
Brian Barrett 2007-05-24 15:41:24 +00:00
родитель 48c026ce6b
Коммит 1a9f48c89d
13 изменённых файлов: 910 добавлений и 770 удалений

Просмотреть файл

@ -7,6 +7,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -33,57 +35,71 @@ ompi_osc_rdma_module_free(ompi_win_t *win)
{
int ret = OMPI_SUCCESS;
int tmp;
ompi_osc_rdma_module_t *module = P2P_MODULE(win);
ompi_osc_rdma_module_t *module = GET_MODULE(win);
opal_output_verbose(1, ompi_osc_base_output,
"rdma component destroying window with id %d",
module->m_comm->c_contextid);
OPAL_THREAD_LOCK(&module->m_lock);
while (OMPI_WIN_EXPOSE_EPOCH & ompi_win_get_mode(win)) {
opal_progress();
opal_condition_wait(&module->m_cond, &module->m_lock);
}
opal_output_verbose(50, ompi_osc_base_output,
"Finalizing window %d", module->p2p_comm->c_contextid);
OPAL_THREAD_UNLOCK(&module->m_lock);
/* finish with a barrier */
if (ompi_group_size(win->w_group) > 1) {
ret = module->p2p_comm->c_coll.coll_barrier(module->p2p_comm);
ret = module->m_comm->c_coll.coll_barrier(module->m_comm);
}
/* remove window information */
win->w_osc_module = NULL;
/* remove from component information */
OPAL_THREAD_LOCK(&mca_osc_rdma_component.p2p_c_lock);
tmp = opal_hash_table_remove_value_uint32(&mca_osc_rdma_component.p2p_c_modules,
module->p2p_comm->c_contextid);
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
tmp = opal_hash_table_remove_value_uint32(&mca_osc_rdma_component.c_modules,
module->m_comm->c_contextid);
/* only take the output of hast_table_remove if there wasn't already an error */
ret = (ret != OMPI_SUCCESS) ? ret : tmp;
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.p2p_c_lock);
OBJ_DESTRUCT(&(module->p2p_locks_pending));
if (0 == opal_hash_table_get_size(&mca_osc_rdma_component.c_modules)) {
#if OMPI_ENABLE_PROGRESS_THREADS
void *foo;
free(module->p2p_sc_remote_ranks);
free(module->p2p_sc_remote_active_ranks);
assert(module->p2p_sc_group == NULL);
assert(module->p2p_pw_group == NULL);
free(module->p2p_fence_coll_counts);
mca_osc_rdma_component.c_thread_run = false;
opal_condition_broadcast(&ompi_request_cond);
opal_thread_join(&mca_osc_rdma_component.c_thread, &foo);
#else
opal_progress_unregister(ompi_osc_rdma_component_progress);
#endif
}
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
free(module->p2p_copy_num_pending_sendreqs);
OBJ_DESTRUCT(&(module->p2p_copy_pending_sendreqs));
win->w_osc_module = NULL;
OBJ_DESTRUCT(&(module->p2p_long_msgs));
OBJ_DESTRUCT(&module->m_unlocks_pending);
OBJ_DESTRUCT(&module->m_locks_pending);
OBJ_DESTRUCT(&module->m_copy_pending_sendreqs);
OBJ_DESTRUCT(&module->m_pending_sendreqs);
OBJ_DESTRUCT(&module->m_acc_lock);
OBJ_DESTRUCT(&module->m_cond);
OBJ_DESTRUCT(&module->m_lock);
free(module->p2p_num_pending_sendreqs);
if (NULL != module->m_sc_remote_ranks) {
free(module->m_sc_remote_ranks);
}
if (NULL != module->m_sc_remote_active_ranks) {
free(module->m_sc_remote_active_ranks);
}
if (NULL != module->m_fence_coll_counts) {
free(module->m_fence_coll_counts);
}
if (NULL != module->m_copy_num_pending_sendreqs) {
free(module->m_copy_num_pending_sendreqs);
}
if (NULL != module->m_num_pending_sendreqs) {
free(module->m_num_pending_sendreqs);
}
if (NULL != module->m_comm) ompi_comm_free(&module->m_comm);
OBJ_DESTRUCT(&(module->p2p_pending_sendreqs));
ompi_comm_free(&(module->p2p_comm));
module->p2p_comm = NULL;
module->p2p_win = NULL;
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
if (NULL != module) free(module);
return ret;
}

Просмотреть файл

@ -7,6 +7,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -21,32 +23,50 @@
#include "opal/class/opal_free_list.h"
#include "opal/class/opal_hash_table.h"
#include "ompi/mca/osc/osc.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/win/win.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/osc/osc.h"
#include "ompi/mca/btl/btl.h"
BEGIN_C_DECLS
struct ompi_osc_rdma_component_t {
/** Extend the basic osc component interface */
ompi_osc_base_component_t super;
/** store the state of progress threads for this instance of OMPI */
bool p2p_c_have_progress_threads;
bool c_have_progress_threads;
/** lock access to datastructures in the component structure */
opal_mutex_t p2p_c_lock;
opal_mutex_t c_lock;
/** List of ompi_osc_rdma_module_ts currently in existance.
Needed so that received fragments can be dispatched to the
correct module */
opal_hash_table_t p2p_c_modules;
opal_hash_table_t c_modules;
/** Lock for request management */
opal_mutex_t c_request_lock;
/** Condition variable for request management */
opal_condition_t c_request_cond;
/** free list of ompi_osc_rdma_sendreq_t structures */
opal_free_list_t p2p_c_sendreqs;
opal_free_list_t c_sendreqs;
/** free list of ompi_osc_rdma_replyreq_t structures */
opal_free_list_t p2p_c_replyreqs;
opal_free_list_t c_replyreqs;
/** free list of ompi_osc_rdma_longreq_t structures */
opal_free_list_t p2p_c_longreqs;
opal_free_list_t c_longreqs;
/** list of outstanding requests, of type ompi_osc_pt2pt_longreq_t */
opal_list_t c_pending_requests;
#if OMPI_ENABLE_PROGRESS_THREADS
opal_thread_t c_thread;
bool c_thread_run;
#endif
bool c_btl_registered;
};
typedef struct ompi_osc_rdma_component_t ompi_osc_rdma_component_t;
@ -56,123 +76,85 @@ struct ompi_osc_rdma_module_t {
ompi_osc_base_module_t super;
/** lock access to data structures in the current module */
opal_mutex_t p2p_lock;
opal_mutex_t m_lock;
/** condition variable for access to current module */
opal_condition_t m_cond;
/** lock for "atomic" window updates from reductions */
opal_mutex_t p2p_acc_lock;
opal_mutex_t m_acc_lock;
/** pointer back to window */
ompi_win_t *p2p_win;
ompi_win_t *m_win;
/** communicator created with this window */
ompi_communicator_t *p2p_comm;
ompi_communicator_t *m_comm;
/** list of ompi_osc_rdma_sendreq_t structures, and includes all
requests for this access epoch that have not already been
started. p2p_lock must be held when modifying this field. */
opal_list_t p2p_pending_sendreqs;
started. m_lock must be held when modifying this field. */
opal_list_t m_pending_sendreqs;
/** list of unsigned int counters for the number of requests to a
particular rank in p2p_comm for this access epoc. p2p_lock
particular rank in m_comm for this access epoc. m_lock
must be held when modifying this field */
unsigned int *p2p_num_pending_sendreqs;
unsigned int *m_num_pending_sendreqs;
/** For MPI_Fence synchronization, the number of messages to send
in epoch. For Start/Complete, the number of updates for this
Complete. For lock, the number of
messages waiting for completion on on the origin side. Not
protected by p2p_lock - must use atomic counter operations. */
volatile int32_t p2p_num_pending_out;
protected by m_lock - must use atomic counter operations. */
volatile int32_t m_num_pending_out;
/** For MPI_Fence synchronization, the number of expected incoming
messages. For Post/Wait, the number of expected updates from
complete. For lock, the number of messages on the passive side
we are waiting for. Not protected by p2p_lock - must use
we are waiting for. Not protected by m_lock - must use
atomic counter operations. */
volatile int32_t p2p_num_pending_in;
volatile int32_t m_num_pending_in;
/** Number of "ping" messages from the remote post group we've
received */
volatile int32_t p2p_num_post_msgs;
volatile int32_t m_num_post_msgs;
/** Number of "count" messages from the remote complete group
we've received */
volatile int32_t p2p_num_complete_msgs;
volatile int32_t m_num_complete_msgs;
/** cyclic counter for a unique tage for long messages. Not
protected by the p2p_lock - must use create_send_tag() to
protected by the m_lock - must use create_send_tag() to
create a send tag */
volatile int32_t p2p_tag_counter;
volatile int32_t m_tag_counter;
/** list of outstanding long messages that must be processes
(ompi_osc_rdma_request_long). Protected by p2p_lock. */
opal_list_t p2p_long_msgs;
opal_list_t m_copy_pending_sendreqs;
unsigned int *m_copy_num_pending_sendreqs;
opal_list_t p2p_copy_pending_sendreqs;
unsigned int *p2p_copy_num_pending_sendreqs;
bool p2p_eager_send;
/** start sending data eagerly */
bool m_eager_send;
/* ********************* FENCE data ************************ */
/* an array of <sizeof(p2p_comm)> ints, each containing the value
/* an array of <sizeof(m_comm)> ints, each containing the value
1. */
int *p2p_fence_coll_counts;
/* an array of <sizeof(p2p_comm)> unsigned ints, for use in experimenting
with different synchronization costs */
unsigned int *p2p_fence_coll_results;
mca_osc_fence_sync_t p2p_fence_sync_type;
int *m_fence_coll_counts;
/* ********************* PWSC data ************************ */
struct ompi_group_t *p2p_pw_group;
struct ompi_group_t *p2p_sc_group;
bool *p2p_sc_remote_active_ranks;
int *p2p_sc_remote_ranks;
struct ompi_group_t *m_pw_group;
struct ompi_group_t *m_sc_group;
bool *m_sc_remote_active_ranks;
int *m_sc_remote_ranks;
/* ********************* LOCK data ************************ */
int32_t p2p_lock_status; /* one of 0, MPI_LOCK_EXCLUSIVE, MPI_LOCK_SHARED */
int32_t p2p_shared_count;
opal_list_t p2p_locks_pending;
int32_t p2p_lock_received_ack;
int32_t m_lock_status; /* one of 0, MPI_LOCK_EXCLUSIVE, MPI_LOCK_SHARED */
int32_t m_shared_count;
opal_list_t m_locks_pending;
opal_list_t m_unlocks_pending;
int32_t m_lock_received_ack;
};
typedef struct ompi_osc_rdma_module_t ompi_osc_rdma_module_t;
/*
* Helper macro for grabbing the module structure from a window instance
*/
#if OMPI_ENABLE_DEBUG
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
OMPI_MODULE_DECLSPEC extern ompi_osc_rdma_component_t mca_osc_rdma_component;
static inline ompi_osc_rdma_module_t* P2P_MODULE(struct ompi_win_t* win)
{
ompi_osc_rdma_module_t *module =
(ompi_osc_rdma_module_t*) win->w_osc_module;
assert(module->p2p_win == win);
return module;
}
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#else
#define P2P_MODULE(win) ((ompi_osc_rdma_module_t*) win->w_osc_module)
#endif
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
OMPI_MODULE_DECLSPEC extern ompi_osc_rdma_component_t mca_osc_rdma_component;
#define GET_MODULE(win) ((ompi_osc_rdma_module_t*) win->w_osc_module)
/*
* Component functions
@ -191,6 +173,7 @@ int ompi_osc_rdma_component_select(struct ompi_win_t *win,
struct ompi_info_t *info,
struct ompi_communicator_t *comm);
int ompi_osc_rdma_component_progress(void);
/*
* Module interface function types
@ -260,8 +243,8 @@ int ompi_osc_rdma_passive_unlock(ompi_osc_rdma_module_t *module,
int32_t origin,
int32_t count);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
int ompi_osc_rdma_passive_unlock_complete(ompi_osc_rdma_module_t *module);
END_C_DECLS
#endif /* OMPI_OSC_RDMA_H */

Просмотреть файл

@ -7,6 +7,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -28,11 +30,11 @@ static int
enqueue_sendreq(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_sendreq_t *sendreq)
{
OPAL_THREAD_LOCK(&(module->p2p_lock));
opal_list_append(&(module->p2p_pending_sendreqs),
OPAL_THREAD_LOCK(&(module->m_lock));
opal_list_append(&(module->m_pending_sendreqs),
(opal_list_item_t*) sendreq);
module->p2p_num_pending_sendreqs[sendreq->req_target_rank]++;
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
module->m_num_pending_sendreqs[sendreq->req_target_rank]++;
OPAL_THREAD_UNLOCK(&(module->m_lock));
return OMPI_SUCCESS;
}
@ -50,7 +52,7 @@ ompi_osc_rdma_module_accumulate(void *origin_addr, int origin_count,
ompi_osc_rdma_sendreq_t *sendreq;
if ((OMPI_WIN_STARTED & ompi_win_get_mode(win)) &&
(!P2P_MODULE(win)->p2p_sc_remote_active_ranks[target])) {
(!GET_MODULE(win)->m_sc_remote_active_ranks[target])) {
return MPI_ERR_RMA_SYNC;
}
@ -82,14 +84,14 @@ ompi_osc_rdma_module_accumulate(void *origin_addr, int origin_count,
target_disp,
target_count,
target_dt,
P2P_MODULE(win),
GET_MODULE(win),
&sendreq);
if (OMPI_SUCCESS != ret) return ret;
sendreq->req_op_id = op->o_f_to_c_index;
/* enqueue sendreq */
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
ret = enqueue_sendreq(GET_MODULE(win), sendreq);
return ret;
}
@ -107,9 +109,10 @@ ompi_osc_rdma_module_get(void *origin_addr,
{
int ret;
ompi_osc_rdma_sendreq_t *sendreq;
ompi_osc_rdma_module_t *module = GET_MODULE(win);
if ((OMPI_WIN_STARTED & ompi_win_get_mode(win)) &&
(!P2P_MODULE(win)->p2p_sc_remote_active_ranks[target])) {
(!module->m_sc_remote_active_ranks[target])) {
return MPI_ERR_RMA_SYNC;
}
@ -126,36 +129,39 @@ ompi_osc_rdma_module_get(void *origin_addr,
/* create sendreq */
ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_RDMA_GET,
origin_addr,
origin_count,
origin_dt,
target,
target_disp,
target_count,
target_dt,
P2P_MODULE(win),
&sendreq);
origin_addr,
origin_count,
origin_dt,
target,
target_disp,
target_count,
target_dt,
module,
&sendreq);
if (OMPI_SUCCESS != ret) return ret;
/* if we're doing fence synchronization, try to actively send
right now */
if (P2P_MODULE(win)->p2p_eager_send &&
if (module->m_eager_send &&
(OMPI_WIN_FENCE & ompi_win_get_mode(win))) {
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), 1);
OPAL_THREAD_LOCK(&module->m_lock);
sendreq->req_module->m_num_pending_out += 1;
module->m_num_pending_sendreqs[sendreq->req_target_rank] += 1;
OPAL_THREAD_UNLOCK(&(module->m_lock));
ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), sendreq);
ret = ompi_osc_rdma_sendreq_send(module, sendreq);
if (OMPI_SUCCESS == ret) {
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
P2P_MODULE(win)->p2p_num_pending_sendreqs[sendreq->req_target_rank]++;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
} else {
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
if (OMPI_SUCCESS != ret) {
OPAL_THREAD_LOCK(&module->m_lock);
sendreq->req_module->m_num_pending_out -= 1;
opal_list_append(&(module->m_pending_sendreqs),
(opal_list_item_t*) sendreq);
OPAL_THREAD_LOCK(&module->m_lock);
ret = OMPI_SUCCESS;
}
} else {
/* enqueue sendreq */
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
ret = enqueue_sendreq(module, sendreq);
}
return ret;
@ -170,9 +176,10 @@ ompi_osc_rdma_module_put(void *origin_addr, int origin_count,
{
int ret;
ompi_osc_rdma_sendreq_t *sendreq;
ompi_osc_rdma_module_t *module = GET_MODULE(win);
if ((OMPI_WIN_STARTED & ompi_win_get_mode(win)) &&
(!P2P_MODULE(win)->p2p_sc_remote_active_ranks[target])) {
(!module->m_sc_remote_active_ranks[target])) {
return MPI_ERR_RMA_SYNC;
}
@ -196,29 +203,32 @@ ompi_osc_rdma_module_put(void *origin_addr, int origin_count,
target_disp,
target_count,
target_dt,
P2P_MODULE(win),
module,
&sendreq);
if (OMPI_SUCCESS != ret) return ret;
/* if we're doing fence synchronization, try to actively send
right now */
if (P2P_MODULE(win)->p2p_eager_send &&
if (module->m_eager_send &&
(OMPI_WIN_FENCE & ompi_win_get_mode(win))) {
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), 1);
OPAL_THREAD_LOCK(&module->m_lock);
sendreq->req_module->m_num_pending_out += 1;
module->m_num_pending_sendreqs[sendreq->req_target_rank] += 1;
OPAL_THREAD_UNLOCK(&(module->m_lock));
ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), sendreq);
ret = ompi_osc_rdma_sendreq_send(module, sendreq);
if (OMPI_SUCCESS == ret) {
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
P2P_MODULE(win)->p2p_num_pending_sendreqs[sendreq->req_target_rank]++;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
} else {
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
if (OMPI_SUCCESS != ret) {
OPAL_THREAD_LOCK(&module->m_lock);
sendreq->req_module->m_num_pending_out -= 1;
opal_list_append(&(module->m_pending_sendreqs),
(opal_list_item_t*) sendreq);
OPAL_THREAD_LOCK(&module->m_lock);
ret = OMPI_SUCCESS;
}
} else {
/* enqueue sendreq */
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
ret = enqueue_sendreq(module, sendreq);
}
return ret;

Просмотреть файл

@ -7,6 +7,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -25,6 +27,7 @@
#include "osc_rdma_obj_convert.h"
#include "osc_rdma_data_move.h"
#include "opal/threads/condition.h"
#include "opal/threads/mutex.h"
#include "ompi/info/info.h"
#include "ompi/communicator/communicator.h"
@ -32,11 +35,18 @@
#include "ompi/mca/osc/base/base.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/mca/bml/bml.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/bml/base/base.h"
#include "ompi/datatype/dt_arch.h"
static int ompi_osc_rdma_component_open(void);
static int32_t registered_callback = 0;
static int component_open(void);
static void component_fragment_cb(struct mca_btl_base_module_t *btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t *descriptor,
void *cbdata);
#if OMPI_ENABLE_PROGRESS_THREADS
static void* component_thread_fn(opal_object_t *obj);
#endif
ompi_osc_rdma_component_t mca_osc_rdma_component = {
{ /* ompi_osc_base_component_t */
@ -46,7 +56,7 @@ ompi_osc_rdma_component_t mca_osc_rdma_component = {
OMPI_MAJOR_VERSION, /* MCA component major version */
OMPI_MINOR_VERSION, /* MCA component minor version */
OMPI_RELEASE_VERSION, /* MCA component release version */
ompi_osc_rdma_component_open,
component_open,
NULL
},
{ /* mca_base_component_data */
@ -83,11 +93,6 @@ ompi_osc_rdma_module_t ompi_osc_rdma_module_template = {
};
void ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t *descriptor,
void *cbdata);
/* look up parameters for configuring this window. The code first
looks in the info structure passed by the user, then through mca
parameters. */
@ -128,18 +133,9 @@ check_config_value_bool(char *key, ompi_info_t *info)
}
static int fence_sync_index;
static int
ompi_osc_rdma_component_open(void)
component_open(void)
{
fence_sync_index =
mca_base_param_reg_string(&mca_osc_rdma_component.super.osc_version,
"fence_sync_method",
"How to synchronize fence: reduce_scatter, allreduce, alltoall",
false, false, "reduce_scatter", NULL);
mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version,
"eager_send",
"Attempt to start data movement during communication call, "
@ -165,33 +161,48 @@ ompi_osc_rdma_component_init(bool enable_progress_threads,
/* we can run with either threads or not threads (may not be able
to do win locks)... */
mca_osc_rdma_component.p2p_c_have_progress_threads =
mca_osc_rdma_component.c_have_progress_threads =
enable_progress_threads;
OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_osc_rdma_component.c_lock, opal_mutex_t);
OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_modules,
OBJ_CONSTRUCT(&mca_osc_rdma_component.c_modules,
opal_hash_table_t);
opal_hash_table_init(&mca_osc_rdma_component.p2p_c_modules, 2);
opal_hash_table_init(&mca_osc_rdma_component.c_modules, 2);
OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_sendreqs, opal_free_list_t);
opal_free_list_init(&mca_osc_rdma_component.p2p_c_sendreqs,
OBJ_CONSTRUCT(&mca_osc_rdma_component.c_request_lock,
opal_mutex_t);
OBJ_CONSTRUCT(&mca_osc_rdma_component.c_request_cond,
opal_condition_t);
OBJ_CONSTRUCT(&mca_osc_rdma_component.c_sendreqs, opal_free_list_t);
opal_free_list_init(&mca_osc_rdma_component.c_sendreqs,
sizeof(ompi_osc_rdma_sendreq_t),
OBJ_CLASS(ompi_osc_rdma_sendreq_t),
1, -1, 1);
OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_replyreqs, opal_free_list_t);
opal_free_list_init(&mca_osc_rdma_component.p2p_c_replyreqs,
OBJ_CONSTRUCT(&mca_osc_rdma_component.c_replyreqs, opal_free_list_t);
opal_free_list_init(&mca_osc_rdma_component.c_replyreqs,
sizeof(ompi_osc_rdma_replyreq_t),
OBJ_CLASS(ompi_osc_rdma_replyreq_t),
1, -1, 1);
OBJ_CONSTRUCT(&mca_osc_rdma_component.p2p_c_longreqs, opal_free_list_t);
opal_free_list_init(&mca_osc_rdma_component.p2p_c_longreqs,
OBJ_CONSTRUCT(&mca_osc_rdma_component.c_longreqs, opal_free_list_t);
opal_free_list_init(&mca_osc_rdma_component.c_longreqs,
sizeof(ompi_osc_rdma_longreq_t),
OBJ_CLASS(ompi_osc_rdma_longreq_t),
1, -1, 1);
OBJ_CONSTRUCT(&mca_osc_rdma_component.c_pending_requests,
opal_list_t);
#if OMPI_ENABLE_PROGRESS_THREADS
OBJ_CONSTRUCT(&mca_osc_rdma_component.c_thread, opal_thread_t);
mca_osc_rdma_component.c_thread_run = false;
#endif
mca_osc_rdma_component.c_btl_registered = false;
return OMPI_SUCCESS;
}
@ -202,19 +213,32 @@ ompi_osc_rdma_component_finalize(void)
size_t num_modules;
if (0 !=
(num_modules = opal_hash_table_get_size(&mca_osc_rdma_component.p2p_c_modules))) {
(num_modules = opal_hash_table_get_size(&mca_osc_rdma_component.c_modules))) {
opal_output(ompi_osc_base_output,
"WARNING: There were %d Windows created but not freed.",
(int) num_modules);
#if OMPI_ENABLE_PROGRESS_THREADS
mca_osc_rdma_component.c_thread_run = false;
opal_condition_broadcast(&ompi_request_cond);
opal_thread_join(&mca_osc_rdma_component.c_thread, &ret);
#else
opal_progress_unregister(ompi_osc_rdma_component_progress);
#endif
}
mca_bml.bml_register(MCA_BTL_TAG_OSC_RDMA, NULL, NULL);
OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_longreqs);
OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_replyreqs);
OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_sendreqs);
OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_modules);
OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_lock);
#if OMPI_ENABLE_PROGRESS_THREADS
OBJ_DESTRUCT(&mca_osc_rdma_component.c_thread);
#endif
OBJ_DESTRUCT(&mca_osc_rdma_component.c_pending_requests);
OBJ_DESTRUCT(&mca_osc_rdma_component.c_longreqs);
OBJ_DESTRUCT(&mca_osc_rdma_component.c_replyreqs);
OBJ_DESTRUCT(&mca_osc_rdma_component.c_sendreqs);
OBJ_DESTRUCT(&mca_osc_rdma_component.c_request_cond);
OBJ_DESTRUCT(&mca_osc_rdma_component.c_request_lock);
OBJ_DESTRUCT(&mca_osc_rdma_component.c_modules);
OBJ_DESTRUCT(&mca_osc_rdma_component.c_lock);
return OMPI_SUCCESS;
}
@ -240,171 +264,114 @@ ompi_osc_rdma_component_select(ompi_win_t *win,
ompi_info_t *info,
ompi_communicator_t *comm)
{
ompi_osc_rdma_module_t *module;
ompi_osc_rdma_module_t *module = NULL;
int ret, i;
char *sync_string;
/* create module structure */
module = (ompi_osc_rdma_module_t*)malloc(sizeof(ompi_osc_rdma_module_t));
if (NULL == module) return OMPI_ERROR;
module = (ompi_osc_rdma_module_t*)
calloc(1, sizeof(ompi_osc_rdma_module_t));
if (NULL == module) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
/* fill in the function pointer part */
memcpy(module, &ompi_osc_rdma_module_template,
sizeof(ompi_osc_base_module_t));
/* initialize the p2p part */
OBJ_CONSTRUCT(&(module->p2p_lock), opal_mutex_t);
OBJ_CONSTRUCT(&(module->p2p_acc_lock), opal_mutex_t);
/* initialize the module part */
OBJ_CONSTRUCT(&module->m_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->m_cond, opal_condition_t);
OBJ_CONSTRUCT(&module->m_acc_lock, opal_mutex_t);
OBJ_CONSTRUCT(&module->m_pending_sendreqs, opal_list_t);
OBJ_CONSTRUCT(&module->m_copy_pending_sendreqs, opal_list_t);
OBJ_CONSTRUCT(&module->m_locks_pending, opal_list_t);
OBJ_CONSTRUCT(&module->m_unlocks_pending, opal_list_t);
module->p2p_win = win;
module->m_win = win;
ret = ompi_comm_dup(comm, &(module->p2p_comm), 0);
if (ret != OMPI_SUCCESS) {
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return ret;
ret = ompi_comm_dup(comm, &module->m_comm, 0);
if (ret != OMPI_SUCCESS) goto cleanup;
opal_output_verbose(1, ompi_osc_base_output,
"rdma component creating window with id %d",
module->m_comm->c_contextid);
module->m_num_pending_sendreqs = (unsigned int*)
malloc(sizeof(unsigned int) * ompi_comm_size(module->m_comm));
if (NULL == module->m_num_pending_sendreqs) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
memset(module->m_num_pending_sendreqs, 0,
sizeof(unsigned int) * ompi_comm_size(module->m_comm));
OBJ_CONSTRUCT(&module->p2p_pending_sendreqs, opal_list_t);
module->p2p_num_pending_sendreqs = (unsigned int*)malloc(sizeof(unsigned int) *
ompi_comm_size(module->p2p_comm));
if (NULL == module->p2p_num_pending_sendreqs) {
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
ompi_comm_free(&comm);
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return ret;
module->m_num_pending_out = 0;
module->m_num_pending_in = 0;
module->m_num_post_msgs = 0;
module->m_num_complete_msgs = 0;
module->m_tag_counter = 0;
module->m_copy_num_pending_sendreqs = (unsigned int*)
malloc(sizeof(unsigned int) * ompi_comm_size(module->m_comm));
if (NULL == module->m_copy_num_pending_sendreqs) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
memset(module->p2p_num_pending_sendreqs, 0,
sizeof(unsigned int) * ompi_comm_size(module->p2p_comm));
memset(module->m_num_pending_sendreqs, 0,
sizeof(unsigned int) * ompi_comm_size(module->m_comm));
module->p2p_num_pending_out = 0;
module->p2p_num_pending_in = 0;
module->p2p_num_post_msgs = 0;
module->p2p_num_complete_msgs = 0;
module->p2p_tag_counter = 0;
OBJ_CONSTRUCT(&(module->p2p_long_msgs), opal_list_t);
OBJ_CONSTRUCT(&(module->p2p_copy_pending_sendreqs), opal_list_t);
module->p2p_copy_num_pending_sendreqs = (unsigned int*)malloc(sizeof(unsigned int) *
ompi_comm_size(module->p2p_comm));
if (NULL == module->p2p_copy_num_pending_sendreqs) {
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_long_msgs);
free(module->p2p_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
ompi_comm_free(&comm);
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return ret;
}
memset(module->p2p_num_pending_sendreqs, 0,
sizeof(unsigned int) * ompi_comm_size(module->p2p_comm));
module->p2p_eager_send = check_config_value_bool("eager_send", info);
module->m_eager_send = check_config_value_bool("eager_send", info);
/* fence data */
module->p2p_fence_coll_counts = (int*)malloc(sizeof(int) *
ompi_comm_size(module->p2p_comm));
if (NULL == module->p2p_fence_coll_counts) {
free(module->p2p_copy_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_long_msgs);
free(module->p2p_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
ompi_comm_free(&comm);
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return ret;
module->m_fence_coll_counts = (int*)
malloc(sizeof(int) * ompi_comm_size(module->m_comm));
if (NULL == module->m_fence_coll_counts) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
for (i = 0 ; i < ompi_comm_size(module->p2p_comm) ; ++i) {
module->p2p_fence_coll_counts[i] = 1;
}
module->p2p_fence_coll_results = (unsigned int*)malloc(sizeof(unsigned int) *
ompi_comm_size(module->p2p_comm));
if (NULL == module->p2p_fence_coll_results) {
free(module->p2p_fence_coll_counts);
free(module->p2p_copy_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_long_msgs);
free(module->p2p_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
ompi_comm_free(&comm);
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return OMPI_ERROR;
}
/* figure out what sync method to use */
mca_base_param_lookup_string(fence_sync_index, &sync_string);
if (0 == strcmp(sync_string, "reduce_scatter")) {
module->p2p_fence_sync_type = OSC_SYNC_REDUCE_SCATTER;
} else if (0 == strcmp(sync_string, "allreduce")) {
module->p2p_fence_sync_type = OSC_SYNC_ALLREDUCE;
} else if (0 == strcmp(sync_string, "alltoall")) {
module->p2p_fence_sync_type = OSC_SYNC_ALLTOALL;
} else {
opal_output(ompi_osc_base_output,
"invalid value for fence_sync_method parameter: %s\n", sync_string);
return OMPI_ERROR;
for (i = 0 ; i < ompi_comm_size(module->m_comm) ; ++i) {
module->m_fence_coll_counts[i] = 1;
}
/* pwsc data */
module->p2p_pw_group = NULL;
module->p2p_sc_group = NULL;
module->p2p_sc_remote_active_ranks =
(bool*)malloc(sizeof(bool) * ompi_comm_size(module->p2p_comm));
if (NULL == module->p2p_sc_remote_active_ranks) {
free(module->p2p_fence_coll_results);
free(module->p2p_fence_coll_counts);
free(module->p2p_copy_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_long_msgs);
free(module->p2p_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
ompi_comm_free(&comm);
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return OMPI_ERROR;
module->m_pw_group = NULL;
module->m_sc_group = NULL;
module->m_sc_remote_active_ranks = (bool*)
malloc(sizeof(bool) * ompi_comm_size(module->m_comm));
if (NULL == module->m_sc_remote_active_ranks) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
module->p2p_sc_remote_ranks =
(int*)malloc(sizeof(int) * ompi_comm_size(module->p2p_comm));
if (NULL == module->p2p_sc_remote_ranks) {
free(module->p2p_sc_remote_active_ranks);
free(module->p2p_fence_coll_results);
free(module->p2p_fence_coll_counts);
free(module->p2p_copy_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_long_msgs);
free(module->p2p_num_pending_sendreqs);
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
ompi_comm_free(&comm);
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return OMPI_ERROR;
module->m_sc_remote_ranks = (int*)
malloc(sizeof(int) * ompi_comm_size(module->m_comm));
if (NULL == module->m_sc_remote_ranks) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
/* lock data */
module->p2p_lock_status = 0;
module->p2p_shared_count = 0;
OBJ_CONSTRUCT(&(module->p2p_locks_pending), opal_list_t);
module->p2p_lock_received_ack = 0;
module->m_lock_status = 0;
module->m_shared_count = 0;
module->m_lock_received_ack = 0;
/* update component data */
OPAL_THREAD_LOCK(&mca_osc_rdma_component.p2p_c_lock);
opal_hash_table_set_value_uint32(&mca_osc_rdma_component.p2p_c_modules,
module->p2p_comm->c_contextid,
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_hash_table_set_value_uint32(&mca_osc_rdma_component.c_modules,
module->m_comm->c_contextid,
module);
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.p2p_c_lock);
ret = opal_hash_table_get_size(&mca_osc_rdma_component.c_modules);
if (ret == 1) {
#if OMPI_ENABLE_PROGRESS_THREADS
mca_osc_rdma_component.c_thread_run = true;
mca_osc_rdma_component.c_thread.t_run = component_thread_fn;
mca_osc_rdma_component.c_thread.t_arg = NULL;
ret = opal_thread_start(&mca_osc_rdma_component.c_thread);
#else
ret = opal_progress_register(ompi_osc_rdma_component_progress);
#endif
} else {
ret = OMPI_SUCCESS;
}
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
if (OMPI_SUCCESS != ret) goto cleanup;
/* fill in window information */
win->w_osc_module = (ompi_osc_base_module_t*) module;
@ -416,27 +383,60 @@ ompi_osc_rdma_component_select(ompi_win_t *win,
opal_atomic_mb();
/* register to receive fragment callbacks, if not already done */
if (OPAL_THREAD_ADD32(&registered_callback, 1) <= 1) {
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
if (!mca_osc_rdma_component.c_btl_registered) {
mca_osc_rdma_component.c_btl_registered = true;
ret = mca_bml.bml_register(MCA_BTL_TAG_OSC_RDMA,
ompi_osc_rdma_component_fragment_cb,
component_fragment_cb,
NULL);
}
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
if (OMPI_SUCCESS != ret) goto cleanup;
/* need to make create a collective, or lock requests can come in
before the window is fully created... */
module->p2p_comm->c_coll.coll_barrier(module->p2p_comm);
module->m_comm->c_coll.coll_barrier(module->m_comm);
opal_output_verbose(50, ompi_osc_base_output,
"created window %d", module->p2p_comm->c_contextid);
"done creating window %d", module->m_comm->c_contextid);
return OMPI_SUCCESS;
cleanup:
OBJ_DESTRUCT(&module->m_unlocks_pending);
OBJ_DESTRUCT(&module->m_locks_pending);
OBJ_DESTRUCT(&module->m_copy_pending_sendreqs);
OBJ_DESTRUCT(&module->m_pending_sendreqs);
OBJ_DESTRUCT(&module->m_acc_lock);
OBJ_DESTRUCT(&module->m_cond);
OBJ_DESTRUCT(&module->m_lock);
if (NULL != module->m_sc_remote_ranks) {
free(module->m_sc_remote_ranks);
}
if (NULL != module->m_sc_remote_active_ranks) {
free(module->m_sc_remote_active_ranks);
}
if (NULL != module->m_fence_coll_counts) {
free(module->m_fence_coll_counts);
}
if (NULL != module->m_copy_num_pending_sendreqs) {
free(module->m_copy_num_pending_sendreqs);
}
if (NULL != module->m_num_pending_sendreqs) {
free(module->m_num_pending_sendreqs);
}
if (NULL != module->m_comm) ompi_comm_free(&module->m_comm);
if (NULL != module) free(module);
return ret;
}
/* dispatch for callback on message completion */
void
ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
static void
component_fragment_cb(struct mca_btl_base_module_t *btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t *descriptor,
void *cbdata)
@ -444,12 +444,16 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
int ret;
ompi_osc_rdma_module_t *module;
void *payload;
uint8_t hdr_type;
assert(descriptor->des_dst[0].seg_len >=
sizeof(ompi_osc_rdma_base_header_t));
hdr_type = ((ompi_osc_rdma_base_header_t*)
descriptor->des_dst[0].seg_addr.pval)->hdr_type;
/* handle message */
switch (((ompi_osc_rdma_base_header_t*) descriptor->des_dst[0].seg_addr.pval)->hdr_type) {
switch (hdr_type) {
case OMPI_OSC_RDMA_HDR_PUT:
{
ompi_osc_rdma_send_header_t *header;
@ -469,10 +473,10 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
if (!ompi_win_exposure_epoch(module->p2p_win)) {
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->p2p_win)) {
if (!ompi_win_exposure_epoch(module->m_win)) {
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->m_win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(module->p2p_win,
ompi_win_set_mode(module->m_win,
OMPI_WIN_FENCE |
OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
@ -502,10 +506,10 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
if (!ompi_win_exposure_epoch(module->p2p_win)) {
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->p2p_win)) {
if (!ompi_win_exposure_epoch(module->m_win)) {
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->m_win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(module->p2p_win,
ompi_win_set_mode(module->m_win,
OMPI_WIN_FENCE |
OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
@ -539,10 +543,10 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
if (!ompi_win_exposure_epoch(module->p2p_win)) {
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->p2p_win)) {
if (!ompi_win_exposure_epoch(module->m_win)) {
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->m_win)) {
/* well, we're definitely in an access epoch now */
ompi_win_set_mode(module->p2p_win,
ompi_win_set_mode(module->m_win,
OMPI_WIN_FENCE |
OMPI_WIN_ACCESS_EPOCH |
OMPI_WIN_EXPOSE_EPOCH);
@ -550,7 +554,7 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
}
/* create or get a pointer to our datatype */
proc = ompi_comm_peer_lookup( module->p2p_comm, header->hdr_origin );
proc = ompi_comm_peer_lookup( module->m_comm, header->hdr_origin );
datatype = ompi_osc_rdma_datatype_create(proc, &payload);
/* create replyreq sendreq */
@ -599,6 +603,7 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
int32_t count;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
@ -610,7 +615,10 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
OPAL_THREAD_ADD32(&(module->p2p_num_post_msgs), -1);
OPAL_THREAD_LOCK(&module->m_lock);
count = (module->m_num_post_msgs -= 1);
OPAL_THREAD_UNLOCK(&module->m_lock);
if (count == 0) opal_condition_broadcast(&module->m_cond);
}
break;
case OMPI_OSC_RDMA_HDR_COMPLETE:
@ -618,6 +626,7 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
int32_t count;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
@ -631,8 +640,12 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
/* we've heard from one more place, and have value reqs to
process */
OPAL_THREAD_ADD32(&(module->p2p_num_complete_msgs), -1);
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), header->hdr_value[0]);
OPAL_THREAD_LOCK(&module->m_lock);
count = (module->m_num_complete_msgs -= 1);
count += (module->m_num_pending_in += header->hdr_value[0]);
OPAL_THREAD_UNLOCK(&module->m_lock);
if (count == 0) opal_condition_broadcast(&module->m_cond);
}
break;
@ -641,6 +654,7 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
int32_t count;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
@ -656,7 +670,11 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
ompi_osc_rdma_passive_lock(module, header->hdr_value[0],
header->hdr_value[1]);
} else {
OPAL_THREAD_ADD32(&(module->p2p_lock_received_ack), 1);
OPAL_THREAD_LOCK(&module->m_lock);
count = (module->m_lock_received_ack += 1);
OPAL_THREAD_UNLOCK(&module->m_lock);
if (count != 0) opal_condition_broadcast(&module->m_cond);
}
}
break;
@ -682,9 +700,99 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
}
break;
case OMPI_OSC_RDMA_HDR_UNLOCK_REPLY:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
int32_t count;
/* get our module pointer */
module = ompi_osc_rdma_windx_to_module(header->hdr_windx);
if (NULL == module) return;
OPAL_THREAD_LOCK(&module->m_lock);
count = (module->m_num_pending_out -= 1);
OPAL_THREAD_UNLOCK(&module->m_lock);
if (count == 0) opal_condition_broadcast(&module->m_cond);
}
break;
default:
/* BWB - FIX ME - this sucks */
opal_output(ompi_osc_base_output,
"received packet for Window with unknown type");
}
}
int
ompi_osc_rdma_component_progress(void)
{
opal_list_item_t *item;
int ret, done = 0;
#if OMPI_ENABLE_PROGRESS_THREADS
ret = OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
#else
ret = OPAL_THREAD_TRYLOCK(&mca_osc_rdma_component.c_lock);
#endif
if (ret != 0) return 0;
for (item = opal_list_get_first(&mca_osc_rdma_component.c_pending_requests) ;
item != opal_list_get_end(&mca_osc_rdma_component.c_pending_requests) ;
item = opal_list_get_next(item)) {
ompi_osc_rdma_longreq_t *longreq =
(ompi_osc_rdma_longreq_t*) item;
/* BWB - FIX ME */
#if OMPI_ENABLE_PROGRESS_THREADS == 0
if (longreq->request->req_state == OMPI_REQUEST_INACTIVE ||
longreq->request->req_complete) {
ret = ompi_request_test(&longreq->request,
&done,
0);
} else {
done = 0;
ret = OMPI_SUCCESS;
}
#else
ret = ompi_request_test(&longreq->request,
&done,
&longreq->status);
#endif
if (OMPI_SUCCESS == ret && 0 != done) {
opal_list_remove_item(&mca_osc_rdma_component.c_pending_requests,
item);
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
longreq->cbfunc(longreq);
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
break;
}
}
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
return done;
}
#if OMPI_ENABLE_PROGRESS_THREADS
static void*
component_thread_fn(opal_object_t *obj)
{
struct timespec waittime;
while (mca_osc_rdma_component.c_thread_run) {
/* wake up whenever a request completes, to make sure it's not
for us */
waittime.tv_sec = 1;
waittime.tv_usec = 0;
OPAL_THREAD_LOCK(&ompi_request_lock);
opal_condition_timedwait(&ompi_request_cond, &ompi_request_lock, &waittime);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
ompi_osc_rdma_component_progress();
}
return NULL;
}
#endif

Просмотреть файл

@ -7,6 +7,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -37,25 +39,46 @@ create_send_tag(ompi_osc_rdma_module_t *module)
#if OMPI_HAVE_THREAD_SUPPORT && OPAL_HAVE_ATOMIC_CMPSET_32
int32_t newval, oldval;
do {
oldval = module->p2p_tag_counter;
oldval = module->m_tag_counter;
newval = (oldval + 1) % mca_pml.pml_max_tag;
} while (0 == opal_atomic_cmpset_32(&module->p2p_tag_counter, oldval, newval));
} while (0 == opal_atomic_cmpset_32(&module->m_tag_counter, oldval, newval));
return newval;
#elif OMPI_HAVE_THREAD_SUPPORT
int32_t ret;
/* no compare and swap - have to lock the module */
OPAL_THREAD_LOCK(&module->p2p_lock);
module->p2p_tag_counter = (module->p2p_tag_counter + 1) % mca_pml.pml_max_tag;
ret = module->p2p_tag_counter;
OPAL_THREAD_UNLOCK(&module->p2p_lock);
OPAL_THREAD_LOCK(&module->m_lock);
module->m_tag_counter = (module->m_tag_counter + 1) % mca_pml.pml_max_tag;
ret = module->m_tag_counter;
OPAL_THREAD_UNLOCK(&module->m_lock);
return ret;
#else
module->p2p_tag_counter = (module->p2p_tag_counter + 1) % mca_pml.pml_max_tag;
return module->p2p_tag_counter;
module->m_tag_counter = (module->m_tag_counter + 1) % mca_pml.pml_max_tag;
return module->m_tag_counter;
#endif
}
static inline void
inmsg_mark_complete(ompi_osc_rdma_module_t *module)
{
int32_t count;
bool need_unlock = false;
OPAL_THREAD_LOCK(&module->m_lock);
count = (module->m_num_pending_in -= 1);
if ((0 != module->m_lock_status) &&
(opal_list_get_size(&module->m_unlocks_pending) != 0)) {
need_unlock = true;
}
OPAL_THREAD_UNLOCK(&module->m_lock);
if (0 == count) {
if (need_unlock) ompi_osc_rdma_passive_unlock_complete(module);
opal_condition_broadcast(&module->m_cond);
}
}
/**********************************************************************
*
* Sending a sendreq to target
@ -65,20 +88,22 @@ static void
ompi_osc_rdma_sendreq_send_long_cb(ompi_osc_rdma_longreq_t *longreq)
{
ompi_osc_rdma_sendreq_t *sendreq =
(ompi_osc_rdma_sendreq_t*) longreq->req_comp_cbdata;
(ompi_osc_rdma_sendreq_t*) longreq->cbdata;
int32_t count;
opal_output_verbose(50, ompi_osc_base_output,
"%d completed long sendreq to %d",
sendreq->req_module->p2p_comm->c_my_rank,
sendreq->req_module->m_comm->c_my_rank,
sendreq->req_target_rank);
opal_list_remove_item(&(sendreq->req_module->p2p_long_msgs),
&(longreq->super.super));
OPAL_THREAD_LOCK(&sendreq->req_module->m_lock);
count = (sendreq->req_module->m_num_pending_out -= 1);
OPAL_THREAD_UNLOCK(&sendreq->req_module->m_lock);
ompi_osc_rdma_longreq_free(longreq);
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
ompi_osc_rdma_sendreq_free(sendreq);
if (0 == count) opal_condition_broadcast(&sendreq->req_module->m_cond);
}
@ -94,6 +119,7 @@ ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl,
(ompi_osc_rdma_send_header_t*) descriptor->des_src[0].seg_addr.pval;
opal_list_item_t *item;
ompi_osc_rdma_module_t *module = sendreq->req_module;
int32_t count;
if (OMPI_SUCCESS != status) {
/* requeue and return */
@ -118,19 +144,20 @@ ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl,
/* do we need to post a send? */
if (header->hdr_msg_length != 0) {
/* sendreq is done. Mark it as so and get out of here */
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
count = sendreq->req_module->m_num_pending_out -= 1;
ompi_osc_rdma_sendreq_free(sendreq);
if (0 == count) opal_condition_broadcast(&sendreq->req_module->m_cond);
} else {
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->req_comp_cb = ompi_osc_rdma_sendreq_send_long_cb;
longreq->req_comp_cbdata = sendreq;
opal_output_verbose(50, ompi_osc_base_output,
"%d starting long sendreq to %d (%d)",
sendreq->req_module->p2p_comm->c_my_rank,
sendreq->req_target_rank,
header->hdr_origin_tag);
longreq->cbfunc = ompi_osc_rdma_sendreq_send_long_cb;
longreq->cbdata = sendreq;
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d starting long sendreq to %d (%d)",
sendreq->req_module->m_comm->c_my_rank,
sendreq->req_target_rank,
header->hdr_origin_tag));
mca_pml.pml_isend(sendreq->req_origin_convertor.pBaseBuf,
sendreq->req_origin_convertor.count,
@ -138,14 +165,14 @@ ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl,
sendreq->req_target_rank,
header->hdr_origin_tag,
MCA_PML_BASE_SEND_STANDARD,
sendreq->req_module->p2p_comm,
&(longreq->req_pml_req));
sendreq->req_module->m_comm,
&(longreq->request));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&(sendreq->req_module->p2p_lock));
opal_list_append(&(sendreq->req_module->p2p_long_msgs),
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_list_append(&mca_osc_rdma_component.c_pending_requests,
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&(sendreq->req_module->p2p_lock));
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
}
}
@ -154,7 +181,7 @@ ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl,
/* any other sendreqs to restart? */
while (NULL !=
(item = opal_list_remove_first(&(module->p2p_copy_pending_sendreqs)))) {
(item = opal_list_remove_first(&(module->m_copy_pending_sendreqs)))) {
ompi_osc_rdma_sendreq_t *req =
(ompi_osc_rdma_sendreq_t*) item;
int ret;
@ -165,7 +192,7 @@ ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl,
opal_output_verbose(5, ompi_osc_base_output,
"fence: failure in starting sendreq (%d). Will try later.",
ret);
opal_list_append(&(module->p2p_copy_pending_sendreqs), item);
opal_list_append(&(module->m_copy_pending_sendreqs), item);
if (OMPI_ERR_TEMP_OUT_OF_RESOURCE == ret ||
OMPI_ERR_OUT_OF_RESOURCE == ret) {
@ -224,8 +251,8 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
header = (ompi_osc_rdma_send_header_t*) descriptor->des_src[0].seg_addr.pval;
written_data += sizeof(ompi_osc_rdma_send_header_t);
header->hdr_base.hdr_flags = 0;
header->hdr_windx = sendreq->req_module->p2p_comm->c_contextid;
header->hdr_origin = sendreq->req_module->p2p_comm->c_my_rank;
header->hdr_windx = sendreq->req_module->m_comm->c_contextid;
header->hdr_origin = sendreq->req_module->m_comm->c_my_rank;
header->hdr_origin_sendreq.pval = (void*) sendreq;
header->hdr_origin_tag = 0;
header->hdr_target_disp = sendreq->req_target_disp;
@ -303,7 +330,7 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
/* send fragment */
opal_output_verbose(50, ompi_osc_base_output,
"%d sending sendreq to %d",
sendreq->req_module->p2p_comm->c_my_rank,
sendreq->req_module->m_comm->c_my_rank,
sendreq->req_target_rank);
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_RDMA);
@ -328,14 +355,11 @@ static void
ompi_osc_rdma_replyreq_send_long_cb(ompi_osc_rdma_longreq_t *longreq)
{
ompi_osc_rdma_replyreq_t *replyreq =
(ompi_osc_rdma_replyreq_t*) longreq->req_comp_cbdata;
(ompi_osc_rdma_replyreq_t*) longreq->cbdata;
opal_list_remove_item(&(replyreq->rep_module->p2p_long_msgs),
&(longreq->super.super));
inmsg_mark_complete(replyreq->rep_module);
ompi_osc_rdma_longreq_free(longreq);
OPAL_THREAD_ADD32(&(replyreq->rep_module->p2p_num_pending_in), -1);
ompi_osc_rdma_replyreq_free(replyreq);
}
@ -367,14 +391,14 @@ ompi_osc_rdma_replyreq_send_cb(struct mca_btl_base_module_t* btl,
/* do we need to post a send? */
if (header->hdr_msg_length != 0) {
/* sendreq is done. Mark it as so and get out of here */
OPAL_THREAD_ADD32(&(replyreq->rep_module->p2p_num_pending_in), -1);
inmsg_mark_complete(replyreq->rep_module);
ompi_osc_rdma_replyreq_free(replyreq);
} else {
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->req_comp_cb = ompi_osc_rdma_replyreq_send_long_cb;
longreq->req_comp_cbdata = replyreq;
longreq->cbfunc = ompi_osc_rdma_replyreq_send_long_cb;
longreq->cbdata = replyreq;
mca_pml.pml_isend(replyreq->rep_target_convertor.pBaseBuf,
replyreq->rep_target_convertor.count,
@ -382,20 +406,18 @@ ompi_osc_rdma_replyreq_send_cb(struct mca_btl_base_module_t* btl,
replyreq->rep_origin_rank,
header->hdr_target_tag,
MCA_PML_BASE_SEND_STANDARD,
replyreq->rep_module->p2p_comm,
&(longreq->req_pml_req));
replyreq->rep_module->m_comm,
&(longreq->request));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&(replyreq->rep_module->p2p_lock));
opal_list_append(&(replyreq->rep_module->p2p_long_msgs),
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_list_append(&mca_osc_rdma_component.c_pending_requests,
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&(replyreq->rep_module->p2p_lock));
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
}
/* release the descriptor and replyreq */
btl->btl_free(btl, descriptor);
/* any other replyreqs to restart? */
}
@ -497,25 +519,22 @@ ompi_osc_rdma_replyreq_send(ompi_osc_rdma_module_t *module,
static void
ompi_osc_rdma_sendreq_recv_put_long_cb(ompi_osc_rdma_longreq_t *longreq)
{
opal_list_remove_item(&(longreq->req_module->p2p_long_msgs),
&(longreq->super.super));
OBJ_RELEASE(longreq->req_datatype);
ompi_osc_rdma_longreq_free(longreq);
OPAL_THREAD_ADD32(&(longreq->req_module->p2p_num_pending_in), -1);
inmsg_mark_complete(longreq->req_module);
}
int
ompi_osc_rdma_sendreq_recv_put(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_send_header_t *header,
void *inbuf)
ompi_osc_rdma_send_header_t *header,
void *inbuf)
{
int ret = OMPI_SUCCESS;
void *target = (unsigned char*) module->p2p_win->w_baseptr +
(header->hdr_target_disp * module->p2p_win->w_disp_unit);
ompi_proc_t *proc = ompi_comm_peer_lookup( module->p2p_comm, header->hdr_origin );
void *target = (unsigned char*) module->m_win->w_baseptr +
(header->hdr_target_disp * module->m_win->w_disp_unit);
ompi_proc_t *proc = ompi_comm_peer_lookup( module->m_comm, header->hdr_origin );
struct ompi_datatype_t *datatype =
ompi_osc_rdma_datatype_create(proc, &inbuf);
@ -530,7 +549,7 @@ ompi_osc_rdma_sendreq_recv_put(ompi_osc_rdma_module_t *module,
OBJ_CONSTRUCT(&convertor, ompi_convertor_t);
/* initialize convertor */
proc = ompi_comm_peer_lookup(module->p2p_comm, header->hdr_origin);
proc = ompi_comm_peer_lookup(module->m_comm, header->hdr_origin);
ompi_convertor_copy_and_prepare_for_recv(proc->proc_convertor,
datatype,
header->hdr_target_count,
@ -546,14 +565,13 @@ ompi_osc_rdma_sendreq_recv_put(ompi_osc_rdma_module_t *module,
&max_data );
OBJ_DESTRUCT(&convertor);
OBJ_RELEASE(datatype);
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1);
inmsg_mark_complete(module);
} else {
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->req_comp_cb = ompi_osc_rdma_sendreq_recv_put_long_cb;
longreq->req_comp_cbdata = NULL;
longreq->cbfunc = ompi_osc_rdma_sendreq_recv_put_long_cb;
longreq->cbdata = NULL;
longreq->req_datatype = datatype;
longreq->req_module = module;
@ -562,14 +580,14 @@ ompi_osc_rdma_sendreq_recv_put(ompi_osc_rdma_module_t *module,
datatype,
header->hdr_origin,
header->hdr_origin_tag,
module->p2p_comm,
&(longreq->req_pml_req));
module->m_comm,
&(longreq->request));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&(module->p2p_lock));
opal_list_append(&(module->p2p_long_msgs),
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_list_append(&mca_osc_rdma_component.c_pending_requests,
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
}
return ret;
@ -587,40 +605,37 @@ static void
ompi_osc_rdma_sendreq_recv_accum_long_cb(ompi_osc_rdma_longreq_t *longreq)
{
ompi_osc_rdma_send_header_t *header =
(ompi_osc_rdma_send_header_t*) longreq->req_comp_cbdata;
(ompi_osc_rdma_send_header_t*) longreq->cbdata;
void *payload = (void*) (header + 1);
int ret;
/* lock the window for accumulates */
OPAL_THREAD_LOCK(&longreq->req_module->p2p_acc_lock);
opal_list_remove_item(&(longreq->req_module->p2p_long_msgs),
&(longreq->super.super));
OPAL_THREAD_LOCK(&longreq->req_module->m_acc_lock);
/* copy the data from the temporary buffer into the user window */
ret = ompi_osc_rdma_process_op(longreq->req_module,
header,
longreq->req_datatype,
longreq->req_op,
payload,
header->hdr_msg_length);
header,
longreq->req_datatype,
longreq->req_op,
payload,
header->hdr_msg_length);
/* unlock the window for accumulates */
OPAL_THREAD_UNLOCK(&longreq->req_module->p2p_acc_lock);
OPAL_THREAD_UNLOCK(&longreq->req_module->m_acc_lock);
opal_output_verbose(50, ompi_osc_base_output,
"%d finished receiving long accum message from %d",
longreq->req_module->p2p_comm->c_my_rank,
longreq->req_module->m_comm->c_my_rank,
header->hdr_origin);
/* free the temp buffer */
free(longreq->req_comp_cbdata);
free(longreq->cbdata);
/* Release datatype & op */
OBJ_RELEASE(longreq->req_datatype);
OBJ_RELEASE(longreq->req_op);
OPAL_THREAD_ADD32(&(longreq->req_module->p2p_num_pending_in), -1);
inmsg_mark_complete(longreq->req_module);
ompi_osc_rdma_longreq_free(longreq);
}
@ -633,32 +648,31 @@ ompi_osc_rdma_sendreq_recv_accum(ompi_osc_rdma_module_t *module,
{
int ret = OMPI_SUCCESS;
struct ompi_op_t *op = ompi_osc_rdma_op_create(header->hdr_target_op);
ompi_proc_t *proc = ompi_comm_peer_lookup( module->p2p_comm, header->hdr_origin );
ompi_proc_t *proc = ompi_comm_peer_lookup( module->m_comm, header->hdr_origin );
struct ompi_datatype_t *datatype =
ompi_osc_rdma_datatype_create(proc, &payload);
if (header->hdr_msg_length > 0) {
/* lock the window for accumulates */
OPAL_THREAD_LOCK(&module->p2p_acc_lock);
OPAL_THREAD_LOCK(&module->m_acc_lock);
/* copy the data from the temporary buffer into the user window */
ret = ompi_osc_rdma_process_op(module, header, datatype, op, payload,
header->hdr_msg_length);
/* unlock the window for accumulates */
OPAL_THREAD_UNLOCK(&module->p2p_acc_lock);
OPAL_THREAD_UNLOCK(&module->m_acc_lock);
/* Release datatype & op */
OBJ_RELEASE(datatype);
OBJ_RELEASE(op);
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1);
inmsg_mark_complete(module);
opal_output_verbose(50, ompi_osc_base_output,
"%d received accum message from %d",
module->p2p_comm->c_my_rank,
module->m_comm->c_my_rank,
header->hdr_origin);
} else {
ompi_osc_rdma_longreq_t *longreq;
ptrdiff_t lb, extent, true_lb, true_extent;
@ -672,39 +686,39 @@ ompi_osc_rdma_sendreq_recv_accum(ompi_osc_rdma_module_t *module,
/* get a longreq and fill it in */
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->req_comp_cb = ompi_osc_rdma_sendreq_recv_accum_long_cb;
longreq->cbfunc = ompi_osc_rdma_sendreq_recv_accum_long_cb;
longreq->req_datatype = datatype;
longreq->req_op = op;
longreq->req_module = module;
/* allocate a buffer to receive into ... */
longreq->req_comp_cbdata = malloc(buflen + sizeof(ompi_osc_rdma_send_header_t));
longreq->cbdata = malloc(buflen + sizeof(ompi_osc_rdma_send_header_t));
if (NULL == longreq->req_comp_cbdata) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
if (NULL == longreq->cbdata) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
/* fill in tmp header */
memcpy(longreq->req_comp_cbdata, header,
memcpy(longreq->cbdata, header,
sizeof(ompi_osc_rdma_send_header_t));
((ompi_osc_rdma_send_header_t*) longreq->req_comp_cbdata)->hdr_msg_length = buflen;
((ompi_osc_rdma_send_header_t*) longreq->cbdata)->hdr_msg_length = buflen;
ret = mca_pml.pml_irecv(((char*) longreq->req_comp_cbdata) + sizeof(ompi_osc_rdma_send_header_t),
ret = mca_pml.pml_irecv(((char*) longreq->cbdata) + sizeof(ompi_osc_rdma_send_header_t),
header->hdr_target_count,
datatype,
header->hdr_origin,
header->hdr_origin_tag,
module->p2p_comm,
&(longreq->req_pml_req));
module->m_comm,
&(longreq->request));
opal_output_verbose(50, ompi_osc_base_output,
"%d started long recv accum message from %d (%d)",
module->p2p_comm->c_my_rank,
module->m_comm->c_my_rank,
header->hdr_origin,
header->hdr_origin_tag);
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&(module->p2p_lock));
opal_list_append(&(module->p2p_long_msgs),
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_list_append(&mca_osc_rdma_component.c_pending_requests,
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
}
return ret;
@ -720,17 +734,20 @@ static void
ompi_osc_rdma_replyreq_recv_long_cb(ompi_osc_rdma_longreq_t *longreq)
{
ompi_osc_rdma_sendreq_t *sendreq =
(ompi_osc_rdma_sendreq_t*) longreq->req_comp_cbdata;
(ompi_osc_rdma_sendreq_t*) longreq->cbdata;
int32_t count;
opal_list_remove_item(&(longreq->req_module->p2p_long_msgs),
&(longreq->super.super));
OPAL_THREAD_LOCK(&sendreq->req_module->m_lock);
count = (sendreq->req_module->m_num_pending_out -= 1);
OPAL_THREAD_UNLOCK(&sendreq->req_module->m_lock);
ompi_osc_rdma_longreq_free(longreq);
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
ompi_osc_rdma_sendreq_free(sendreq);
if (0 == count) opal_condition_broadcast(&sendreq->req_module->m_cond);
}
int
ompi_osc_rdma_replyreq_recv(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_sendreq_t *sendreq,
@ -746,6 +763,7 @@ ompi_osc_rdma_replyreq_recv(ompi_osc_rdma_module_t *module,
struct iovec iov;
uint32_t iov_count = 1;
size_t max_data;
int32_t count;
iov.iov_len = header->hdr_msg_length;
iov.iov_base = (IOVBASE_TYPE*)payload;
@ -755,14 +773,15 @@ ompi_osc_rdma_replyreq_recv(ompi_osc_rdma_module_t *module,
&iov_count,
&max_data );
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
count = sendreq->req_module->m_num_pending_out -= 1;
ompi_osc_rdma_sendreq_free(sendreq);
if (0 == count) opal_condition_broadcast(&sendreq->req_module->m_cond);
} else {
ompi_osc_rdma_longreq_t *longreq;
ompi_osc_rdma_longreq_alloc(&longreq);
longreq->req_comp_cb = ompi_osc_rdma_replyreq_recv_long_cb;
longreq->req_comp_cbdata = sendreq;
longreq->cbfunc = ompi_osc_rdma_replyreq_recv_long_cb;
longreq->cbdata = sendreq;
longreq->req_module = module;
/* BWB - FIX ME - George is going to kill me for this */
@ -771,14 +790,14 @@ ompi_osc_rdma_replyreq_recv(ompi_osc_rdma_module_t *module,
sendreq->req_origin_datatype,
sendreq->req_target_rank,
header->hdr_target_tag,
module->p2p_comm,
&(longreq->req_pml_req));
module->m_comm,
&(longreq->request));
/* put the send request in the waiting list */
OPAL_THREAD_LOCK(&(module->p2p_lock));
opal_list_append(&(module->p2p_long_msgs),
OPAL_THREAD_LOCK(&mca_osc_rdma_component.c_lock);
opal_list_append(&mca_osc_rdma_component.c_pending_requests,
&(longreq->super.super));
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
OPAL_THREAD_UNLOCK(&mca_osc_rdma_component.c_lock);
}
return ret;
@ -839,7 +858,7 @@ ompi_osc_rdma_control_send(ompi_osc_rdma_module_t *module,
header->hdr_base.hdr_type = type;
header->hdr_value[0] = value0;
header->hdr_value[1] = value1;
header->hdr_windx = module->p2p_comm->c_contextid;
header->hdr_windx = module->m_comm->c_contextid;
#ifdef WORDS_BIGENDIAN
header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO;

Просмотреть файл

@ -7,6 +7,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -23,16 +25,17 @@
#include "opal/types.h"
#define OMPI_OSC_RDMA_HDR_PUT 0x0001
#define OMPI_OSC_RDMA_HDR_ACC 0x0002
#define OMPI_OSC_RDMA_HDR_GET 0x0004
#define OMPI_OSC_RDMA_HDR_REPLY 0x0008
#define OMPI_OSC_RDMA_HDR_POST 0x0010
#define OMPI_OSC_RDMA_HDR_COMPLETE 0x0020
#define OMPI_OSC_RDMA_HDR_LOCK_REQ 0x0040
#define OMPI_OSC_RDMA_HDR_UNLOCK_REQ 0x0080
#define OMPI_OSC_RDMA_HDR_PUT 0x0001
#define OMPI_OSC_RDMA_HDR_ACC 0x0002
#define OMPI_OSC_RDMA_HDR_GET 0x0003
#define OMPI_OSC_RDMA_HDR_REPLY 0x0004
#define OMPI_OSC_RDMA_HDR_POST 0x0005
#define OMPI_OSC_RDMA_HDR_COMPLETE 0x0006
#define OMPI_OSC_RDMA_HDR_LOCK_REQ 0x0007
#define OMPI_OSC_RDMA_HDR_UNLOCK_REQ 0x0008
#define OMPI_OSC_RDMA_HDR_UNLOCK_REPLY 0x0009
#define OMPI_OSC_RDMA_HDR_FLAG_NBO 0x0001
#define OMPI_OSC_RDMA_HDR_FLAG_NBO 0x0001
struct ompi_osc_rdma_base_header_t {
uint8_t hdr_type;

Просмотреть файл

@ -7,6 +7,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -26,22 +28,18 @@
struct ompi_osc_rdma_longreq_t;
typedef struct ompi_osc_rdma_longreq_t ompi_osc_rdma_longreq_t;
typedef void (*ompi_osc_rdma_longreq_comp_cb_t)(ompi_osc_rdma_longreq_t *longreq);
typedef void (*ompi_osc_rdma_longreq_cb_fn_t)(ompi_osc_rdma_longreq_t *longreq);
struct ompi_osc_rdma_longreq_t {
opal_free_list_item_t super;
ompi_request_t *request;
ompi_osc_rdma_longreq_cb_fn_t cbfunc;
void *cbdata;
/* warning - this doesn't always have a sane value */
ompi_osc_rdma_module_t *req_module;
ompi_request_t *req_pml_req;
ompi_osc_rdma_longreq_comp_cb_t req_comp_cb;
/* general storage place - usually holds a request of some type */
void *req_comp_cbdata;
/* for long receives, to avoid a longrecvreq type */
/* BWB - I don't like this, but I don't want another free list. What to do? */
struct ompi_op_t *req_op;
struct ompi_datatype_t *req_datatype;
};
@ -53,7 +51,7 @@ ompi_osc_rdma_longreq_alloc(ompi_osc_rdma_longreq_t **longreq)
opal_free_list_item_t *item;
int ret;
OPAL_FREE_LIST_GET(&mca_osc_rdma_component.p2p_c_longreqs,
OPAL_FREE_LIST_GET(&mca_osc_rdma_component.c_longreqs,
item, ret);
*longreq = (ompi_osc_rdma_longreq_t*) item;
@ -63,7 +61,7 @@ ompi_osc_rdma_longreq_alloc(ompi_osc_rdma_longreq_t **longreq)
static inline int
ompi_osc_rdma_longreq_free(ompi_osc_rdma_longreq_t *longreq)
{
OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.p2p_c_longreqs,
OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.c_longreqs,
&longreq->super.super);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -7,6 +7,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -38,8 +40,8 @@ ompi_osc_rdma_process_op(ompi_osc_rdma_module_t *module,
unsigned char *target_buffer;
/* compute target buffer location */
target_buffer = (unsigned char*) module->p2p_win->w_baseptr +
(header->hdr_target_disp * module->p2p_win->w_disp_unit);
target_buffer = (unsigned char*) module->m_win->w_baseptr +
(header->hdr_target_disp * module->m_win->w_disp_unit);
/* BWB - fix me - change back to the pointer comparison when the
replace o_f_to_c_index is set properly */
@ -55,7 +57,7 @@ ompi_osc_rdma_process_op(ompi_osc_rdma_module_t *module,
OBJ_CONSTRUCT(&convertor, ompi_convertor_t);
/* initialize convertor */
proc = ompi_comm_peer_lookup(module->p2p_comm, header->hdr_origin);
proc = ompi_comm_peer_lookup(module->m_comm, header->hdr_origin);
ompi_convertor_copy_and_prepare_for_recv(proc->proc_convertor,
datatype,
header->hdr_target_count,

Просмотреть файл

@ -7,6 +7,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -59,7 +61,7 @@ ompi_osc_rdma_windx_to_module(uint32_t windx)
ompi_osc_rdma_module_t *module;
/* find the right module and dispatch */
ret = opal_hash_table_get_value_uint32(&mca_osc_rdma_component.p2p_c_modules,
ret = opal_hash_table_get_value_uint32(&mca_osc_rdma_component.c_modules,
windx,
(void**) (&module));
if (OMPI_SUCCESS != ret) {

Просмотреть файл

@ -31,8 +31,8 @@ ompi_osc_rdma_replyreq_alloc_init(ompi_osc_rdma_module_t *module,
ompi_osc_rdma_replyreq_t **replyreq)
{
int ret;
void *target_addr = (unsigned char*) module->p2p_win->w_baseptr +
(target_displacement * module->p2p_win->w_disp_unit);
void *target_addr = (unsigned char*) module->m_win->w_baseptr +
(target_displacement * module->m_win->w_disp_unit);
/* allocate a replyreq */

Просмотреть файл

@ -7,6 +7,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -73,12 +75,12 @@ ompi_osc_rdma_replyreq_alloc(ompi_osc_rdma_module_t *module,
{
int ret;
opal_free_list_item_t *item;
ompi_proc_t *proc = ompi_comm_peer_lookup( module->p2p_comm, origin_rank );
ompi_proc_t *proc = ompi_comm_peer_lookup( module->m_comm, origin_rank );
/* BWB - FIX ME - is this really the right return code? */
if (NULL == proc) return OMPI_ERR_OUT_OF_RESOURCE;
OPAL_FREE_LIST_GET(&mca_osc_rdma_component.p2p_c_replyreqs,
OPAL_FREE_LIST_GET(&mca_osc_rdma_component.c_replyreqs,
item, ret);
if (OMPI_SUCCESS != ret) return ret;
*replyreq = (ompi_osc_rdma_replyreq_t*) item;
@ -130,7 +132,7 @@ ompi_osc_rdma_replyreq_free(ompi_osc_rdma_replyreq_t *replyreq)
OBJ_RELEASE(replyreq->rep_target_datatype);
OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.p2p_c_replyreqs,
OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.c_replyreqs,
(opal_list_item_t*) replyreq);
return OMPI_SUCCESS;

Просмотреть файл

@ -7,6 +7,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -89,12 +91,12 @@ ompi_osc_rdma_sendreq_alloc(ompi_osc_rdma_module_t *module,
{
int ret;
opal_free_list_item_t *item;
ompi_proc_t *proc = ompi_comm_peer_lookup( module->p2p_comm, target_rank );
ompi_proc_t *proc = ompi_comm_peer_lookup( module->m_comm, target_rank );
/* BWB - FIX ME - is this really the right return code? */
if (NULL == proc) return OMPI_ERR_OUT_OF_RESOURCE;
OPAL_FREE_LIST_GET(&mca_osc_rdma_component.p2p_c_sendreqs,
OPAL_FREE_LIST_GET(&mca_osc_rdma_component.c_sendreqs,
item, ret);
if (OMPI_SUCCESS != ret) return ret;
*sendreq = (ompi_osc_rdma_sendreq_t*) item;
@ -166,7 +168,7 @@ ompi_osc_rdma_sendreq_free(ompi_osc_rdma_sendreq_t *sendreq)
OBJ_RELEASE(sendreq->req_target_datatype);
OBJ_RELEASE(sendreq->req_origin_datatype);
OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.p2p_c_sendreqs,
OPAL_FREE_LIST_RETURN(&mca_osc_rdma_component.c_sendreqs,
(opal_list_item_t*) sendreq);
return OMPI_SUCCESS;

Просмотреть файл

@ -7,6 +7,8 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -29,58 +31,23 @@
#include "ompi/mca/osc/base/base.h"
/* should have p2p_lock before calling */
static inline void
ompi_osc_rdma_progress(ompi_osc_rdma_module_t *module)
{
if (0 != opal_list_get_size(&(module->p2p_long_msgs))) {
opal_list_item_t *item, *next;
OPAL_THREAD_LOCK(&(module->p2p_lock));
/* Have to go the convoluted while() route instead of a for()
loop because the callback will likely remove the request
from the list and free it, and that would lead to much
badness. */
next = opal_list_get_first(&(module->p2p_long_msgs));
while (opal_list_get_end(&(module->p2p_long_msgs)) != (item = next)) {
ompi_osc_rdma_longreq_t *longreq =
(ompi_osc_rdma_longreq_t*) item;
int ret, completed;
next = opal_list_get_next(item);
ret = ompi_request_test(&(longreq->req_pml_req), &completed, NULL);
/* BWB - FIX ME - error handling */
if (completed > 0) {
longreq->req_comp_cb(longreq);
}
}
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
}
opal_progress();
}
/* Must hold module's lock before calling... */
static inline void
ompi_osc_rdma_flip_sendreqs(ompi_osc_rdma_module_t *module)
{
unsigned int *tmp;
OPAL_THREAD_LOCK(&(module->p2p_lock));
tmp = module->p2p_copy_num_pending_sendreqs;
module->p2p_copy_num_pending_sendreqs =
module->p2p_num_pending_sendreqs;
module->p2p_num_pending_sendreqs = tmp;
memset(module->p2p_num_pending_sendreqs, 0,
sizeof(unsigned int) * ompi_comm_size(module->p2p_comm));
tmp = module->m_copy_num_pending_sendreqs;
module->m_copy_num_pending_sendreqs =
module->m_num_pending_sendreqs;
module->m_num_pending_sendreqs = tmp;
memset(module->m_num_pending_sendreqs, 0,
sizeof(unsigned int) * ompi_comm_size(module->m_comm));
/* Copy in all the pending requests */
opal_list_join(&module->p2p_copy_pending_sendreqs,
opal_list_get_end(&module->p2p_copy_pending_sendreqs),
&module->p2p_pending_sendreqs);
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
opal_list_join(&module->m_copy_pending_sendreqs,
opal_list_get_end(&module->m_copy_pending_sendreqs),
&module->m_pending_sendreqs);
}
@ -89,115 +56,79 @@ ompi_osc_rdma_module_fence(int assert, ompi_win_t *win)
{
unsigned int incoming_reqs;
int ret = OMPI_SUCCESS, i;
ompi_osc_rdma_module_t *module = GET_MODULE(win);
int num_outgoing = 0;
if (0 != (assert & MPI_MODE_NOPRECEDE)) {
size_t num_pending;
/* check that the user didn't lie to us - since NOPRECEDED
must be specified by all processes if it is specified by
any process, if we see this it is safe to assume that there
are no pending operations anywhere needed to close out this
epoch. */
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
num_pending = opal_list_get_size(&(P2P_MODULE(win)->p2p_pending_sendreqs));
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
if (0 != num_pending) {
if (0 != opal_list_get_size(&(module->m_pending_sendreqs))) {
return MPI_ERR_RMA_SYNC;
}
} else {
opal_list_item_t *item;
ompi_osc_rdma_flip_sendreqs(P2P_MODULE(win));
/* "atomically" copy all the data we're going to be modifying
into the copy... */
OPAL_THREAD_LOCK(&module->m_lock);
ompi_osc_rdma_flip_sendreqs(module);
OPAL_THREAD_UNLOCK(&module->m_lock);
switch (P2P_MODULE(win)->p2p_fence_sync_type) {
num_outgoing = opal_list_get_size(&(module->m_copy_pending_sendreqs));
/* find out how much data everyone is going to send us. Need
to have the lock during this period so that we have a sane
view of the number of sendreqs */
case OSC_SYNC_REDUCE_SCATTER:
ret = P2P_MODULE(win)->p2p_comm->
c_coll.coll_reduce_scatter(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs,
&incoming_reqs,
P2P_MODULE(win)->p2p_fence_coll_counts,
MPI_UNSIGNED,
MPI_SUM,
P2P_MODULE(win)->p2p_comm);
break;
case OSC_SYNC_ALLREDUCE:
ret = P2P_MODULE(win)->p2p_comm->
c_coll.coll_allreduce(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs,
P2P_MODULE(win)->p2p_fence_coll_results,
ompi_comm_size(P2P_MODULE(win)->p2p_comm),
MPI_UNSIGNED,
MPI_SUM,
P2P_MODULE(win)->p2p_comm);
incoming_reqs = P2P_MODULE(win)->
p2p_fence_coll_results[P2P_MODULE(win)->p2p_comm->c_my_rank];
break;
case OSC_SYNC_ALLTOALL:
ret = P2P_MODULE(win)->p2p_comm->
c_coll.coll_alltoall(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs,
1,
MPI_UNSIGNED,
P2P_MODULE(win)->p2p_fence_coll_results,
1,
MPI_UNSIGNED,
P2P_MODULE(win)->p2p_comm);
incoming_reqs = 0;
for (i = 0 ; i < ompi_comm_size(P2P_MODULE(win)->p2p_comm) ; ++i) {
incoming_reqs += P2P_MODULE(win)->p2p_fence_coll_results[i];
}
break;
default:
assert(0 == 1);
}
/* find out how much data everyone is going to send us. Need
to have the lock during this period so that we have a sane
view of the number of sendreqs */
ret = module->m_comm->
c_coll.coll_reduce_scatter(module->m_copy_num_pending_sendreqs,
&incoming_reqs,
module->m_fence_coll_counts,
MPI_UNSIGNED,
MPI_SUM,
module->m_comm);
if (OMPI_SUCCESS != ret) {
/* put the stupid data back for the user. This is not
cheap, but the user lost his data if we don't. */
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
opal_list_join(&P2P_MODULE(win)->p2p_pending_sendreqs,
opal_list_get_end(&P2P_MODULE(win)->p2p_pending_sendreqs),
&P2P_MODULE(win)->p2p_copy_pending_sendreqs);
OPAL_THREAD_LOCK(&(module->m_lock));
opal_list_join(&module->m_pending_sendreqs,
opal_list_get_end(&module->m_pending_sendreqs),
&module->m_copy_pending_sendreqs);
for (i = 0 ; i < ompi_comm_size(P2P_MODULE(win)->p2p_comm) ; ++i) {
P2P_MODULE(win)->p2p_num_pending_sendreqs[i] +=
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[i];
for (i = 0 ; i < ompi_comm_size(module->m_comm) ; ++i) {
module->m_num_pending_sendreqs[i] +=
module->m_copy_num_pending_sendreqs[i];
}
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
OPAL_THREAD_UNLOCK(&(module->m_lock));
return ret;
}
/* possible we've already received a couple in messages, so
atomicall add however many we're going to wait for */
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_in), incoming_reqs);
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out),
(int32_t)opal_list_get_size(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)));
opal_output_verbose(50, ompi_osc_base_output,
"fence: waiting on %d in and %d out",
P2P_MODULE(win)->p2p_num_pending_in,
P2P_MODULE(win)->p2p_num_pending_out);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"fence: waiting on %d in and %d out",
module->m_num_pending_in,
module->m_num_pending_out));
/* try to start all the requests. We've copied everything we
need out of pending_sendreqs, so don't need the lock
here */
while (NULL !=
(item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) {
(item = opal_list_remove_first(&(module->m_copy_pending_sendreqs)))) {
ompi_osc_rdma_sendreq_t *req =
(ompi_osc_rdma_sendreq_t*) item;
ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), req);
ret = ompi_osc_rdma_sendreq_send(module, req);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(5, ompi_osc_base_output,
"fence: failure in starting sendreq (%d). Will try later.",
ret);
opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);
"fence: failure in starting sendreq (%d). "
"Will try later.",
ret);
opal_list_append(&(module->m_copy_pending_sendreqs), item);
if (OMPI_ERR_TEMP_OUT_OF_RESOURCE == ret ||
OMPI_ERR_OUT_OF_RESOURCE == ret) {
@ -206,11 +137,18 @@ ompi_osc_rdma_module_fence(int assert, ompi_win_t *win)
}
}
OPAL_THREAD_LOCK(&module->m_lock);
/* possible we've already received a couple in messages, so
atomicall add however many we're going to wait for */
module->m_num_pending_in += incoming_reqs;
module->m_num_pending_out += num_outgoing;
/* now we know how many things we're waiting for - wait for them... */
while (P2P_MODULE(win)->p2p_num_pending_in > 0 ||
0 != P2P_MODULE(win)->p2p_num_pending_out) {
ompi_osc_rdma_progress(P2P_MODULE(win));
while (module->m_num_pending_in > 0 ||
0 != module->m_num_pending_out) {
opal_condition_wait(&module->m_cond, &module->m_lock);
}
OPAL_THREAD_UNLOCK(&module->m_lock);
}
/* all transfers are done - back to the real world we go */
@ -229,19 +167,27 @@ ompi_osc_rdma_module_start(ompi_group_t *group,
int assert,
ompi_win_t *win)
{
int i;
int i, ret = OMPI_SUCCESS;
ompi_osc_rdma_module_t *module = GET_MODULE(win);
OBJ_RETAIN(group);
/* BWB - do I need this? */
ompi_group_increment_proc_count(group);
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
assert(NULL == P2P_MODULE(win)->p2p_sc_group);
P2P_MODULE(win)->p2p_sc_group = group;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
OPAL_THREAD_LOCK(&module->m_lock);
if (NULL != module->m_sc_group) {
OPAL_THREAD_UNLOCK(&module->m_lock);
ret = MPI_ERR_RMA_SYNC;
goto clean;
}
module->m_sc_group = group;
memset(P2P_MODULE(win)->p2p_sc_remote_active_ranks, 0,
sizeof(bool) * ompi_comm_size(P2P_MODULE(win)->p2p_comm));
/* possible we've already received a couple in messages, so
add however many we're going to wait for */
module->m_num_post_msgs += ompi_group_size(module->m_sc_group);
OPAL_THREAD_UNLOCK(&(module->m_lock));
memset(module->m_sc_remote_active_ranks, 0,
sizeof(bool) * ompi_comm_size(module->m_comm));
/* for each process in the specified group, find it's rank in our
communicator, store those indexes, and set the true / false in
@ -251,36 +197,37 @@ ompi_osc_rdma_module_start(ompi_group_t *group,
/* no need to increment ref count - the communicator isn't
going anywhere while we're here */
ompi_group_t *comm_group = P2P_MODULE(win)->p2p_comm->c_local_group;
ompi_group_t *comm_group = module->m_comm->c_local_group;
/* find the rank in the communicator associated with this windows */
for (j = 0 ;
j < ompi_group_size(comm_group) ;
++j) {
if (P2P_MODULE(win)->p2p_sc_group->grp_proc_pointers[i] ==
if (module->m_sc_group->grp_proc_pointers[i] ==
comm_group->grp_proc_pointers[j]) {
comm_rank = j;
break;
}
}
if (comm_rank == -1) {
return MPI_ERR_RMA_SYNC;
ret = MPI_ERR_RMA_SYNC;
goto clean;
}
P2P_MODULE(win)->p2p_sc_remote_active_ranks[comm_rank] = true;
P2P_MODULE(win)->p2p_sc_remote_ranks[i] = comm_rank;
module->m_sc_remote_active_ranks[comm_rank] = true;
module->m_sc_remote_ranks[i] = comm_rank;
}
/* Set our mode to access w/ start */
ompi_win_remove_mode(win, OMPI_WIN_FENCE);
ompi_win_append_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_STARTED);
/* possible we've already received a couple in messages, so
atomicall add however many we're going to wait for */
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_post_msgs),
ompi_group_size(P2P_MODULE(win)->p2p_sc_group));
return OMPI_SUCCESS;
clean:
ompi_group_decrement_proc_count(group);
OBJ_RELEASE(group);
return ret;
}
@ -291,25 +238,32 @@ ompi_osc_rdma_module_complete(ompi_win_t *win)
int ret = OMPI_SUCCESS;
ompi_group_t *group;
opal_list_item_t *item;
ompi_osc_rdma_module_t *module = GET_MODULE(win);
/* wait for all the post messages */
while (0 != P2P_MODULE(win)->p2p_num_post_msgs) {
ompi_osc_rdma_progress(P2P_MODULE(win));
OPAL_THREAD_LOCK(&module->m_lock);
while (0 != module->m_num_post_msgs) {
opal_condition_wait(&module->m_cond, &module->m_lock);
}
ompi_osc_rdma_flip_sendreqs(P2P_MODULE(win));
ompi_osc_rdma_flip_sendreqs(module);
/* for each process in group, send a control message with number
of updates coming, then start all the requests */
for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_sc_group) ; ++i) {
int comm_rank = P2P_MODULE(win)->p2p_sc_remote_ranks[i];
for (i = 0 ; i < ompi_group_size(module->m_sc_group) ; ++i) {
int comm_rank = module->m_sc_remote_ranks[i];
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out),
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank]);
ret = ompi_osc_rdma_control_send(P2P_MODULE(win),
P2P_MODULE(win)->p2p_sc_group->grp_proc_pointers[i],
module->m_num_pending_out +=
module->m_copy_num_pending_sendreqs[comm_rank];
}
OPAL_THREAD_UNLOCK(&module->m_lock);
for (i = 0 ; i < ompi_group_size(module->m_sc_group) ; ++i) {
int comm_rank = module->m_sc_remote_ranks[i];
ret = ompi_osc_rdma_control_send(module,
module->m_sc_group->grp_proc_pointers[i],
OMPI_OSC_RDMA_HDR_COMPLETE,
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank],
module->m_copy_num_pending_sendreqs[comm_rank],
0);
assert(ret == OMPI_SUCCESS);
}
@ -318,68 +272,68 @@ ompi_osc_rdma_module_complete(ompi_win_t *win)
need out of pending_sendreqs, so don't need the lock
here */
while (NULL !=
(item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) {
(item = opal_list_remove_first(&(module->m_copy_pending_sendreqs)))) {
ompi_osc_rdma_sendreq_t *req =
(ompi_osc_rdma_sendreq_t*) item;
ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), req);
ret = ompi_osc_rdma_sendreq_send(module, req);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(5, ompi_osc_base_output,
"complete: failure in starting sendreq (%d). Will try later.",
ret);
opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);
opal_list_append(&(module->m_copy_pending_sendreqs), item);
}
}
/* wait for all the requests */
ompi_osc_rdma_progress(P2P_MODULE(win));
while (0 != P2P_MODULE(win)->p2p_num_pending_out) {
ompi_osc_rdma_progress(P2P_MODULE(win));
OPAL_THREAD_LOCK(&module->m_lock);
while (0 != module->m_num_pending_out) {
opal_condition_wait(&module->m_cond, &module->m_lock);
}
group = module->m_sc_group;
module->m_sc_group = NULL;
OPAL_THREAD_UNLOCK(&(module->m_lock));
/* remove WIN_POSTED from our mode */
ompi_win_remove_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_STARTED);
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
group = P2P_MODULE(win)->p2p_sc_group;
P2P_MODULE(win)->p2p_sc_group = NULL;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
/* BWB - do I need this? */
ompi_group_decrement_proc_count(group);
OBJ_RELEASE(group);
return ret;
}
int
ompi_osc_rdma_module_post(ompi_group_t *group,
int assert,
ompi_win_t *win)
{
int i;
ompi_osc_rdma_module_t *module = GET_MODULE(win);
OBJ_RETAIN(group);
/* BWB - do I need this? */
ompi_group_increment_proc_count(group);
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
assert(NULL == P2P_MODULE(win)->p2p_pw_group);
P2P_MODULE(win)->p2p_pw_group = group;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
OPAL_THREAD_LOCK(&(module->m_lock));
assert(NULL == module->m_pw_group);
module->m_pw_group = group;
/* Set our mode to expose w/ post */
ompi_win_remove_mode(win, OMPI_WIN_FENCE);
ompi_win_append_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED);
/* list how many complete counters we're still waiting on */
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_complete_msgs),
ompi_group_size(P2P_MODULE(win)->p2p_pw_group));
module->m_num_complete_msgs +=
ompi_group_size(module->m_pw_group);
OPAL_THREAD_UNLOCK(&(module->m_lock));
/* send a hello counter to everyone in group */
for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_pw_group) ; ++i) {
ompi_osc_rdma_control_send(P2P_MODULE(win),
for (i = 0 ; i < ompi_group_size(module->m_pw_group) ; ++i) {
ompi_osc_rdma_control_send(module,
group->grp_proc_pointers[i],
OMPI_OSC_RDMA_HDR_POST, 1, 0);
}
@ -392,20 +346,20 @@ int
ompi_osc_rdma_module_wait(ompi_win_t *win)
{
ompi_group_t *group;
ompi_osc_rdma_module_t *module = GET_MODULE(win);
while (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||
0 != (P2P_MODULE(win)->p2p_num_complete_msgs)) {
ompi_osc_rdma_progress(P2P_MODULE(win));
OPAL_THREAD_LOCK(&module->m_lock);
while (0 != (module->m_num_pending_in) ||
0 != (module->m_num_complete_msgs)) {
opal_condition_wait(&module->m_cond, &module->m_lock);
}
group = module->m_pw_group;
module->m_pw_group = NULL;
OPAL_THREAD_UNLOCK(&module->m_lock);
ompi_win_remove_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED);
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
group = P2P_MODULE(win)->p2p_pw_group;
P2P_MODULE(win)->p2p_pw_group = NULL;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
/* BWB - do I need this? */
ompi_group_decrement_proc_count(group);
OBJ_RELEASE(group);
@ -418,27 +372,27 @@ ompi_osc_rdma_module_test(ompi_win_t *win,
int *flag)
{
ompi_group_t *group;
ompi_osc_rdma_module_t *module = GET_MODULE(win);
if (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||
0 != (P2P_MODULE(win)->p2p_num_complete_msgs)) {
ompi_osc_rdma_progress(P2P_MODULE(win));
if (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||
0 != (P2P_MODULE(win)->p2p_num_complete_msgs)) {
*flag = 0;
return OMPI_SUCCESS;
}
#if !OMPI_ENABLE_PROGRESS_THREADS
opal_progress();
#endif
if (0 != (module->m_num_pending_in) ||
0 != (module->m_num_complete_msgs)) {
*flag = 0;
return OMPI_SUCCESS;
}
*flag = 1;
ompi_win_remove_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED);
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
group = P2P_MODULE(win)->p2p_pw_group;
P2P_MODULE(win)->p2p_pw_group = NULL;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
OPAL_THREAD_LOCK(&(module->m_lock));
group = module->m_pw_group;
module->m_pw_group = NULL;
OPAL_THREAD_UNLOCK(&(module->m_lock));
/* BWB - do I need this? */
ompi_group_decrement_proc_count(group);
OBJ_RELEASE(group);
@ -462,7 +416,8 @@ ompi_osc_rdma_module_lock(int lock_type,
int assert,
ompi_win_t *win)
{
ompi_proc_t *proc = ompi_comm_peer_lookup( P2P_MODULE(win)->p2p_comm, target );
ompi_osc_rdma_module_t *module = GET_MODULE(win);
ompi_proc_t *proc = ompi_comm_peer_lookup( module->m_comm, target );
assert(lock_type != 0);
@ -470,15 +425,15 @@ ompi_osc_rdma_module_lock(int lock_type,
ompi_win_remove_mode(win, OMPI_WIN_FENCE);
ompi_win_append_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_LOCK_ACCESS);
opal_output_verbose(50, ompi_osc_base_output,
"%d sending lock request to %d",
P2P_MODULE(win)->p2p_comm->c_my_rank, target);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d sending lock request to %d",
module->m_comm->c_my_rank, target));
/* generate a lock request */
ompi_osc_rdma_control_send(P2P_MODULE(win),
proc,
OMPI_OSC_RDMA_HDR_LOCK_REQ,
P2P_MODULE(win)->p2p_comm->c_my_rank,
lock_type);
ompi_osc_rdma_control_send(module,
proc,
OMPI_OSC_RDMA_HDR_LOCK_REQ,
module->m_comm->c_my_rank,
lock_type);
/* return */
return OMPI_SUCCESS;
@ -492,51 +447,61 @@ ompi_osc_rdma_module_unlock(int target,
int32_t out_count;
opal_list_item_t *item;
int ret;
ompi_proc_t *proc = ompi_comm_peer_lookup( P2P_MODULE(win)->p2p_comm, target );
ompi_osc_rdma_module_t *module = GET_MODULE(win);
ompi_proc_t *proc = ompi_comm_peer_lookup( module->m_comm, target );
while (0 == P2P_MODULE(win)->p2p_lock_received_ack) {
ompi_osc_rdma_progress(P2P_MODULE(win));
OPAL_THREAD_LOCK(&module->m_lock);
while (0 == module->m_lock_received_ack) {
opal_condition_wait(&module->m_cond, &module->m_lock);
}
P2P_MODULE(win)->p2p_lock_received_ack = 0;
module->m_lock_received_ack -= 1;
/* start all the requests */
ompi_osc_rdma_flip_sendreqs(P2P_MODULE(win));
ompi_osc_rdma_flip_sendreqs(module);
/* try to start all the requests. We've copied everything we need
out of pending_sendreqs, so don't need the lock here */
out_count = (int32_t)opal_list_get_size(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs));
out_count = opal_list_get_size(&module->m_copy_pending_sendreqs);
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out), out_count);
/* we want to send all the requests, plus we wait for one more
completion event for the control message ack from the unlocker
saying we're done */
module->m_num_pending_out += (out_count + 1);
OPAL_THREAD_UNLOCK(&module->m_lock);
/* send the unlock request */
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d sending unlock request to %d",
module->m_comm->c_my_rank, target));
ompi_osc_rdma_control_send(module,
proc,
OMPI_OSC_RDMA_HDR_UNLOCK_REQ,
module->m_comm->c_my_rank,
out_count);
while (NULL !=
(item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) {
(item = opal_list_remove_first(&(module->m_copy_pending_sendreqs)))) {
ompi_osc_rdma_sendreq_t *req =
(ompi_osc_rdma_sendreq_t*) item;
ret = ompi_osc_rdma_sendreq_send(P2P_MODULE(win), req);
ret = ompi_osc_rdma_sendreq_send(module, req);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(5, ompi_osc_base_output,
"unlock: failure in starting sendreq (%d). Will try later.",
ret);
opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);
opal_list_append(&(module->m_copy_pending_sendreqs), item);
break;
}
}
/* wait for all the requests */
while (0 != P2P_MODULE(win)->p2p_num_pending_out) {
ompi_osc_rdma_progress(P2P_MODULE(win));
OPAL_THREAD_LOCK(&module->m_lock);
while (0 != module->m_num_pending_out) {
opal_condition_wait(&module->m_cond, &module->m_lock);
}
/* send the unlock request */
opal_output_verbose(50, ompi_osc_base_output,
"%d sending unlock request to %d",
P2P_MODULE(win)->p2p_comm->c_my_rank, target);
ompi_osc_rdma_control_send(P2P_MODULE(win),
proc,
OMPI_OSC_RDMA_HDR_UNLOCK_REQ,
P2P_MODULE(win)->p2p_comm->c_my_rank,
out_count);
OPAL_THREAD_LOCK(&module->m_lock);
/* set our mode on the window */
ompi_win_remove_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_LOCK_ACCESS);
@ -547,57 +512,57 @@ ompi_osc_rdma_module_unlock(int target,
int
ompi_osc_rdma_passive_lock(ompi_osc_rdma_module_t *module,
int32_t origin,
int32_t lock_type)
int32_t origin,
int32_t lock_type)
{
bool send_ack = false;
int ret = OMPI_SUCCESS;
ompi_proc_t *proc = ompi_comm_peer_lookup( module->p2p_comm, origin );
ompi_proc_t *proc = ompi_comm_peer_lookup( module->m_comm, origin );
ompi_osc_rdma_pending_lock_t *new_pending;
OPAL_THREAD_LOCK(&(module->p2p_lock));
OPAL_THREAD_LOCK(&(module->m_lock));
if (lock_type == MPI_LOCK_EXCLUSIVE) {
if (module->p2p_lock_status == 0) {
module->p2p_lock_status = MPI_LOCK_EXCLUSIVE;
ompi_win_append_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH);
if (module->m_lock_status == 0) {
module->m_lock_status = MPI_LOCK_EXCLUSIVE;
ompi_win_append_mode(module->m_win, OMPI_WIN_EXPOSE_EPOCH);
send_ack = true;
} else {
opal_output_verbose(50, ompi_osc_base_output,
"%d queuing lock request from %d (%d)",
module->p2p_comm->c_my_rank, origin, lock_type);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d queuing lock request from %d (%d)",
module->m_comm->c_my_rank, origin, lock_type));
new_pending = OBJ_NEW(ompi_osc_rdma_pending_lock_t);
new_pending->proc = proc;
new_pending->lock_type = lock_type;
opal_list_append(&(module->p2p_locks_pending), &(new_pending->super));
opal_list_append(&(module->m_locks_pending), &(new_pending->super));
}
} else if (lock_type == MPI_LOCK_SHARED) {
if (module->p2p_lock_status != MPI_LOCK_EXCLUSIVE) {
module->p2p_lock_status = MPI_LOCK_SHARED;
module->p2p_shared_count++;
ompi_win_append_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH);
if (module->m_lock_status != MPI_LOCK_EXCLUSIVE) {
module->m_lock_status = MPI_LOCK_SHARED;
module->m_shared_count++;
ompi_win_append_mode(module->m_win, OMPI_WIN_EXPOSE_EPOCH);
send_ack = true;
} else {
opal_output_verbose(50, ompi_osc_base_output,
"queuing lock request from %d (%d) lock_type:%d",
module->p2p_comm->c_my_rank, origin, lock_type);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"queuing lock request from %d (%d) lock_type:%d",
module->m_comm->c_my_rank, origin, lock_type));
new_pending = OBJ_NEW(ompi_osc_rdma_pending_lock_t);
new_pending->proc = proc;
new_pending->lock_type = lock_type;
opal_list_append(&(module->p2p_locks_pending), &(new_pending->super));
opal_list_append(&(module->m_locks_pending), &(new_pending->super));
}
} else {
ret = OMPI_ERROR;
}
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
OPAL_THREAD_UNLOCK(&(module->m_lock));
if (send_ack) {
opal_output_verbose(50, ompi_osc_base_output,
"%d sending lock ack to %d",
module->p2p_comm->c_my_rank, origin);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"%d sending lock ack to %d",
module->m_comm->c_my_rank, origin));
ompi_osc_rdma_control_send(module, proc,
OMPI_OSC_RDMA_HDR_LOCK_REQ,
module->p2p_comm->c_my_rank,
OMPI_SUCCESS);
OMPI_OSC_RDMA_HDR_LOCK_REQ,
module->m_comm->c_my_rank,
OMPI_SUCCESS);
}
return OMPI_SUCCESS;
@ -606,46 +571,76 @@ ompi_osc_rdma_passive_lock(ompi_osc_rdma_module_t *module,
int
ompi_osc_rdma_passive_unlock(ompi_osc_rdma_module_t *module,
int32_t origin,
int32_t count)
int32_t origin,
int32_t count)
{
ompi_proc_t *proc = ompi_comm_peer_lookup( module->m_comm, origin );
ompi_osc_rdma_pending_lock_t *new_pending = NULL;
assert(module->m_lock_status != 0);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"received unlock request from %d with %d requests\n",
origin, count));
new_pending = OBJ_NEW(ompi_osc_rdma_pending_lock_t);
new_pending->proc = proc;
new_pending->lock_type = 0;
OPAL_THREAD_LOCK(&(module->m_lock));
module->m_num_pending_in += count;
opal_list_append(&module->m_unlocks_pending, &(new_pending->super));
OPAL_THREAD_UNLOCK(&(module->m_lock));
return ompi_osc_rdma_passive_unlock_complete(module);
}
int
ompi_osc_rdma_passive_unlock_complete(ompi_osc_rdma_module_t *module)
{
ompi_osc_rdma_pending_lock_t *new_pending = NULL;
assert(module->p2p_lock_status != 0);
if (module->m_num_pending_in != 0) return OMPI_SUCCESS;
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), count);
while (0 != module->p2p_num_pending_in) {
ompi_osc_rdma_progress(module);
OPAL_THREAD_LOCK(&module->m_lock);
if (module->m_lock_status == MPI_LOCK_EXCLUSIVE) {
ompi_win_remove_mode(module->m_win, OMPI_WIN_EXPOSE_EPOCH);
module->m_lock_status = 0;
} else {
module->m_shared_count -= opal_list_get_size(&module->m_unlocks_pending);
if (module->m_shared_count == 0) {
ompi_win_remove_mode(module->m_win, OMPI_WIN_EXPOSE_EPOCH);
module->m_lock_status = 0;
}
}
OPAL_THREAD_LOCK(&(module->p2p_lock));
if (module->p2p_lock_status == MPI_LOCK_EXCLUSIVE) {
ompi_win_remove_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH);
module->p2p_lock_status = 0;
} else {
module->p2p_shared_count--;
if (module->p2p_shared_count == 0) {
ompi_win_remove_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH);
module->p2p_lock_status = 0;
}
/* issue whichever unlock acks we should issue */
while (NULL != (new_pending = (ompi_osc_rdma_pending_lock_t*)
opal_list_remove_first(&module->m_unlocks_pending))) {
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"sending unlock reply to proc"));
ompi_osc_rdma_control_send(module,
new_pending->proc,
OMPI_OSC_RDMA_HDR_UNLOCK_REPLY,
OMPI_SUCCESS, OMPI_SUCCESS);
OBJ_DESTRUCT(new_pending);
}
/* if we were really unlocked, see if we have more to process */
new_pending = (ompi_osc_rdma_pending_lock_t*)
opal_list_remove_first(&(module->p2p_locks_pending));
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
opal_list_remove_first(&(module->m_locks_pending));
OPAL_THREAD_UNLOCK(&(module->m_lock));
if (NULL != new_pending) {
opal_output_verbose(50, ompi_osc_base_output,
"sending lock request to proc");
ompi_win_append_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
"sending lock request to proc"));
ompi_win_append_mode(module->m_win, OMPI_WIN_EXPOSE_EPOCH);
/* set lock state and generate a lock request */
module->p2p_lock_status = new_pending->lock_type;
module->m_lock_status = new_pending->lock_type;
ompi_osc_rdma_control_send(module,
new_pending->proc,
OMPI_OSC_RDMA_HDR_LOCK_REQ,
module->p2p_comm->c_my_rank,
module->m_comm->c_my_rank,
OMPI_SUCCESS);
OBJ_DESTRUCT(new_pending);
}