1
1

* Merge in new version of the pt2pt one-sided communication component,

implemented entirely on top of the PML.  This allows us to have a
  one-sided interface even when we are using the CM PML and MTLs for
  point-to-point transport (and therefore not using the BML/BTLs)
* Old pt2pt component was renamed "rdma", as it will soon be having
  real RDMA support added to it.

Work was done in a temporary branch.  Commit is the result of the
merge command:

  svn merge -r10862:11099 https://svn.open-mpi.org/svn/ompi/tmp/bwb-osc-pt2pt

This commit was SVN r11100.

The following SVN revisions from the original message are invalid or
inconsistent and therefore were not cross-referenced:
  r10862
  r11099
Этот коммит содержится в:
Brian Barrett 2006-08-03 00:10:19 +00:00
родитель a21769bbfb
Коммит 0ba0a60ada
22 изменённых файлов: 548 добавлений и 442 удалений

Просмотреть файл

@ -52,6 +52,8 @@ OMPI_DECLSPEC int mca_bml_base_open(void);
OMPI_DECLSPEC int mca_bml_base_init(bool enable_progress_threads,
bool enable_mpi_threads);
OMPI_DECLSPEC int mca_bml_base_close(void);
OMPI_DECLSPEC bool mca_bml_base_inited(void);
/*

Просмотреть файл

@ -37,6 +37,13 @@ OMPI_DECLSPEC mca_bml_base_module_t mca_bml = {
};
mca_bml_base_component_t mca_bml_component;
static bool init_called = false;
bool
mca_bml_base_inited(void)
{
return init_called;
}
int mca_bml_base_init( bool enable_progress_threads,
bool enable_mpi_threads) {
@ -46,6 +53,8 @@ int mca_bml_base_init( bool enable_progress_threads,
int priority = 0, best_priority = -1;
mca_base_component_list_item_t *cli = NULL;
init_called = true;
for (item = opal_list_get_first(&mca_bml_base_components_available);
opal_list_get_end(&mca_bml_base_components_available) != item;
item = opal_list_get_next(item)) {

Просмотреть файл

@ -131,7 +131,7 @@ typedef uint8_t mca_btl_base_tag_t;
/* reserved tag values */
#define MCA_BTL_TAG_BTL 0
#define MCA_BTL_TAG_PML 1
#define MCA_BTL_TAG_OSC_PT2PT 2
#define MCA_BTL_TAG_OSC_RDMA 2
#define MCA_BTL_TAG_USR 3
#define MCA_BTL_TAG_MAX 255 /* 1 + highest allowed tag num */

Просмотреть файл

@ -21,6 +21,8 @@ include $(top_ompi_srcdir)/config/Makefile.options
pt2pt_sources = \
osc_pt2pt.h \
osc_pt2pt.c \
osc_pt2pt_buffer.h \
osc_pt2pt_buffer.c \
osc_pt2pt_comm.c \
osc_pt2pt_component.c \
osc_pt2pt_data_move.h \

Просмотреть файл

@ -22,7 +22,6 @@
#include "opal/threads/mutex.h"
#include "ompi/win/win.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/btl/btl.h"
#include "mpi.h"
@ -47,6 +46,12 @@ ompi_osc_pt2pt_module_free(ompi_win_t *win)
module->p2p_comm->c_contextid);
/* only take the output of hast_table_remove if there wasn't already an error */
ret = (ret != OMPI_SUCCESS) ? ret : tmp;
if (0 == opal_hash_table_get_size(&mca_osc_pt2pt_component.p2p_c_modules)) {
/* stop progress thread */
opal_progress_unregister(ompi_osc_pt2pt_progress);
}
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
OBJ_DESTRUCT(&(module->p2p_locks_pending));
@ -64,6 +69,9 @@ ompi_osc_pt2pt_module_free(ompi_win_t *win)
OBJ_DESTRUCT(&(module->p2p_pending_sendreqs));
free(module->p2p_control_buffer);
OBJ_DESTRUCT(&(module->p2p_pending_control_sends));
ompi_comm_free(&(module->p2p_comm));
module->p2p_comm = NULL;

Просмотреть файл

@ -22,7 +22,6 @@
#include "opal/class/opal_hash_table.h"
#include "ompi/mca/osc/osc.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/win/win.h"
#include "ompi/communicator/communicator.h"
@ -33,13 +32,16 @@ struct ompi_osc_pt2pt_component_t {
/** store the state of progress threads for this instance of OMPI */
bool p2p_c_have_progress_threads;
/** lock access to datastructures in the component structure */
opal_mutex_t p2p_c_lock;
/** lock access to datastructures in the component structure */
opal_mutex_t p2p_c_lock;
/** List of ompi_osc_pt2pt_module_ts currently in existance.
Needed so that received fragments can be dispatched to the
correct module */
opal_hash_table_t p2p_c_modules;
/** List of ompi_osc_pt2pt_module_ts currently in existance.
Needed so that received fragments can be dispatched to the
correct module */
opal_hash_table_t p2p_c_modules;
/** max size of eager message */
size_t p2p_c_eager_size;
/** free list of ompi_osc_pt2pt_sendreq_t structures */
opal_free_list_t p2p_c_sendreqs;
@ -47,13 +49,15 @@ struct ompi_osc_pt2pt_component_t {
opal_free_list_t p2p_c_replyreqs;
/** free list of ompi_osc_pt2pt_longreq_t structures */
opal_free_list_t p2p_c_longreqs;
/** free list for eager / control meessages */
opal_free_list_t p2p_c_buffers;
};
typedef struct ompi_osc_pt2pt_component_t ompi_osc_pt2pt_component_t;
struct ompi_osc_pt2pt_module_t {
/** Extend the basic osc module interface */
ompi_osc_base_module_t super;
ompi_osc_base_module_t super;
/** lock access to data structures in the current module */
opal_mutex_t p2p_lock;
@ -67,6 +71,14 @@ struct ompi_osc_pt2pt_module_t {
/** communicator created with this window */
ompi_communicator_t *p2p_comm;
/** eager message / control message receive buffer */
void *p2p_control_buffer;
/** control message receive request */
struct ompi_request_t *p2p_cb_request;
opal_list_t p2p_pending_control_sends;
/** list of ompi_osc_pt2pt_sendreq_t structures, and includes all
requests for this access epoch that have not already been
started. p2p_lock must be held when modifying this field. */
@ -93,7 +105,7 @@ struct ompi_osc_pt2pt_module_t {
atomic counter operations. */
volatile int32_t p2p_num_pending_in;
/** cyclic counter for a unique tage for long messages. Not
/** cyclic counter for a unique tag for long messages. Not
protected by the p2p_lock - must use create_send_tag() to
create a send tag */
volatile int32_t p2p_tag_counter;
@ -105,8 +117,6 @@ struct ompi_osc_pt2pt_module_t {
opal_list_t p2p_copy_pending_sendreqs;
short *p2p_copy_num_pending_sendreqs;
bool p2p_eager_send;
/* ********************* FENCE data ************************ */
/* an array of <sizeof(p2p_comm)> ints, each containing the value
1. */
@ -115,8 +125,6 @@ struct ompi_osc_pt2pt_module_t {
with different synchronization costs */
short *p2p_fence_coll_results;
enum { OSC_SYNC_REDUCE_SCATTER, OSC_SYNC_ALLREDUCE, OSC_SYNC_ALLTOALL } p2p_fence_sync_type;
/* ********************* PWSC data ************************ */
struct ompi_group_t *p2p_pw_group;
@ -184,6 +192,7 @@ int ompi_osc_pt2pt_component_select(struct ompi_win_t *win,
struct ompi_info_t *info,
struct ompi_communicator_t *comm);
int ompi_osc_pt2pt_progress(void);
/*
* Module interface function types
@ -253,6 +262,7 @@ int ompi_osc_pt2pt_passive_unlock(ompi_osc_pt2pt_module_t *module,
int32_t origin,
int32_t count);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif

39
ompi/mca/osc/pt2pt/osc_pt2pt_buffer.c Обычный файл
Просмотреть файл

@ -0,0 +1,39 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2006 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "osc_pt2pt.h"
#include "osc_pt2pt_buffer.h"
#include "opal/class/opal_free_list.h"
static void ompi_osc_pt2pt_buffer_construct(ompi_osc_pt2pt_buffer_t *buf)
{
buf->payload = buf + 1;
}
static void ompi_osc_pt2pt_buffer_destruct(ompi_osc_pt2pt_buffer_t *buf)
{
buf->payload = NULL;
}
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_buffer_t, opal_free_list_item_t,
ompi_osc_pt2pt_buffer_construct,
ompi_osc_pt2pt_buffer_destruct);

39
ompi/mca/osc/pt2pt/osc_pt2pt_buffer.h Обычный файл
Просмотреть файл

@ -0,0 +1,39 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2006 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#ifndef OMPI_OSC_PT2PT_BUFFER_H
#define OMPI_OSC_PT2PT__BUFFERH
#include "opal/class/opal_free_list.h"
#include "ompi/request/request.h"
struct ompi_osc_pt2pt_buffer_t;
typedef void (*ompi_osc_pt2pt_buffer_completion_fn_t)(
struct ompi_osc_pt2pt_buffer_t *buffer);
struct ompi_osc_pt2pt_buffer_t {
opal_free_list_item_t super;
ompi_request_t *request;
ompi_osc_pt2pt_buffer_completion_fn_t cbfunc;
void *cbdata;
void *payload;
size_t len;
};
typedef struct ompi_osc_pt2pt_buffer_t ompi_osc_pt2pt_buffer_t;
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_buffer_t);
#endif

Просмотреть файл

@ -127,26 +127,8 @@ ompi_osc_pt2pt_module_get(void *origin_addr,
&sendreq);
if (OMPI_SUCCESS != ret) return ret;
/* if we're doing fence synchronization, try to actively send
right now */
if (P2P_MODULE(win)->p2p_eager_send &&
(OMPI_WIN_FENCE & ompi_win_get_mode(win))) {
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), 1);
ret = ompi_osc_pt2pt_sendreq_send(P2P_MODULE(win), sendreq);
if (OMPI_SUCCESS == ret) {
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
P2P_MODULE(win)->p2p_num_pending_sendreqs[sendreq->req_target_rank]++;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
} else {
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
}
} else {
/* enqueue sendreq */
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
}
/* enqueue sendreq */
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
return ret;
}
@ -185,26 +167,8 @@ ompi_osc_pt2pt_module_put(void *origin_addr, int origin_count,
&sendreq);
if (OMPI_SUCCESS != ret) return ret;
/* if we're doing fence synchronization, try to actively send
right now */
if (P2P_MODULE(win)->p2p_eager_send &&
(OMPI_WIN_FENCE & ompi_win_get_mode(win))) {
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), 1);
ret = ompi_osc_pt2pt_sendreq_send(P2P_MODULE(win), sendreq);
if (OMPI_SUCCESS == ret) {
OPAL_THREAD_LOCK(&(P2P_MODULE(win)->p2p_lock));
P2P_MODULE(win)->p2p_num_pending_sendreqs[sendreq->req_target_rank]++;
OPAL_THREAD_UNLOCK(&(P2P_MODULE(win)->p2p_lock));
} else {
OPAL_THREAD_ADD32(&(sendreq->req_module->p2p_num_pending_out), -1);
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
}
} else {
/* enqueue sendreq */
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
}
/* enqueue sendreq */
ret = enqueue_sendreq(P2P_MODULE(win), sendreq);
return ret;
}

Просмотреть файл

@ -24,14 +24,13 @@
#include "osc_pt2pt_header.h"
#include "osc_pt2pt_obj_convert.h"
#include "osc_pt2pt_data_move.h"
#include "osc_pt2pt_buffer.h"
#include "opal/threads/mutex.h"
#include "ompi/info/info.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/osc/osc.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/mca/bml/bml.h"
#include "ompi/mca/bml/base/base.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/datatype/dt_arch.h"
static int ompi_osc_pt2pt_component_open(void);
@ -41,9 +40,9 @@ ompi_osc_pt2pt_component_t mca_osc_pt2pt_component = {
{ /* ompi_base_component_t */
OMPI_OSC_BASE_VERSION_1_0_0,
"pt2pt",
1,
0,
0,
OMPI_MAJOR_VERSION, /* MCA component major version */
OMPI_MINOR_VERSION, /* MCA component minor version */
OMPI_RELEASE_VERSION, /* MCA component release version */
ompi_osc_pt2pt_component_open,
NULL
},
@ -79,12 +78,6 @@ ompi_osc_pt2pt_module_t ompi_osc_pt2pt_module_template = {
}
};
void ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t *descriptor,
void *cbdata);
/* look up parameters for configuring this window. The code first
looks in the info structure passed by the user, then through mca
parameters. */
@ -127,30 +120,23 @@ check_config_value_bool(char *key, ompi_info_t *info)
}
static int fence_sync_index;
static int
ompi_osc_pt2pt_component_open(void)
{
fence_sync_index =
mca_base_param_reg_string(&mca_osc_pt2pt_component.super.osc_version,
"fence_sync_method",
"How to synchronize fence: reduce_scatter, allreduce, alltoall",
false, false, "reduce_scatter", NULL);
int tmp;
mca_base_param_reg_int(&mca_osc_pt2pt_component.super.osc_version,
"eager_send",
"Attempt to start data movement during communication call, "
"instead of at synchrnoization time. "
"Info key of same name overrides this value, "
"if info key given.",
false, false, 0, NULL);
mca_base_param_reg_int(&mca_osc_pt2pt_component.super.osc_version,
"no_locks",
"Enable optimizations available only if MPI_LOCK is not used.",
false, false, 0, NULL);
mca_base_param_reg_int(&mca_osc_pt2pt_component.super.osc_version,
"no_locks",
"Enable optimizations available only if MPI_LOCK is not used.",
false, false, 0, NULL);
mca_base_param_reg_int(&mca_osc_pt2pt_component.super.osc_version,
"eager_limit",
"Max size of eagerly sent data",
false, false, 16 * 1024,
&tmp);
mca_osc_pt2pt_component.p2p_c_eager_size = tmp;
return OMPI_SUCCESS;
}
@ -189,6 +175,13 @@ ompi_osc_pt2pt_component_init(bool enable_progress_threads,
OBJ_CLASS(ompi_osc_pt2pt_longreq_t),
1, -1, 1);
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.p2p_c_buffers, opal_free_list_t);
opal_free_list_init(&mca_osc_pt2pt_component.p2p_c_buffers,
mca_osc_pt2pt_component.p2p_c_eager_size +
sizeof(ompi_osc_pt2pt_buffer_t),
OBJ_CLASS(ompi_osc_pt2pt_buffer_t),
1, -1, 1);
return OMPI_SUCCESS;
}
@ -198,16 +191,13 @@ ompi_osc_pt2pt_component_finalize(void)
{
size_t num_modules;
if (0 !=
if (0 !=
(num_modules = opal_hash_table_get_size(&mca_osc_pt2pt_component.p2p_c_modules))) {
opal_output(0, "WARNING: There were %d Windows created but not freed.",
num_modules);
opal_output(0, "WARNING: There were %d Windows created but not freed.",- num_modules);
opal_progress_unregister(ompi_osc_pt2pt_progress);
}
#if 0
mca_bml.bml_register(MCA_BTL_TAG_OSC_PT2PT, NULL, NULL);
#endif
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_buffers);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_longreqs);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_replyreqs);
OBJ_DESTRUCT(&mca_osc_pt2pt_component.p2p_c_sendreqs);
@ -224,7 +214,7 @@ ompi_osc_pt2pt_component_query(ompi_win_t *win,
ompi_communicator_t *comm)
{
/* we can always run - return a low priority */
return 10;
return 5;
}
@ -235,7 +225,6 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
{
ompi_osc_pt2pt_module_t *module;
int ret, i;
char *sync_string;
/* create module structure */
module = malloc(sizeof(ompi_osc_pt2pt_module_t));
@ -259,6 +248,20 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
return ret;
}
module->p2p_control_buffer = malloc(mca_osc_pt2pt_component.p2p_c_eager_size);
if (NULL == module->p2p_control_buffer) {
OBJ_DESTRUCT(&module->p2p_pending_sendreqs);
ompi_comm_free(&comm);
OBJ_DESTRUCT(&(module->p2p_acc_lock));
OBJ_DESTRUCT(&(module->p2p_lock));
free(module);
return OMPI_ERROR;
}
module->p2p_cb_request = NULL;
OBJ_CONSTRUCT(&module->p2p_pending_control_sends, opal_list_t);
OBJ_CONSTRUCT(&module->p2p_pending_sendreqs, opal_list_t);
module->p2p_num_pending_sendreqs = malloc(sizeof(short) *
ompi_comm_size(module->p2p_comm));
@ -296,8 +299,6 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
memset(module->p2p_num_pending_sendreqs, 0,
sizeof(short) * ompi_comm_size(module->p2p_comm));
module->p2p_eager_send = check_config_value_bool("eager_send", info);
/* fence data */
module->p2p_fence_coll_counts = malloc(sizeof(int) *
ompi_comm_size(module->p2p_comm));
@ -333,19 +334,6 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
return ret;
}
/* figure out what sync method to use */
mca_base_param_lookup_string(fence_sync_index, &sync_string);
if (0 == strcmp(sync_string, "reduce_scatter")) {
module->p2p_fence_sync_type = OSC_SYNC_REDUCE_SCATTER;
} else if (0 == strcmp(sync_string, "allreduce")) {
module->p2p_fence_sync_type = OSC_SYNC_ALLREDUCE;
} else if (0 == strcmp(sync_string, "alltoall")) {
module->p2p_fence_sync_type = OSC_SYNC_ALLTOALL;
} else {
opal_output(0, "invalid value for fence_sync_method parameter: %s\n", sync_string);
return OMPI_ERROR;
}
/* pwsc data */
module->p2p_pw_group = NULL;
module->p2p_sc_group = NULL;
@ -361,6 +349,11 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
opal_hash_table_set_value_uint32(&mca_osc_pt2pt_component.p2p_c_modules,
module->p2p_comm->c_contextid,
module);
if (1 == opal_hash_table_get_size(&mca_osc_pt2pt_component.p2p_c_modules)) {
/* start progress thread */
opal_progress_register(ompi_osc_pt2pt_progress);
}
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.p2p_c_lock);
/* fill in window information */
@ -372,47 +365,40 @@ ompi_osc_pt2pt_component_select(ompi_win_t *win,
/* sync memory - make sure all initialization completed */
opal_atomic_mb();
/* register to receive fragment callbacks */
ret = mca_bml.bml_register(MCA_BTL_TAG_OSC_PT2PT,
ompi_osc_pt2pt_component_fragment_cb,
NULL);
if (module->p2p_eager_send) {
/* need to barrier if eager sending or we can receive before the
other side has been fully setup, causing much gnashing of
teeth. */
module->p2p_comm->c_coll.coll_barrier(module->p2p_comm);
}
/* start up receive for protocol headers */
ret = MCA_PML_CALL(irecv(module->p2p_control_buffer,
mca_osc_pt2pt_component.p2p_c_eager_size,
MPI_BYTE,
MPI_ANY_SOURCE,
-200,
module->p2p_comm,
&module->p2p_cb_request));
return ret;
}
/* dispatch for callback on message completion */
void
ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t *descriptor,
void *cbdata)
static void
ompi_osc_pt2pt_component_fragment_cb(ompi_osc_pt2pt_module_t *module,
void *buffer,
size_t buffer_len)
{
int ret;
ompi_osc_pt2pt_module_t *module;
void *payload;
assert(descriptor->des_dst[0].seg_len >=
assert(buffer_len >=
sizeof(ompi_osc_pt2pt_base_header_t));
/* handle message */
switch (((ompi_osc_pt2pt_base_header_t*) descriptor->des_dst[0].seg_addr.pval)->hdr_type) {
switch (((ompi_osc_pt2pt_base_header_t*) buffer)->hdr_type) {
case OMPI_OSC_PT2PT_HDR_PUT:
{
ompi_osc_pt2pt_send_header_t *header;
/* get our header and payload */
header = (ompi_osc_pt2pt_send_header_t*)
descriptor->des_dst[0].seg_addr.pval;
buffer;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
@ -421,9 +407,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
}
#endif
/* get our module pointer */
module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx);
if (NULL == module) return;
assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
ret = ompi_osc_pt2pt_sendreq_recv_put(module, header, payload);
}
@ -435,7 +419,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
/* get our header and payload */
header = (ompi_osc_pt2pt_send_header_t*)
descriptor->des_dst[0].seg_addr.pval;
buffer;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
@ -444,9 +428,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
}
#endif
/* get our module pointer */
module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx);
if (NULL == module) return;
assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
/* receive into temporary buffer */
ret = ompi_osc_pt2pt_sendreq_recv_accum(module, header, payload);
@ -462,7 +444,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
/* get our header and payload */
header = (ompi_osc_pt2pt_send_header_t*)
descriptor->des_dst[0].seg_addr.pval;
buffer;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
@ -471,9 +453,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
}
#endif
/* get our module pointer */
module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx);
if (NULL == module) return;
assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
/* create or get a pointer to our datatype */
proc = module->p2p_comm->c_pml_procs[header->hdr_origin]->proc_ompi;
@ -503,7 +483,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
/* get our header and payload */
header = (ompi_osc_pt2pt_reply_header_t*)
descriptor->des_dst[0].seg_addr.pval;
buffer;
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
@ -524,7 +504,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
{
ompi_osc_pt2pt_control_header_t *header =
(ompi_osc_pt2pt_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
buffer;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
@ -532,9 +512,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
}
#endif
/* get our module pointer */
module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx);
if (NULL == module) return;
assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1);
}
@ -543,7 +521,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
{
ompi_osc_pt2pt_control_header_t *header =
(ompi_osc_pt2pt_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
buffer;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
@ -551,9 +529,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
}
#endif
/* get our module pointer */
module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx);
if (NULL == module) return;
assert(module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
/* we've heard from one more place, and have value reqs to
process */
@ -566,7 +542,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
{
ompi_osc_pt2pt_control_header_t *header =
(ompi_osc_pt2pt_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
buffer;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
@ -574,9 +550,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
}
#endif
/* get our module pointer */
module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx);
if (NULL == module) return;
assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
if (header->hdr_value[1] > 0) {
ompi_osc_pt2pt_passive_lock(module, header->hdr_value[0],
@ -591,7 +565,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
{
ompi_osc_pt2pt_control_header_t *header =
(ompi_osc_pt2pt_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
buffer;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
@ -599,9 +573,7 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
}
#endif
/* get our module pointer */
module = ompi_osc_pt2pt_windx_to_module(header->hdr_windx);
if (NULL == module) return;
assert(module == ompi_osc_pt2pt_windx_to_module(header->hdr_windx));
ompi_osc_pt2pt_passive_unlock(module, header->hdr_value[0],
header->hdr_value[1]);
@ -614,3 +586,90 @@ ompi_osc_pt2pt_component_fragment_cb(struct mca_btl_base_module_t *btl,
abort();
}
}
static int
ompi_osc_pt2pt_request_test(ompi_request_t ** rptr,
int *completed,
ompi_status_public_t * status )
{
ompi_request_t *request = *rptr;
int ret = OMPI_SUCCESS;
#if OMPI_ENABLE_PROGRESS_THREADS == 0
if (request->req_state == OMPI_REQUEST_INACTIVE ||
request->req_complete) {
ret = ompi_request_test(rptr, completed, status);
} else {
*completed = 0;
}
#else
ret = ompi_request_test(rptr, completed, status);
#endif
return ret;
}
int
ompi_osc_pt2pt_progress(void)
{
int ret, done, count = 0;
ompi_status_public_t status;
void *node;
uint32_t key;
ompi_osc_pt2pt_module_t *module;
opal_list_item_t *item;
ret = opal_hash_table_get_first_key_uint32(&mca_osc_pt2pt_component.p2p_c_modules,
&key,
(void**) &module,
&node);
if (OMPI_SUCCESS != ret) return 0;
do {
ret = ompi_osc_pt2pt_request_test(&module->p2p_cb_request, &done, &status);
if (OMPI_SUCCESS == ret && done) {
/* process message */
ompi_osc_pt2pt_component_fragment_cb(module,
module->p2p_control_buffer,
status._count);
/* repost receive */
ret = MCA_PML_CALL(irecv(module->p2p_control_buffer,
mca_osc_pt2pt_component.p2p_c_eager_size,
MPI_BYTE,
MPI_ANY_SOURCE,
-200,
module->p2p_comm,
&module->p2p_cb_request));
assert(OMPI_SUCCESS == ret);
count++;
}
/* loop through sends */
for (item = opal_list_get_first(&module->p2p_pending_control_sends) ;
item != opal_list_get_end(&module->p2p_pending_control_sends) ;
item = opal_list_get_next(item)) {
ompi_osc_pt2pt_buffer_t *buffer =
(ompi_osc_pt2pt_buffer_t*) item;
ret = ompi_osc_pt2pt_request_test(&buffer->request, &done, &status);
if (OMPI_SUCCESS == ret && done) {
item = opal_list_remove_item(&module->p2p_pending_control_sends,
item);
buffer->cbfunc(buffer);
}
}
} while (OMPI_SUCCESS ==
opal_hash_table_get_next_key_uint32(&mca_osc_pt2pt_component.p2p_c_modules,
&key,
(void**) &module,
node,
&node));
return count;
}

Просмотреть файл

@ -21,15 +21,15 @@
#include "osc_pt2pt_header.h"
#include "osc_pt2pt_data_move.h"
#include "osc_pt2pt_obj_convert.h"
#include "osc_pt2pt_buffer.h"
#include "opal/util/output.h"
#include "opal/sys/atomic.h"
#include "ompi/mca/bml/bml.h"
#include "ompi/mca/bml/base/base.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/datatype/datatype.h"
#include "ompi/datatype/dt_arch.h"
static inline int32_t
create_send_tag(ompi_osc_pt2pt_module_t *module)
{
@ -81,22 +81,12 @@ ompi_osc_pt2pt_sendreq_send_long_cb(ompi_osc_pt2pt_longreq_t *longreq)
static void
ompi_osc_pt2pt_sendreq_send_cb(struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t *endpoint,
struct mca_btl_base_descriptor_t* descriptor,
int status)
ompi_osc_pt2pt_sendreq_send_cb(ompi_osc_pt2pt_buffer_t *buffer)
{
ompi_osc_pt2pt_sendreq_t *sendreq =
(ompi_osc_pt2pt_sendreq_t*) descriptor->des_cbdata;
(ompi_osc_pt2pt_sendreq_t*) buffer->cbdata;
ompi_osc_pt2pt_send_header_t *header =
(ompi_osc_pt2pt_send_header_t*) descriptor->des_src[0].seg_addr.pval;
if (OMPI_SUCCESS != status) {
/* requeue and return */
/* BWB - FIX ME - figure out where to put this bad boy */
abort();
return;
}
(ompi_osc_pt2pt_send_header_t*) buffer->payload;
/* have to look at header, and not the sendreq because in the case
of get, it's possible that the sendreq has been freed already
@ -144,11 +134,9 @@ ompi_osc_pt2pt_sendreq_send_cb(struct mca_btl_base_module_t* btl,
}
}
/* release the descriptor and sendreq */
btl->btl_free(btl, descriptor);
/* any other sendreqs to restart? */
/* BWB - FIX ME - implement sending the next sendreq here */
/* release the buffer */
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,
&buffer->super);
}
@ -159,10 +147,9 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_sendreq_t *sendreq)
{
int ret = OMPI_SUCCESS;
mca_bml_base_endpoint_t *endpoint = NULL;
mca_bml_base_btl_t *bml_btl = NULL;
mca_btl_base_descriptor_t *descriptor = NULL;
opal_free_list_item_t *item;
ompi_osc_pt2pt_send_header_t *header = NULL;
ompi_osc_pt2pt_buffer_t *buffer = NULL;
size_t written_data = 0;
size_t needed_len = sizeof(ompi_osc_pt2pt_send_header_t);
const void *packed_ddt;
@ -174,30 +161,27 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
needed_len += sendreq->req_origin_bytes_packed;
}
/* Get a BTL so we have the eager limit */
endpoint = (mca_bml_base_endpoint_t*) sendreq->req_target_proc->proc_bml;
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager);
descriptor = bml_btl->btl_alloc(bml_btl->btl,
needed_len < bml_btl->btl_eager_limit ? needed_len :
bml_btl->btl_eager_limit);
if (NULL == descriptor) {
/* Get a buffer */
OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers,
item, ret);
if (NULL == item) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
buffer = (ompi_osc_pt2pt_buffer_t*) item;
/* verify at least enough space for header */
if (descriptor->des_src[0].seg_len < sizeof(ompi_osc_pt2pt_send_header_t)) {
if (mca_osc_pt2pt_component.p2p_c_eager_size < sizeof(ompi_osc_pt2pt_send_header_t)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
/* setup descriptor */
descriptor->des_cbfunc = ompi_osc_pt2pt_sendreq_send_cb;
descriptor->des_cbdata = (void*) sendreq;
descriptor->des_flags = MCA_BTL_DES_FLAGS_PRIORITY;
/* setup buffer */
buffer->cbfunc = ompi_osc_pt2pt_sendreq_send_cb;
buffer->cbdata = (void*) sendreq;
/* pack header */
header = (ompi_osc_pt2pt_send_header_t*) descriptor->des_src[0].seg_addr.pval;
header = (ompi_osc_pt2pt_send_header_t*) buffer->payload;
written_data += sizeof(ompi_osc_pt2pt_send_header_t);
header->hdr_base.hdr_flags = 0;
header->hdr_windx = sendreq->req_module->p2p_comm->c_contextid;
@ -231,13 +215,13 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
/* Set datatype id and / or pack datatype */
ret = ompi_ddt_get_pack_description(sendreq->req_target_datatype, &packed_ddt);
if (OMPI_SUCCESS != ret) goto cleanup;
memcpy((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data,
memcpy((unsigned char*) buffer->payload + written_data,
packed_ddt, packed_ddt_len);
written_data += packed_ddt_len;
if (OMPI_OSC_PT2PT_GET != sendreq->req_type) {
/* if sending data and it fits, pack payload */
if (descriptor->des_src[0].seg_len >=
if (mca_osc_pt2pt_component.p2p_c_eager_size >=
written_data + sendreq->req_origin_bytes_packed) {
struct iovec iov;
uint32_t iov_count = 1;
@ -245,7 +229,7 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
size_t max_data = sendreq->req_origin_bytes_packed;
iov.iov_len = max_data;
iov.iov_base = (void*) ((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data);
iov.iov_base = (void*) ((unsigned char*) buffer->payload + written_data);
ret = ompi_convertor_pack(&sendreq->req_origin_convertor, &iov, &iov_count,
&max_data, &free_after);
@ -256,7 +240,6 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
assert(max_data == sendreq->req_origin_bytes_packed);
written_data += max_data;
descriptor->des_src[0].seg_len = written_data;
header->hdr_msg_length = sendreq->req_origin_bytes_packed;
} else {
@ -264,10 +247,11 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
header->hdr_origin_tag = create_send_tag(module);
}
} else {
descriptor->des_src[0].seg_len = written_data;
header->hdr_msg_length = 0;
}
buffer->len = written_data;
#ifdef WORDS_BIGENDIAN
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
#elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT
@ -281,13 +265,23 @@ ompi_osc_pt2pt_sendreq_send(ompi_osc_pt2pt_module_t *module,
opal_output(-1, "%d sending sendreq to %d",
sendreq->req_module->p2p_comm->c_my_rank,
sendreq->req_target_rank);
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT);
ret = MCA_PML_CALL(isend(buffer->payload,
buffer->len,
MPI_BYTE,
sendreq->req_target_rank,
-200,
MCA_PML_BASE_SEND_STANDARD,
module->p2p_comm,
&buffer->request));
opal_list_append(&module->p2p_pending_control_sends,
&buffer->super.super);
goto done;
cleanup:
if (descriptor != NULL) {
mca_bml_base_free(bml_btl, descriptor);
if (item != NULL) {
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,
item);
}
done:
@ -317,22 +311,12 @@ ompi_osc_pt2pt_replyreq_send_long_cb(ompi_osc_pt2pt_longreq_t *longreq)
static void
ompi_osc_pt2pt_replyreq_send_cb(struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t *endpoint,
struct mca_btl_base_descriptor_t* descriptor,
int status)
ompi_osc_pt2pt_replyreq_send_cb(ompi_osc_pt2pt_buffer_t *buffer)
{
ompi_osc_pt2pt_replyreq_t *replyreq =
(ompi_osc_pt2pt_replyreq_t*) descriptor->des_cbdata;
(ompi_osc_pt2pt_replyreq_t*) buffer->cbdata;
ompi_osc_pt2pt_reply_header_t *header =
(ompi_osc_pt2pt_reply_header_t*) descriptor->des_src[0].seg_addr.pval;
if (OMPI_SUCCESS != status) {
/* requeue and return */
/* BWB - FIX ME - figure out where to put this bad boy */
abort();
return;
}
(ompi_osc_pt2pt_reply_header_t*) buffer->payload;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
@ -369,9 +353,8 @@ ompi_osc_pt2pt_replyreq_send_cb(struct mca_btl_base_module_t* btl,
}
/* release the descriptor and replyreq */
btl->btl_free(btl, descriptor);
/* any other replyreqs to restart? */
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,
&buffer->super);
}
@ -380,35 +363,32 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
ompi_osc_pt2pt_replyreq_t *replyreq)
{
int ret = OMPI_SUCCESS;
mca_bml_base_endpoint_t *endpoint = NULL;
mca_bml_base_btl_t *bml_btl = NULL;
mca_btl_base_descriptor_t *descriptor = NULL;
opal_free_list_item_t *item;
ompi_osc_pt2pt_buffer_t *buffer = NULL;
ompi_osc_pt2pt_reply_header_t *header = NULL;
size_t written_data = 0;
/* Get a BTL and a fragment to go with it */
endpoint = (mca_bml_base_endpoint_t*) replyreq->rep_origin_proc->proc_bml;
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager);
descriptor = bml_btl->btl_alloc(bml_btl->btl,
bml_btl->btl_eager_limit);
if (NULL == descriptor) {
/* Get a buffer */
OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers,
item, ret);
if (NULL == item) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
buffer = (ompi_osc_pt2pt_buffer_t*) item;
/* verify at least enough space for header */
if (descriptor->des_src[0].seg_len < sizeof(ompi_osc_pt2pt_reply_header_t)) {
if (mca_osc_pt2pt_component.p2p_c_eager_size < sizeof(ompi_osc_pt2pt_reply_header_t)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
/* setup descriptor */
descriptor->des_cbfunc = ompi_osc_pt2pt_replyreq_send_cb;
descriptor->des_cbdata = (void*) replyreq;
descriptor->des_flags = MCA_BTL_DES_FLAGS_PRIORITY;
/* setup buffer */
buffer->cbfunc = ompi_osc_pt2pt_replyreq_send_cb;
buffer->cbdata = (void*) replyreq;
/* pack header */
header = (ompi_osc_pt2pt_reply_header_t*) descriptor->des_src[0].seg_addr.pval;
header = (ompi_osc_pt2pt_reply_header_t*) buffer->payload;
written_data += sizeof(ompi_osc_pt2pt_reply_header_t);
header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_REPLY;
header->hdr_base.hdr_flags = 0;
@ -416,7 +396,7 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
header->hdr_target_tag = 0;
/* if sending data fits, pack payload */
if (descriptor->des_src[0].seg_len >=
if (mca_osc_pt2pt_component.p2p_c_eager_size >=
written_data + replyreq->rep_target_bytes_packed) {
struct iovec iov;
uint32_t iov_count = 1;
@ -424,7 +404,7 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
size_t max_data = replyreq->rep_target_bytes_packed;
iov.iov_len = max_data;
iov.iov_base = (void*) ((unsigned char*) descriptor->des_src[0].seg_addr.pval + written_data);
iov.iov_base = (void*) ((unsigned char*) buffer->payload + written_data);
ret = ompi_convertor_pack(&replyreq->rep_target_convertor, &iov, &iov_count,
&max_data, &free_after);
@ -435,7 +415,6 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
assert(max_data == replyreq->rep_target_bytes_packed);
written_data += max_data;
descriptor->des_src[0].seg_len = written_data;
header->hdr_msg_length = replyreq->rep_target_bytes_packed;
} else {
@ -443,6 +422,8 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
header->hdr_target_tag = create_send_tag(module);
}
buffer->len = written_data;
#ifdef WORDS_BIGENDIAN
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
#elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT
@ -453,12 +434,23 @@ ompi_osc_pt2pt_replyreq_send(ompi_osc_pt2pt_module_t *module,
#endif
/* send fragment */
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT);
ret = MCA_PML_CALL(isend(buffer->payload,
buffer->len,
MPI_BYTE,
replyreq->rep_origin_rank,
-200,
MCA_PML_BASE_SEND_STANDARD,
module->p2p_comm,
&buffer->request));
opal_list_append(&module->p2p_pending_control_sends,
&buffer->super.super);
goto done;
cleanup:
if (descriptor != NULL) {
mca_bml_base_free(bml_btl, descriptor);
if (item != NULL) {
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,
item);
}
done:
@ -769,13 +761,11 @@ ompi_osc_pt2pt_replyreq_recv(ompi_osc_pt2pt_module_t *module,
*
**********************************************************************/
static void
ompi_osc_pt2pt_control_send_cb(struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t *endpoint,
struct mca_btl_base_descriptor_t* descriptor,
int status)
ompi_osc_pt2pt_control_send_cb(ompi_osc_pt2pt_buffer_t *buffer)
{
/* release the descriptor and sendreq */
btl->btl_free(btl, descriptor);
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,
&buffer->super);
}
@ -785,35 +775,40 @@ ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module,
uint8_t type, int32_t value0, int32_t value1)
{
int ret = OMPI_SUCCESS;
mca_bml_base_endpoint_t *endpoint = NULL;
mca_bml_base_btl_t *bml_btl = NULL;
mca_btl_base_descriptor_t *descriptor = NULL;
opal_free_list_item_t *item;
ompi_osc_pt2pt_buffer_t *buffer = NULL;
ompi_osc_pt2pt_control_header_t *header = NULL;
/* Get a BTL and a fragment to go with it */
endpoint = (mca_bml_base_endpoint_t*) proc->proc_bml;
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager);
descriptor = bml_btl->btl_alloc(bml_btl->btl,
sizeof(ompi_osc_pt2pt_control_header_t));
if (NULL == descriptor) {
int rank, i;
/* find the rank */
for (i = 0 ; i < module->p2p_comm->c_remote_group->grp_proc_count ; ++i) {
if (proc == module->p2p_comm->c_remote_group->grp_proc_pointers[i]) {
rank = i;
}
}
/* Get a buffer */
OPAL_FREE_LIST_GET(&mca_osc_pt2pt_component.p2p_c_buffers,
item, ret);
if (NULL == item) {
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
}
buffer = (ompi_osc_pt2pt_buffer_t*) item;
/* verify at least enough space for header */
if (descriptor->des_src[0].seg_len < sizeof(ompi_osc_pt2pt_control_header_t)) {
if (mca_osc_pt2pt_component.p2p_c_eager_size < sizeof(ompi_osc_pt2pt_control_header_t)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
/* setup descriptor */
descriptor->des_cbfunc = ompi_osc_pt2pt_control_send_cb;
descriptor->des_cbdata = NULL;
descriptor->des_flags = MCA_BTL_DES_FLAGS_PRIORITY;
descriptor->des_src[0].seg_len = sizeof(ompi_osc_pt2pt_control_header_t);
/* setup buffer */
buffer->cbfunc = ompi_osc_pt2pt_control_send_cb;
buffer->cbdata = NULL;
buffer->len = sizeof(ompi_osc_pt2pt_control_header_t);
/* pack header */
header = (ompi_osc_pt2pt_control_header_t*) descriptor->des_src[0].seg_addr.pval;
header = (ompi_osc_pt2pt_control_header_t*) buffer->payload;
header->hdr_base.hdr_type = type;
header->hdr_value[0] = value0;
header->hdr_value[1] = value1;
@ -829,12 +824,22 @@ ompi_osc_pt2pt_control_send(ompi_osc_pt2pt_module_t *module,
#endif
/* send fragment */
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT);
ret = MCA_PML_CALL(isend(buffer->payload,
buffer->len,
MPI_BYTE,
rank,
-200,
MCA_PML_BASE_SEND_STANDARD,
module->p2p_comm,
&buffer->request));
opal_list_append(&module->p2p_pending_control_sends,
&buffer->super.super);
goto done;
cleanup:
if (descriptor != NULL) {
mca_bml_base_free(bml_btl, descriptor);
if (item != NULL) {
OPAL_FREE_LIST_RETURN(&mca_osc_pt2pt_component.p2p_c_buffers,
item);
}
done:

Просмотреть файл

@ -30,7 +30,7 @@
/* should have p2p_lock before calling */
static inline void
ompi_osc_pt2pt_progress(ompi_osc_pt2pt_module_t *module)
ompi_osc_pt2pt_progress_long(ompi_osc_pt2pt_module_t *module)
{
if (0 != opal_list_get_size(&(module->p2p_long_msgs))) {
opal_list_item_t *item, *next;
@ -109,50 +109,16 @@ ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win)
ompi_osc_pt2pt_flip_sendreqs(P2P_MODULE(win));
switch (P2P_MODULE(win)->p2p_fence_sync_type) {
/* find out how much data everyone is going to send us. Need
to have the lock during this period so that we have a sane
view of the number of sendreqs */
case OSC_SYNC_REDUCE_SCATTER:
ret = P2P_MODULE(win)->p2p_comm->
c_coll.coll_reduce_scatter(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs,
&incoming_reqs,
P2P_MODULE(win)->p2p_fence_coll_counts,
MPI_SHORT,
MPI_SUM,
P2P_MODULE(win)->p2p_comm);
break;
case OSC_SYNC_ALLREDUCE:
ret = P2P_MODULE(win)->p2p_comm->
c_coll.coll_allreduce(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs,
P2P_MODULE(win)->p2p_fence_coll_results,
ompi_comm_size(P2P_MODULE(win)->p2p_comm),
MPI_SHORT,
MPI_SUM,
P2P_MODULE(win)->p2p_comm);
incoming_reqs = P2P_MODULE(win)->
p2p_fence_coll_results[P2P_MODULE(win)->p2p_comm->c_my_rank];
break;
case OSC_SYNC_ALLTOALL:
ret = P2P_MODULE(win)->p2p_comm->
c_coll.coll_alltoall(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs,
1,
MPI_SHORT,
P2P_MODULE(win)->p2p_fence_coll_results,
1,
MPI_SHORT,
P2P_MODULE(win)->p2p_comm);
incoming_reqs = 0;
for (i = 0 ; i < ompi_comm_size(P2P_MODULE(win)->p2p_comm) ; ++i) {
incoming_reqs += P2P_MODULE(win)->p2p_fence_coll_results[i];
}
break;
default:
assert(0 == 1);
}
ret = P2P_MODULE(win)->p2p_comm->
c_coll.coll_reduce_scatter(P2P_MODULE(win)->p2p_copy_num_pending_sendreqs,
&incoming_reqs,
P2P_MODULE(win)->p2p_fence_coll_counts,
MPI_SHORT,
MPI_SUM,
P2P_MODULE(win)->p2p_comm);
if (OMPI_SUCCESS != ret) {
/* put the stupid data back for the user. This is not
@ -201,7 +167,7 @@ ompi_osc_pt2pt_module_fence(int assert, ompi_win_t *win)
/* now we know how many things we're waiting for - wait for them... */
while (P2P_MODULE(win)->p2p_num_pending_in > 0 ||
0 != P2P_MODULE(win)->p2p_num_pending_out) {
ompi_osc_pt2pt_progress(P2P_MODULE(win));
ompi_osc_pt2pt_progress_long(P2P_MODULE(win));
}
}
@ -255,7 +221,7 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win)
/* wait for all the post messages */
while (0 != P2P_MODULE(win)->p2p_num_pending_in) {
ompi_osc_pt2pt_progress(P2P_MODULE(win));
ompi_osc_pt2pt_progress_long(P2P_MODULE(win));
}
ompi_osc_pt2pt_flip_sendreqs(P2P_MODULE(win));
@ -311,7 +277,7 @@ ompi_osc_pt2pt_module_complete(ompi_win_t *win)
/* wait for all the requests */
while (0 != P2P_MODULE(win)->p2p_num_pending_out) {
ompi_osc_pt2pt_progress(P2P_MODULE(win));
ompi_osc_pt2pt_progress_long(P2P_MODULE(win));
}
cleanup:
@ -374,7 +340,7 @@ ompi_osc_pt2pt_module_wait(ompi_win_t *win)
while (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||
0 != (P2P_MODULE(win)->p2p_num_pending_out)) {
ompi_osc_pt2pt_progress(P2P_MODULE(win));
ompi_osc_pt2pt_progress_long(P2P_MODULE(win));
}
ompi_win_set_mode(win, 0);
@ -400,7 +366,7 @@ ompi_osc_pt2pt_module_test(ompi_win_t *win,
if (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||
0 != (P2P_MODULE(win)->p2p_num_pending_out)) {
ompi_osc_pt2pt_progress(P2P_MODULE(win));
ompi_osc_pt2pt_progress_long(P2P_MODULE(win));
if (0 != (P2P_MODULE(win)->p2p_num_pending_in) ||
0 != (P2P_MODULE(win)->p2p_num_pending_out)) {
*flag = 0;
@ -472,7 +438,7 @@ ompi_osc_pt2pt_module_unlock(int target,
ompi_proc_t *proc = P2P_MODULE(win)->p2p_comm->c_pml_procs[target]->proc_ompi;
while (0 == P2P_MODULE(win)->p2p_lock_received_ack) {
ompi_osc_pt2pt_progress(P2P_MODULE(win));
ompi_osc_pt2pt_progress_long(P2P_MODULE(win));
}
P2P_MODULE(win)->p2p_lock_received_ack = 0;
@ -501,7 +467,7 @@ ompi_osc_pt2pt_module_unlock(int target,
/* wait for all the requests */
while (0 != P2P_MODULE(win)->p2p_num_pending_out) {
ompi_osc_pt2pt_progress(P2P_MODULE(win));
ompi_osc_pt2pt_progress_long(P2P_MODULE(win));
}
/* send the unlock request */
@ -586,7 +552,7 @@ ompi_osc_pt2pt_passive_unlock(ompi_osc_pt2pt_module_t *module,
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), count);
while (0 != module->p2p_num_pending_in) {
ompi_osc_pt2pt_progress(module);
ompi_osc_pt2pt_progress_long(module);
}
OPAL_THREAD_LOCK(&(module->p2p_lock));

Просмотреть файл

@ -14,8 +14,8 @@
* $HEADER$
*/
#ifndef OMPI_OSC_PT2PT_H
#define OMPI_OSC_PT2PT_H
#ifndef OMPI_OSC_RDMA_H
#define OMPI_OSC_RDMA_H
#include "opal/class/opal_list.h"
#include "opal/class/opal_free_list.h"
@ -257,4 +257,4 @@ int ompi_osc_rdma_passive_unlock(ompi_osc_rdma_module_t *module,
}
#endif
#endif /* OMPI_OSC_PT2PT_H */
#endif /* OMPI_OSC_RDMA_H */

Просмотреть файл

@ -69,7 +69,7 @@ ompi_osc_rdma_module_accumulate(void *origin_addr, int origin_count,
}
/* create sendreq */
ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_PT2PT_ACC,
ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_RDMA_ACC,
origin_addr,
origin_count,
origin_dt,
@ -115,7 +115,7 @@ ompi_osc_rdma_module_get(void *origin_addr,
}
/* create sendreq */
ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_PT2PT_GET,
ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_RDMA_GET,
origin_addr,
origin_count,
origin_dt,
@ -173,7 +173,7 @@ ompi_osc_rdma_module_put(void *origin_addr, int origin_count,
}
/* create sendreq */
ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_PT2PT_PUT,
ret = ompi_osc_rdma_sendreq_alloc_init(OMPI_OSC_RDMA_PUT,
origin_addr,
origin_count,
origin_dt,

Просмотреть файл

@ -40,10 +40,10 @@ ompi_osc_rdma_component_t mca_osc_rdma_component = {
{ /* ompi_osc_base_component_t */
{ /* ompi_base_component_t */
OMPI_OSC_BASE_VERSION_1_0_0,
"pt2pt",
1,
0,
0,
"rdma",
OMPI_MAJOR_VERSION, /* MCA component major version */
OMPI_MINOR_VERSION, /* MCA component minor version */
OMPI_RELEASE_VERSION, /* MCA component release version */
ompi_osc_rdma_component_open,
NULL
},
@ -115,7 +115,7 @@ check_config_value_bool(char *key, ompi_info_t *info)
return result;
info_not_found:
param = mca_base_param_find("osc", "pt2pt", key);
param = mca_base_param_find("osc", "rdma", key);
if (param == OPAL_ERROR) return false;
ret = mca_base_param_lookup_int(param, &flag);
@ -139,18 +139,18 @@ ompi_osc_rdma_component_open(void)
"How to synchronize fence: reduce_scatter, allreduce, alltoall",
false, false, "reduce_scatter", NULL);
mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version,
"eager_send",
"Attempt to start data movement during communication call, "
"instead of at synchrnoization time. "
"Info key of same name overrides this value, "
"if info key given.",
false, false, 0, NULL);
mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version,
"eager_send",
"Attempt to start data movement during communication call, "
"instead of at synchrnoization time. "
"Info key of same name overrides this value, "
"if info key given.",
false, false, 0, NULL);
mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version,
"no_locks",
"Enable optimizations available only if MPI_LOCK is not used.",
false, false, 0, NULL);
mca_base_param_reg_int(&mca_osc_rdma_component.super.osc_version,
"no_locks",
"Enable optimizations available only if MPI_LOCK is not used.",
false, false, 0, NULL);
return OMPI_SUCCESS;
}
@ -160,6 +160,8 @@ int
ompi_osc_rdma_component_init(bool enable_progress_threads,
bool enable_mpi_threads)
{
if (!mca_bml_base_inited()) return OMPI_ERROR;
/* we can run with either threads or not threads (may not be able
to do win locks)... */
mca_osc_rdma_component.p2p_c_have_progress_threads =
@ -204,9 +206,7 @@ ompi_osc_rdma_component_finalize(void)
num_modules);
}
#if 0
mca_bml.bml_register(MCA_BTL_TAG_OSC_PT2PT, NULL, NULL);
#endif
mca_bml.bml_register(MCA_BTL_TAG_OSC_RDMA, NULL, NULL);
OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_longreqs);
OBJ_DESTRUCT(&mca_osc_rdma_component.p2p_c_replyreqs);
@ -223,7 +223,10 @@ ompi_osc_rdma_component_query(ompi_win_t *win,
ompi_info_t *info,
ompi_communicator_t *comm)
{
/* we can always run - return a low priority */
/* if we inited, then the BMLs are available and we have a path to
each peer. Return slightly higher priority than the
point-to-point code */
return 10;
}
@ -373,7 +376,7 @@ ompi_osc_rdma_component_select(ompi_win_t *win,
opal_atomic_mb();
/* register to receive fragment callbacks */
ret = mca_bml.bml_register(MCA_BTL_TAG_OSC_PT2PT,
ret = mca_bml.bml_register(MCA_BTL_TAG_OSC_RDMA,
ompi_osc_rdma_component_fragment_cb,
NULL);
@ -406,7 +409,7 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
/* handle message */
switch (((ompi_osc_rdma_base_header_t*) descriptor->des_dst[0].seg_addr.pval)->hdr_type) {
case OMPI_OSC_PT2PT_HDR_PUT:
case OMPI_OSC_RDMA_HDR_PUT:
{
ompi_osc_rdma_send_header_t *header;
@ -416,8 +419,8 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header);
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_SEND_HDR_NTOH(*header);
}
#endif
@ -429,7 +432,7 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
}
break;
case OMPI_OSC_PT2PT_HDR_ACC:
case OMPI_OSC_RDMA_HDR_ACC:
{
ompi_osc_rdma_send_header_t *header;
@ -439,8 +442,8 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header);
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_SEND_HDR_NTOH(*header);
}
#endif
@ -453,7 +456,7 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
}
break;
case OMPI_OSC_PT2PT_HDR_GET:
case OMPI_OSC_RDMA_HDR_GET:
{
ompi_datatype_t *datatype;
ompi_osc_rdma_send_header_t *header;
@ -466,8 +469,8 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header);
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_SEND_HDR_NTOH(*header);
}
#endif
@ -496,7 +499,7 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
}
break;
case OMPI_OSC_PT2PT_HDR_REPLY:
case OMPI_OSC_RDMA_HDR_REPLY:
{
ompi_osc_rdma_reply_header_t *header;
ompi_osc_rdma_sendreq_t *sendreq;
@ -507,8 +510,8 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
payload = (void*) (header + 1);
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_REPLY_HDR_NTOH(*header);
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_REPLY_HDR_NTOH(*header);
}
#endif
@ -520,15 +523,15 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
ompi_osc_rdma_replyreq_recv(module, sendreq, header, payload);
}
break;
case OMPI_OSC_PT2PT_HDR_POST:
case OMPI_OSC_RDMA_HDR_POST:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
#endif
@ -539,15 +542,15 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
OPAL_THREAD_ADD32(&(module->p2p_num_pending_in), -1);
}
break;
case OMPI_OSC_PT2PT_HDR_COMPLETE:
case OMPI_OSC_RDMA_HDR_COMPLETE:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
#endif
@ -562,15 +565,15 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
}
break;
case OMPI_OSC_PT2PT_HDR_LOCK_REQ:
case OMPI_OSC_RDMA_HDR_LOCK_REQ:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
#endif
@ -587,15 +590,15 @@ ompi_osc_rdma_component_fragment_cb(struct mca_btl_base_module_t *btl,
}
break;
case OMPI_OSC_PT2PT_HDR_UNLOCK_REQ:
case OMPI_OSC_RDMA_HDR_UNLOCK_REQ:
{
ompi_osc_rdma_control_header_t *header =
(ompi_osc_rdma_control_header_t*)
descriptor->des_dst[0].seg_addr.pval;
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(*header);
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_CONTROL_HDR_NTOH(*header);
}
#endif

Просмотреть файл

@ -105,10 +105,10 @@ ompi_osc_rdma_sendreq_send_cb(struct mca_btl_base_module_t* btl,
wait for this completion before exiting a synchronization point
in the case of get, as we really don't care when it completes -
only when the data arrives. */
if (OMPI_OSC_PT2PT_HDR_GET != header->hdr_base.hdr_type) {
if (OMPI_OSC_RDMA_HDR_GET != header->hdr_base.hdr_type) {
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_SEND_HDR_NTOH(*header);
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_SEND_HDR_NTOH(*header);
}
#endif
/* do we need to post a send? */
@ -170,7 +170,7 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
/* we always need to send the ddt */
needed_len += packed_ddt_len;
if (OMPI_OSC_PT2PT_GET != sendreq->req_type) {
if (OMPI_OSC_RDMA_GET != sendreq->req_type) {
needed_len += sendreq->req_origin_bytes_packed;
}
@ -208,20 +208,20 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
header->hdr_target_count = sendreq->req_target_count;
switch (sendreq->req_type) {
case OMPI_OSC_PT2PT_PUT:
header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_PUT;
case OMPI_OSC_RDMA_PUT:
header->hdr_base.hdr_type = OMPI_OSC_RDMA_HDR_PUT;
#if OMPI_ENABLE_MEM_DEBUG
header->hdr_target_op = 0;
#endif
break;
case OMPI_OSC_PT2PT_ACC:
header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_ACC;
case OMPI_OSC_RDMA_ACC:
header->hdr_base.hdr_type = OMPI_OSC_RDMA_HDR_ACC;
header->hdr_target_op = sendreq->req_op_id;
break;
case OMPI_OSC_PT2PT_GET:
header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_GET;
case OMPI_OSC_RDMA_GET:
header->hdr_base.hdr_type = OMPI_OSC_RDMA_HDR_GET;
#if OMPI_ENABLE_MEM_DEBUG
header->hdr_target_op = 0;
#endif
@ -235,7 +235,7 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
packed_ddt, packed_ddt_len);
written_data += packed_ddt_len;
if (OMPI_OSC_PT2PT_GET != sendreq->req_type) {
if (OMPI_OSC_RDMA_GET != sendreq->req_type) {
/* if sending data and it fits, pack payload */
if (descriptor->des_src[0].seg_len >=
written_data + sendreq->req_origin_bytes_packed) {
@ -269,11 +269,11 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
}
#ifdef WORDS_BIGENDIAN
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO;
#elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (sendreq->req_target_proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) {
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
OMPI_OSC_PT2PT_SEND_HDR_HTON(*header);
header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO;
OMPI_OSC_RDMA_SEND_HDR_HTON(*header);
}
#endif
@ -282,7 +282,7 @@ ompi_osc_rdma_sendreq_send(ompi_osc_rdma_module_t *module,
sendreq->req_module->p2p_comm->c_my_rank,
sendreq->req_target_rank);
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT);
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_RDMA);
goto done;
cleanup:
@ -335,8 +335,8 @@ ompi_osc_rdma_replyreq_send_cb(struct mca_btl_base_module_t* btl,
}
#if !defined(WORDS_BIGENDIAN) && OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (header->hdr_base.hdr_flags & OMPI_OSC_PT2PT_HDR_FLAG_NBO) {
OMPI_OSC_PT2PT_REPLY_HDR_NTOH(*header);
if (header->hdr_base.hdr_flags & OMPI_OSC_RDMA_HDR_FLAG_NBO) {
OMPI_OSC_RDMA_REPLY_HDR_NTOH(*header);
}
#endif
@ -410,7 +410,7 @@ ompi_osc_rdma_replyreq_send(ompi_osc_rdma_module_t *module,
/* pack header */
header = (ompi_osc_rdma_reply_header_t*) descriptor->des_src[0].seg_addr.pval;
written_data += sizeof(ompi_osc_rdma_reply_header_t);
header->hdr_base.hdr_type = OMPI_OSC_PT2PT_HDR_REPLY;
header->hdr_base.hdr_type = OMPI_OSC_RDMA_HDR_REPLY;
header->hdr_base.hdr_flags = 0;
header->hdr_origin_sendreq = replyreq->rep_origin_sendreq;
header->hdr_target_tag = 0;
@ -444,16 +444,16 @@ ompi_osc_rdma_replyreq_send(ompi_osc_rdma_module_t *module,
}
#ifdef WORDS_BIGENDIAN
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO;
#elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (replyreq->rep_origin_proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) {
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
OMPI_OSC_PT2PT_REPLY_HDR_HTON(*header);
header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO;
OMPI_OSC_RDMA_REPLY_HDR_HTON(*header);
}
#endif
/* send fragment */
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT);
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_RDMA);
goto done;
cleanup:
@ -820,16 +820,16 @@ ompi_osc_rdma_control_send(ompi_osc_rdma_module_t *module,
header->hdr_windx = module->p2p_comm->c_contextid;
#ifdef WORDS_BIGENDIAN
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO;
#elif OMPI_ENABLE_HETEROGENEOUS_SUPPORT
if (proc->proc_arch & OMPI_ARCH_ISBIGENDIAN) {
header->hdr_base.hdr_flags |= OMPI_OSC_PT2PT_HDR_FLAG_NBO;
OMPI_OSC_PT2PT_CONTROL_HDR_HTON(*header);
header->hdr_base.hdr_flags |= OMPI_OSC_RDMA_HDR_FLAG_NBO;
OMPI_OSC_RDMA_CONTROL_HDR_HTON(*header);
}
#endif
/* send fragment */
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_PT2PT);
ret = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_OSC_RDMA);
goto done;
cleanup:

Просмотреть файл

@ -14,8 +14,8 @@
* $HEADER$
*/
#ifndef OMPI_MCA_OSC_PT2PT_DATA_MOVE_H
#define OMPI_MCA_OSC_PT2PT_DATA_MOVE_H
#ifndef OMPI_MCA_OSC_RDMA_DATA_MOVE_H
#define OMPI_MCA_OSC_RDMA_DATA_MOVE_H
#include "osc_rdma_sendreq.h"
#include "osc_rdma_replyreq.h"

Просмотреть файл

@ -14,8 +14,8 @@
* $HEADER$
*/
#ifndef OMPI_MCA_OSC_PT2PT_HDR_H
#define OMPI_MCA_OSC_PT2PT_HDR_H
#ifndef OMPI_MCA_OSC_RDMA_HDR_H
#define OMPI_MCA_OSC_RDMA_HDR_H
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
@ -23,16 +23,16 @@
#include "opal/types.h"
#define OMPI_OSC_PT2PT_HDR_PUT 0x0001
#define OMPI_OSC_PT2PT_HDR_ACC 0x0002
#define OMPI_OSC_PT2PT_HDR_GET 0x0004
#define OMPI_OSC_PT2PT_HDR_REPLY 0x0008
#define OMPI_OSC_PT2PT_HDR_POST 0x0010
#define OMPI_OSC_PT2PT_HDR_COMPLETE 0x0020
#define OMPI_OSC_PT2PT_HDR_LOCK_REQ 0x0040
#define OMPI_OSC_PT2PT_HDR_UNLOCK_REQ 0x0080
#define OMPI_OSC_RDMA_HDR_PUT 0x0001
#define OMPI_OSC_RDMA_HDR_ACC 0x0002
#define OMPI_OSC_RDMA_HDR_GET 0x0004
#define OMPI_OSC_RDMA_HDR_REPLY 0x0008
#define OMPI_OSC_RDMA_HDR_POST 0x0010
#define OMPI_OSC_RDMA_HDR_COMPLETE 0x0020
#define OMPI_OSC_RDMA_HDR_LOCK_REQ 0x0040
#define OMPI_OSC_RDMA_HDR_UNLOCK_REQ 0x0080
#define OMPI_OSC_PT2PT_HDR_FLAG_NBO 0x0001
#define OMPI_OSC_RDMA_HDR_FLAG_NBO 0x0001
struct ompi_osc_rdma_base_header_t {
uint8_t hdr_type;
@ -41,8 +41,8 @@ struct ompi_osc_rdma_base_header_t {
};
typedef struct ompi_osc_rdma_base_header_t ompi_osc_rdma_base_header_t;
#define OMPI_OSC_PT2PT_BASE_HDR_NTOH(h)
#define OMPI_OSC_PT2PT_BASE_HDR_HTON(h)
#define OMPI_OSC_RDMA_BASE_HDR_NTOH(h)
#define OMPI_OSC_RDMA_BASE_HDR_HTON(h)
struct ompi_osc_rdma_send_header_t {
ompi_osc_rdma_base_header_t hdr_base;
@ -60,9 +60,9 @@ struct ompi_osc_rdma_send_header_t {
};
typedef struct ompi_osc_rdma_send_header_t ompi_osc_rdma_send_header_t;
#define OMPI_OSC_PT2PT_SEND_HDR_HTON(hdr) \
#define OMPI_OSC_RDMA_SEND_HDR_HTON(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \
OMPI_OSC_RDMA_BASE_HDR_HTON((hdr).hdr_base) \
(hdr).hdr_windx = htons((hdr).hdr_windx); \
(hdr).hdr_origin = htonl((hdr).hdr_origin); \
(hdr).hdr_origin_tag = htonl((hdr).hdr_origin_tag); \
@ -72,9 +72,9 @@ typedef struct ompi_osc_rdma_send_header_t ompi_osc_rdma_send_header_t;
(hdr).hdr_msg_length = htonl((hdr).hdr_msg_length); \
} while (0)
#define OMPI_OSC_PT2PT_SEND_HDR_NTOH(hdr) \
#define OMPI_OSC_RDMA_SEND_HDR_NTOH(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \
OMPI_OSC_RDMA_BASE_HDR_NTOH((hdr).hdr_base) \
(hdr).hdr_windx = ntohs((hdr).hdr_windx); \
(hdr).hdr_origin = ntohl((hdr).hdr_origin); \
(hdr).hdr_origin_tag = ntohl((hdr).hdr_origin_tag); \
@ -95,16 +95,16 @@ struct ompi_osc_rdma_reply_header_t {
};
typedef struct ompi_osc_rdma_reply_header_t ompi_osc_rdma_reply_header_t;
#define OMPI_OSC_PT2PT_REPLY_HDR_HTON(hdr) \
#define OMPI_OSC_RDMA_REPLY_HDR_HTON(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \
OMPI_OSC_RDMA_BASE_HDR_HTON((hdr).hdr_base) \
(hdr).hdr_target_tag = htonl((hdr).hdr_target_tag); \
(hdr).hdr_msg_length = htonl((hdr).hdr_msg_length); \
} while (0)
#define OMPI_OSC_PT2PT_REPLY_HDR_NTOH(hdr) \
#define OMPI_OSC_RDMA_REPLY_HDR_NTOH(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \
OMPI_OSC_RDMA_BASE_HDR_NTOH((hdr).hdr_base) \
(hdr).hdr_target_tag = ntohl((hdr).hdr_target_tag); \
(hdr).hdr_msg_length = ntohl((hdr).hdr_msg_length); \
} while (0)
@ -117,20 +117,20 @@ struct ompi_osc_rdma_control_header_t {
};
typedef struct ompi_osc_rdma_control_header_t ompi_osc_rdma_control_header_t;
#define OMPI_OSC_PT2PT_CONTROL_HDR_HTON(hdr) \
#define OMPI_OSC_RDMA_CONTROL_HDR_HTON(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_HTON((hdr).hdr_base) \
OMPI_OSC_RDMA_BASE_HDR_HTON((hdr).hdr_base) \
(hdr).hdr_windx = htons((hdr).hdr_windx); \
(hdr).hdr_value[0] = htonl((hdr).hdr_value[0]); \
(hdr).hdr_value[1] = htonl((hdr).hdr_value[1]); \
} while (0)
#define OMPI_OSC_PT2PT_CONTROL_HDR_NTOH(hdr) \
#define OMPI_OSC_RDMA_CONTROL_HDR_NTOH(hdr) \
do { \
OMPI_OSC_PT2PT_BASE_HDR_NTOH((hdr).hdr_base) \
OMPI_OSC_RDMA_BASE_HDR_NTOH((hdr).hdr_base) \
(hdr).hdr_windx = ntohs((hdr).hdr_windx); \
(hdr).hdr_value[0] = ntohl((hdr).hdr_value[0]); \
(hdr).hdr_value[1] = ntohl((hdr).hdr_value[1]); \
} while (0)
#endif /* OMPI_MCA_OSC_PT2PT_HDR_H */
#endif /* OMPI_MCA_OSC_RDMA_HDR_H */

Просмотреть файл

@ -14,8 +14,8 @@
* $HEADER$
*/
#ifndef OSC_PT2PT_LONGREQ_H
#define OSC_PT2PT_LONGREQ_H
#ifndef OSC_RDMA_LONGREQ_H
#define OSC_RDMA_LONGREQ_H
#include "osc_rdma.h"

Просмотреть файл

@ -14,8 +14,8 @@
* $HEADER$
*/
#ifndef OMPI_OSC_PT2PT_REPLYREQ_H
#define OMPI_OSC_PT2PT_REPLYREQ_H
#ifndef OMPI_OSC_RDMA_REPLYREQ_H
#define OMPI_OSC_RDMA_REPLYREQ_H
#include "osc_rdma.h"
#include "osc_rdma_longreq.h"
@ -136,4 +136,4 @@ ompi_osc_rdma_replyreq_free(ompi_osc_rdma_replyreq_t *replyreq)
return OMPI_SUCCESS;
}
#endif /* OMPI_OSC_PT2PT_REPLYREQ_H */
#endif /* OMPI_OSC_RDMA_REPLYREQ_H */

Просмотреть файл

@ -14,8 +14,8 @@
* $HEADER$
*/
#ifndef OMPI_OSC_PT2PT_SENDREQ_H
#define OMPI_OSC_PT2PT_SENDREQ_H
#ifndef OMPI_OSC_RDMA_SENDREQ_H
#define OMPI_OSC_RDMA_SENDREQ_H
#include "osc_rdma.h"
#include "osc_rdma_longreq.h"
@ -30,9 +30,9 @@
#include "ompi/mca/pml/pml.h"
typedef enum {
OMPI_OSC_PT2PT_GET,
OMPI_OSC_PT2PT_ACC,
OMPI_OSC_PT2PT_PUT
OMPI_OSC_RDMA_GET,
OMPI_OSC_RDMA_ACC,
OMPI_OSC_RDMA_PUT
} ompi_osc_rdma_req_type_t;
@ -118,7 +118,7 @@ ompi_osc_rdma_sendreq_init_origin(ompi_osc_rdma_sendreq_t *sendreq,
sendreq->req_origin_datatype = origin_dt;
sendreq->req_type = req_type;
if (req_type != OMPI_OSC_PT2PT_GET) {
if (req_type != OMPI_OSC_RDMA_GET) {
ompi_convertor_copy_and_prepare_for_send(sendreq->req_target_proc->proc_convertor,
origin_dt,
origin_count,
@ -172,4 +172,4 @@ ompi_osc_rdma_sendreq_free(ompi_osc_rdma_sendreq_t *sendreq)
return OMPI_SUCCESS;
}
#endif /* OMPI_OSC_PT2PT_SENDREQ_H */
#endif /* OMPI_OSC_RDMA_SENDREQ_H */

Просмотреть файл

@ -287,7 +287,7 @@ ompi_osc_rdma_module_complete(ompi_win_t *win)
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank]);
ompi_osc_rdma_control_send(P2P_MODULE(win),
P2P_MODULE(win)->p2p_sc_group->grp_proc_pointers[i],
OMPI_OSC_PT2PT_HDR_COMPLETE,
OMPI_OSC_RDMA_HDR_COMPLETE,
P2P_MODULE(win)->p2p_copy_num_pending_sendreqs[comm_rank],
0);
}
@ -360,7 +360,7 @@ ompi_osc_rdma_module_post(ompi_group_t *group,
for (i = 0 ; i < ompi_group_size(P2P_MODULE(win)->p2p_pw_group) ; ++i) {
ompi_osc_rdma_control_send(P2P_MODULE(win),
group->grp_proc_pointers[i],
OMPI_OSC_PT2PT_HDR_POST, 1, 0);
OMPI_OSC_RDMA_HDR_POST, 1, 0);
}
return OMPI_SUCCESS;
@ -453,7 +453,7 @@ ompi_osc_rdma_module_lock(int lock_type,
/* generate a lock request */
ompi_osc_rdma_control_send(P2P_MODULE(win),
proc,
OMPI_OSC_PT2PT_HDR_LOCK_REQ,
OMPI_OSC_RDMA_HDR_LOCK_REQ,
P2P_MODULE(win)->p2p_comm->c_my_rank,
lock_type);
@ -509,7 +509,7 @@ ompi_osc_rdma_module_unlock(int target,
P2P_MODULE(win)->p2p_comm->c_my_rank, target);
ompi_osc_rdma_control_send(P2P_MODULE(win),
proc,
OMPI_OSC_PT2PT_HDR_UNLOCK_REQ,
OMPI_OSC_RDMA_HDR_UNLOCK_REQ,
P2P_MODULE(win)->p2p_comm->c_my_rank,
out_count);
@ -565,7 +565,7 @@ ompi_osc_rdma_passive_lock(ompi_osc_rdma_module_t *module,
opal_output(-1, "%d sending lock ack to %d",
module->p2p_comm->c_my_rank, origin);
ompi_osc_rdma_control_send(module, proc,
OMPI_OSC_PT2PT_HDR_LOCK_REQ,
OMPI_OSC_RDMA_HDR_LOCK_REQ,
module->p2p_comm->c_my_rank,
OMPI_SUCCESS);
}
@ -610,7 +610,7 @@ ompi_osc_rdma_passive_unlock(ompi_osc_rdma_module_t *module,
module->p2p_lock_status = new_pending->lock_type;
ompi_osc_rdma_control_send(module,
new_pending->proc,
OMPI_OSC_PT2PT_HDR_LOCK_REQ,
OMPI_OSC_RDMA_HDR_LOCK_REQ,
module->p2p_comm->c_my_rank,
OMPI_SUCCESS);
OBJ_DESTRUCT(new_pending);