Some much needed cleanup of the point-to-point one-sided component...
* Combine polling of the long requests and buffer requests into one type, and in one place * Associate the list of requests to poll with the component, not the individual modules * add progress thread that sits on the OMPI request structure and wakes up at the appropriate time to poll the message list. Not the best, but without some asynch notification from the PML that a given set of requests has completed, there isn't much better * Instead of calling opal_progress() all over the place, move to using the condition variables like the rest of the project. Has the advantage of moving it slightly futher along in the becoming thread safe thing * Fix a problem with the passive side of unlock where it could go recursive and cause all kinds of problems, especially when progress threads are used. Instead, have two parts of passive unlock -- one to start the unlock, and another to complete the lock and send the ack back. The data moving code trips the second at the right time. This commit was SVN r14703.
Этот коммит содержится в:
родитель
77a80b2612
Коммит
2b4b754925
@ -30,6 +30,8 @@ pt2pt_sources = \
|
||||
osc_pt2pt_header.h \
|
||||
osc_pt2pt_longreq.h \
|
||||
osc_pt2pt_longreq.c \
|
||||
osc_pt2pt_mpireq.h \
|
||||
osc_pt2pt_mpireq.c \
|
||||
osc_pt2pt_obj_convert.h \
|
||||
osc_pt2pt_obj_convert.c \
|
||||
osc_pt2pt_replyreq.h \
|
||||
|
@ -33,18 +33,17 @@ ompi_osc_pt2pt_module_free(ompi_win_t *win)
|
||||
int tmp;
|
||||
ompi_osc_pt2pt_module_t *module = P2P_MODULE(win);
|
||||
|
||||
OPAL_THREAD_LOCK(&module->p2p_lock);
|
||||
while (OMPI_WIN_EXPOSE_EPOCH & ompi_win_get_mode(win)) {
|
||||
opal_progress();
|
||||
opal_condition_wait(&module->p2p_cond, &module->p2p_lock);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->p2p_lock);
|
||||
|
||||
/* finish with a barrier */
|
||||
if (ompi_group_size(win->w_group) > 1) {
|
||||
ret = module->p2p_comm->c_coll.coll_barrier(module->p2p_comm);
|
||||
}
|
||||
|
||||
/* remove window information */
|
||||
win->w_osc_module = NULL;
|
||||
|
||||
/* remove from component information */
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
tmp = opal_hash_table_remove_value_uint32(&mca_osc_pt2pt_component.p2p_c_modules,
|
||||
@ -53,40 +52,47 @@ ompi_osc_pt2pt_module_free(ompi_win_t *win)
|
||||
ret = (ret != OMPI_SUCCESS) ? ret : tmp;
|
||||
|
||||
if (0 == opal_hash_table_get_size(&mca_osc_pt2pt_component.p2p_c_modules)) {
|
||||
/* stop progress thread */
|
||||
opal_progress_unregister(ompi_osc_pt2pt_progress);
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
mca_osc_pt2pt_component.p2p_c_thread_run = false;
|
||||
opal_condition_broadcast(&ompi_request_cond);
|
||||
opal_thread_join(&mca_osc_pt2pt_component.p2p_c_thread, &ret);
|
||||
#else
|
||||
opal_progress_unregister(ompi_osc_pt2pt_component_progress);
|
||||
#endif
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
|
||||
OBJ_DESTRUCT(&(module->p2p_locks_pending));
|
||||
win->w_osc_module = NULL;
|
||||
|
||||
free(module->p2p_sc_remote_ranks);
|
||||
free(module->p2p_sc_remote_active_ranks);
|
||||
assert(module->p2p_sc_group == NULL);
|
||||
assert(module->p2p_pw_group == NULL);
|
||||
free(module->p2p_fence_coll_counts);
|
||||
OBJ_DESTRUCT(&module->p2p_unlocks_pending);
|
||||
OBJ_DESTRUCT(&module->p2p_locks_pending);
|
||||
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_acc_lock);
|
||||
OBJ_DESTRUCT(&module->p2p_cond);
|
||||
OBJ_DESTRUCT(&module->p2p_lock);
|
||||
|
||||
free(module->p2p_copy_num_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&(module->p2p_copy_pending_sendreqs));
|
||||
if (NULL != module->p2p_sc_remote_ranks) {
|
||||
free(module->p2p_sc_remote_ranks);
|
||||
}
|
||||
if (NULL != module->p2p_sc_remote_active_ranks) {
|
||||
free(module->p2p_sc_remote_active_ranks);
|
||||
}
|
||||
if (NULL != module->p2p_fence_coll_counts) {
|
||||
free(module->p2p_fence_coll_counts);
|
||||
}
|
||||
if (NULL != module->p2p_copy_num_pending_sendreqs) {
|
||||
free(module->p2p_copy_num_pending_sendreqs);
|
||||
}
|
||||
if (NULL != module->p2p_num_pending_sendreqs) {
|
||||
free(module->p2p_num_pending_sendreqs);
|
||||
}
|
||||
if (NULL != module->p2p_comm) ompi_comm_free(&module->p2p_comm);
|
||||
|
||||
OBJ_DESTRUCT(&(module->p2p_long_msgs));
|
||||
|
||||
free(module->p2p_num_pending_sendreqs);
|
||||
|
||||
OBJ_DESTRUCT(&(module->p2p_pending_sendreqs));
|
||||
|
||||
OBJ_DESTRUCT(&(module->p2p_pending_control_sends));
|
||||
|
||||
ompi_comm_free(&(module->p2p_comm));
|
||||
module->p2p_comm = NULL;
|
||||
|
||||
module->p2p_win = NULL;
|
||||
|
||||
OBJ_DESTRUCT(&(module->p2p_acc_lock));
|
||||
OBJ_DESTRUCT(&(module->p2p_lock));
|
||||
|
||||
free(module);
|
||||
#if OMPI_ENABLE_DEBUG
|
||||
memset(module, 0, sizeof(ompi_osc_base_module_t));
|
||||
#endif
|
||||
if (NULL != module) free(module);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -26,9 +26,7 @@
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "ompi/request/request.h"
|
||||
|
||||
#if defined(__cplusplus) || defined(c_plusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
BEGIN_C_DECLS
|
||||
|
||||
#define CONTROL_MSG_TAG (-200)
|
||||
|
||||
@ -42,14 +40,18 @@ struct ompi_osc_pt2pt_component_t {
|
||||
/** lock access to datastructures in the component structure */
|
||||
opal_mutex_t p2p_c_lock;
|
||||
|
||||
/** List of ompi_osc_pt2pt_module_ts currently in existance.
|
||||
Needed so that received fragments can be dispatched to the
|
||||
correct module */
|
||||
/** List of active modules (windows) */
|
||||
opal_hash_table_t p2p_c_modules;
|
||||
|
||||
/** max size of eager message */
|
||||
size_t p2p_c_eager_size;
|
||||
|
||||
/** Lock for request management */
|
||||
opal_mutex_t p2p_c_request_lock;
|
||||
|
||||
/** Condition variable for request management */
|
||||
opal_condition_t p2p_c_request_cond;
|
||||
|
||||
/** free list of ompi_osc_pt2pt_sendreq_t structures */
|
||||
opal_free_list_t p2p_c_sendreqs;
|
||||
/** free list of ompi_osc_pt2pt_replyreq_t structures */
|
||||
@ -58,6 +60,14 @@ struct ompi_osc_pt2pt_component_t {
|
||||
opal_free_list_t p2p_c_longreqs;
|
||||
/** free list for eager / control meessages */
|
||||
opal_free_list_t p2p_c_buffers;
|
||||
|
||||
/** list of outstanding requests, of type ompi_osc_pt2pt_mpireq_t */
|
||||
opal_list_t p2p_c_pending_requests;
|
||||
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
opal_thread_t p2p_c_thread;
|
||||
bool p2p_c_thread_run = false;
|
||||
#endif
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_component_t ompi_osc_pt2pt_component_t;
|
||||
|
||||
@ -69,6 +79,8 @@ struct ompi_osc_pt2pt_module_t {
|
||||
/** lock access to data structures in the current module */
|
||||
opal_mutex_t p2p_lock;
|
||||
|
||||
opal_condition_t p2p_cond;
|
||||
|
||||
/** lock for "atomic" window updates from reductions */
|
||||
opal_mutex_t p2p_acc_lock;
|
||||
|
||||
@ -78,11 +90,6 @@ struct ompi_osc_pt2pt_module_t {
|
||||
/** communicator created with this window */
|
||||
ompi_communicator_t *p2p_comm;
|
||||
|
||||
/** control message receive request */
|
||||
struct ompi_request_t *p2p_cb_request;
|
||||
|
||||
opal_list_t p2p_pending_control_sends;
|
||||
|
||||
/** list of ompi_osc_pt2pt_sendreq_t structures, and includes all
|
||||
requests for this access epoch that have not already been
|
||||
started. p2p_lock must be held when modifying this field. */
|
||||
@ -120,10 +127,6 @@ struct ompi_osc_pt2pt_module_t {
|
||||
create a send tag */
|
||||
volatile int32_t p2p_tag_counter;
|
||||
|
||||
/** list of outstanding long messages that must be processes
|
||||
(ompi_osc_pt2pt_request_long). Protected by p2p_lock. */
|
||||
opal_list_t p2p_long_msgs;
|
||||
|
||||
opal_list_t p2p_copy_pending_sendreqs;
|
||||
unsigned int *p2p_copy_num_pending_sendreqs;
|
||||
|
||||
@ -131,9 +134,6 @@ struct ompi_osc_pt2pt_module_t {
|
||||
/* an array of <sizeof(p2p_comm)> ints, each containing the value
|
||||
1. */
|
||||
int *p2p_fence_coll_counts;
|
||||
/* an array of <sizeof(p2p_comm)> unsigned ints, for use in experimenting
|
||||
with different synchronization costs */
|
||||
unsigned int *p2p_fence_coll_results;
|
||||
|
||||
/* ********************* PWSC data ************************ */
|
||||
|
||||
@ -146,30 +146,17 @@ struct ompi_osc_pt2pt_module_t {
|
||||
int32_t p2p_lock_status; /* one of 0, MPI_LOCK_EXCLUSIVE, MPI_LOCK_SHARED */
|
||||
int32_t p2p_shared_count;
|
||||
opal_list_t p2p_locks_pending;
|
||||
opal_list_t p2p_unlocks_pending;
|
||||
int32_t p2p_lock_received_ack;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_t;
|
||||
|
||||
OMPI_MODULE_DECLSPEC extern ompi_osc_pt2pt_component_t mca_osc_pt2pt_component;
|
||||
|
||||
/*
|
||||
|
||||
/**
|
||||
* Helper macro for grabbing the module structure from a window instance
|
||||
*/
|
||||
#if OMPI_ENABLE_DEBUG
|
||||
|
||||
static inline ompi_osc_pt2pt_module_t* P2P_MODULE(struct ompi_win_t* win)
|
||||
{
|
||||
ompi_osc_pt2pt_module_t *module =
|
||||
(ompi_osc_pt2pt_module_t*) win->w_osc_module;
|
||||
|
||||
assert(module->p2p_win == win);
|
||||
|
||||
return module;
|
||||
}
|
||||
|
||||
#else
|
||||
#define P2P_MODULE(win) ((ompi_osc_pt2pt_module_t*) win->w_osc_module)
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Component functions
|
||||
@ -188,7 +175,7 @@ int ompi_osc_pt2pt_component_select(struct ompi_win_t *win,
|
||||
struct ompi_info_t *info,
|
||||
struct ompi_communicator_t *comm);
|
||||
|
||||
int ompi_osc_pt2pt_progress(void);
|
||||
int ompi_osc_pt2pt_component_progress(void);
|
||||
|
||||
int ompi_osc_pt2pt_request_test(ompi_request_t ** rptr,
|
||||
int *completed,
|
||||
@ -261,10 +248,8 @@ int ompi_osc_pt2pt_passive_lock(ompi_osc_pt2pt_module_t *module,
|
||||
int ompi_osc_pt2pt_passive_unlock(ompi_osc_pt2pt_module_t *module,
|
||||
int32_t origin,
|
||||
int32_t count);
|
||||
int ompi_osc_pt2pt_passive_unlock_complete(ompi_osc_pt2pt_module_t *module);
|
||||
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
END_C_DECLS
|
||||
|
||||
#endif /* OMPI_OSC_PT2PT_H */
|
||||
|
@ -16,11 +16,9 @@
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "osc_pt2pt.h"
|
||||
#include "osc_pt2pt_mpireq.h"
|
||||
#include "osc_pt2pt_buffer.h"
|
||||
|
||||
#include "opal/class/opal_free_list.h"
|
||||
|
||||
static void ompi_osc_pt2pt_buffer_construct(ompi_osc_pt2pt_buffer_t *buf)
|
||||
{
|
||||
buf->payload = buf + 1;
|
||||
@ -33,7 +31,7 @@ static void ompi_osc_pt2pt_buffer_destruct(ompi_osc_pt2pt_buffer_t *buf)
|
||||
}
|
||||
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_buffer_t, opal_free_list_item_t,
|
||||
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_buffer_t, ompi_osc_pt2pt_mpireq_t,
|
||||
ompi_osc_pt2pt_buffer_construct,
|
||||
ompi_osc_pt2pt_buffer_destruct);
|
||||
|
||||
|
@ -17,32 +17,22 @@
|
||||
#ifndef OMPI_OSC_PT2PT_BUFFER_H
|
||||
#define OMPI_OSC_PT2PT_BUFFER_H
|
||||
|
||||
#include "opal/class/opal_free_list.h"
|
||||
#include "ompi/request/request.h"
|
||||
#include "osc_pt2pt_mpireq.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
struct ompi_osc_pt2pt_buffer_t;
|
||||
|
||||
typedef void (*ompi_osc_pt2pt_buffer_completion_fn_t)(
|
||||
struct ompi_osc_pt2pt_buffer_t *buffer);
|
||||
|
||||
struct ompi_osc_pt2pt_buffer_t {
|
||||
opal_free_list_item_t super;
|
||||
ompi_request_t *request;
|
||||
ompi_status_public_t status;
|
||||
ompi_osc_pt2pt_buffer_completion_fn_t cbfunc;
|
||||
void *cbdata;
|
||||
ompi_osc_pt2pt_mpireq_t mpireq;
|
||||
void *payload;
|
||||
size_t len;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_buffer_t ompi_osc_pt2pt_buffer_t;
|
||||
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_buffer_t);
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
END_C_DECLS
|
||||
|
||||
#endif
|
||||
|
@ -34,8 +34,11 @@
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
#include "ompi/datatype/dt_arch.h"
|
||||
|
||||
static int ompi_osc_pt2pt_component_open(void);
|
||||
static void ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *buffer);
|
||||
static int component_open(void);
|
||||
static void component_fragment_cb(ompi_osc_pt2pt_mpireq_t *mpireq);
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
static void component_thread_fn(opal_obect_t *obj);
|
||||
#endif
|
||||
|
||||
ompi_osc_pt2pt_component_t mca_osc_pt2pt_component = {
|
||||
{ /* ompi_osc_base_component_t */
|
||||
@ -45,7 +48,7 @@ ompi_osc_pt2pt_component_t mca_osc_pt2pt_component = {
|
||||
OMPI_MAJOR_VERSION, /* MCA component major version */
|
||||
OMPI_MINOR_VERSION, /* MCA component minor version */
|
||||
OMPI_RELEASE_VERSION, /* MCA component release version */
|
||||
ompi_osc_pt2pt_component_open,
|
||||
component_open,
|
||||
NULL
|
||||
},
|
||||
{ /* mca_base_component_data */
|
||||
@ -81,6 +84,7 @@ ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_template = {
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/* look up parameters for configuring this window. The code first
|
||||
looks in the info structure passed by the user, then through mca
|
||||
parameters. */
|
||||
@ -122,7 +126,7 @@ check_config_value_bool(char *key, ompi_info_t *info)
|
||||
|
||||
|
||||
static int
|
||||
ompi_osc_pt2pt_component_open(void)
|
||||
component_open(void)
|
||||
{
|
||||
int tmp;
|
||||
|
||||
@ -158,6 +162,11 @@ ompi_osc_pt2pt_component_init(bool enable_progress_threads,
|
||||
opal_hash_table_t);
|
||||
opal_hash_table_init(&mca_osc_pt2pt_component.p2p_c_modules, 2);
|
||||
|
||||
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.p2p_c_request_lock,
|
||||
opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.p2p_c_request_cond,
|
||||
opal_condition_t);
|
||||
|
||||
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.p2p_c_sendreqs, opal_free_list_t);
|
||||
opal_free_list_init(&mca_osc_pt2pt_component.p2p_c_sendreqs,
|
||||
sizeof(ompi_osc_pt2pt_sendreq_t),
|
||||
@ -183,6 +192,9 @@ ompi_osc_pt2pt_component_init(bool enable_progress_threads,
|
||||
OBJ_CLASS(ompi_osc_pt2pt_buffer_t),
|
||||
1, -1, 1);
|
||||
|
||||
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.p2p_c_pending_requests,
|
||||
opal_list_t);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -191,19 +203,32 @@ int
|
||||
ompi_osc_pt2pt_component_finalize(void)
|
||||
{
|
||||
size_t num_modules;
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
void* ret;
|
||||
#endif
|
||||
|
||||
if (0 !=
|
||||
(num_modules = opal_hash_table_get_size(&mca_osc_pt2pt_component.p2p_c_modules))) {
|
||||
opal_output(ompi_osc_base_output,
|
||||
"WARNING: There were %d Windows created but not freed.",
|
||||
(int) num_modules);
|
||||
opal_progress_unregister(ompi_osc_pt2pt_progress);
|
||||
(int) num_modules);
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
mca_osc_pt2pt_component.p2p_c_thread_run = false;
|
||||
opal_condition_broadcast(&ompi_request_cond);
|
||||
opal_thread_join(&mca_osc_pt2pt_component.p2p_c_thread, &ret);
|
||||
#else
|
||||
opal_progress_unregister(ompi_osc_pt2pt_component_progress);
|
||||
#endif
|
||||
}
|
||||
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_pending_requests);
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_buffers);
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_longreqs);
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_replyreqs);
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_sendreqs);
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_request_lock);
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_request_cond);
|
||||
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_modules);
|
||||
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
|
||||
@ -226,14 +251,15 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
|
||||
ompi_info_t *info,
|
||||
ompi_communicator_t *comm)
|
||||
{
|
||||
ompi_osc_pt2pt_module_t *module;
|
||||
ompi_osc_pt2pt_module_t *module = NULL;
|
||||
int ret, i;
|
||||
ompi_osc_pt2pt_buffer_t *buffer;
|
||||
opal_free_list_item_t *item;
|
||||
ompi_osc_pt2pt_buffer_t *buffer = NULL;
|
||||
opal_free_list_item_t *item = NULL;
|
||||
|
||||
/* create module structure */
|
||||
module = (ompi_osc_pt2pt_module_t*)malloc(sizeof(ompi_osc_pt2pt_module_t));
|
||||
if (NULL == module) return OMPI_ERROR;
|
||||
module = (ompi_osc_pt2pt_module_t*)
|
||||
calloc(1, sizeof(ompi_osc_pt2pt_module_t));
|
||||
if (NULL == module) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
|
||||
/* fill in the function pointer part */
|
||||
memcpy(module, &ompi_osc_pt2pt_module_template,
|
||||
@ -241,33 +267,21 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
|
||||
|
||||
/* initialize the p2p part */
|
||||
OBJ_CONSTRUCT(&(module->p2p_lock), opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&(module->p2p_cond), opal_condition_t);
|
||||
OBJ_CONSTRUCT(&(module->p2p_acc_lock), opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&module->p2p_pending_sendreqs, opal_list_t);
|
||||
OBJ_CONSTRUCT(&(module->p2p_copy_pending_sendreqs), opal_list_t);
|
||||
OBJ_CONSTRUCT(&(module->p2p_locks_pending), opal_list_t);
|
||||
OBJ_CONSTRUCT(&(module->p2p_unlocks_pending), opal_list_t);
|
||||
|
||||
module->p2p_win = win;
|
||||
|
||||
ret = ompi_comm_dup(comm, &(module->p2p_comm), 0);
|
||||
if (ret != OMPI_SUCCESS) {
|
||||
OBJ_DESTRUCT(&(module->p2p_acc_lock));
|
||||
OBJ_DESTRUCT(&(module->p2p_lock));
|
||||
free(module);
|
||||
return ret;
|
||||
}
|
||||
if (ret != OMPI_SUCCESS) goto cleanup;
|
||||
|
||||
module->p2p_cb_request = NULL;
|
||||
|
||||
OBJ_CONSTRUCT(&module->p2p_pending_control_sends, opal_list_t);
|
||||
|
||||
OBJ_CONSTRUCT(&module->p2p_pending_sendreqs, opal_list_t);
|
||||
module->p2p_num_pending_sendreqs = (unsigned int*)malloc(sizeof(unsigned int) *
|
||||
ompi_comm_size(module->p2p_comm));
|
||||
if (NULL == module->p2p_num_pending_sendreqs) {
|
||||
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
|
||||
ompi_comm_free(&comm);
|
||||
OBJ_DESTRUCT(&(module->p2p_acc_lock));
|
||||
OBJ_DESTRUCT(&(module->p2p_lock));
|
||||
free(module);
|
||||
return ret;
|
||||
}
|
||||
module->p2p_num_pending_sendreqs = (unsigned int*)
|
||||
malloc(sizeof(unsigned int) * ompi_comm_size(module->p2p_comm));
|
||||
if (NULL == module->p2p_num_pending_sendreqs) goto cleanup;
|
||||
memset(module->p2p_num_pending_sendreqs, 0,
|
||||
sizeof(unsigned int) * ompi_comm_size(module->p2p_comm));
|
||||
|
||||
@ -277,101 +291,46 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
|
||||
module->p2p_num_complete_msgs = 0;
|
||||
module->p2p_tag_counter = 0;
|
||||
|
||||
OBJ_CONSTRUCT(&(module->p2p_long_msgs), opal_list_t);
|
||||
|
||||
OBJ_CONSTRUCT(&(module->p2p_copy_pending_sendreqs), opal_list_t);
|
||||
module->p2p_copy_num_pending_sendreqs = (unsigned int*)malloc(sizeof(unsigned int) *
|
||||
ompi_comm_size(module->p2p_comm));
|
||||
module->p2p_copy_num_pending_sendreqs = (unsigned int*)
|
||||
malloc(sizeof(unsigned int) * ompi_comm_size(module->p2p_comm));
|
||||
if (NULL == module->p2p_copy_num_pending_sendreqs) {
|
||||
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_long_msgs);
|
||||
free(module->p2p_num_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
|
||||
ompi_comm_free(&comm);
|
||||
OBJ_DESTRUCT(&(module->p2p_acc_lock));
|
||||
OBJ_DESTRUCT(&(module->p2p_lock));
|
||||
free(module);
|
||||
return ret;
|
||||
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
goto cleanup;
|
||||
}
|
||||
memset(module->p2p_num_pending_sendreqs, 0,
|
||||
sizeof(unsigned int) * ompi_comm_size(module->p2p_comm));
|
||||
|
||||
/* fence data */
|
||||
module->p2p_fence_coll_counts = (int*)malloc(sizeof(int) *
|
||||
ompi_comm_size(module->p2p_comm));
|
||||
module->p2p_fence_coll_counts = (int*)
|
||||
malloc(sizeof(int) * ompi_comm_size(module->p2p_comm));
|
||||
if (NULL == module->p2p_fence_coll_counts) {
|
||||
free(module->p2p_copy_num_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_long_msgs);
|
||||
free(module->p2p_num_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
|
||||
ompi_comm_free(&comm);
|
||||
OBJ_DESTRUCT(&(module->p2p_acc_lock));
|
||||
OBJ_DESTRUCT(&(module->p2p_lock));
|
||||
free(module);
|
||||
return ret;
|
||||
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
goto cleanup;
|
||||
}
|
||||
for (i = 0 ; i < ompi_comm_size(module->p2p_comm) ; ++i) {
|
||||
module->p2p_fence_coll_counts[i] = 1;
|
||||
}
|
||||
|
||||
module->p2p_fence_coll_results = (unsigned int*)malloc(sizeof(unsigned short) *
|
||||
ompi_comm_size(module->p2p_comm));
|
||||
if (NULL == module->p2p_fence_coll_results) {
|
||||
free(module->p2p_fence_coll_counts);
|
||||
free(module->p2p_copy_num_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_long_msgs);
|
||||
free(module->p2p_num_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
|
||||
ompi_comm_free(&comm);
|
||||
OBJ_DESTRUCT(&(module->p2p_acc_lock));
|
||||
OBJ_DESTRUCT(&(module->p2p_lock));
|
||||
free(module);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* pwsc data */
|
||||
module->p2p_pw_group = NULL;
|
||||
module->p2p_sc_group = NULL;
|
||||
module->p2p_sc_remote_active_ranks =
|
||||
(bool*)malloc(sizeof(bool) * ompi_comm_size(module->p2p_comm));
|
||||
module->p2p_sc_remote_active_ranks = (bool*)
|
||||
malloc(sizeof(bool) * ompi_comm_size(module->p2p_comm));
|
||||
if (NULL == module->p2p_sc_remote_active_ranks) {
|
||||
free(module->p2p_fence_coll_results);
|
||||
free(module->p2p_fence_coll_counts);
|
||||
free(module->p2p_copy_num_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_long_msgs);
|
||||
free(module->p2p_num_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
|
||||
ompi_comm_free(&comm);
|
||||
OBJ_DESTRUCT(&(module->p2p_acc_lock));
|
||||
OBJ_DESTRUCT(&(module->p2p_lock));
|
||||
free(module);
|
||||
return OMPI_ERROR;
|
||||
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
goto cleanup;
|
||||
}
|
||||
module->p2p_sc_remote_ranks =
|
||||
(int*)malloc(sizeof(int) * ompi_comm_size(module->p2p_comm));
|
||||
|
||||
module->p2p_sc_remote_ranks = (int*)
|
||||
malloc(sizeof(int) * ompi_comm_size(module->p2p_comm));
|
||||
if (NULL == module->p2p_sc_remote_ranks) {
|
||||
free(module->p2p_sc_remote_active_ranks);
|
||||
free(module->p2p_fence_coll_results);
|
||||
free(module->p2p_fence_coll_counts);
|
||||
free(module->p2p_copy_num_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_long_msgs);
|
||||
free(module->p2p_num_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
|
||||
ompi_comm_free(&comm);
|
||||
OBJ_DESTRUCT(&(module->p2p_acc_lock));
|
||||
OBJ_DESTRUCT(&(module->p2p_lock));
|
||||
free(module);
|
||||
return OMPI_ERROR;
|
||||
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* lock data */
|
||||
module->p2p_lock_status = 0;
|
||||
module->p2p_shared_count = 0;
|
||||
OBJ_CONSTRUCT(&(module->p2p_locks_pending), opal_list_t);
|
||||
module->p2p_lock_received_ack = 0;
|
||||
|
||||
/* update component data */
|
||||
@ -379,12 +338,19 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
|
||||
opal_hash_table_set_value_uint32(&mca_osc_pt2pt_component.p2p_c_modules,
|
||||
module->p2p_comm->c_contextid,
|
||||
module);
|
||||
ret = opal_hash_table_get_size(&mca_osc_pt2pt_component.p2p_c_modules);
|
||||
|
||||
if (1 == opal_hash_table_get_size(&mca_osc_pt2pt_component.p2p_c_modules)) {
|
||||
/* start progress thread */
|
||||
opal_progress_register(ompi_osc_pt2pt_progress);
|
||||
if (ret == 1) {
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
mca_osc_pt2pt_component.p2p_c_thread_run = true;
|
||||
mca_osc_pt2pt_component.p2p_c_thread.t_run = component_thread;
|
||||
mca_osc_pt2pt_component.p2p_c_thread.t_arg = NULL;
|
||||
ret = opal_thread_start(&mca_osc_pt2pt_component.p2p_c_thread);
|
||||
#else
|
||||
ret = opal_progress_register(ompi_osc_pt2pt_component_progress);
|
||||
#endif
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
|
||||
/* fill in window information */
|
||||
win->w_osc_module = (ompi_osc_base_module_t*) module;
|
||||
@ -396,27 +362,13 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
|
||||
opal_atomic_mb();
|
||||
|
||||
/* start up receive for protocol headers */
|
||||
OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers,
|
||||
item, ret);
|
||||
if (NULL == item) {
|
||||
free(module->p2p_sc_remote_ranks);
|
||||
free(module->p2p_sc_remote_active_ranks);
|
||||
free(module->p2p_fence_coll_results);
|
||||
free(module->p2p_fence_coll_counts);
|
||||
free(module->p2p_copy_num_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_long_msgs);
|
||||
free(module->p2p_num_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
|
||||
ompi_comm_free(&comm);
|
||||
OBJ_DESTRUCT(&(module->p2p_acc_lock));
|
||||
OBJ_DESTRUCT(&(module->p2p_lock));
|
||||
free(module);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
OPAL_FREE_LIST_WAIT(&mca_osc_pt2pt_component.p2p_c_buffers,
|
||||
item, ret);
|
||||
if (OMPI_SUCCESS != ret) goto cleanup;
|
||||
|
||||
buffer = (ompi_osc_pt2pt_buffer_t*) item;
|
||||
buffer->cbfunc = ompi_osc_pt2pt_component_fragment_cb;
|
||||
buffer->cbdata = (void*) module;
|
||||
buffer->mpireq.cbfunc = component_fragment_cb;
|
||||
buffer->mpireq.cbdata = (void*) module;
|
||||
|
||||
ret = MCA_PML_CALL(irecv(buffer->payload,
|
||||
mca_osc_pt2pt_component.p2p_c_eager_size,
|
||||
@ -424,9 +376,49 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
|
||||
MPI_ANY_SOURCE,
|
||||
CONTROL_MSG_TAG,
|
||||
module->p2p_comm,
|
||||
&buffer->request));
|
||||
opal_list_append(&module->p2p_pending_control_sends,
|
||||
&buffer->super.super);
|
||||
&buffer->mpireq.request));
|
||||
if (OMPI_SUCCESS != ret) goto cleanup;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
|
||||
&buffer->mpireq.super.super);
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_unlock);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&module->p2p_unlocks_pending);
|
||||
OBJ_DESTRUCT(&module->p2p_locks_pending);
|
||||
OBJ_DESTRUCT(&module->p2p_copy_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
|
||||
OBJ_DESTRUCT(&module->p2p_acc_lock);
|
||||
OBJ_DESTRUCT(&module->p2p_cond);
|
||||
OBJ_DESTRUCT(&module->p2p_lock);
|
||||
|
||||
if (NULL != buffer) {
|
||||
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers, item);
|
||||
}
|
||||
if (NULL != module->p2p_sc_remote_ranks) {
|
||||
free(module->p2p_sc_remote_ranks);
|
||||
}
|
||||
if (NULL != module->p2p_sc_remote_active_ranks) {
|
||||
free(module->p2p_sc_remote_active_ranks);
|
||||
}
|
||||
if (NULL != module->p2p_fence_coll_counts) {
|
||||
free(module->p2p_fence_coll_counts);
|
||||
}
|
||||
if (NULL != module->p2p_copy_num_pending_sendreqs) {
|
||||
free(module->p2p_copy_num_pending_sendreqs);
|
||||
}
|
||||
if (NULL != module->p2p_num_pending_sendreqs) {
|
||||
free(module->p2p_num_pending_sendreqs);
|
||||
}
|
||||
if (NULL != module->p2p_comm) ompi_comm_free(&module->p2p_comm);
|
||||
|
||||
#if OMPI_ENABLE_DEBUG
|
||||
memset(module, 0, sizeof(ompi_osc_base_module_t));
|
||||
#endif
|
||||
if (NULL != module) free(module);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -434,53 +426,25 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
|
||||
|
||||
/* dispatch for callback on message completion */
|
||||
static void
|
||||
ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffer)
|
||||
component_fragment_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
|
||||
{
|
||||
int ret;
|
||||
void *payload, *buffer;
|
||||
size_t buffer_len;
|
||||
ompi_osc_pt2pt_module_t *module;
|
||||
ompi_osc_pt2pt_buffer_t *new_pt2pt_buffer;
|
||||
opal_free_list_item_t *item;
|
||||
ompi_osc_pt2pt_buffer_t *buffer =
|
||||
(ompi_osc_pt2pt_buffer_t*) mpireq;
|
||||
ompi_osc_pt2pt_module_t *module =
|
||||
(ompi_osc_pt2pt_module_t*) mpireq->cbdata;
|
||||
|
||||
buffer = pt2pt_buffer->payload;
|
||||
buffer_len = pt2pt_buffer->status._count;
|
||||
module = (ompi_osc_pt2pt_module_t*)pt2pt_buffer->cbdata;
|
||||
|
||||
/* post a new receive message */
|
||||
|
||||
/* start up receive for protocol headers */
|
||||
OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers,
|
||||
item, ret);
|
||||
assert(NULL != item);
|
||||
new_pt2pt_buffer = (ompi_osc_pt2pt_buffer_t*) item;
|
||||
new_pt2pt_buffer->cbfunc = ompi_osc_pt2pt_component_fragment_cb;
|
||||
new_pt2pt_buffer->cbdata = (void*) module;
|
||||
|
||||
ret = MCA_PML_CALL(irecv(new_pt2pt_buffer->payload,
|
||||
mca_osc_pt2pt_component.p2p_c_eager_size,
|
||||
MPI_BYTE,
|
||||
MPI_ANY_SOURCE,
|
||||
CONTROL_MSG_TAG,
|
||||
module->p2p_comm,
|
||||
&new_pt2pt_buffer->request));
|
||||
assert(OMPI_SUCCESS == ret);
|
||||
opal_list_append(&module->p2p_pending_control_sends,
|
||||
&new_pt2pt_buffer->super.super);
|
||||
|
||||
assert(buffer_len >=
|
||||
sizeof(ompi_osc_pt2pt_base_header_t));
|
||||
assert(mpireq->status._count >= (int) sizeof(ompi_osc_pt2pt_base_header_t));
|
||||
|
||||
/* handle message */
|
||||
switch (((ompi_osc_pt2pt_base_header_t*) buffer)->hdr_type) {
|
||||
switch (((ompi_osc_pt2pt_base_header_t*) buffer->payload)->hdr_type) {
|
||||
case OMPI_OSC_PT2PT_HDR_PUT:
|
||||
{
|
||||
ompi_osc_pt2pt_send_header_t *header;
|
||||
|
||||
/* get our header and payload */
|
||||
header = (ompi_osc_pt2pt_send_header_t*)
|
||||
buffer;
|
||||
payload = (void*) (header + 1);
|
||||
ompi_osc_pt2pt_send_header_t *header =
|
||||
(ompi_osc_pt2pt_send_header_t*) buffer->payload;
|
||||
void *payload = (void*) (header + 1);
|
||||
|
||||
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
|
||||
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
|
||||
@ -488,11 +452,8 @@ ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffe
|
||||
}
|
||||
#endif
|
||||
|
||||
assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
|
||||
|
||||
if (!ompi_win_exposure_epoch(module->p2p_win)) {
|
||||
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->p2p_win)) {
|
||||
/* well, we're definitely in an access epoch now */
|
||||
ompi_win_set_mode(module->p2p_win,
|
||||
OMPI_WIN_FENCE |
|
||||
OMPI_WIN_ACCESS_EPOCH |
|
||||
@ -506,12 +467,10 @@ ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffe
|
||||
|
||||
case OMPI_OSC_PT2PT_HDR_ACC:
|
||||
{
|
||||
ompi_osc_pt2pt_send_header_t *header;
|
||||
|
||||
/* get our header and payload */
|
||||
header = (ompi_osc_pt2pt_send_header_t*)
|
||||
buffer;
|
||||
payload = (void*) (header + 1);
|
||||
ompi_osc_pt2pt_send_header_t *header =
|
||||
(ompi_osc_pt2pt_send_header_t*) buffer->payload;
|
||||
void *payload = (void*) (header + 1);
|
||||
|
||||
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
|
||||
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
|
||||
@ -519,11 +478,8 @@ ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffe
|
||||
}
|
||||
#endif
|
||||
|
||||
assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
|
||||
|
||||
if (!ompi_win_exposure_epoch(module->p2p_win)) {
|
||||
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->p2p_win)) {
|
||||
/* well, we're definitely in an access epoch now */
|
||||
ompi_win_set_mode(module->p2p_win,
|
||||
OMPI_WIN_FENCE |
|
||||
OMPI_WIN_ACCESS_EPOCH |
|
||||
@ -538,27 +494,22 @@ ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffe
|
||||
|
||||
case OMPI_OSC_PT2PT_HDR_GET:
|
||||
{
|
||||
/* get our header and payload */
|
||||
ompi_osc_pt2pt_send_header_t *header =
|
||||
(ompi_osc_pt2pt_send_header_t*) buffer->payload;
|
||||
void *payload = (void*) (header + 1);
|
||||
ompi_datatype_t *datatype;
|
||||
ompi_osc_pt2pt_send_header_t *header;
|
||||
ompi_osc_pt2pt_replyreq_t *replyreq;
|
||||
ompi_proc_t *proc;
|
||||
|
||||
/* get our header and payload */
|
||||
header = (ompi_osc_pt2pt_send_header_t*)
|
||||
buffer;
|
||||
payload = (void*) (header + 1);
|
||||
|
||||
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
|
||||
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
|
||||
OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header);
|
||||
}
|
||||
#endif
|
||||
|
||||
assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
|
||||
|
||||
if (!ompi_win_exposure_epoch(module->p2p_win)) {
|
||||
if (OMPI_WIN_FENCE & ompi_win_get_mode(module->p2p_win)) {
|
||||
/* well, we're definitely in an access epoch now */
|
||||
ompi_win_set_mode(module->p2p_win,
|
||||
OMPI_WIN_FENCE |
|
||||
OMPI_WIN_ACCESS_EPOCH |
|
||||
@ -589,14 +540,11 @@ ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffe
|
||||
|
||||
case OMPI_OSC_PT2PT_HDR_REPLY:
|
||||
{
|
||||
ompi_osc_pt2pt_reply_header_t *header;
|
||||
ompi_osc_pt2pt_reply_header_t *header =
|
||||
(ompi_osc_pt2pt_reply_header_t*) buffer->payload;
|
||||
void *payload = (void*) (header + 1);
|
||||
ompi_osc_pt2pt_sendreq_t *sendreq;
|
||||
|
||||
/* get our header and payload */
|
||||
header = (ompi_osc_pt2pt_reply_header_t*)
|
||||
buffer;
|
||||
payload = (void*) (header + 1);
|
||||
|
||||
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
|
||||
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
|
||||
OMPI_OSC_PT2PT_REPLY_HDR_NTOH(*header);
|
||||
@ -611,28 +559,23 @@ ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffe
|
||||
ompi_osc_pt2pt_replyreq_recv(module, sendreq, header, payload);
|
||||
}
|
||||
break;
|
||||
|
||||
case OMPI_OSC_PT2PT_HDR_POST:
|
||||
{
|
||||
ompi_osc_pt2pt_control_header_t *header =
|
||||
(ompi_osc_pt2pt_control_header_t*)
|
||||
buffer;
|
||||
int32_t count;
|
||||
|
||||
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
|
||||
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
|
||||
OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);
|
||||
count = OPAL_THREAD_ADD32(&(module->p2p_num_post_msgs), -1);
|
||||
if (count == 0) {
|
||||
opal_condition_broadcast(&module->p2p_cond);
|
||||
}
|
||||
#endif
|
||||
|
||||
assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
|
||||
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_post_msgs), -1);
|
||||
}
|
||||
break;
|
||||
|
||||
case OMPI_OSC_PT2PT_HDR_COMPLETE:
|
||||
{
|
||||
ompi_osc_pt2pt_control_header_t *header =
|
||||
(ompi_osc_pt2pt_control_header_t*)
|
||||
buffer;
|
||||
(ompi_osc_pt2pt_control_header_t*) buffer->payload;
|
||||
int32_t count;
|
||||
|
||||
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
|
||||
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
|
||||
@ -640,20 +583,21 @@ ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffe
|
||||
}
|
||||
#endif
|
||||
|
||||
assert(module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
|
||||
|
||||
/* we've heard from one more place, and have value reqs to
|
||||
process */
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_complete_msgs), -1);
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), header->hdr_value[0]);
|
||||
/* BWB -- need to check to see if we can do this better */
|
||||
count = OPAL_THREAD_ADD32(&(module->p2p_num_complete_msgs), -1);
|
||||
if (count == 0) opal_condition_broadcast(&module->p2p_cond);
|
||||
count = OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), header->hdr_value[0]);
|
||||
if (count == 0) opal_condition_broadcast(&module->p2p_cond);
|
||||
}
|
||||
break;
|
||||
|
||||
case OMPI_OSC_PT2PT_HDR_LOCK_REQ:
|
||||
{
|
||||
ompi_osc_pt2pt_control_header_t *header =
|
||||
(ompi_osc_pt2pt_control_header_t*)
|
||||
buffer;
|
||||
(ompi_osc_pt2pt_control_header_t*) buffer->payload;
|
||||
int32_t count;
|
||||
|
||||
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
|
||||
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
|
||||
@ -661,13 +605,14 @@ ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffe
|
||||
}
|
||||
#endif
|
||||
|
||||
assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
|
||||
|
||||
if (header->hdr_value[1] > 0) {
|
||||
ompi_osc_pt2pt_passive_lock(module, header->hdr_value[0],
|
||||
header->hdr_value[1]);
|
||||
} else {
|
||||
OPAL_THREAD_ADD32(&(module->p2p_lock_received_ack), 1);
|
||||
count = OPAL_THREAD_ADD32(&(module->p2p_lock_received_ack), 1);
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
||||
"received lock request ack, count %d", count));
|
||||
if (count != 0) opal_condition_broadcast(&module->p2p_cond);
|
||||
}
|
||||
}
|
||||
break;
|
||||
@ -675,8 +620,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffe
|
||||
case OMPI_OSC_PT2PT_HDR_UNLOCK_REQ:
|
||||
{
|
||||
ompi_osc_pt2pt_control_header_t *header =
|
||||
(ompi_osc_pt2pt_control_header_t*)
|
||||
buffer;
|
||||
(ompi_osc_pt2pt_control_header_t*) buffer->payload;
|
||||
|
||||
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
|
||||
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
|
||||
@ -684,107 +628,100 @@ ompi_osc_pt2pt_component_fragment_cb(struct ompi_osc_pt2pt_buffer_t *pt2pt_buffe
|
||||
}
|
||||
#endif
|
||||
|
||||
assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
|
||||
|
||||
ompi_osc_pt2pt_passive_unlock(module, header->hdr_value[0],
|
||||
header->hdr_value[1]);
|
||||
}
|
||||
break;
|
||||
|
||||
case OMPI_OSC_PT2PT_HDR_UNLOCK_REPLY:
|
||||
{
|
||||
ompi_osc_pt2pt_control_header_t *header =
|
||||
(ompi_osc_pt2pt_control_header_t*)
|
||||
buffer;
|
||||
int32_t count;
|
||||
|
||||
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
|
||||
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
|
||||
OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);
|
||||
count = OPAL_THREAD_ADD32(&(module->p2p_num_pending_out), -1);
|
||||
if (count == 0) {
|
||||
opal_condition_broadcast(&module->p2p_cond);
|
||||
}
|
||||
#endif
|
||||
|
||||
assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_pending_out), -1);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
opal_output_verbose(5, ompi_osc_base_output,
|
||||
"received packet for Window with unknown type");
|
||||
}
|
||||
|
||||
item = &(pt2pt_buffer->super);
|
||||
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,
|
||||
item);
|
||||
}
|
||||
|
||||
|
||||
|
||||
int
|
||||
ompi_osc_pt2pt_request_test(ompi_request_t ** rptr,
|
||||
int *completed,
|
||||
ompi_status_public_t * status )
|
||||
{
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 0
|
||||
ompi_request_t *request = *rptr;
|
||||
#endif
|
||||
int ret = OMPI_SUCCESS;
|
||||
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 0
|
||||
if (request->req_state == OMPI_REQUEST_INACTIVE ||
|
||||
request->req_complete) {
|
||||
ret = ompi_request_test(rptr, completed, status);
|
||||
} else {
|
||||
*completed = 0;
|
||||
"received one-sided packet for with unknown type");
|
||||
}
|
||||
#else
|
||||
ret = ompi_request_test(rptr, completed, status);
|
||||
#endif
|
||||
|
||||
return ret;
|
||||
ret = MCA_PML_CALL(irecv(buffer->payload,
|
||||
mca_osc_pt2pt_component.p2p_c_eager_size,
|
||||
MPI_BYTE,
|
||||
MPI_ANY_SOURCE,
|
||||
CONTROL_MSG_TAG,
|
||||
module->p2p_comm,
|
||||
&buffer->mpireq.request));
|
||||
/* BWB -- FIX ME -- handle irecv errors */
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
|
||||
&buffer->mpireq.super.super);
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
ompi_osc_pt2pt_progress(void)
|
||||
ompi_osc_pt2pt_component_progress(void)
|
||||
{
|
||||
int ret, done, count = 0;
|
||||
void *node;
|
||||
uint32_t key;
|
||||
ompi_osc_pt2pt_module_t *module;
|
||||
opal_list_item_t *item;
|
||||
int ret, done = 0;
|
||||
|
||||
ret = opal_hash_table_get_first_key_uint32(&mca_osc_pt2pt_component.p2p_c_modules,
|
||||
&key,
|
||||
(void**) &module,
|
||||
&node);
|
||||
if (OMPI_SUCCESS != ret) return 0;
|
||||
ret = OPAL_THREAD_TRYLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
if (ret != 0) return 0;
|
||||
|
||||
do {
|
||||
/* loop through pending requests */
|
||||
for (item = opal_list_get_first(&module->p2p_pending_control_sends) ;
|
||||
item != opal_list_get_end(&module->p2p_pending_control_sends) ;
|
||||
item = opal_list_get_next(item)) {
|
||||
ompi_osc_pt2pt_buffer_t *buffer =
|
||||
(ompi_osc_pt2pt_buffer_t*) item;
|
||||
for (item = opal_list_get_first(&mca_osc_pt2pt_component.p2p_c_pending_requests) ;
|
||||
item != opal_list_get_end(&mca_osc_pt2pt_component.p2p_c_pending_requests) ;
|
||||
item = opal_list_get_next(item)) {
|
||||
ompi_osc_pt2pt_mpireq_t *buffer =
|
||||
(ompi_osc_pt2pt_mpireq_t*) item;
|
||||
|
||||
ret = ompi_osc_pt2pt_request_test(&buffer->request, &done, &buffer->status);
|
||||
if (OMPI_SUCCESS == ret && done) {
|
||||
item = opal_list_remove_item(&module->p2p_pending_control_sends,
|
||||
item);
|
||||
buffer->cbfunc(buffer);
|
||||
/* it's possible that cbfunc is going to do something
|
||||
that calls progress, which means our loop is
|
||||
probably hosed up because it's possible that the
|
||||
list changed under us. It's either exit the loop
|
||||
through the list or start all over again. I'm
|
||||
going with exit. */
|
||||
break;
|
||||
}
|
||||
/* BWB - FIX ME */
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS == 0
|
||||
if (buffer->request->req_state == OMPI_REQUEST_INACTIVE ||
|
||||
buffer->request->req_complete) {
|
||||
ret = ompi_request_test(&buffer->request,
|
||||
&done,
|
||||
&buffer->status);
|
||||
} else {
|
||||
done = 0;
|
||||
ret = OMPI_SUCCESS;
|
||||
}
|
||||
} while (OMPI_SUCCESS ==
|
||||
opal_hash_table_get_next_key_uint32(&mca_osc_pt2pt_component.p2p_c_modules,
|
||||
&key,
|
||||
(void**) &module,
|
||||
node,
|
||||
&node));
|
||||
#else
|
||||
ret = ompi_request_test(&buffer->request,
|
||||
&done,
|
||||
&buffer->status);
|
||||
#endif
|
||||
if (OMPI_SUCCESS == ret && 0 != done) {
|
||||
opal_list_remove_item(&mca_osc_pt2pt_component.p2p_c_pending_requests,
|
||||
item);
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
buffer->cbfunc(buffer);
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
|
||||
return count;
|
||||
return done;
|
||||
}
|
||||
|
||||
|
||||
#if OMPI_ENABLE_PROGRESS_THREADS
|
||||
static void
|
||||
component_thread(opal_obect_t *obj)
|
||||
{
|
||||
while (component_thread_run) {
|
||||
/* wake up whenever a request completes, to make sure it's not
|
||||
for us */
|
||||
OPAL_MUTEX_LOCK(&ompi_request_lock);
|
||||
opal_condition_wait(&ompi_request_cond, &ompi_request_lock);
|
||||
OPAL_MUTEX_UNLOCK(&ompi_request_lock);
|
||||
component_progress();
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
@ -41,7 +41,7 @@ create_send_tag(ompi_osc_pt2pt_module_t *module)
|
||||
newval = (oldval + 1) % mca_pml.pml_max_tag;
|
||||
} while (0 == opal_atomic_cmpset_32(&module->p2p_tag_counter, oldval, newval));
|
||||
return newval;
|
||||
#elif OMPI_HAVE_THREAD_SUPPORT
|
||||
#else
|
||||
int32_t ret;
|
||||
/* no compare and swap - have to lock the module */
|
||||
OPAL_THREAD_LOCK(&module->p2p_lock);
|
||||
@ -49,46 +49,62 @@ create_send_tag(ompi_osc_pt2pt_module_t *module)
|
||||
ret = module->p2p_tag_counter;
|
||||
OPAL_THREAD_UNLOCK(&module->p2p_lock);
|
||||
return ret;
|
||||
#else
|
||||
module->p2p_tag_counter = (module->p2p_tag_counter + 1) % mca_pml.pml_max_tag;
|
||||
return module->p2p_tag_counter;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
static inline void
|
||||
inmsg_mark_complete(ompi_osc_pt2pt_module_t *module)
|
||||
{
|
||||
int32_t count = OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1);
|
||||
if (0 == count) {
|
||||
opal_condition_broadcast(&module->p2p_cond);
|
||||
if ((0 != module->p2p_lock_status) &&
|
||||
(opal_list_get_size(&module->p2p_unlocks_pending) != 0)) {
|
||||
ompi_osc_pt2pt_passive_unlock_complete(module);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**********************************************************************
|
||||
*
|
||||
* Sending a sendreq to target
|
||||
*
|
||||
**********************************************************************/
|
||||
static void
|
||||
ompi_osc_pt2pt_sendreq_send_long_cb(ompi_osc_pt2pt_longreq_t *longreq)
|
||||
ompi_osc_pt2pt_sendreq_send_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
|
||||
{
|
||||
ompi_osc_pt2pt_longreq_t *longreq =
|
||||
(ompi_osc_pt2pt_longreq_t*) mpireq;
|
||||
ompi_osc_pt2pt_sendreq_t *sendreq =
|
||||
(ompi_osc_pt2pt_sendreq_t*) longreq->req_comp_cbdata;
|
||||
(ompi_osc_pt2pt_sendreq_t*) longreq->mpireq.cbdata;
|
||||
int32_t count;
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
||||
"%d completed long sendreq to %d",
|
||||
sendreq->req_module->p2p_comm->c_my_rank,
|
||||
sendreq->req_target_rank));
|
||||
|
||||
opal_list_remove_item(&(sendreq->req_module->p2p_long_msgs),
|
||||
&(longreq->super.super));
|
||||
count = OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
|
||||
|
||||
ompi_osc_pt2pt_longreq_free(longreq);
|
||||
|
||||
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
|
||||
ompi_osc_pt2pt_sendreq_free(sendreq);
|
||||
|
||||
if (0 == count) opal_condition_broadcast(&sendreq->req_module->p2p_cond);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ompi_osc_pt2pt_sendreq_send_cb(ompi_osc_pt2pt_buffer_t *buffer)
|
||||
ompi_osc_pt2pt_sendreq_send_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
|
||||
{
|
||||
ompi_osc_pt2pt_buffer_t *buffer =
|
||||
(ompi_osc_pt2pt_buffer_t*) mpireq;
|
||||
ompi_osc_pt2pt_sendreq_t *sendreq =
|
||||
(ompi_osc_pt2pt_sendreq_t*) buffer->cbdata;
|
||||
(ompi_osc_pt2pt_sendreq_t*) mpireq->cbdata;
|
||||
ompi_osc_pt2pt_send_header_t *header =
|
||||
(ompi_osc_pt2pt_send_header_t*) buffer->payload;
|
||||
int32_t count;
|
||||
|
||||
/* have to look at header, and not the sendreq because in the case
|
||||
of get, it's possible that the sendreq has been freed already
|
||||
@ -106,14 +122,15 @@ ompi_osc_pt2pt_sendreq_send_cb(ompi_osc_pt2pt_buffer_t *buffer)
|
||||
/* do we need to post a send? */
|
||||
if (header->hdr_msg_length != 0) {
|
||||
/* sendreq is done. Mark it as so and get out of here */
|
||||
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
|
||||
count = OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
|
||||
ompi_osc_pt2pt_sendreq_free(sendreq);
|
||||
if (0 == count) opal_condition_broadcast(&sendreq->req_module->p2p_cond);
|
||||
}
|
||||
}
|
||||
|
||||
/* release the buffer */
|
||||
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,
|
||||
&buffer->super);
|
||||
&mpireq->super);
|
||||
}
|
||||
|
||||
|
||||
@ -154,14 +171,13 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
|
||||
}
|
||||
|
||||
/* setup buffer */
|
||||
buffer->cbfunc = ompi_osc_pt2pt_sendreq_send_cb;
|
||||
buffer->cbdata = (void*) sendreq;
|
||||
buffer->mpireq.cbfunc = ompi_osc_pt2pt_sendreq_send_cb;
|
||||
buffer->mpireq.cbdata = (void*) sendreq;
|
||||
|
||||
/* pack header */
|
||||
header = (ompi_osc_pt2pt_send_header_t*) buffer->payload;
|
||||
written_data += sizeof(ompi_osc_pt2pt_send_header_t);
|
||||
header->hdr_base.hdr_flags = 0;
|
||||
header->hdr_windx = sendreq->req_module->p2p_comm->c_contextid;
|
||||
header->hdr_origin = sendreq->req_module->p2p_comm->c_my_rank;
|
||||
header->hdr_origin_sendreq.pval = (void*) sendreq;
|
||||
header->hdr_origin_tag = 0;
|
||||
@ -250,17 +266,19 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
|
||||
CONTROL_MSG_TAG,
|
||||
MCA_PML_BASE_SEND_STANDARD,
|
||||
module->p2p_comm,
|
||||
&buffer->request));
|
||||
opal_list_append(&module->p2p_pending_control_sends,
|
||||
&buffer->super.super);
|
||||
&buffer->mpireq.request));
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
|
||||
&buffer->mpireq.super.super);
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
|
||||
if (OMPI_OSC_PT2PT_GET != sendreq->req_type &&
|
||||
header->hdr_msg_length == 0) {
|
||||
ompi_osc_pt2pt_longreq_t *longreq;
|
||||
ompi_osc_pt2pt_longreq_alloc(&longreq);
|
||||
|
||||
longreq->req_comp_cb = ompi_osc_pt2pt_sendreq_send_long_cb;
|
||||
longreq->req_comp_cbdata = sendreq;
|
||||
longreq->mpireq.cbfunc = ompi_osc_pt2pt_sendreq_send_long_cb;
|
||||
longreq->mpireq.cbdata = sendreq;
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
||||
"%d starting long sendreq to %d (%d)",
|
||||
sendreq->req_module->p2p_comm->c_my_rank,
|
||||
@ -274,13 +292,13 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
|
||||
header->hdr_origin_tag,
|
||||
MCA_PML_BASE_SEND_STANDARD,
|
||||
sendreq->req_module->p2p_comm,
|
||||
&(longreq->req_pml_req));
|
||||
&(longreq->mpireq.request));
|
||||
|
||||
/* put the send request in the waiting list */
|
||||
OPAL_THREAD_LOCK(&(sendreq->req_module->p2p_lock));
|
||||
opal_list_append(&(sendreq->req_module->p2p_long_msgs),
|
||||
&(longreq->super.super));
|
||||
OPAL_THREAD_UNLOCK(&(sendreq->req_module->p2p_lock));
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
|
||||
&(longreq->mpireq.super.super));
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
}
|
||||
|
||||
goto done;
|
||||
@ -302,26 +320,27 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
|
||||
*
|
||||
**********************************************************************/
|
||||
static void
|
||||
ompi_osc_pt2pt_replyreq_send_long_cb(ompi_osc_pt2pt_longreq_t *longreq)
|
||||
ompi_osc_pt2pt_replyreq_send_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
|
||||
{
|
||||
ompi_osc_pt2pt_longreq_t *longreq =
|
||||
(ompi_osc_pt2pt_longreq_t*) mpireq;
|
||||
ompi_osc_pt2pt_replyreq_t *replyreq =
|
||||
(ompi_osc_pt2pt_replyreq_t*) longreq->req_comp_cbdata;
|
||||
(ompi_osc_pt2pt_replyreq_t*) mpireq->cbdata;
|
||||
|
||||
opal_list_remove_item(&(replyreq->rep_module->p2p_long_msgs),
|
||||
&(longreq->super.super));
|
||||
inmsg_mark_complete(replyreq->rep_module);
|
||||
|
||||
ompi_osc_pt2pt_longreq_free(longreq);
|
||||
|
||||
OPAL_THREAD_ADD32(&(replyreq->rep_module->p2p_num_pending_in), -1);
|
||||
ompi_osc_pt2pt_replyreq_free(replyreq);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ompi_osc_pt2pt_replyreq_send_cb(ompi_osc_pt2pt_buffer_t *buffer)
|
||||
ompi_osc_pt2pt_replyreq_send_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
|
||||
{
|
||||
ompi_osc_pt2pt_buffer_t *buffer =
|
||||
(ompi_osc_pt2pt_buffer_t*) mpireq;
|
||||
ompi_osc_pt2pt_replyreq_t *replyreq =
|
||||
(ompi_osc_pt2pt_replyreq_t*) buffer->cbdata;
|
||||
(ompi_osc_pt2pt_replyreq_t*) mpireq->cbdata;
|
||||
ompi_osc_pt2pt_reply_header_t *header =
|
||||
(ompi_osc_pt2pt_reply_header_t*) buffer->payload;
|
||||
|
||||
@ -334,13 +353,13 @@ ompi_osc_pt2pt_replyreq_send_cb(ompi_osc_pt2pt_buffer_t *buffer)
|
||||
/* do we need to post a send? */
|
||||
if (header->hdr_msg_length != 0) {
|
||||
/* sendreq is done. Mark it as so and get out of here */
|
||||
OPAL_THREAD_ADD32(&(replyreq->rep_module->p2p_num_pending_in), -1);
|
||||
inmsg_mark_complete(replyreq->rep_module);
|
||||
ompi_osc_pt2pt_replyreq_free(replyreq);
|
||||
}
|
||||
|
||||
/* release the descriptor and replyreq */
|
||||
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,
|
||||
&buffer->super);
|
||||
&mpireq->super);
|
||||
}
|
||||
|
||||
|
||||
@ -370,8 +389,8 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
|
||||
}
|
||||
|
||||
/* setup buffer */
|
||||
buffer->cbfunc = ompi_osc_pt2pt_replyreq_send_cb;
|
||||
buffer->cbdata = (void*) replyreq;
|
||||
buffer->mpireq.cbfunc = ompi_osc_pt2pt_replyreq_send_cb;
|
||||
buffer->mpireq.cbdata = (void*) replyreq;
|
||||
|
||||
/* pack header */
|
||||
header = (ompi_osc_pt2pt_reply_header_t*) buffer->payload;
|
||||
@ -426,16 +445,18 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
|
||||
CONTROL_MSG_TAG,
|
||||
MCA_PML_BASE_SEND_STANDARD,
|
||||
module->p2p_comm,
|
||||
&buffer->request));
|
||||
opal_list_append(&module->p2p_pending_control_sends,
|
||||
&buffer->super.super);
|
||||
&buffer->mpireq.request));
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
|
||||
&buffer->mpireq.super.super);
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
|
||||
if (header->hdr_msg_length == 0) {
|
||||
ompi_osc_pt2pt_longreq_t *longreq;
|
||||
ompi_osc_pt2pt_longreq_alloc(&longreq);
|
||||
|
||||
longreq->req_comp_cb = ompi_osc_pt2pt_replyreq_send_long_cb;
|
||||
longreq->req_comp_cbdata = replyreq;
|
||||
longreq->mpireq.cbfunc = ompi_osc_pt2pt_replyreq_send_long_cb;
|
||||
longreq->mpireq.cbdata = replyreq;
|
||||
|
||||
mca_pml.pml_isend(replyreq->rep_target_convertor.pBaseBuf,
|
||||
replyreq->rep_target_convertor.count,
|
||||
@ -443,14 +464,14 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
|
||||
replyreq->rep_origin_rank,
|
||||
header->hdr_target_tag,
|
||||
MCA_PML_BASE_SEND_STANDARD,
|
||||
replyreq->rep_module->p2p_comm,
|
||||
&(longreq->req_pml_req));
|
||||
module->p2p_comm,
|
||||
&(longreq->mpireq.request));
|
||||
|
||||
/* put the send request in the waiting list */
|
||||
OPAL_THREAD_LOCK(&(replyreq->rep_module->p2p_lock));
|
||||
opal_list_append(&(replyreq->rep_module->p2p_long_msgs),
|
||||
&(longreq->super.super));
|
||||
OPAL_THREAD_UNLOCK(&(replyreq->rep_module->p2p_lock));
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
|
||||
&longreq->mpireq.super.super);
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
}
|
||||
goto done;
|
||||
|
||||
@ -471,15 +492,15 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
|
||||
*
|
||||
**********************************************************************/
|
||||
static void
|
||||
ompi_osc_pt2pt_sendreq_recv_put_long_cb(ompi_osc_pt2pt_longreq_t *longreq)
|
||||
ompi_osc_pt2pt_sendreq_recv_put_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
|
||||
{
|
||||
opal_list_remove_item(&(longreq->req_module->p2p_long_msgs),
|
||||
&(longreq->super.super));
|
||||
ompi_osc_pt2pt_longreq_t *longreq =
|
||||
(ompi_osc_pt2pt_longreq_t*) mpireq;
|
||||
|
||||
OBJ_RELEASE(longreq->req_datatype);
|
||||
ompi_osc_pt2pt_longreq_free(longreq);
|
||||
|
||||
OPAL_THREAD_ADD32(&(longreq->req_module->p2p_num_pending_in), -1);
|
||||
inmsg_mark_complete(longreq->req_module);
|
||||
}
|
||||
|
||||
|
||||
@ -522,14 +543,13 @@ ompi_osc_pt2pt_sendreq_recv_put(ompi_osc_pt2pt_module_t *module,
|
||||
&max_data );
|
||||
OBJ_DESTRUCT(&convertor);
|
||||
OBJ_RELEASE(datatype);
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1);
|
||||
|
||||
inmsg_mark_complete(module);
|
||||
} else {
|
||||
ompi_osc_pt2pt_longreq_t *longreq;
|
||||
ompi_osc_pt2pt_longreq_alloc(&longreq);
|
||||
|
||||
longreq->req_comp_cb = ompi_osc_pt2pt_sendreq_recv_put_long_cb;
|
||||
longreq->req_comp_cbdata = NULL;
|
||||
longreq->mpireq.cbfunc = ompi_osc_pt2pt_sendreq_recv_put_long_cb;
|
||||
longreq->mpireq.cbdata = NULL;
|
||||
longreq->req_datatype = datatype;
|
||||
longreq->req_module = module;
|
||||
|
||||
@ -539,40 +559,37 @@ ompi_osc_pt2pt_sendreq_recv_put(ompi_osc_pt2pt_module_t *module,
|
||||
header->hdr_origin,
|
||||
header->hdr_origin_tag,
|
||||
module->p2p_comm,
|
||||
&(longreq->req_pml_req));
|
||||
&(longreq->mpireq.request));
|
||||
|
||||
/* put the send request in the waiting list */
|
||||
OPAL_THREAD_LOCK(&(module->p2p_lock));
|
||||
opal_list_append(&(module->p2p_long_msgs),
|
||||
&(longreq->super.super));
|
||||
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
|
||||
&(longreq->mpireq.super.super));
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**********************************************************************
|
||||
*
|
||||
* Receive an accumulate on the target side
|
||||
*
|
||||
**********************************************************************/
|
||||
static void
|
||||
ompi_osc_pt2pt_sendreq_recv_accum_long_cb(ompi_osc_pt2pt_longreq_t *longreq)
|
||||
ompi_osc_pt2pt_sendreq_recv_accum_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
|
||||
{
|
||||
ompi_osc_pt2pt_longreq_t *longreq =
|
||||
(ompi_osc_pt2pt_longreq_t*) mpireq;
|
||||
ompi_osc_pt2pt_send_header_t *header =
|
||||
(ompi_osc_pt2pt_send_header_t*) longreq->req_comp_cbdata;
|
||||
(ompi_osc_pt2pt_send_header_t*) mpireq->cbdata;
|
||||
void *payload = (void*) (header + 1);
|
||||
int ret;
|
||||
|
||||
/* lock the window for accumulates */
|
||||
OPAL_THREAD_LOCK(&longreq->req_module->p2p_acc_lock);
|
||||
|
||||
opal_list_remove_item(&(longreq->req_module->p2p_long_msgs),
|
||||
&(longreq->super.super));
|
||||
|
||||
/* copy the data from the temporary buffer into the user window */
|
||||
ret = ompi_osc_pt2pt_process_op(longreq->req_module,
|
||||
header,
|
||||
@ -590,13 +607,13 @@ ompi_osc_pt2pt_sendreq_recv_accum_long_cb(ompi_osc_pt2pt_longreq_t *longreq)
|
||||
header->hdr_origin));
|
||||
|
||||
/* free the temp buffer */
|
||||
free(longreq->req_comp_cbdata);
|
||||
free(mpireq->cbdata);
|
||||
|
||||
/* Release datatype & op */
|
||||
OBJ_RELEASE(longreq->req_datatype);
|
||||
OBJ_RELEASE(longreq->req_op);
|
||||
|
||||
OPAL_THREAD_ADD32(&(longreq->req_module->p2p_num_pending_in), -1);
|
||||
inmsg_mark_complete(longreq->req_module);
|
||||
|
||||
ompi_osc_pt2pt_longreq_free(longreq);
|
||||
}
|
||||
@ -628,13 +645,12 @@ ompi_osc_pt2pt_sendreq_recv_accum(ompi_osc_pt2pt_module_t *module,
|
||||
OBJ_RELEASE(datatype);
|
||||
OBJ_RELEASE(op);
|
||||
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1);
|
||||
inmsg_mark_complete(module);
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
||||
"%d received accum message from %d",
|
||||
module->p2p_comm->c_my_rank,
|
||||
header->hdr_origin));
|
||||
|
||||
} else {
|
||||
ompi_osc_pt2pt_longreq_t *longreq;
|
||||
ptrdiff_t lb, extent, true_lb, true_extent;
|
||||
@ -648,27 +664,27 @@ ompi_osc_pt2pt_sendreq_recv_accum(ompi_osc_pt2pt_module_t *module,
|
||||
/* get a longreq and fill it in */
|
||||
ompi_osc_pt2pt_longreq_alloc(&longreq);
|
||||
|
||||
longreq->req_comp_cb = ompi_osc_pt2pt_sendreq_recv_accum_long_cb;
|
||||
longreq->mpireq.cbfunc = ompi_osc_pt2pt_sendreq_recv_accum_long_cb;
|
||||
longreq->req_datatype = datatype;
|
||||
longreq->req_op = op;
|
||||
longreq->req_module = module;
|
||||
|
||||
/* allocate a buffer to receive into ... */
|
||||
longreq->req_comp_cbdata = malloc(buflen + sizeof(ompi_osc_pt2pt_send_header_t));
|
||||
longreq->mpireq.cbdata = malloc(buflen + sizeof(ompi_osc_pt2pt_send_header_t));
|
||||
|
||||
if (NULL == longreq->req_comp_cbdata) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
if (NULL == longreq->mpireq.cbdata) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
|
||||
/* fill in tmp header */
|
||||
memcpy(longreq->req_comp_cbdata, header,
|
||||
memcpy(longreq->mpireq.cbdata, header,
|
||||
sizeof(ompi_osc_pt2pt_send_header_t));
|
||||
((ompi_osc_pt2pt_send_header_t*) longreq->req_comp_cbdata)->hdr_msg_length = buflen;
|
||||
((ompi_osc_pt2pt_send_header_t*) longreq->mpireq.cbdata)->hdr_msg_length = buflen;
|
||||
|
||||
ret = mca_pml.pml_irecv(((char*) longreq->req_comp_cbdata) + sizeof(ompi_osc_pt2pt_send_header_t),
|
||||
ret = mca_pml.pml_irecv(((char*) longreq->mpireq.cbdata) + sizeof(ompi_osc_pt2pt_send_header_t),
|
||||
header->hdr_target_count,
|
||||
datatype,
|
||||
header->hdr_origin,
|
||||
header->hdr_origin_tag,
|
||||
module->p2p_comm,
|
||||
&(longreq->req_pml_req));
|
||||
&(longreq->mpireq.request));
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
||||
"%d started long recv accum message from %d (%d)",
|
||||
@ -677,10 +693,10 @@ ompi_osc_pt2pt_sendreq_recv_accum(ompi_osc_pt2pt_module_t *module,
|
||||
header->hdr_origin_tag));
|
||||
|
||||
/* put the send request in the waiting list */
|
||||
OPAL_THREAD_LOCK(&(module->p2p_lock));
|
||||
opal_list_append(&(module->p2p_long_msgs),
|
||||
&(longreq->super.super));
|
||||
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
|
||||
&(longreq->mpireq.super.super));
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -693,18 +709,19 @@ ompi_osc_pt2pt_sendreq_recv_accum(ompi_osc_pt2pt_module_t *module,
|
||||
*
|
||||
**********************************************************************/
|
||||
static void
|
||||
ompi_osc_pt2pt_replyreq_recv_long_cb(ompi_osc_pt2pt_longreq_t *longreq)
|
||||
ompi_osc_pt2pt_replyreq_recv_long_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
|
||||
{
|
||||
ompi_osc_pt2pt_longreq_t *longreq =
|
||||
(ompi_osc_pt2pt_longreq_t*) mpireq;
|
||||
ompi_osc_pt2pt_sendreq_t *sendreq =
|
||||
(ompi_osc_pt2pt_sendreq_t*) longreq->req_comp_cbdata;
|
||||
|
||||
opal_list_remove_item(&(longreq->req_module->p2p_long_msgs),
|
||||
&(longreq->super.super));
|
||||
(ompi_osc_pt2pt_sendreq_t*) longreq->mpireq.cbdata;
|
||||
int32_t count;
|
||||
|
||||
ompi_osc_pt2pt_longreq_free(longreq);
|
||||
|
||||
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
|
||||
count = OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
|
||||
ompi_osc_pt2pt_sendreq_free(sendreq);
|
||||
if (0 == count) opal_condition_broadcast(&sendreq->req_module->p2p_cond);
|
||||
}
|
||||
|
||||
int
|
||||
@ -714,6 +731,7 @@ ompi_osc_pt2pt_replyreq_recv(ompi_osc_pt2pt_module_t *module,
|
||||
void *payload)
|
||||
{
|
||||
int ret = OMPI_SUCCESS;
|
||||
int32_t count;
|
||||
|
||||
/* receive into user buffer */
|
||||
if (header->hdr_msg_length > 0) {
|
||||
@ -731,14 +749,16 @@ ompi_osc_pt2pt_replyreq_recv(ompi_osc_pt2pt_module_t *module,
|
||||
&iov_count,
|
||||
&max_data );
|
||||
|
||||
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
|
||||
count = OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
|
||||
ompi_osc_pt2pt_sendreq_free(sendreq);
|
||||
if (0 == count) opal_condition_broadcast(&module->p2p_cond);
|
||||
|
||||
} else {
|
||||
ompi_osc_pt2pt_longreq_t *longreq;
|
||||
ompi_osc_pt2pt_longreq_alloc(&longreq);
|
||||
|
||||
longreq->req_comp_cb = ompi_osc_pt2pt_replyreq_recv_long_cb;
|
||||
longreq->req_comp_cbdata = sendreq;
|
||||
longreq->mpireq.cbfunc = ompi_osc_pt2pt_replyreq_recv_long_cb;
|
||||
longreq->mpireq.cbdata = sendreq;
|
||||
longreq->req_module = module;
|
||||
|
||||
/* BWB - FIX ME - George is going to kill me for this */
|
||||
@ -748,13 +768,13 @@ ompi_osc_pt2pt_replyreq_recv(ompi_osc_pt2pt_module_t *module,
|
||||
sendreq->req_target_rank,
|
||||
header->hdr_target_tag,
|
||||
module->p2p_comm,
|
||||
&(longreq->req_pml_req));
|
||||
|
||||
&(longreq->mpireq.request));
|
||||
|
||||
/* put the send request in the waiting list */
|
||||
OPAL_THREAD_LOCK(&(module->p2p_lock));
|
||||
opal_list_append(&(module->p2p_long_msgs),
|
||||
&(longreq->super.super));
|
||||
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
|
||||
&(longreq->mpireq.super.super));
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -767,11 +787,11 @@ ompi_osc_pt2pt_replyreq_recv(ompi_osc_pt2pt_module_t *module,
|
||||
*
|
||||
**********************************************************************/
|
||||
static void
|
||||
ompi_osc_pt2pt_control_send_cb(ompi_osc_pt2pt_buffer_t *buffer)
|
||||
ompi_osc_pt2pt_control_send_cb(ompi_osc_pt2pt_mpireq_t *mpireq)
|
||||
{
|
||||
/* release the descriptor and sendreq */
|
||||
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,
|
||||
&buffer->super);
|
||||
&mpireq->super);
|
||||
}
|
||||
|
||||
|
||||
@ -809,8 +829,8 @@ ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module,
|
||||
}
|
||||
|
||||
/* setup buffer */
|
||||
buffer->cbfunc = ompi_osc_pt2pt_control_send_cb;
|
||||
buffer->cbdata = NULL;
|
||||
buffer->mpireq.cbfunc = ompi_osc_pt2pt_control_send_cb;
|
||||
buffer->mpireq.cbdata = NULL;
|
||||
buffer->len = sizeof(ompi_osc_pt2pt_control_header_t);
|
||||
|
||||
/* pack header */
|
||||
@ -819,7 +839,6 @@ ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module,
|
||||
header->hdr_base.hdr_flags = 0;
|
||||
header->hdr_value[0] = value0;
|
||||
header->hdr_value[1] = value1;
|
||||
header->hdr_windx = module->p2p_comm->c_contextid;
|
||||
|
||||
#ifdef WORDS_BIGENDIAN
|
||||
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
|
||||
@ -838,9 +857,12 @@ ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module,
|
||||
CONTROL_MSG_TAG,
|
||||
MCA_PML_BASE_SEND_STANDARD,
|
||||
module->p2p_comm,
|
||||
&buffer->request));
|
||||
opal_list_append(&module->p2p_pending_control_sends,
|
||||
&buffer->super.super);
|
||||
&buffer->mpireq.request));
|
||||
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
opal_list_append(&mca_osc_pt2pt_component.p2p_c_pending_requests,
|
||||
&(buffer->mpireq.super.super));
|
||||
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
|
||||
|
||||
goto done;
|
||||
|
||||
cleanup:
|
||||
|
@ -7,6 +7,8 @@
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2006-2007 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
@ -37,8 +39,10 @@
|
||||
|
||||
struct ompi_osc_pt2pt_base_header_t {
|
||||
uint8_t hdr_type;
|
||||
/* eventually, this will include endian information */
|
||||
uint8_t hdr_flags;
|
||||
#if OMPI_ENABLE_HETEROGENEOUS_SUPPORT
|
||||
uint8_t padding[2];
|
||||
#endif
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_base_header_t ompi_osc_pt2pt_base_header_t;
|
||||
|
||||
@ -47,7 +51,6 @@ typedef struct ompi_osc_pt2pt_base_header_t ompi_osc_pt2pt_base_header_t;
|
||||
|
||||
struct ompi_osc_pt2pt_send_header_t {
|
||||
ompi_osc_pt2pt_base_header_t hdr_base;
|
||||
uint16_t hdr_windx;
|
||||
|
||||
int32_t hdr_origin;
|
||||
ompi_ptr_t hdr_origin_sendreq;
|
||||
@ -64,7 +67,6 @@ typedef struct ompi_osc_pt2pt_send_header_t ompi_osc_pt2pt_send_header_t;
|
||||
#define OMPI_OSC_PT2PT_SEND_HDR_HTON(hdr) \
|
||||
do { \
|
||||
OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \
|
||||
(hdr).hdr_windx = htons((hdr).hdr_windx); \
|
||||
(hdr).hdr_origin = htonl((hdr).hdr_origin); \
|
||||
(hdr).hdr_origin_tag = htonl((hdr).hdr_origin_tag); \
|
||||
(hdr).hdr_target_disp = htonl((hdr).hdr_target_disp); \
|
||||
@ -76,7 +78,6 @@ typedef struct ompi_osc_pt2pt_send_header_t ompi_osc_pt2pt_send_header_t;
|
||||
#define OMPI_OSC_PT2PT_SEND_HDR_NTOH(hdr) \
|
||||
do { \
|
||||
OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \
|
||||
(hdr).hdr_windx = ntohs((hdr).hdr_windx); \
|
||||
(hdr).hdr_origin = ntohl((hdr).hdr_origin); \
|
||||
(hdr).hdr_origin_tag = ntohl((hdr).hdr_origin_tag); \
|
||||
(hdr).hdr_target_disp = ntohl((hdr).hdr_target_disp); \
|
||||
@ -113,7 +114,6 @@ typedef struct ompi_osc_pt2pt_reply_header_t ompi_osc_pt2pt_reply_header_t;
|
||||
|
||||
struct ompi_osc_pt2pt_control_header_t {
|
||||
ompi_osc_pt2pt_base_header_t hdr_base;
|
||||
int16_t hdr_windx;
|
||||
int32_t hdr_value[2];
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_control_header_t ompi_osc_pt2pt_control_header_t;
|
||||
@ -121,7 +121,6 @@ typedef struct ompi_osc_pt2pt_control_header_t ompi_osc_pt2pt_control_header_t;
|
||||
#define OMPI_OSC_PT2PT_CONTROL_HDR_HTON(hdr) \
|
||||
do { \
|
||||
OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \
|
||||
(hdr).hdr_windx = htons((hdr).hdr_windx); \
|
||||
(hdr).hdr_value[0] = htonl((hdr).hdr_value[0]); \
|
||||
(hdr).hdr_value[1] = htonl((hdr).hdr_value[1]); \
|
||||
} while (0)
|
||||
@ -129,7 +128,6 @@ typedef struct ompi_osc_pt2pt_control_header_t ompi_osc_pt2pt_control_header_t;
|
||||
#define OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(hdr) \
|
||||
do { \
|
||||
OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \
|
||||
(hdr).hdr_windx = ntohs((hdr).hdr_windx); \
|
||||
(hdr).hdr_value[0] = ntohl((hdr).hdr_value[0]); \
|
||||
(hdr).hdr_value[1] = ntohl((hdr).hdr_value[1]); \
|
||||
} while (0)
|
||||
|
@ -16,11 +16,10 @@
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "osc_pt2pt.h"
|
||||
#include "osc_pt2pt_mpireq.h"
|
||||
#include "osc_pt2pt_longreq.h"
|
||||
|
||||
#include "opal/class/opal_list.h"
|
||||
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_longreq_t, opal_free_list_item_t,
|
||||
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_longreq_t, ompi_osc_pt2pt_mpireq_t,
|
||||
NULL, NULL);
|
||||
|
||||
|
@ -18,33 +18,18 @@
|
||||
#define OSC_PT2PT_LONGREQ_H
|
||||
|
||||
#include "osc_pt2pt.h"
|
||||
|
||||
#include "opal/class/opal_list.h"
|
||||
#include "opal/class/opal_free_list.h"
|
||||
#include "ompi/request/request.h"
|
||||
|
||||
struct ompi_osc_pt2pt_longreq_t;
|
||||
typedef struct ompi_osc_pt2pt_longreq_t ompi_osc_pt2pt_longreq_t;
|
||||
|
||||
typedef void (*ompi_osc_pt2pt_longreq_comp_cb_t)(ompi_osc_pt2pt_longreq_t *longreq);
|
||||
#include "osc_pt2pt_mpireq.h"
|
||||
|
||||
struct ompi_osc_pt2pt_longreq_t {
|
||||
opal_free_list_item_t super;
|
||||
ompi_osc_pt2pt_mpireq_t mpireq;
|
||||
|
||||
/* warning - this doesn't always have a sane value */
|
||||
ompi_osc_pt2pt_module_t *req_module;
|
||||
|
||||
ompi_request_t *req_pml_req;
|
||||
ompi_osc_pt2pt_longreq_comp_cb_t req_comp_cb;
|
||||
|
||||
/* general storage place - usually holds a request of some type */
|
||||
void *req_comp_cbdata;
|
||||
|
||||
/* for long receives, to avoid a longrecvreq type */
|
||||
/* BWB - I don't like this, but I don't want another free list. What to do? */
|
||||
struct ompi_op_t *req_op;
|
||||
struct ompi_datatype_t *req_datatype;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_longreq_t ompi_osc_pt2pt_longreq_t;
|
||||
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_longreq_t);
|
||||
|
||||
static inline int
|
||||
@ -64,7 +49,7 @@ static inline int
|
||||
ompi_osc_pt2pt_longreq_free(ompi_osc_pt2pt_longreq_t *longreq)
|
||||
{
|
||||
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_longreqs,
|
||||
&longreq->super.super);
|
||||
&longreq->mpireq.super.super);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
27
ompi/mca/osc/pt2pt/osc_pt2pt_mpireq.c
Обычный файл
27
ompi/mca/osc/pt2pt/osc_pt2pt_mpireq.c
Обычный файл
@ -0,0 +1,27 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2006 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "osc_pt2pt.h"
|
||||
#include "osc_pt2pt_mpireq.h"
|
||||
|
||||
#include "opal/class/opal_free_list.h"
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_mpireq_t, opal_free_list_item_t,
|
||||
NULL, NULL);
|
||||
|
||||
|
44
ompi/mca/osc/pt2pt/osc_pt2pt_mpireq.h
Обычный файл
44
ompi/mca/osc/pt2pt/osc_pt2pt_mpireq.h
Обычный файл
@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2004-2006 The Trustees of the University of Tennessee.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2006 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2007 Los Alamos National Security, LLC. All rights
|
||||
* reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef OMPI_OSC_PT2PT_MPIREQ_H
|
||||
#define OMPI_OSC_PT2PT_MPIREQ_H
|
||||
|
||||
#include "opal/class/opal_free_list.h"
|
||||
#include "ompi/request/request.h"
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
struct ompi_osc_pt2pt_mpireq_t;
|
||||
|
||||
typedef void (*ompi_osc_pt2pt_mpireq_cb_fn_t)(
|
||||
struct ompi_osc_pt2pt_mpireq_t *mpireq);
|
||||
|
||||
struct ompi_osc_pt2pt_mpireq_t {
|
||||
opal_free_list_item_t super;
|
||||
ompi_request_t *request;
|
||||
ompi_status_public_t status;
|
||||
ompi_osc_pt2pt_mpireq_cb_fn_t cbfunc;
|
||||
void *cbdata;
|
||||
};
|
||||
typedef struct ompi_osc_pt2pt_mpireq_t ompi_osc_pt2pt_mpireq_t;
|
||||
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_mpireq_t);
|
||||
|
||||
END_C_DECLS
|
||||
|
||||
#endif
|
@ -48,25 +48,3 @@ int ompi_osc_pt2pt_process_op(ompi_osc_pt2pt_module_t *module,
|
||||
ompi_op_t *op,
|
||||
void *inbuf,
|
||||
size_t inbuflen);
|
||||
|
||||
/**
|
||||
* Convert a window index number into a module instance.
|
||||
*/
|
||||
static inline ompi_osc_pt2pt_module_t*
|
||||
ompi_osc_pt2pt_windx_to_module(uint32_t windx)
|
||||
{
|
||||
int ret;
|
||||
ompi_osc_pt2pt_module_t *module;
|
||||
|
||||
/* find the right module and dispatch */
|
||||
ret = opal_hash_table_get_value_uint32(&mca_osc_pt2pt_component.p2p_c_modules,
|
||||
windx,
|
||||
(void**) (&module));
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output(0, "Could not translate windx %d to a local MPI_Win instance",
|
||||
windx);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return module;
|
||||
}
|
||||
|
@ -28,37 +28,6 @@
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "ompi/mca/osc/base/base.h"
|
||||
|
||||
/* should have p2p_lock before calling */
|
||||
static inline void
|
||||
ompi_osc_pt2pt_progress_long(ompi_osc_pt2pt_module_t *module)
|
||||
{
|
||||
if (0 != opal_list_get_size(&(module->p2p_long_msgs))) {
|
||||
opal_list_item_t *item, *next;
|
||||
|
||||
OPAL_THREAD_LOCK(&(module->p2p_lock));
|
||||
|
||||
/* Have to go the convoluted while() route instead of a for()
|
||||
loop because the callback will likely remove the request
|
||||
from the list and free it, and that would lead to much
|
||||
badness. */
|
||||
next = opal_list_get_first(&(module->p2p_long_msgs));
|
||||
while (opal_list_get_end(&(module->p2p_long_msgs)) != (item = next)) {
|
||||
ompi_osc_pt2pt_longreq_t *longreq =
|
||||
(ompi_osc_pt2pt_longreq_t*) item;
|
||||
int ret, completed;
|
||||
next = opal_list_get_next(item);
|
||||
|
||||
ret = ompi_osc_pt2pt_request_test(&(longreq->req_pml_req), &completed, NULL);
|
||||
/* BWB - FIX ME - error handling */
|
||||
if (completed > 0) {
|
||||
longreq->req_comp_cb(longreq);
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
|
||||
}
|
||||
opal_progress();
|
||||
}
|
||||
|
||||
static inline void
|
||||
ompi_osc_pt2pt_flip_sendreqs(ompi_osc_pt2pt_module_t *module)
|
||||
@ -88,89 +57,89 @@ ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win)
|
||||
{
|
||||
unsigned int incoming_reqs;
|
||||
int ret = OMPI_SUCCESS, i;
|
||||
ompi_osc_pt2pt_module_t *module = P2P_MODULE(win);
|
||||
|
||||
if (0 != (assert & MPI_MODE_NOPRECEDE)) {
|
||||
int num_pending;
|
||||
|
||||
/* check that the user didn't lie to us - since NOPRECEDED
|
||||
must be specified by all processes if it is specified by
|
||||
any process, if we see this it is safe to assume that there
|
||||
are no pending operations anywhere needed to close out this
|
||||
epoch. */
|
||||
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
|
||||
num_pending = opal_list_get_size(&(P2P_MODULE(win)->p2p_pending_sendreqs));
|
||||
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
|
||||
if (0 != num_pending) {
|
||||
epoch. No need to lock, since it's a lookup and any
|
||||
pending modification of the pending_sendreqs during this
|
||||
time is an erroneous program. */
|
||||
if (0 != opal_list_get_size(&(module->p2p_pending_sendreqs))) {
|
||||
return MPI_ERR_RMA_SYNC;
|
||||
}
|
||||
|
||||
} else {
|
||||
opal_list_item_t *item;
|
||||
|
||||
ompi_osc_pt2pt_flip_sendreqs(P2P_MODULE(win));
|
||||
/* "atomically" copy all the data we're going to be modifying
|
||||
into the copy... */
|
||||
ompi_osc_pt2pt_flip_sendreqs(module);
|
||||
|
||||
/* find out how much data everyone is going to send us. Need
|
||||
to have the lock during this period so that we have a sane
|
||||
view of the number of sendreqs */
|
||||
ret = P2P_MODULE(win)->p2p_comm->
|
||||
c_coll.coll_reduce_scatter(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs,
|
||||
/* find out how much data everyone is going to send us. */
|
||||
ret = module->p2p_comm->
|
||||
c_coll.coll_reduce_scatter(module->p2p_copy_num_pending_sendreqs,
|
||||
&incoming_reqs,
|
||||
P2P_MODULE(win)->p2p_fence_coll_counts,
|
||||
module->p2p_fence_coll_counts,
|
||||
MPI_UNSIGNED,
|
||||
MPI_SUM,
|
||||
P2P_MODULE(win)->p2p_comm);
|
||||
module->p2p_comm);
|
||||
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
/* put the stupid data back for the user. This is not
|
||||
cheap, but the user lost his data if we don't. */
|
||||
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
|
||||
opal_list_join(&P2P_MODULE(win)->p2p_pending_sendreqs,
|
||||
opal_list_get_end(&P2P_MODULE(win)->p2p_pending_sendreqs),
|
||||
&P2P_MODULE(win)->p2p_copy_pending_sendreqs);
|
||||
OPAL_THREAD_LOCK(&(module->p2p_lock));
|
||||
opal_list_join(&module->p2p_pending_sendreqs,
|
||||
opal_list_get_end(&module->p2p_pending_sendreqs),
|
||||
&module->p2p_copy_pending_sendreqs);
|
||||
|
||||
for (i = 0 ; i < ompi_comm_size(P2P_MODULE(win)->p2p_comm) ; ++i) {
|
||||
P2P_MODULE(win)->p2p_num_pending_sendreqs[i] +=
|
||||
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[i];
|
||||
for (i = 0 ; i < ompi_comm_size(module->p2p_comm) ; ++i) {
|
||||
module->p2p_num_pending_sendreqs[i] +=
|
||||
module->p2p_copy_num_pending_sendreqs[i];
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
|
||||
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* possible we've already received a couple in messages, so
|
||||
atomicall add however many we're going to wait for */
|
||||
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_in), incoming_reqs);
|
||||
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out),
|
||||
opal_list_get_size(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)));
|
||||
atomically add however many we're going to wait for */
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), incoming_reqs);
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_pending_out),
|
||||
opal_list_get_size(&(module->p2p_copy_pending_sendreqs)));
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
||||
"fence: waiting on %d in and %d out",
|
||||
P2P_MODULE(win)->p2p_num_pending_in,
|
||||
P2P_MODULE(win)->p2p_num_pending_out));
|
||||
module->p2p_num_pending_in,
|
||||
module->p2p_num_pending_out));
|
||||
|
||||
/* try to start all the requests. We've copied everything we
|
||||
need out of pending_sendreqs, so don't need the lock
|
||||
here */
|
||||
while (NULL !=
|
||||
(item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) {
|
||||
(item = opal_list_remove_first(&(module->p2p_copy_pending_sendreqs)))) {
|
||||
ompi_osc_pt2pt_sendreq_t *req =
|
||||
(ompi_osc_pt2pt_sendreq_t*) item;
|
||||
|
||||
ret = ompi_osc_pt2pt_sendreq_send(P2P_MODULE(win), req);
|
||||
ret = ompi_osc_pt2pt_sendreq_send(module, req);
|
||||
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output_verbose(5, ompi_osc_base_output,
|
||||
"fence: failure in starting sendreq (%d). Will try later.",
|
||||
ret);
|
||||
opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);
|
||||
opal_list_append(&(module->p2p_copy_pending_sendreqs), item);
|
||||
}
|
||||
}
|
||||
|
||||
OPAL_THREAD_LOCK(&module->p2p_lock);
|
||||
/* now we know how many things we're waiting for - wait for them... */
|
||||
while (P2P_MODULE(win)->p2p_num_pending_in > 0 ||
|
||||
0 != P2P_MODULE(win)->p2p_num_pending_out) {
|
||||
ompi_osc_pt2pt_progress_long(P2P_MODULE(win));
|
||||
while (module->p2p_num_pending_in > 0 ||
|
||||
0 != module->p2p_num_pending_out) {
|
||||
opal_condition_wait(&module->p2p_cond, &module->p2p_lock);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->p2p_lock);
|
||||
}
|
||||
|
||||
/* all transfers are done - back to the real world we go */
|
||||
@ -189,19 +158,23 @@ ompi_osc_pt2pt_module_start(ompi_group_t *group,
|
||||
int assert,
|
||||
ompi_win_t *win)
|
||||
{
|
||||
int i;
|
||||
int i, ret = OMPI_SUCCESS;
|
||||
ompi_osc_pt2pt_module_t *module = P2P_MODULE(win);
|
||||
|
||||
OBJ_RETAIN(group);
|
||||
/* BWB - do I need this? */
|
||||
ompi_group_increment_proc_count(group);
|
||||
|
||||
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
|
||||
assert(NULL == P2P_MODULE(win)->p2p_sc_group);
|
||||
P2P_MODULE(win)->p2p_sc_group = group;
|
||||
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
|
||||
OPAL_THREAD_LOCK(&(module->p2p_lock));
|
||||
if (NULL != module->p2p_sc_group) {
|
||||
OPAL_THREAD_UNLOCK(&module->p2p_lock);
|
||||
ret = MPI_ERR_RMA_SYNC;
|
||||
goto cleanup;
|
||||
}
|
||||
module->p2p_sc_group = group;
|
||||
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
|
||||
|
||||
memset(P2P_MODULE(win)->p2p_sc_remote_active_ranks, 0,
|
||||
sizeof(bool) * ompi_comm_size(P2P_MODULE(win)->p2p_comm));
|
||||
memset(module->p2p_sc_remote_active_ranks, 0,
|
||||
sizeof(bool) * ompi_comm_size(module->p2p_comm));
|
||||
|
||||
/* for each process in the specified group, find it's rank in our
|
||||
communicator, store those indexes, and set the true / false in
|
||||
@ -211,24 +184,25 @@ ompi_osc_pt2pt_module_start(ompi_group_t *group,
|
||||
|
||||
/* no need to increment ref count - the communicator isn't
|
||||
going anywhere while we're here */
|
||||
ompi_group_t *comm_group = P2P_MODULE(win)->p2p_comm->c_local_group;
|
||||
ompi_group_t *comm_group = module->p2p_comm->c_local_group;
|
||||
|
||||
/* find the rank in the communicator associated with this windows */
|
||||
for (j = 0 ;
|
||||
j < ompi_group_size(comm_group) ;
|
||||
++j) {
|
||||
if (P2P_MODULE(win)->p2p_sc_group->grp_proc_pointers[i] ==
|
||||
if (module->p2p_sc_group->grp_proc_pointers[i] ==
|
||||
comm_group->grp_proc_pointers[j]) {
|
||||
comm_rank = j;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (comm_rank == -1) {
|
||||
return MPI_ERR_RMA_SYNC;
|
||||
ret = MPI_ERR_RMA_SYNC;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
P2P_MODULE(win)->p2p_sc_remote_active_ranks[comm_rank] = true;
|
||||
P2P_MODULE(win)->p2p_sc_remote_ranks[i] = comm_rank;
|
||||
module->p2p_sc_remote_active_ranks[comm_rank] = true;
|
||||
module->p2p_sc_remote_ranks[i] = comm_rank;
|
||||
}
|
||||
|
||||
/* Set our mode to access w/ start */
|
||||
@ -237,10 +211,15 @@ ompi_osc_pt2pt_module_start(ompi_group_t *group,
|
||||
|
||||
/* possible we've already received a couple in messages, so
|
||||
atomicall add however many we're going to wait for */
|
||||
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_post_msgs),
|
||||
ompi_group_size(P2P_MODULE(win)->p2p_sc_group));
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_post_msgs),
|
||||
ompi_group_size(module->p2p_sc_group));
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
cleanup:
|
||||
ompi_group_decrement_proc_count(group);
|
||||
OBJ_RELEASE(group);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -251,25 +230,28 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win)
|
||||
int ret = OMPI_SUCCESS;
|
||||
ompi_group_t *group;
|
||||
opal_list_item_t *item;
|
||||
ompi_osc_pt2pt_module_t *module = P2P_MODULE(win);
|
||||
|
||||
/* wait for all the post messages */
|
||||
while (0 != P2P_MODULE(win)->p2p_num_post_msgs) {
|
||||
ompi_osc_pt2pt_progress_long(P2P_MODULE(win));
|
||||
OPAL_THREAD_LOCK(&module->p2p_lock);
|
||||
while (0 != module->p2p_num_post_msgs) {
|
||||
opal_condition_wait(&module->p2p_cond, &module->p2p_lock);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->p2p_lock);
|
||||
|
||||
ompi_osc_pt2pt_flip_sendreqs(P2P_MODULE(win));
|
||||
ompi_osc_pt2pt_flip_sendreqs(module);
|
||||
|
||||
/* for each process in group, send a control message with number
|
||||
of updates coming, then start all the requests */
|
||||
for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_sc_group) ; ++i) {
|
||||
int comm_rank = P2P_MODULE(win)->p2p_sc_remote_ranks[i];
|
||||
for (i = 0 ; i < ompi_group_size(module->p2p_sc_group) ; ++i) {
|
||||
int comm_rank = module->p2p_sc_remote_ranks[i];
|
||||
|
||||
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out),
|
||||
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank]);
|
||||
ret = ompi_osc_pt2pt_control_send(P2P_MODULE(win),
|
||||
P2P_MODULE(win)->p2p_sc_group->grp_proc_pointers[i],
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_pending_out),
|
||||
module->p2p_copy_num_pending_sendreqs[comm_rank]);
|
||||
ret = ompi_osc_pt2pt_control_send(module,
|
||||
module->p2p_sc_group->grp_proc_pointers[i],
|
||||
OMPI_OSC_PT2PT_HDR_COMPLETE,
|
||||
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank],
|
||||
module->p2p_copy_num_pending_sendreqs[comm_rank],
|
||||
0);
|
||||
assert(ret == OMPI_SUCCESS);
|
||||
}
|
||||
@ -278,67 +260,68 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win)
|
||||
need out of pending_sendreqs, so don't need the lock
|
||||
here */
|
||||
while (NULL !=
|
||||
(item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) {
|
||||
(item = opal_list_remove_first(&(module->p2p_copy_pending_sendreqs)))) {
|
||||
ompi_osc_pt2pt_sendreq_t *req =
|
||||
(ompi_osc_pt2pt_sendreq_t*) item;
|
||||
|
||||
ret = ompi_osc_pt2pt_sendreq_send(P2P_MODULE(win), req);
|
||||
ret = ompi_osc_pt2pt_sendreq_send(module, req);
|
||||
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output_verbose(5, ompi_osc_base_output,
|
||||
"complete: failure in starting sendreq (%d). Will try later.",
|
||||
ret);
|
||||
opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);
|
||||
opal_list_append(&(module->p2p_copy_pending_sendreqs), item);
|
||||
}
|
||||
}
|
||||
|
||||
/* wait for all the requests */
|
||||
while (0 != P2P_MODULE(win)->p2p_num_pending_out) {
|
||||
ompi_osc_pt2pt_progress_long(P2P_MODULE(win));
|
||||
OPAL_THREAD_LOCK(&module->p2p_lock);
|
||||
while (0 != module->p2p_num_pending_out) {
|
||||
opal_condition_wait(&module->p2p_cond, &module->p2p_lock);
|
||||
}
|
||||
|
||||
group = module->p2p_sc_group;
|
||||
module->p2p_sc_group = NULL;
|
||||
|
||||
OPAL_THREAD_UNLOCK(&module->p2p_lock);
|
||||
|
||||
/* remove WIN_POSTED from our mode */
|
||||
ompi_win_remove_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_STARTED);
|
||||
|
||||
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
|
||||
group = P2P_MODULE(win)->p2p_sc_group;
|
||||
P2P_MODULE(win)->p2p_sc_group = NULL;
|
||||
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
|
||||
|
||||
/* BWB - do I need this? */
|
||||
ompi_group_decrement_proc_count(group);
|
||||
OBJ_RELEASE(group);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
ompi_osc_pt2pt_module_post(ompi_group_t *group,
|
||||
int assert,
|
||||
ompi_win_t *win)
|
||||
{
|
||||
int i;
|
||||
ompi_osc_pt2pt_module_t *module = P2P_MODULE(win);
|
||||
|
||||
OBJ_RETAIN(group);
|
||||
/* BWB - do I need this? */
|
||||
ompi_group_increment_proc_count(group);
|
||||
|
||||
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
|
||||
assert(NULL == P2P_MODULE(win)->p2p_pw_group);
|
||||
P2P_MODULE(win)->p2p_pw_group = group;
|
||||
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
|
||||
OPAL_THREAD_LOCK(&(module->p2p_lock));
|
||||
assert(NULL == module->p2p_pw_group);
|
||||
module->p2p_pw_group = group;
|
||||
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
|
||||
|
||||
/* Set our mode to expose w/ post */
|
||||
ompi_win_remove_mode(win, OMPI_WIN_FENCE);
|
||||
ompi_win_append_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED);
|
||||
|
||||
/* list how many complete counters we're still waiting on */
|
||||
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_complete_msgs),
|
||||
ompi_group_size(P2P_MODULE(win)->p2p_pw_group));
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_complete_msgs),
|
||||
ompi_group_size(module->p2p_pw_group));
|
||||
|
||||
/* send a hello counter to everyone in group */
|
||||
for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_pw_group) ; ++i) {
|
||||
ompi_osc_pt2pt_control_send(P2P_MODULE(win),
|
||||
for (i = 0 ; i < ompi_group_size(module->p2p_pw_group) ; ++i) {
|
||||
ompi_osc_pt2pt_control_send(module,
|
||||
group->grp_proc_pointers[i],
|
||||
OMPI_OSC_PT2PT_HDR_POST, 1, 0);
|
||||
}
|
||||
@ -351,20 +334,20 @@ int
|
||||
ompi_osc_pt2pt_module_wait(ompi_win_t *win)
|
||||
{
|
||||
ompi_group_t *group;
|
||||
ompi_osc_pt2pt_module_t *module = P2P_MODULE(win);
|
||||
|
||||
while (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||
|
||||
0 != (P2P_MODULE(win)->p2p_num_complete_msgs)) {
|
||||
ompi_osc_pt2pt_progress_long(P2P_MODULE(win));
|
||||
OPAL_THREAD_LOCK(&module->p2p_lock);
|
||||
while (0 != (module->p2p_num_pending_in) ||
|
||||
0 != (module->p2p_num_complete_msgs)) {
|
||||
opal_condition_wait(&module->p2p_cond, &module->p2p_lock);
|
||||
}
|
||||
|
||||
group = module->p2p_pw_group;
|
||||
module->p2p_pw_group = NULL;
|
||||
OPAL_THREAD_UNLOCK(&module->p2p_lock);
|
||||
|
||||
ompi_win_remove_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED);
|
||||
|
||||
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
|
||||
group = P2P_MODULE(win)->p2p_pw_group;
|
||||
P2P_MODULE(win)->p2p_pw_group = NULL;
|
||||
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
|
||||
|
||||
/* BWB - do I need this? */
|
||||
ompi_group_decrement_proc_count(group);
|
||||
OBJ_RELEASE(group);
|
||||
|
||||
@ -377,27 +360,27 @@ ompi_osc_pt2pt_module_test(ompi_win_t *win,
|
||||
int *flag)
|
||||
{
|
||||
ompi_group_t *group;
|
||||
ompi_osc_pt2pt_module_t *module = P2P_MODULE(win);
|
||||
|
||||
if (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||
|
||||
0 != (P2P_MODULE(win)->p2p_num_complete_msgs)) {
|
||||
ompi_osc_pt2pt_progress_long(P2P_MODULE(win));
|
||||
if (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||
|
||||
0 != (P2P_MODULE(win)->p2p_num_complete_msgs)) {
|
||||
*flag = 0;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
#if !OMPI_ENABLE_PROGRESS_THREADS
|
||||
opal_progress();
|
||||
#endif
|
||||
|
||||
if (0 != (module->p2p_num_pending_in) ||
|
||||
0 != (module->p2p_num_complete_msgs)) {
|
||||
*flag = 0;
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
*flag = 1;
|
||||
|
||||
ompi_win_remove_mode(win, OMPI_WIN_EXPOSE_EPOCH | OMPI_WIN_POSTED);
|
||||
|
||||
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
|
||||
group = P2P_MODULE(win)->p2p_pw_group;
|
||||
P2P_MODULE(win)->p2p_pw_group = NULL;
|
||||
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
|
||||
OPAL_THREAD_LOCK(&(module->p2p_lock));
|
||||
group = module->p2p_pw_group;
|
||||
module->p2p_pw_group = NULL;
|
||||
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
|
||||
|
||||
/* BWB - do I need this? */
|
||||
ompi_group_decrement_proc_count(group);
|
||||
OBJ_RELEASE(group);
|
||||
|
||||
@ -421,7 +404,8 @@ ompi_osc_pt2pt_module_lock(int lock_type,
|
||||
int assert,
|
||||
ompi_win_t *win)
|
||||
{
|
||||
ompi_proc_t *proc = ompi_comm_peer_lookup( P2P_MODULE(win)->p2p_comm, target );
|
||||
ompi_osc_pt2pt_module_t *module = P2P_MODULE(win);
|
||||
ompi_proc_t *proc = ompi_comm_peer_lookup( module->p2p_comm, target );
|
||||
|
||||
assert(lock_type != 0);
|
||||
|
||||
@ -431,12 +415,12 @@ ompi_osc_pt2pt_module_lock(int lock_type,
|
||||
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
||||
"%d sending lock request to %d",
|
||||
P2P_MODULE(win)->p2p_comm->c_my_rank, target));
|
||||
module->p2p_comm->c_my_rank, target));
|
||||
/* generate a lock request */
|
||||
ompi_osc_pt2pt_control_send(P2P_MODULE(win),
|
||||
ompi_osc_pt2pt_control_send(module,
|
||||
proc,
|
||||
OMPI_OSC_PT2PT_HDR_LOCK_REQ,
|
||||
P2P_MODULE(win)->p2p_comm->c_my_rank,
|
||||
module->p2p_comm->c_my_rank,
|
||||
lock_type);
|
||||
|
||||
/* return */
|
||||
@ -451,54 +435,60 @@ ompi_osc_pt2pt_module_unlock(int target,
|
||||
int32_t out_count;
|
||||
opal_list_item_t *item;
|
||||
int ret;
|
||||
ompi_proc_t *proc = ompi_comm_peer_lookup( P2P_MODULE(win)->p2p_comm, target );
|
||||
ompi_osc_pt2pt_module_t *module = P2P_MODULE(win);
|
||||
ompi_proc_t *proc = ompi_comm_peer_lookup( module->p2p_comm, target );
|
||||
|
||||
while (0 == P2P_MODULE(win)->p2p_lock_received_ack) {
|
||||
ompi_osc_pt2pt_progress_long(P2P_MODULE(win));
|
||||
OPAL_THREAD_LOCK(&module->p2p_lock);
|
||||
while (0 == module->p2p_lock_received_ack) {
|
||||
opal_condition_wait(&module->p2p_cond, &module->p2p_lock);
|
||||
}
|
||||
P2P_MODULE(win)->p2p_lock_received_ack = 0;
|
||||
OPAL_THREAD_UNLOCK(&module->p2p_lock);
|
||||
|
||||
OPAL_THREAD_ADD32(&(module->p2p_lock_received_ack), -1);
|
||||
|
||||
/* start all the requests */
|
||||
ompi_osc_pt2pt_flip_sendreqs(P2P_MODULE(win));
|
||||
ompi_osc_pt2pt_flip_sendreqs(module);
|
||||
|
||||
/* try to start all the requests. We've copied everything we need
|
||||
out of pending_sendreqs, so don't need the lock here */
|
||||
out_count = opal_list_get_size(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs));
|
||||
out_count = opal_list_get_size(&(module->p2p_copy_pending_sendreqs));
|
||||
|
||||
/* we want to send all the requests, plus we wait for one more
|
||||
completion event for the control message ack from the unlocker
|
||||
saying we're done */
|
||||
OPAL_THREAD_ADD32(&(P2P_MODULE(win)->p2p_num_pending_out), out_count + 1);
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_pending_out), out_count + 1);
|
||||
|
||||
/* send the unlock request */
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
||||
"%d sending unlock request to %d",
|
||||
P2P_MODULE(win)->p2p_comm->c_my_rank, target));
|
||||
ompi_osc_pt2pt_control_send(P2P_MODULE(win),
|
||||
module->p2p_comm->c_my_rank, target));
|
||||
ompi_osc_pt2pt_control_send(module,
|
||||
proc,
|
||||
OMPI_OSC_PT2PT_HDR_UNLOCK_REQ,
|
||||
P2P_MODULE(win)->p2p_comm->c_my_rank,
|
||||
module->p2p_comm->c_my_rank,
|
||||
out_count);
|
||||
|
||||
while (NULL !=
|
||||
(item = opal_list_remove_first(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs)))) {
|
||||
(item = opal_list_remove_first(&(module->p2p_copy_pending_sendreqs)))) {
|
||||
ompi_osc_pt2pt_sendreq_t *req =
|
||||
(ompi_osc_pt2pt_sendreq_t*) item;
|
||||
|
||||
ret = ompi_osc_pt2pt_sendreq_send(P2P_MODULE(win), req);
|
||||
ret = ompi_osc_pt2pt_sendreq_send(module, req);
|
||||
|
||||
if (OMPI_SUCCESS != ret) {
|
||||
opal_output_verbose(5, ompi_osc_base_output,
|
||||
"unlock: failure in starting sendreq (%d). Will try later.",
|
||||
ret);
|
||||
opal_list_append(&(P2P_MODULE(win)->p2p_copy_pending_sendreqs), item);
|
||||
opal_list_append(&(module->p2p_copy_pending_sendreqs), item);
|
||||
}
|
||||
}
|
||||
|
||||
/* wait for all the requests */
|
||||
while (0 != P2P_MODULE(win)->p2p_num_pending_out) {
|
||||
ompi_osc_pt2pt_progress_long(P2P_MODULE(win));
|
||||
OPAL_THREAD_LOCK(&module->p2p_lock);
|
||||
while (0 != module->p2p_num_pending_out) {
|
||||
opal_condition_wait(&module->p2p_cond, &module->p2p_lock);
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&module->p2p_lock);
|
||||
|
||||
/* set our mode on the window */
|
||||
ompi_win_remove_mode(win, OMPI_WIN_ACCESS_EPOCH | OMPI_WIN_LOCK_ACCESS);
|
||||
@ -571,32 +561,57 @@ ompi_osc_pt2pt_passive_unlock(ompi_osc_pt2pt_module_t *module,
|
||||
int32_t origin,
|
||||
int32_t count)
|
||||
{
|
||||
ompi_osc_pt2pt_pending_lock_t *new_pending = NULL;
|
||||
ompi_proc_t *proc = ompi_comm_peer_lookup( module->p2p_comm, origin );
|
||||
ompi_osc_pt2pt_pending_lock_t *new_pending = NULL;
|
||||
|
||||
assert(module->p2p_lock_status != 0);
|
||||
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), count);
|
||||
opal_output_verbose(50, ompi_osc_base_output,
|
||||
"received unlock request from %d with %d requests\n",
|
||||
origin, count);
|
||||
|
||||
while (0 != module->p2p_num_pending_in) {
|
||||
ompi_osc_pt2pt_progress_long(module);
|
||||
}
|
||||
new_pending = OBJ_NEW(ompi_osc_pt2pt_pending_lock_t);
|
||||
new_pending->proc = proc;
|
||||
new_pending->lock_type = 0;
|
||||
OPAL_THREAD_LOCK(&(module->p2p_lock));
|
||||
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), count);
|
||||
opal_list_append(&module->p2p_unlocks_pending, &(new_pending->super));
|
||||
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
|
||||
|
||||
return ompi_osc_pt2pt_passive_unlock_complete(module);
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
ompi_osc_pt2pt_passive_unlock_complete(ompi_osc_pt2pt_module_t *module)
|
||||
{
|
||||
ompi_osc_pt2pt_pending_lock_t *new_pending = NULL;
|
||||
|
||||
if (module->p2p_num_pending_in != 0) return OMPI_SUCCESS;
|
||||
|
||||
OPAL_THREAD_LOCK(&(module->p2p_lock));
|
||||
if (module->p2p_lock_status == MPI_LOCK_EXCLUSIVE) {
|
||||
ompi_win_remove_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH);
|
||||
module->p2p_lock_status = 0;
|
||||
} else {
|
||||
module->p2p_shared_count--;
|
||||
module->p2p_shared_count -= opal_list_get_size(&module->p2p_unlocks_pending);
|
||||
if (module->p2p_shared_count == 0) {
|
||||
ompi_win_remove_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH);
|
||||
module->p2p_lock_status = 0;
|
||||
}
|
||||
}
|
||||
|
||||
ompi_osc_pt2pt_control_send(module, proc,
|
||||
OMPI_OSC_PT2PT_HDR_UNLOCK_REPLY,
|
||||
OMPI_SUCCESS, OMPI_SUCCESS);
|
||||
/* issue whichever unlock acks we should issue */
|
||||
while (NULL != (new_pending = (ompi_osc_pt2pt_pending_lock_t*)
|
||||
opal_list_remove_first(&module->p2p_unlocks_pending))) {
|
||||
opal_output_verbose(50, ompi_osc_base_output,
|
||||
"sending unlock reply to proc");
|
||||
ompi_osc_pt2pt_control_send(module,
|
||||
new_pending->proc,
|
||||
OMPI_OSC_PT2PT_HDR_UNLOCK_REPLY,
|
||||
OMPI_SUCCESS, OMPI_SUCCESS);
|
||||
OBJ_DESTRUCT(new_pending);
|
||||
}
|
||||
|
||||
/* if we were really unlocked, see if we have more to process */
|
||||
new_pending = (ompi_osc_pt2pt_pending_lock_t*)
|
||||
@ -604,8 +619,8 @@ ompi_osc_pt2pt_passive_unlock(ompi_osc_pt2pt_module_t *module,
|
||||
OPAL_THREAD_UNLOCK(&(module->p2p_lock));
|
||||
|
||||
if (NULL != new_pending) {
|
||||
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_output,
|
||||
"sending lock request to proc"));
|
||||
opal_output_verbose(50, ompi_osc_base_output,
|
||||
"sending lock request to proc");
|
||||
ompi_win_append_mode(module->p2p_win, OMPI_WIN_EXPOSE_EPOCH);
|
||||
/* set lock state and generate a lock request */
|
||||
module->p2p_lock_status = new_pending->lock_type;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user