1
1
add object size to opal class
no longer need the size when allocating a new object as this is stored in
the class structure

--- dr changes 
Previous rev. maintained state on the communicator used for acking duplicate
fragments, but the communicator may be destroyed prior to successfull
delivery of an ack to the peer. We must therefore maintain this state
globally on a per peer, not a per peer, per communicator basis. 
This requires that we use a global rank on the wire and translate this as
appropriate to a local rank within the communicator. 

This commit was SVN r9454.
Этот коммит содержится в:
Galen Shipman 2006-03-29 16:19:17 +00:00
родитель 2d9757e81b
Коммит 5271948ec0
24 изменённых файлов: 332 добавлений и 140 удалений

Просмотреть файл

@ -49,7 +49,9 @@ OBJ_CLASS_DECLARATION(mca_bml_base_selected_module_t);
*/
OMPI_DECLSPEC int mca_bml_base_open(void);
OMPI_DECLSPEC int mca_bml_base_init(bool enable_progress_threads, bool enable_mpi_threads);
OMPI_DECLSPEC int mca_bml_base_init(bool enable_progress_threads,
bool enable_mpi_threads,
opal_class_t* endpoint_class);
OMPI_DECLSPEC int mca_bml_base_close(void);

Просмотреть файл

@ -64,7 +64,8 @@ int mca_bml_base_btl_array_reserve(mca_bml_base_btl_array_t* array, size_t size)
#if OMPI_ENABLE_DEBUG_RELIABILITY
extern double mca_bml_base_error_rate;
extern double mca_bml_base_error_rate_floor;
extern double mca_bml_base_error_rate_ceiling;
extern int mca_bml_base_error_count;
struct mca_bml_base_context_t {
@ -97,8 +98,11 @@ int mca_bml_base_send(
{
static int count;
des->des_context = bml_btl;
if(mca_bml_base_error_count <= 0 && mca_bml_base_error_rate > 0) {
mca_bml_base_error_count = (int) ((mca_bml_base_error_rate * rand())/(RAND_MAX+1.0));
if(mca_bml_base_error_count <= 0 && mca_bml_base_error_rate_ceiling > 0) {
mca_bml_base_error_count = (int) ((mca_bml_base_error_rate_ceiling * rand())/(RAND_MAX+1.0));
if(mca_bml_base_error_count < mca_bml_base_error_rate_floor) {
mca_bml_base_error_count = mca_bml_base_error_rate_floor;
}
if(mca_bml_base_error_count % 2) {
/* local completion - network "drops" packet */
opal_output(0, "%s:%d: dropping data\n", __FILE__, __LINE__);

Просмотреть файл

@ -39,7 +39,8 @@ mca_bml_base_component_t mca_bml_component;
int mca_bml_base_init( bool enable_progress_threads,
bool enable_mpi_threads ) {
bool enable_mpi_threads,
opal_class_t* endpoint_class) {
opal_list_item_t *item = NULL;
mca_bml_base_component_t *component = NULL, *best_component = NULL;
mca_bml_base_module_t *module = NULL, *best_module = NULL;
@ -59,7 +60,8 @@ int mca_bml_base_init( bool enable_progress_threads,
}
module = component->bml_init(&priority,
enable_progress_threads,
enable_mpi_threads );
enable_mpi_threads,
endpoint_class);
if(NULL == module) {
continue;

Просмотреть файл

@ -29,7 +29,8 @@
opal_list_t mca_bml_base_components_available;
#if OMPI_ENABLE_DEBUG_RELIABILITY
double mca_bml_base_error_rate;
double mca_bml_base_error_rate_floor;
double mca_bml_base_error_rate_ceiling;
int mca_bml_base_error_count;
#endif
@ -46,10 +47,16 @@ int mca_bml_base_open( void ) {
do {
int param, value;
mca_base_param_register_int("bml", NULL, "error_rate", "error_rate", 0);
param = mca_base_param_find("bml", NULL, "error_rate");
mca_base_param_register_int("bml", NULL, "error_rate_floor", "error_rate_floor", 0);
param = mca_base_param_find("bml", NULL, "error_rate_floor");
mca_base_param_lookup_int(param, &value);
mca_bml_base_error_rate = value;
mca_bml_base_error_rate_floor = value;
mca_base_param_register_int("bml", NULL, "error_rate_ceiling", "error_rate_ceiling", 0);
param = mca_base_param_find("bml", NULL, "error_rate_ceiling");
mca_base_param_lookup_int(param, &value);
mca_bml_base_error_rate_ceiling = value;
mca_base_param_register_int("bml", NULL, "srand", "srand", 1);
param = mca_base_param_find("bml", NULL, "srand");
@ -63,8 +70,9 @@ int mca_bml_base_open( void ) {
}
/* initialize count */
if(mca_bml_base_error_rate > 0) {
mca_bml_base_error_count = (int) ((mca_bml_base_error_rate * rand())/(RAND_MAX+1.0));
if(mca_bml_base_error_rate_ceiling > 0
&& mca_bml_base_error_rate_floor <= mca_bml_base_error_rate_ceiling) {
mca_bml_base_error_count = (int) ((mca_bml_base_error_rate_ceiling * rand())/(RAND_MAX+1.0));
}
} while (0);
#endif

Просмотреть файл

@ -207,7 +207,7 @@ static inline mca_bml_base_btl_t* mca_bml_base_btl_array_find(
/**
* Structure associated w/ ompi_proc_t that contains the set
* of BTLs used to reach a destinationation
* of BTLs used to reach a destination
*/
struct mca_bml_base_endpoint_t {
mca_pml_proc_t super;
@ -398,7 +398,8 @@ static inline void mca_bml_base_prepare_dst(mca_bml_base_btl_t* bml_btl,
typedef struct mca_bml_base_module_t* (*mca_bml_base_component_init_fn_t)(
int* priority,
bool enable_progress_threads,
bool enable_mpi_threads
bool enable_mpi_threads,
opal_class_t* endpoint_class
);
/**

Просмотреть файл

@ -248,7 +248,8 @@ int mca_bml_r2_add_procs(
/* allocate bml specific proc data */
bml_endpoint = OBJ_NEW(mca_bml_base_endpoint_t);
bml_endpoint = (mca_bml_base_endpoint_t*)
opal_obj_new(mca_bml_r2.endpoint_class);
if (NULL == bml_endpoint) {
opal_output(0, "mca_bml_r2_add_procs: unable to allocate resources");
free(btl_endpoints);

Просмотреть файл

@ -53,6 +53,7 @@ struct mca_bml_r2_module_t {
mca_btl_base_component_progress_fn_t * btl_progress;
mca_bml_r2_recv_reg_t r2_reg[256];
bool btls_added;
opal_class_t * endpoint_class;
};
typedef struct mca_bml_r2_module_t mca_bml_r2_module_t;
@ -65,7 +66,8 @@ extern int mca_bml_r2_component_close(void);
extern mca_bml_base_module_t* mca_bml_r2_component_init(
int* priority,
bool enable_progress_threads,
bool enable_mpi_threads
bool enable_mpi_threads,
opal_class_t* endpoint_class
);
extern int mca_bml_r2_progress(void);

Просмотреть файл

@ -83,7 +83,8 @@ int mca_bml_r2_component_close(void)
mca_bml_base_module_t* mca_bml_r2_component_init(
int* priority,
bool enable_progress_threads,
bool enable_mpi_threads
bool enable_mpi_threads,
opal_class_t* endpoint_class
)
{
/* initialize BTLs */
@ -93,5 +94,6 @@ mca_bml_base_module_t* mca_bml_r2_component_init(
*priority = 100;
mca_bml_r2.btls_added = false;
mca_bml_r2.endpoint_class = endpoint_class;
return &mca_bml_r2.super;
}

Просмотреть файл

@ -31,8 +31,6 @@ OBJ_CLASS_INSTANCE(mca_mpool_base_selected_module_t, opal_list_item_t, NULL, NUL
static bool mca_mpool_enable_progress_threads = true;
static bool mca_mpool_enable_mpi_threads = true;
OBJ_CLASS_INSTANCE(mca_mpool_base_chunk_t, opal_list_item_t, NULL, NULL);
/**
* Function for weeding out mpool modules that don't want to run.
*

Просмотреть файл

@ -24,6 +24,8 @@ dr_sources = \
pml_dr_comm.h \
pml_dr_component.c \
pml_dr_component.h \
pml_dr_endpoint.c \
pml_dr_endpoint.h \
pml_dr_hdr.h \
pml_dr_iprobe.c \
pml_dr_irecv.c \

Просмотреть файл

@ -1,3 +1,4 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
@ -34,6 +35,7 @@
#include "pml_dr_sendreq.h"
#include "pml_dr_recvreq.h"
#include "ompi/mca/bml/base/base.h"
#include "orte/mca/ns/ns.h"
mca_pml_dr_t mca_pml_dr = {
{
@ -111,8 +113,9 @@ int mca_pml_dr_del_comm(ompi_communicator_t* comm)
int mca_pml_dr_add_procs(ompi_proc_t** procs, size_t nprocs)
{
ompi_bitmap_t reachable;
struct mca_bml_base_endpoint_t ** bml_endpoints = NULL;
struct mca_pml_dr_endpoint_t ** endpoints = NULL;
int rc;
size_t i;
if(nprocs == 0)
return OMPI_SUCCESS;
@ -122,16 +125,16 @@ int mca_pml_dr_add_procs(ompi_proc_t** procs, size_t nprocs)
if(OMPI_SUCCESS != rc)
return rc;
bml_endpoints = (struct mca_bml_base_endpoint_t **) malloc ( nprocs *
sizeof(struct mca_bml_base_endpoint_t*));
if ( NULL == bml_endpoints ) {
endpoints = (struct mca_pml_dr_endpoint_t **) malloc ( nprocs *
sizeof(struct mca_pml_dr_endpoint_t*));
if ( NULL == endpoints ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
rc = mca_bml.bml_add_procs(
nprocs,
procs,
bml_endpoints,
(mca_bml_base_endpoint_t**) endpoints,
&reachable
);
if(OMPI_SUCCESS != rc)
@ -151,9 +154,27 @@ int mca_pml_dr_add_procs(ompi_proc_t** procs, size_t nprocs)
mca_pml_dr.free_list_max,
mca_pml_dr.free_list_inc,
NULL);
for(i = 0; i < nprocs; i++) {
int idx;
/* this won't work for comm spawn and other dynamic
processes, but will work for initial job start */
idx = ompi_pointer_array_add(&mca_pml_dr.procs,
(void*) endpoints[i]);
if(orte_ns.compare(ORTE_NS_CMP_ALL,
orte_process_info.my_name,
&endpoints[i]->base.super.proc_ompi->proc_name) == 0) {
mca_pml_dr.my_rank = idx;
}
endpoints[i]->local = endpoints[i]->dst = idx;
}
if ( NULL != bml_endpoints ) {
free ( bml_endpoints) ;
for(i = 0; i < nprocs; i++) {
endpoints[i]->src = mca_pml_dr.my_rank;
}
/* no longer need this */
if ( NULL != endpoints ) {
free ( endpoints) ;
}
return rc;
}

Просмотреть файл

@ -33,6 +33,7 @@
#include "ompi/mca/pml/base/pml_base_request.h"
#include "ompi/mca/pml/base/pml_base_bsend.h"
#include "ompi/mca/pml/base/pml_base_sendreq.h"
#include "ompi/class/ompi_pointer_array.h"
#include "ompi/mca/btl/btl.h"
#include "ompi/datatype/datatype.h"
@ -72,6 +73,12 @@ struct mca_pml_dr_t {
ompi_free_list_t vfrags;
ompi_free_list_t buffers;
/* proc pointer array */
ompi_pointer_array_t procs;
/* my 'global' rank */
int32_t my_rank;
int timer_wdog_sec;
int timer_wdog_usec;
int timer_wdog_multiplier;

Просмотреть файл

@ -21,20 +21,17 @@
#include "pml_dr.h"
#include "pml_dr_comm.h"
#include "pml_dr_endpoint.h"
static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc)
{
proc->expected_sequence = 1;
proc->vfrag_id = 1;
proc->send_sequence = 0;
OBJ_CONSTRUCT(&proc->frags_cant_match, opal_list_t);
OBJ_CONSTRUCT(&proc->specific_receives, opal_list_t);
OBJ_CONSTRUCT(&proc->matched_receives, opal_list_t);
OBJ_CONSTRUCT(&proc->unexpected_frags, opal_list_t);
OBJ_CONSTRUCT(&proc->seq_sends, ompi_seq_tracker_t);
OBJ_CONSTRUCT(&proc->seq_recvs, ompi_seq_tracker_t);
OBJ_CONSTRUCT(&proc->seq_recvs_matched, ompi_seq_tracker_t);
}
@ -44,9 +41,6 @@ static void mca_pml_dr_comm_proc_destruct(mca_pml_dr_comm_proc_t* proc)
OBJ_DESTRUCT(&proc->matched_receives);
OBJ_DESTRUCT(&proc->specific_receives);
OBJ_DESTRUCT(&proc->unexpected_frags);
OBJ_DESTRUCT(&proc->seq_sends);
OBJ_DESTRUCT(&proc->seq_recvs);
OBJ_DESTRUCT(&proc->seq_recvs_matched);
}
@ -60,6 +54,7 @@ static void mca_pml_dr_comm_construct(mca_pml_dr_comm_t* comm)
{
OBJ_CONSTRUCT(&comm->wild_receives, opal_list_t);
OBJ_CONSTRUCT(&comm->matching_lock, opal_mutex_t);
OBJ_CONSTRUCT(&comm->sparse_procs, ompi_pointer_array_t);
comm->recv_sequence = 0;
comm->procs = NULL;
comm->num_procs = 0;
@ -69,12 +64,16 @@ static void mca_pml_dr_comm_construct(mca_pml_dr_comm_t* comm)
static void mca_pml_dr_comm_destruct(mca_pml_dr_comm_t* comm)
{
size_t i;
for(i=0; i<comm->num_procs; i++)
for(i=0; i<comm->num_procs; i++) {
OBJ_DESTRUCT((&comm->procs[i]));
if(NULL != comm->procs)
}
if(NULL != comm->procs) {
free(comm->procs);
}
OBJ_DESTRUCT(&comm->wild_receives);
OBJ_DESTRUCT(&comm->matching_lock);
OBJ_DESTRUCT(&comm->sparse_procs);
}
@ -96,11 +95,23 @@ int mca_pml_dr_comm_init(mca_pml_dr_comm_t* dr_comm, ompi_communicator_t* ompi_c
return OMPI_ERR_OUT_OF_RESOURCE;
}
for(i=0; i<size; i++) {
OBJ_CONSTRUCT(dr_comm->procs+i, mca_pml_dr_comm_proc_t);
dr_comm->procs[i].ompi_proc = ompi_comm->c_remote_group->grp_proc_pointers[i];
mca_pml_dr_comm_proc_t* proc;
mca_pml_dr_endpoint_t* ep;
ompi_proc_t* ompi_proc;
proc = dr_comm->procs+i;
OBJ_CONSTRUCT(proc, mca_pml_dr_comm_proc_t);
proc->comm_rank = i;
ompi_proc = ompi_comm->c_remote_group->grp_proc_pointers[i];
proc->ompi_proc = ompi_proc;
ep = (mca_pml_dr_endpoint_t*) ompi_proc->proc_pml;
ompi_pointer_array_set_item(&dr_comm->sparse_procs,
ep->dst, /* from our view this is the
peers source 'global rank' */
proc);
proc->endpoint = ep;
}
dr_comm->num_procs = size;
return OMPI_SUCCESS;
}

Просмотреть файл

@ -27,6 +27,7 @@
#include "ompi/class/ompi_seq_tracker.h"
#include "ompi/communicator/communicator.h"
#include "ompi/proc/proc.h"
#include "pml_dr_endpoint.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
@ -35,7 +36,6 @@ extern "C" {
struct mca_pml_dr_comm_proc_t {
opal_object_t super;
uint16_t expected_sequence; /**< send message sequence number - receiver side */
uint32_t vfrag_id; /**< virtual fragment identifier */
#if OMPI_HAVE_THREAD_SUPPORT
volatile int32_t send_sequence; /**< send side sequence number */
#else
@ -46,9 +46,8 @@ struct mca_pml_dr_comm_proc_t {
opal_list_t unexpected_frags; /**< unexpected fragment queues */
opal_list_t matched_receives; /**< list of in-progress matched receives */
ompi_proc_t* ompi_proc; /**< back pointer to ompi_proc_t */
ompi_seq_tracker_t seq_sends; /**< Tracks the send vfrags that have been acked */
ompi_seq_tracker_t seq_recvs; /**< Tracks the receive vfrags that have been acked */
ompi_seq_tracker_t seq_recvs_matched; /**< Tracks the received vfrags that have been matched */
mca_pml_dr_endpoint_t* endpoint; /**< back pointer to the endpoint */
int32_t comm_rank; /**< rank in the communicator */
};
typedef struct mca_pml_dr_comm_proc_t mca_pml_dr_comm_proc_t;
@ -65,6 +64,7 @@ struct mca_pml_comm_t {
#endif
opal_mutex_t matching_lock; /**< matching lock */
opal_list_t wild_receives; /**< queue of unmatched wild (source process not specified) receives */
ompi_pointer_array_t sparse_procs; /**< sparse array, allows lookup of comm_proc using a global rank */
mca_pml_dr_comm_proc_t* procs;
size_t num_procs;
};

Просмотреть файл

@ -31,6 +31,7 @@
#include "pml_dr_sendreq.h"
#include "pml_dr_recvreq.h"
#include "pml_dr_recvfrag.h"
#include "pml_dr_endpoint.h"
#include "ompi/mca/bml/base/base.h"
@ -154,7 +155,7 @@ int mca_pml_dr_component_open(void)
OBJ_CONSTRUCT(&mca_pml_dr.send_pending, opal_list_t);
OBJ_CONSTRUCT(&mca_pml_dr.acks_pending, opal_list_t);
OBJ_CONSTRUCT(&mca_pml_dr.buffers, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_pml_dr.procs, ompi_pointer_array_t);
OBJ_CONSTRUCT(&mca_pml_dr.lock, opal_mutex_t);
mca_pml_dr.enabled = false;
@ -195,9 +196,15 @@ mca_pml_base_module_t* mca_pml_dr_component_init(int* priority,
}
if(OMPI_SUCCESS != mca_bml_base_init( enable_progress_threads, enable_mpi_threads))
if(OMPI_SUCCESS != mca_bml_base_init( enable_progress_threads,
enable_mpi_threads,
OBJ_CLASS(mca_pml_dr_endpoint_t)
)) {
return NULL;
}
mca_pml_dr.super.pml_progress = mca_bml.bml_progress;
return &mca_pml_dr.super;
}

43
ompi/mca/pml/dr/pml_dr_endpoint.c Обычный файл
Просмотреть файл

@ -0,0 +1,43 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "pml_dr.h"
#include "pml_dr_endpoint.h"
static void mca_pml_dr_endpoint_construct(mca_pml_dr_endpoint_t* ep)
{
OBJ_CONSTRUCT(&ep->seq_sends, ompi_seq_tracker_t);
OBJ_CONSTRUCT(&ep->seq_recvs, ompi_seq_tracker_t);
OBJ_CONSTRUCT(&ep->seq_recvs_matched, ompi_seq_tracker_t);
}
static void mca_pml_dr_endpoint_destruct(mca_pml_dr_endpoint_t* ep)
{
OBJ_DESTRUCT(&ep->seq_sends);
OBJ_DESTRUCT(&ep->seq_recvs);
OBJ_DESTRUCT(&ep->seq_recvs_matched);
}
OBJ_CLASS_INSTANCE(
mca_pml_dr_endpoint_t,
mca_bml_base_endpoint_t,
mca_pml_dr_endpoint_construct,
mca_pml_dr_endpoint_destruct);

53
ompi/mca/pml/dr/pml_dr_endpoint.h Обычный файл
Просмотреть файл

@ -0,0 +1,53 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_PML_ENDPOINT_H
#define MCA_PML_ENDPOINT_H
#include "ompi/mca/bml/bml.h"
#include "ompi/class/ompi_seq_tracker.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
/**
* This is the pml level endpoint
* simply inherity the bml_base_endpoint and
* add whatever else is needed
*/
struct mca_pml_dr_endpoint_t {
mca_bml_base_endpoint_t base;
int32_t local; /* local view of the rank */
int32_t src; /* peers view of the src rank */
int32_t dst; /* peers destination rank */
ompi_seq_tracker_t seq_sends; /**< Tracks the send vfrags that have been acked */
ompi_seq_tracker_t seq_recvs; /**< Tracks the receive vfrags that have been acked */
ompi_seq_tracker_t seq_recvs_matched; /**< Tracks the received vfrags that have been matched */
uint32_t vfrag_seq; /**< current virtual fragment identifier sequence */
};
typedef struct mca_pml_dr_endpoint_t mca_pml_dr_endpoint_t;
OBJ_CLASS_DECLARATION(mca_pml_dr_endpoint_t);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

Просмотреть файл

@ -40,13 +40,29 @@
#define MCA_PML_DR_HDR_VALIDATE(hdr, type) \
do { \
ompi_communicator_t* comm = NULL; \
uint16_t csum = opal_csum(hdr, sizeof(type)); \
if(hdr->hdr_common.hdr_csum != csum) { \
OPAL_OUTPUT((0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n", \
__FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum)); \
return; \
} \
if(hdr->hdr_common.hdr_dst != (ompi_comm_lookup(hdr->hdr_common.hdr_ctx))->c_my_rank ) { \
comm = ompi_comm_lookup(hdr->hdr_common.hdr_ctx); \
if(NULL == comm ) { \
mca_pml_dr_endpoint_t* ep; \
OPAL_OUTPUT((0, "%s:%d: communicator not found\n", \
__FILE__, __LINE__)); \
ep = ompi_pointer_array_get_item(&mca_pml_dr.procs, \
hdr->hdr_common.hdr_src); \
if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs_matched, hdr->hdr_common.hdr_vid)) {\
mca_pml_dr_recv_frag_ack(&ep->base, \
&hdr->hdr_common, \
hdr->hdr_match.hdr_src_ptr.pval, \
1, 0); \
} \
return; \
} \
if(hdr->hdr_common.hdr_dst != mca_pml_dr.my_rank ) { \
OPAL_OUTPUT((0, "%s:%d: misdelivered packet [rank %d -> rank %d]\n", \
__FILE__, __LINE__, hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst)); \
return; \
@ -54,7 +70,7 @@ do {
} while (0)
#define MCA_PML_DR_COMM_PROC_LOOKUP(hdr, comm, proc) \
#define MCA_PML_DR_COMM_PROC_LOOKUP(hdr, comm, proc, ep) \
do { \
ompi_communicator_t* comm_ptr=ompi_comm_lookup(hdr->hdr_common.hdr_ctx); \
if(NULL == comm_ptr) { \
@ -63,7 +79,8 @@ do {
return; \
} \
comm = (mca_pml_dr_comm_t*)comm_ptr->c_pml_comm; \
proc = comm->procs + hdr->hdr_common.hdr_src; \
proc = ompi_pointer_array_get_item(&comm->sparse_procs, hdr->hdr_common.hdr_src); \
ep = proc->endpoint; \
} while (0)
@ -110,6 +127,7 @@ void mca_pml_dr_recv_frag_callback(
mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval;
mca_pml_dr_comm_t *comm;
mca_pml_dr_comm_proc_t *proc;
mca_pml_dr_endpoint_t *ep;
if(segments->seg_len < sizeof(mca_pml_dr_common_hdr_t)) {
return;
}
@ -118,14 +136,14 @@ void mca_pml_dr_recv_frag_callback(
case MCA_PML_DR_HDR_TYPE_MATCH:
{
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_match_hdr_t);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep);
/* seq_recvs protected by matching lock */
OPAL_THREAD_LOCK(&comm->matching_lock);
if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) {
if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs, hdr->hdr_common.hdr_vid)) {
OPAL_THREAD_UNLOCK(&comm->matching_lock);
OPAL_OUTPUT((0, "%s:%d: acking duplicate match\n", __FILE__, __LINE__));
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml,
mca_pml_dr_recv_frag_ack(&ep->base,
&hdr->hdr_common,
hdr->hdr_match.hdr_src_ptr.pval,
1, 0);
@ -138,11 +156,11 @@ void mca_pml_dr_recv_frag_callback(
case MCA_PML_DR_HDR_TYPE_MATCH_ACK:
{
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep);
/* seq_sends protected by ompi_request lock*/
OPAL_THREAD_LOCK(&ompi_request_lock);
if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
if(!ompi_seq_tracker_check_duplicate(&ep->seq_sends, hdr->hdr_common.hdr_vid)) {
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_pml_dr_send_request_match_ack(btl, &hdr->hdr_ack);
} else {
@ -153,11 +171,11 @@ void mca_pml_dr_recv_frag_callback(
case MCA_PML_DR_HDR_TYPE_RNDV:
{
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_rendezvous_hdr_t);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep);
/* seq_recvs protected by matching lock */
OPAL_THREAD_LOCK(&comm->matching_lock);
if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)){
if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs, hdr->hdr_common.hdr_vid)){
/* ack only if the vfrag has been matched */
mca_pml_dr_recv_request_t* recvreq =
mca_pml_dr_comm_proc_check_matched(proc, hdr->hdr_common.hdr_vid);
@ -167,9 +185,9 @@ void mca_pml_dr_recv_frag_callback(
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common,
hdr->hdr_match.hdr_src_ptr, recvreq->req_bytes_received, 1);
} else {
if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs_matched, hdr->hdr_common.hdr_vid)) {
if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs_matched, hdr->hdr_common.hdr_vid)) {
OPAL_OUTPUT((0, "%s:%d: acking duplicate matched rendezvous from sequence tracker\n", __FILE__, __LINE__));
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*) proc->ompi_proc->proc_pml,
mca_pml_dr_recv_frag_ack(&ep->base,
&hdr->hdr_common,
hdr->hdr_match.hdr_src_ptr.pval,
~(uint64_t) 0, hdr->hdr_rndv.hdr_msg_length);
@ -186,11 +204,11 @@ void mca_pml_dr_recv_frag_callback(
case MCA_PML_DR_HDR_TYPE_RNDV_ACK:
{
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep);
/* seq_sends protected by ompi_request lock*/
OPAL_THREAD_LOCK(&ompi_request_lock);
if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
if(!ompi_seq_tracker_check_duplicate(&ep->seq_sends, hdr->hdr_common.hdr_vid)) {
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_pml_dr_send_request_rndv_ack(btl, &hdr->hdr_ack);
} else {
@ -202,14 +220,14 @@ void mca_pml_dr_recv_frag_callback(
{
mca_pml_dr_recv_request_t* recvreq;
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_frag_hdr_t);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep);
/* seq_recvs protected by matching lock */
OPAL_THREAD_LOCK(&comm->matching_lock);
if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) {
if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs, hdr->hdr_common.hdr_vid)) {
OPAL_THREAD_UNLOCK(&comm->matching_lock);
OPAL_OUTPUT((0, "%s:%d: acking duplicate fragment\n", __FILE__, __LINE__));
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml,
mca_pml_dr_recv_frag_ack(&ep->base,
&hdr->hdr_common,
hdr->hdr_frag.hdr_src_ptr.pval,
~(uint64_t) 0, 0);
@ -224,11 +242,11 @@ void mca_pml_dr_recv_frag_callback(
case MCA_PML_DR_HDR_TYPE_FRAG_ACK:
{
MCA_PML_DR_HDR_VALIDATE(hdr, mca_pml_dr_ack_hdr_t);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc);
MCA_PML_DR_COMM_PROC_LOOKUP(hdr,comm,proc,ep);
/* seq_sends protected by ompi_request lock*/
OPAL_THREAD_LOCK(&ompi_request_lock);
if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
if(!ompi_seq_tracker_check_duplicate(&ep->seq_sends, hdr->hdr_common.hdr_vid)) {
OPAL_THREAD_UNLOCK(&ompi_request_lock);
mca_pml_dr_send_request_frag_ack(btl, &hdr->hdr_ack);
} else {
@ -545,6 +563,7 @@ bool mca_pml_dr_recv_frag_match(
ompi_proc_t* ompi_proc = proc->ompi_proc;
int rc;
uint32_t csum = OPAL_CSUM_ZERO;
mca_pml_dr_endpoint_t* ep = (mca_pml_dr_endpoint_t*) proc->endpoint;
/* source sequence number */
frag_msg_seq = hdr->hdr_seq;
@ -668,7 +687,7 @@ rematch:
opal_list_append(&proc->frags_cant_match, (opal_list_item_t *)frag);
}
ompi_seq_tracker_insert(&proc->seq_recvs, hdr->hdr_common.hdr_vid);
ompi_seq_tracker_insert(&ep->seq_recvs, hdr->hdr_common.hdr_vid);
OPAL_THREAD_UNLOCK(&comm->matching_lock);
/* release matching lock before processing fragment */
@ -720,7 +739,7 @@ void mca_pml_dr_recv_frag_ack(
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_type;
ack->hdr_common.hdr_flags = 0;
ack->hdr_common.hdr_src = comm->c_my_rank;
ack->hdr_common.hdr_src = hdr->hdr_dst;
ack->hdr_common.hdr_dst = hdr->hdr_src;
ack->hdr_common.hdr_vid = hdr->hdr_vid;
ack->hdr_common.hdr_ctx = hdr->hdr_ctx;
@ -828,7 +847,9 @@ rematch:
* look only at "specific" receives, or "wild" receives,
* or if we need to traverse both sets at the same time.
*/
proc = comm->procs + hdr->hdr_common.hdr_src;
proc = ompi_pointer_array_get_item(&comm->sparse_procs,
hdr->hdr_common.hdr_src);
if (opal_list_get_size(&proc->specific_receives) == 0 ) {
/*
* There are only wild irecvs, so specialize the algorithm.
@ -868,7 +889,7 @@ rematch:
* descriptor */
frag->request=match;
match->req_proc = proc;
match->req_endpoint = (mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml;
match->req_endpoint = (mca_pml_dr_endpoint_t*)proc->ompi_proc->proc_pml;
/* add this fragment descriptor to the list of
* descriptors to be processed later

Просмотреть файл

@ -41,7 +41,7 @@ if(csum != hdr->hdr_match.hdr_csum) { \
} else { \
mca_pml_dr_recv_request_match_specific(recvreq); \
} \
mca_pml_dr_recv_frag_ack(recvreq->req_endpoint, \
mca_pml_dr_recv_frag_ack(&recvreq->req_endpoint->base, \
&hdr->hdr_common, \
hdr->hdr_match.hdr_src_ptr.pval, \
0, 0); \
@ -53,7 +53,7 @@ if(csum != hdr->hdr_match.hdr_csum) { \
bytes_received = bytes_delivered = 0; \
} else if (recvreq->req_acked == false) { \
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common, \
hdr->hdr_match.hdr_src_ptr, bytes_received, 1); \
hdr->hdr_match.hdr_src_ptr, bytes_received, 1); \
}
@ -92,8 +92,8 @@ static int mca_pml_dr_recv_request_cancel(struct ompi_request_t* ompi_request, i
if( request->req_recv.req_base.req_peer == OMPI_ANY_SOURCE ) {
opal_list_remove_item( &comm->wild_receives, (opal_list_item_t*)request );
} else {
mca_pml_dr_comm_proc_t* proc = comm->procs + request->req_recv.req_base.req_peer;
opal_list_remove_item(&proc->specific_receives, (opal_list_item_t*)request);
mca_pml_dr_comm_proc_t* proc = comm->procs + request->req_recv.req_base.req_peer;
opal_list_remove_item(&proc->specific_receives, (opal_list_item_t*)request);
}
}
OPAL_THREAD_UNLOCK(&comm->matching_lock);
@ -167,7 +167,7 @@ void mca_pml_dr_recv_request_ack(
int rc;
/* allocate descriptor */
bml_btl = mca_bml_base_btl_array_get_next(&recvreq->req_endpoint->btl_eager);
bml_btl = mca_bml_base_btl_array_get_next(&recvreq->req_endpoint->base.btl_eager);
MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t));
if(NULL == des) {
return;
@ -177,8 +177,8 @@ void mca_pml_dr_recv_request_ack(
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_type;
ack->hdr_common.hdr_flags = 0;
ack->hdr_common.hdr_dst = recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE;;
ack->hdr_common.hdr_src = recvreq->req_recv.req_base.req_comm->c_my_rank;
ack->hdr_common.hdr_dst = recvreq->req_endpoint->dst;
ack->hdr_common.hdr_src = recvreq->req_endpoint->src;
ack->hdr_common.hdr_ctx = recvreq->req_recv.req_base.req_comm->c_contextid;
ack->hdr_common.hdr_vid = hdr->hdr_vid;
ack->hdr_vlen = vlen;
@ -255,7 +255,7 @@ void mca_pml_dr_recv_request_progress(
bytes_received,
bytes_delivered,
csum);
MCA_PML_DR_RECV_REQUEST_ACK(recvreq,hdr,csum,bytes_received);
MCA_PML_DR_RECV_REQUEST_ACK(recvreq,hdr,csum,bytes_received);
break;
case MCA_PML_DR_HDR_TYPE_FRAG:
@ -290,7 +290,7 @@ void mca_pml_dr_recv_request_progress(
if((vfrag->vf_pending & vfrag->vf_mask) == vfrag->vf_mask) {
/* we have received all the pieces of the vfrag, ack
everything that passed the checksum */
ompi_seq_tracker_insert(&recvreq->req_proc->seq_recvs, vfrag->vf_id);
ompi_seq_tracker_insert(&recvreq->req_endpoint->seq_recvs, vfrag->vf_id);
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common,
hdr->hdr_frag.hdr_src_ptr, vfrag->vf_size, vfrag->vf_mask);
}
@ -347,7 +347,7 @@ void mca_pml_dr_recv_request_matched_probe(
/* check completion status */
OPAL_THREAD_LOCK(&ompi_request_lock);
recvreq->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_match.hdr_tag;
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_common.hdr_src;
recvreq->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = recvreq->req_proc->comm_rank;
recvreq->req_recv.req_base.req_ompi.req_status._count = bytes_packed;
recvreq->req_recv.req_base.req_pml_complete = true;
recvreq->req_recv.req_base.req_ompi.req_complete = true;

Просмотреть файл

@ -44,7 +44,7 @@ struct mca_pml_dr_recv_request_t {
/* filled in after match */
struct mca_pml_dr_comm_proc_t* req_proc;
struct mca_bml_base_endpoint_t* req_endpoint;
struct mca_pml_dr_endpoint_t* req_endpoint;
opal_mutex_t* req_mutex;
/* vfrag state */
@ -220,13 +220,13 @@ do {
do { \
(request)->req_mutex = &comm->matching_lock; \
(request)->req_proc = proc; \
(request)->req_endpoint = (mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml; \
(request)->req_endpoint = (mca_pml_dr_endpoint_t*)proc->ompi_proc->proc_pml; \
(request)->req_recv.req_base.req_ompi.req_status.MPI_TAG = (hdr)->hdr_tag; \
(request)->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = \
(hdr)->hdr_common.hdr_src; \
(request)->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = proc->comm_rank; \
(request)->req_vfrag0.vf_id = (hdr)->hdr_common.hdr_vid; \
opal_list_append(&proc->matched_receives, (opal_list_item_t*)request); \
ompi_seq_tracker_insert(&proc->seq_recvs_matched, (hdr)->hdr_common.hdr_vid); \
ompi_seq_tracker_insert(&request->req_endpoint->seq_recvs_matched, \
(hdr)->hdr_common.hdr_vid); \
} while(0)

Просмотреть файл

@ -132,7 +132,7 @@ static void mca_pml_dr_match_completion(
/* update statistics and complete */
sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed;
ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id);
ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id);
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
/* on negative ack need to retransmit */
@ -187,7 +187,7 @@ static void mca_pml_dr_rndv_completion(
}
/* update statistics and complete */
ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id);
ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id);
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
} else {
@ -250,7 +250,7 @@ static void mca_pml_dr_frag_completion(
}
/* record vfrag id to drop duplicate acks */
ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id);
ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id);
/* return this vfrag */
MCA_PML_DR_VFRAG_RETURN(vfrag);
@ -370,8 +370,8 @@ int mca_pml_dr_send_request_start_buffered(
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV;
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst;
hdr->hdr_common.hdr_src = sendreq->req_endpoint->src;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
@ -452,9 +452,9 @@ int mca_pml_dr_send_request_start_copy(
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH;
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_common.hdr_src = sendreq->req_endpoint->src;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : OPAL_CSUM_ZERO;
@ -521,9 +521,9 @@ int mca_pml_dr_send_request_start_prepare(
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH;
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_common.hdr_src = sendreq->req_endpoint->src;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : OPAL_CSUM_ZERO;
@ -595,8 +595,8 @@ int mca_pml_dr_send_request_start_rndv(
hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval;
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV;
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst;
hdr->hdr_common.hdr_src = sendreq->req_endpoint->src;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_common.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
@ -640,7 +640,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
* the scheduling logic once for every call.
*/
mca_bml_base_endpoint_t* bml_endpoint = sendreq->req_endpoint;
mca_pml_dr_endpoint_t* endpoint = sendreq->req_endpoint;
assert(sendreq->req_vfrag0.vf_recv.pval != NULL);
if(OPAL_THREAD_ADD32(&sendreq->req_lock,1) == 1) {
do {
@ -662,7 +662,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
/* do we need to allocate a new vfrag
(we scheduled all the vfrag already) */
if(vfrag->vf_size == bytes_sent) {
bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send);
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->base.btl_send);
MCA_PML_DR_VFRAG_ALLOC(vfrag,rc);
if(NULL == vfrag) {
OPAL_THREAD_LOCK(&mca_pml_dr.lock);
@ -670,7 +670,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
OPAL_THREAD_UNLOCK(&mca_pml_dr.lock);
break;
}
MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,bml_endpoint,bytes_remaining,vfrag);
MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,endpoint,bytes_remaining,vfrag);
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
vfrag->bml_btl = bml_btl;
bytes_sent = 0;
@ -711,9 +711,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG;
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst;
hdr->hdr_common.hdr_vid = vfrag->vf_id;
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_common.hdr_src = sendreq->req_endpoint->src;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_vlen = vfrag->vf_len;
hdr->hdr_frag_idx = vfrag->vf_idx;
@ -800,9 +800,9 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG;
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_common.hdr_dst = sendreq->req_endpoint->dst;
hdr->hdr_common.hdr_vid = vfrag->vf_id;
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_common.hdr_src = sendreq->req_endpoint->src;
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_vlen = vfrag->vf_len;
hdr->hdr_frag_idx = vfrag->vf_idx;
@ -875,7 +875,7 @@ void mca_pml_dr_send_request_match_ack(
/* update statistics */
sendreq->req_bytes_delivered = vfrag->vf_size;
ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id);
ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id);
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
}
@ -941,7 +941,7 @@ void mca_pml_dr_send_request_rndv_ack(
}
/* stash the vfrag id for duplicate acks.. */
ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id);
ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id);
OPAL_THREAD_UNLOCK(&ompi_request_lock);
if(schedule) {
@ -998,7 +998,7 @@ void mca_pml_dr_send_request_frag_ack(
assert(sendreq->req_bytes_delivered <= sendreq->req_send.req_bytes_packed);
/* stash the vfid for duplicate acks.. */
ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id);
ompi_seq_tracker_insert(&sendreq->req_endpoint->seq_sends, vfrag->vf_id);
/* return vfrag */
MCA_PML_DR_VFRAG_RETURN(vfrag);

Просмотреть файл

@ -33,6 +33,7 @@
#include "pml_dr_comm.h"
#include "pml_dr_hdr.h"
#include "pml_dr_vfrag.h"
#include "pml_dr_endpoint.h"
#include "opal/event/event.h"
#if defined(c_plusplus) || defined(__cplusplus)
@ -42,7 +43,7 @@ extern "C" {
struct mca_pml_dr_send_request_t {
mca_pml_base_send_request_t req_send;
mca_pml_dr_comm_proc_t* req_proc;
mca_bml_base_endpoint_t* req_endpoint;
mca_pml_dr_endpoint_t* req_endpoint;
#if OMPI_HAVE_THREAD_SUPPORT
volatile int32_t req_state;
volatile int32_t req_lock;
@ -143,9 +144,10 @@ do {
#define MCA_PML_DR_SEND_REQUEST_START(sendreq, rc) \
do { \
mca_pml_dr_comm_t* comm = sendreq->req_send.req_base.req_comm->c_pml_comm; \
mca_pml_dr_comm_proc_t* proc = comm->procs + sendreq->req_send.req_base.req_peer; \
mca_bml_base_endpoint_t* endpoint = \
(mca_bml_base_endpoint_t*)sendreq->req_send.req_base.req_proc->proc_pml; \
mca_pml_dr_endpoint_t* endpoint = \
(mca_pml_dr_endpoint_t*)sendreq->req_send.req_base.req_proc->proc_pml; \
mca_pml_dr_comm_proc_t* proc = \
comm->procs + sendreq->req_send.req_base.req_peer; \
mca_bml_base_btl_t* bml_btl; \
size_t size = sendreq->req_send.req_bytes_packed; \
size_t eager_limit; \
@ -154,9 +156,9 @@ do {
break; \
} \
\
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->base.btl_eager); \
MCA_PML_DR_VFRAG_INIT(&sendreq->req_vfrag0); \
sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \
sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&endpoint->vfrag_seq,1); \
sendreq->req_vfrag = &sendreq->req_vfrag0; \
sendreq->req_endpoint = endpoint; \
sendreq->req_proc = proc; \
@ -274,8 +276,8 @@ do {
#define MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq, endpoint, size, vfrag) \
do { \
mca_pml_dr_comm_t* comm = sendreq->req_send.req_base.req_comm->c_pml_comm; \
mca_pml_dr_comm_proc_t* proc = comm->procs + sendreq->req_send.req_base.req_peer; \
size_t max_send_size = endpoint->btl_max_send_size - sizeof(mca_pml_dr_frag_hdr_t); \
size_t max_send_size = endpoint->base.btl_max_send_size - \
sizeof(mca_pml_dr_frag_hdr_t); \
size_t div = size / max_send_size; \
\
MCA_PML_DR_VFRAG_INIT(vfrag); \
@ -301,7 +303,7 @@ do {
else \
vfrag->vf_mask = (((uint64_t)1 << vfrag->vf_len) - (uint64_t)1); \
} \
vfrag->vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \
vfrag->vf_id = OPAL_THREAD_ADD32(&endpoint->vfrag_seq,1); \
vfrag->vf_offset = sendreq->req_send_offset; \
vfrag->vf_max_send_size = max_send_size; \
vfrag->vf_send.pval = sendreq; \
@ -354,8 +356,9 @@ do { \
#define MCA_PML_DR_SEND_REQUEST_EAGER_RETRY(sendreq, vfrag) \
do { \
mca_bml_base_endpoint_t* endpoint = sendreq->req_endpoint; \
mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \
mca_pml_dr_endpoint_t* endpoint = sendreq->req_endpoint; \
mca_bml_base_btl_t* bml_btl = \
mca_bml_base_btl_array_get_next(&endpoint->base.btl_eager); \
mca_btl_base_descriptor_t *des_old, *des_new; \
\
if(++vfrag->vf_retry_cnt > mca_pml_dr.timer_ack_max_count) { \
@ -384,8 +387,9 @@ do { \
#define MCA_PML_DR_SEND_REQUEST_RNDV_PROBE(sendreq, vfrag) \
do { \
mca_bml_base_endpoint_t* endpoint = sendreq->req_endpoint; \
mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \
mca_pml_dr_endpoint_t* endpoint = sendreq->req_endpoint; \
mca_bml_base_btl_t* bml_btl = \
mca_bml_base_btl_array_get_next(&endpoint->base.btl_eager); \
mca_btl_base_descriptor_t *des_old, *des_new; \
mca_pml_dr_hdr_t *hdr; \
\
@ -405,9 +409,9 @@ do { \
hdr = (mca_pml_dr_hdr_t*)des_new->des_src->seg_addr.pval; \
hdr->hdr_common.hdr_flags = 0; \
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV; \
hdr->hdr_common.hdr_dst = sendreq->req_send.req_base.req_peer; \
hdr->hdr_common.hdr_dst = endpoint->dst; \
hdr->hdr_common.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; \
hdr->hdr_common.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; \
hdr->hdr_common.hdr_src = endpoint->src; \
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; \
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; \
hdr->hdr_match.hdr_src_ptr.pval = &sendreq->req_vfrag0; \

Просмотреть файл

@ -222,9 +222,11 @@ mca_pml_base_module_t* mca_pml_ob1_component_init(int* priority,
}
if(OMPI_SUCCESS != mca_bml_base_init( enable_progress_threads, enable_mpi_threads))
if(OMPI_SUCCESS != mca_bml_base_init( enable_progress_threads,
enable_mpi_threads,
OBJ_CLASS(mca_bml_base_endpoint_t))) {
return NULL;
}
/* As our own progress function does nothing except calling the BML
* progress, let's modify the progress function pointer in our structure
* to avoid useless functions calls. The event library will instead call

Просмотреть файл

@ -163,6 +163,7 @@ struct opal_class_t {
/**< array of parent class constructors */
opal_destruct_t *cls_destruct_array;
/**< array of parent class destructors */
size_t cls_sizeof; /**< size of an object instance */
};
/**
@ -207,7 +208,8 @@ struct opal_object_t {
OBJ_CLASS(PARENT), \
(opal_construct_t) CONSTRUCTOR, \
(opal_destruct_t) DESTRUCTOR, \
0, 0, NULL, NULL \
0, 0, NULL, NULL, \
sizeof(NAME) \
}
@ -229,20 +231,20 @@ struct opal_object_t {
* @param type Type (class) of the object
* @return Pointer to the object
*/
static inline opal_object_t *opal_obj_new(size_t size, opal_class_t * cls);
static inline opal_object_t *opal_obj_new(opal_class_t * cls);
#if OMPI_ENABLE_DEBUG
static inline opal_object_t *opal_obj_new_debug(size_t obj_size, opal_class_t* type, const char* file, int line)
static inline opal_object_t *opal_obj_new_debug(opal_class_t* type, const char* file, int line)
{
opal_object_t* object = opal_obj_new(obj_size, type);
opal_object_t* object = opal_obj_new(type);
object->cls_init_file_name = file;
object->cls_init_lineno = line;
return object;
}
#define OBJ_NEW(type) \
((type *)opal_obj_new_debug(sizeof(type), OBJ_CLASS(type), __FILE__, __LINE__))
((type *)opal_obj_new_debug(OBJ_CLASS(type), __FILE__, __LINE__))
#else
#define OBJ_NEW(type) \
((type *) opal_obj_new(sizeof(type), OBJ_CLASS(type)))
((type *) opal_obj_new(OBJ_CLASS(type)))
#endif /* OMPI_ENABLE_DEBUG */
/**
@ -416,13 +418,12 @@ static inline void opal_obj_run_destructors(opal_object_t * object)
* @param cls Pointer to the class descriptor of this object
* @return Pointer to the object
*/
static inline opal_object_t *opal_obj_new(size_t size, opal_class_t * cls)
static inline opal_object_t *opal_obj_new(opal_class_t * cls)
{
opal_object_t *object;
assert(cls->cls_sizeof >= sizeof(opal_object_t));
assert(size >= sizeof(opal_object_t));
object = (opal_object_t *) malloc(size);
object = (opal_object_t *) malloc(cls->cls_sizeof);
if (0 == cls->cls_initialized) {
opal_class_initialize(cls);
}