Seperate out sequence tracking list as stand alone class.
This commit was SVN r9391.
Этот коммит содержится в:
родитель
c1bec478c4
Коммит
e01cf0a166
@ -25,11 +25,13 @@ headers += \
|
||||
class/ompi_free_list.h \
|
||||
class/ompi_bitmap.h \
|
||||
class/ompi_pointer_array.h \
|
||||
class/ompi_rb_tree.h
|
||||
class/ompi_rb_tree.h \
|
||||
class/ompi_seq_tracker.h
|
||||
|
||||
libmpi_la_SOURCES += \
|
||||
class/ompi_bitmap.c \
|
||||
class/ompi_free_list.c \
|
||||
class/ompi_pointer_array.c \
|
||||
class/ompi_rb_tree.c
|
||||
class/ompi_rb_tree.c \
|
||||
class/ompi_seq_tracker.c
|
||||
|
||||
|
182
ompi/class/ompi_seq_tracker.c
Обычный файл
182
ompi/class/ompi_seq_tracker.c
Обычный файл
@ -0,0 +1,182 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "ompi/class/ompi_seq_tracker.h"
|
||||
#include "opal/sys/cache.h"
|
||||
#include "opal/util/output.h"
|
||||
|
||||
|
||||
|
||||
OBJ_CLASS_INSTANCE(ompi_seq_tracker_range_t,
|
||||
opal_list_item_t,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
|
||||
static void ompi_seq_tracker_construct(ompi_seq_tracker_t* seq_tracker) {
|
||||
OBJ_CONSTRUCT(&seq_tracker->seq_ids, opal_list_t);
|
||||
seq_tracker->seq_ids_current = NULL;
|
||||
}
|
||||
|
||||
|
||||
static void ompi_seq_tracker_destruct(ompi_seq_tracker_t* seq_tracker) {
|
||||
OBJ_DESTRUCT(&seq_tracker->seq_ids);
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
ompi_seq_tracker_t,
|
||||
opal_object_t,
|
||||
ompi_seq_tracker_construct,
|
||||
ompi_seq_tracker_destruct);
|
||||
|
||||
|
||||
/**
|
||||
* Look for duplicate sequence number in current range.
|
||||
* Must be called w/ matching lock held.
|
||||
*/
|
||||
|
||||
bool ompi_seq_tracker_check_duplicate(
|
||||
ompi_seq_tracker_t* seq_tracker,
|
||||
uint32_t seq_id)
|
||||
{
|
||||
ompi_seq_tracker_range_t* item;
|
||||
int8_t direction = 0; /* 1 is next, -1 is previous */
|
||||
|
||||
item = seq_tracker->seq_ids_current;
|
||||
while(true) {
|
||||
if(NULL == item) {
|
||||
return false;
|
||||
} else if(item->seq_id_high >= seq_id && item->seq_id_low <= seq_id) {
|
||||
seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) item;
|
||||
return true;
|
||||
} else if(seq_id > item->seq_id_high && direction != -1) {
|
||||
direction = 1;
|
||||
item = (ompi_seq_tracker_range_t*) opal_list_get_next(item);
|
||||
} else if(seq_id < item->seq_id_low && direction != 1) {
|
||||
direction = -1;
|
||||
item = (ompi_seq_tracker_range_t*) opal_list_get_prev(item);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* insert item into sequence tracking list,
|
||||
* compacts continuous regions into a single entry
|
||||
*/
|
||||
void ompi_seq_tracker_insert(ompi_seq_tracker_t* seq_tracker,
|
||||
uint32_t seq_id)
|
||||
{
|
||||
opal_list_t* seq_ids = &seq_tracker->seq_ids;
|
||||
ompi_seq_tracker_range_t* item = seq_tracker->seq_ids_current;
|
||||
int8_t direction = 0; /* 1 is next, -1 is previous */
|
||||
ompi_seq_tracker_range_t *new_item, *next_item, *prev_item;
|
||||
while(true) {
|
||||
if( item == NULL || item == (ompi_seq_tracker_range_t*) &seq_ids->opal_list_tail ) {
|
||||
new_item = OBJ_NEW(ompi_seq_tracker_range_t);
|
||||
new_item->seq_id_low = new_item->seq_id_high = seq_id;
|
||||
opal_list_append(seq_ids, (opal_list_item_t*) new_item);
|
||||
seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) new_item;
|
||||
return;
|
||||
} else if( item == (ompi_seq_tracker_range_t*) &seq_ids->opal_list_head ) {
|
||||
new_item = OBJ_NEW(ompi_seq_tracker_range_t);
|
||||
new_item->seq_id_low = new_item->seq_id_high = seq_id;
|
||||
opal_list_prepend(seq_ids, (opal_list_item_t*) new_item);
|
||||
seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) new_item;
|
||||
return;
|
||||
|
||||
} else if(item->seq_id_high >= seq_id && item->seq_id_low <= seq_id ) {
|
||||
|
||||
seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) item;
|
||||
return;
|
||||
|
||||
} else if((item->seq_id_high + 1) == seq_id) {
|
||||
|
||||
next_item = (ompi_seq_tracker_range_t*) opal_list_get_next(item);
|
||||
/* try to consolidate */
|
||||
if(next_item && next_item->seq_id_low == (seq_id+1)) {
|
||||
item->seq_id_high = next_item->seq_id_high;
|
||||
opal_list_remove_item(seq_ids, (opal_list_item_t*) next_item);
|
||||
OBJ_RELEASE(next_item);
|
||||
} else {
|
||||
item->seq_id_high = seq_id;
|
||||
}
|
||||
seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) item;
|
||||
return;
|
||||
|
||||
} else if((item->seq_id_low - 1) == seq_id) {
|
||||
|
||||
prev_item = (ompi_seq_tracker_range_t*) opal_list_get_prev(item);
|
||||
/* try to consolidate */
|
||||
if(prev_item && prev_item->seq_id_high == (seq_id-1)) {
|
||||
item->seq_id_low = prev_item->seq_id_low;
|
||||
opal_list_remove_item(seq_ids, (opal_list_item_t*) prev_item);
|
||||
OBJ_RELEASE(prev_item);
|
||||
} else {
|
||||
item->seq_id_low = seq_id;
|
||||
}
|
||||
seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) item;
|
||||
return;
|
||||
|
||||
} else if(seq_id > item->seq_id_high ) {
|
||||
if(direction == -1) {
|
||||
/* we have gone back in the list, and we went one item too far */
|
||||
new_item = OBJ_NEW(ompi_seq_tracker_range_t);
|
||||
new_item->seq_id_low = new_item->seq_id_high = seq_id;
|
||||
/* insert new_item directly before item */
|
||||
opal_list_insert_pos(seq_ids,
|
||||
(opal_list_item_t*) item,
|
||||
(opal_list_item_t*) new_item);
|
||||
seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) new_item;
|
||||
return;
|
||||
} else {
|
||||
direction = 1;
|
||||
item = (ompi_seq_tracker_range_t*) opal_list_get_next(item);
|
||||
}
|
||||
} else if(seq_id < item->seq_id_low) {
|
||||
if(direction == 1) {
|
||||
/* we have gone forward in the list, and we went one item too far */
|
||||
new_item = OBJ_NEW(ompi_seq_tracker_range_t);
|
||||
next_item = (ompi_seq_tracker_range_t*) opal_list_get_next(item);
|
||||
if(NULL == next_item) {
|
||||
opal_list_append(seq_ids, (opal_list_item_t*) new_item);
|
||||
} else {
|
||||
opal_list_insert_pos(seq_ids,
|
||||
(opal_list_item_t*) next_item,
|
||||
(opal_list_item_t*) new_item);
|
||||
}
|
||||
seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) new_item;
|
||||
return;
|
||||
} else {
|
||||
direction = -1;
|
||||
item = (ompi_seq_tracker_range_t*) opal_list_get_prev(item);
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
73
ompi/class/ompi_seq_tracker.h
Обычный файл
73
ompi/class/ompi_seq_tracker.h
Обычный файл
@ -0,0 +1,73 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#ifndef OMPI_SEQ_TRACKER_H
|
||||
#define OMPI_SEQ_TRACKER_H
|
||||
|
||||
#include "ompi_config.h"
|
||||
#include "opal/class/opal_list.h"
|
||||
#include "opal/threads/threads.h"
|
||||
#include "opal/threads/condition.h"
|
||||
#include "ompi/constants.h"
|
||||
#include "ompi/mca/mpool/mpool.h"
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
struct ompi_seq_tracker_range_t{
|
||||
opal_list_item_t super;
|
||||
uint32_t seq_id_high;
|
||||
uint32_t seq_id_low;
|
||||
};
|
||||
typedef struct ompi_seq_tracker_range_t ompi_seq_tracker_range_t;
|
||||
|
||||
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_seq_tracker_range_t);
|
||||
|
||||
struct ompi_seq_tracker_t{
|
||||
opal_list_t seq_ids; /**< list of seqs id's that have been seen */
|
||||
ompi_seq_tracker_range_t* seq_ids_current; /**< a pointer to the last place we were in the list */
|
||||
|
||||
};
|
||||
typedef struct ompi_seq_tracker_t ompi_seq_tracker_t;
|
||||
|
||||
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_seq_tracker_t);
|
||||
|
||||
|
||||
/**
|
||||
* Look for duplicate sequence number in current range.
|
||||
* Must be called w/ matching lock held.
|
||||
*/
|
||||
|
||||
bool ompi_seq_tracker_check_duplicate(
|
||||
ompi_seq_tracker_t* seq_tracker,
|
||||
uint32_t seq_id);
|
||||
|
||||
|
||||
/*
|
||||
* insert item into sequence tracking list,
|
||||
* compacts continuous regions into a single entry
|
||||
*/
|
||||
void ompi_seq_tracker_insert(ompi_seq_tracker_t* seq_tracker,
|
||||
uint32_t seq_i);
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
@ -22,27 +22,6 @@
|
||||
#include "pml_dr.h"
|
||||
#include "pml_dr_comm.h"
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_pml_dr_range_t,
|
||||
opal_list_item_t,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
|
||||
static void mca_pml_dr_seq_tracker_construct(mca_pml_dr_seq_tracker_t* seq_tracker) {
|
||||
OBJ_CONSTRUCT(&seq_tracker->vfrag_ids, opal_list_t);
|
||||
seq_tracker->vfrag_ids_current = NULL;
|
||||
}
|
||||
|
||||
|
||||
static void mca_pml_dr_seq_tracker_destruct(mca_pml_dr_seq_tracker_t* seq_tracker) {
|
||||
OBJ_DESTRUCT(&seq_tracker->vfrag_ids);
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
mca_pml_dr_seq_tracker_t,
|
||||
opal_object_t,
|
||||
mca_pml_dr_seq_tracker_construct,
|
||||
mca_pml_dr_seq_tracker_destruct);
|
||||
|
||||
static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc)
|
||||
{
|
||||
@ -53,8 +32,8 @@ static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc)
|
||||
OBJ_CONSTRUCT(&proc->specific_receives, opal_list_t);
|
||||
OBJ_CONSTRUCT(&proc->matched_receives, opal_list_t);
|
||||
OBJ_CONSTRUCT(&proc->unexpected_frags, opal_list_t);
|
||||
OBJ_CONSTRUCT(&proc->seq_sends, mca_pml_dr_seq_tracker_t);
|
||||
OBJ_CONSTRUCT(&proc->seq_recvs, mca_pml_dr_seq_tracker_t);
|
||||
OBJ_CONSTRUCT(&proc->seq_sends, ompi_seq_tracker_t);
|
||||
OBJ_CONSTRUCT(&proc->seq_recvs, ompi_seq_tracker_t);
|
||||
}
|
||||
|
||||
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "opal/threads/mutex.h"
|
||||
#include "opal/threads/condition.h"
|
||||
#include "opal/class/opal_list.h"
|
||||
#include "ompi/class/ompi_seq_tracker.h"
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "ompi/proc/proc.h"
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
@ -31,24 +32,6 @@ extern "C" {
|
||||
#endif
|
||||
|
||||
|
||||
struct mca_pml_dr_range_t{
|
||||
opal_list_item_t super;
|
||||
uint32_t vfrag_id_high;
|
||||
uint32_t vfrag_id_low;
|
||||
};
|
||||
typedef struct mca_pml_dr_range_t mca_pml_dr_range_t;
|
||||
|
||||
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_range_t);
|
||||
|
||||
struct mca_pml_dr_seq_tracker_t{
|
||||
opal_list_t vfrag_ids; /**< list of vfrags id's that have been seen */
|
||||
mca_pml_dr_range_t* vfrag_ids_current; /**< a pointer to the last place we were in the list */
|
||||
|
||||
};
|
||||
typedef struct mca_pml_dr_seq_tracker_t mca_pml_dr_seq_tracker_t;
|
||||
|
||||
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_seq_tracker_t);
|
||||
|
||||
struct mca_pml_dr_comm_proc_t {
|
||||
opal_object_t super;
|
||||
uint16_t expected_sequence; /**< send message sequence number - receiver side */
|
||||
@ -63,8 +46,8 @@ struct mca_pml_dr_comm_proc_t {
|
||||
opal_list_t unexpected_frags; /**< unexpected fragment queues */
|
||||
opal_list_t matched_receives; /**< list of in-progress matched receives */
|
||||
ompi_proc_t* ompi_proc; /**< back pointer to ompi_proc_t */
|
||||
mca_pml_dr_seq_tracker_t seq_sends; /**< Tracks the send vfrags that have been acked */
|
||||
mca_pml_dr_seq_tracker_t seq_recvs; /**< Tracks the receive vfrags that have been acked */
|
||||
ompi_seq_tracker_t seq_sends; /**< Tracks the send vfrags that have been acked */
|
||||
ompi_seq_tracker_t seq_recvs; /**< Tracks the receive vfrags that have been acked */
|
||||
};
|
||||
typedef struct mca_pml_dr_comm_proc_t mca_pml_dr_comm_proc_t;
|
||||
|
||||
@ -99,134 +82,6 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_comm_t);
|
||||
|
||||
OMPI_DECLSPEC extern int mca_pml_dr_comm_init(mca_pml_dr_comm_t* dr_comm, ompi_communicator_t* ompi_comm);
|
||||
|
||||
/**
|
||||
* Look for duplicate sequence number in current range.
|
||||
* Must be called w/ matching lock held.
|
||||
*/
|
||||
|
||||
static inline bool mca_pml_dr_comm_proc_check_duplicate(
|
||||
mca_pml_dr_seq_tracker_t* seq_tracker,
|
||||
uint32_t vfrag_id)
|
||||
{
|
||||
mca_pml_dr_range_t* item;
|
||||
int8_t direction = 0; /* 1 is next, -1 is previous */
|
||||
|
||||
item = seq_tracker->vfrag_ids_current;
|
||||
while(true) {
|
||||
if(NULL == item) {
|
||||
return false;
|
||||
} else if(item->vfrag_id_high >= vfrag_id && item->vfrag_id_low <= vfrag_id) {
|
||||
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) item;
|
||||
return true;
|
||||
} else if(vfrag_id > item->vfrag_id_high && direction != -1) {
|
||||
direction = 1;
|
||||
item = (mca_pml_dr_range_t*) opal_list_get_next(item);
|
||||
} else if(vfrag_id < item->vfrag_id_low && direction != 1) {
|
||||
direction = -1;
|
||||
item = (mca_pml_dr_range_t*) opal_list_get_prev(item);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Must be called w/ matching lock held
|
||||
*/
|
||||
static inline void mca_pml_dr_comm_proc_set_vid(mca_pml_dr_seq_tracker_t* seq_tracker,
|
||||
uint32_t vfrag_id)
|
||||
{
|
||||
opal_list_t* vfrag_ids = &seq_tracker->vfrag_ids;
|
||||
mca_pml_dr_range_t* item = seq_tracker->vfrag_ids_current;
|
||||
int8_t direction = 0; /* 1 is next, -1 is previous */
|
||||
mca_pml_dr_range_t *new_item, *next_item, *prev_item;
|
||||
while(true) {
|
||||
if( item == NULL || item == (mca_pml_dr_range_t*) &vfrag_ids->opal_list_tail ) {
|
||||
new_item = OBJ_NEW(mca_pml_dr_range_t);
|
||||
new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id;
|
||||
opal_list_append(vfrag_ids, (opal_list_item_t*) new_item);
|
||||
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
|
||||
return;
|
||||
} else if( item == (mca_pml_dr_range_t*) &vfrag_ids->opal_list_head ) {
|
||||
new_item = OBJ_NEW(mca_pml_dr_range_t);
|
||||
new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id;
|
||||
opal_list_prepend(vfrag_ids, (opal_list_item_t*) new_item);
|
||||
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
|
||||
return;
|
||||
|
||||
} else if(item->vfrag_id_high >= vfrag_id && item->vfrag_id_low <= vfrag_id ) {
|
||||
|
||||
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) item;
|
||||
return;
|
||||
|
||||
} else if((item->vfrag_id_high + 1) == vfrag_id) {
|
||||
|
||||
next_item = (mca_pml_dr_range_t*) opal_list_get_next(item);
|
||||
/* try to consolidate */
|
||||
if(next_item && next_item->vfrag_id_low == (vfrag_id+1)) {
|
||||
item->vfrag_id_high = next_item->vfrag_id_high;
|
||||
opal_list_remove_item(vfrag_ids, (opal_list_item_t*) next_item);
|
||||
OBJ_RELEASE(next_item);
|
||||
} else {
|
||||
item->vfrag_id_high = vfrag_id;
|
||||
}
|
||||
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) item;
|
||||
return;
|
||||
|
||||
} else if((item->vfrag_id_low - 1) == vfrag_id) {
|
||||
|
||||
prev_item = (mca_pml_dr_range_t*) opal_list_get_prev(item);
|
||||
/* try to consolidate */
|
||||
if(prev_item && prev_item->vfrag_id_high == (vfrag_id-1)) {
|
||||
item->vfrag_id_low = prev_item->vfrag_id_low;
|
||||
opal_list_remove_item(vfrag_ids, (opal_list_item_t*) prev_item);
|
||||
OBJ_RELEASE(prev_item);
|
||||
} else {
|
||||
item->vfrag_id_low = vfrag_id;
|
||||
}
|
||||
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) item;
|
||||
return;
|
||||
|
||||
} else if(vfrag_id > item->vfrag_id_high ) {
|
||||
if(direction == -1) {
|
||||
/* we have gone back in the list, and we went one item too far */
|
||||
new_item = OBJ_NEW(mca_pml_dr_range_t);
|
||||
new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id;
|
||||
/* insert new_item directly before item */
|
||||
opal_list_insert_pos(vfrag_ids,
|
||||
(opal_list_item_t*) item,
|
||||
(opal_list_item_t*) new_item);
|
||||
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
|
||||
return;
|
||||
} else {
|
||||
direction = 1;
|
||||
item = (mca_pml_dr_range_t*) opal_list_get_next(item);
|
||||
}
|
||||
} else if(vfrag_id < item->vfrag_id_low) {
|
||||
if(direction == 1) {
|
||||
/* we have gone forward in the list, and we went one item too far */
|
||||
new_item = OBJ_NEW(mca_pml_dr_range_t);
|
||||
next_item = (mca_pml_dr_range_t*) opal_list_get_next(item);
|
||||
if(NULL == next_item) {
|
||||
opal_list_append(vfrag_ids, (opal_list_item_t*) new_item);
|
||||
} else {
|
||||
opal_list_insert_pos(vfrag_ids,
|
||||
(opal_list_item_t*) next_item,
|
||||
(opal_list_item_t*) new_item);
|
||||
}
|
||||
seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) new_item;
|
||||
return;
|
||||
} else {
|
||||
direction = -1;
|
||||
item = (mca_pml_dr_range_t*) opal_list_get_prev(item);
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
|
@ -29,6 +29,7 @@
|
||||
#include "ompi/communicator/communicator.h"
|
||||
#include "ompi/datatype/datatype.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
#include "ompi/class/ompi_seq_tracker.h"
|
||||
#include "pml_dr.h"
|
||||
#include "pml_dr_comm.h"
|
||||
#include "pml_dr_recvfrag.h"
|
||||
@ -121,7 +122,7 @@ void mca_pml_dr_recv_frag_callback(
|
||||
|
||||
/* seq_recvs protected by matching lock */
|
||||
OPAL_THREAD_LOCK(&comm->matching_lock);
|
||||
if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) {
|
||||
if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) {
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
OPAL_OUTPUT((0, "%s:%d: acking duplicate match\n", __FILE__, __LINE__));
|
||||
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml,
|
||||
@ -141,7 +142,7 @@ void mca_pml_dr_recv_frag_callback(
|
||||
|
||||
/* seq_sends protected by ompi_request lock*/
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
|
||||
if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
mca_pml_dr_send_request_match_ack(btl, &hdr->hdr_ack);
|
||||
} else {
|
||||
@ -156,7 +157,7 @@ void mca_pml_dr_recv_frag_callback(
|
||||
|
||||
/* seq_recvs protected by matching lock */
|
||||
OPAL_THREAD_LOCK(&comm->matching_lock);
|
||||
if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) {
|
||||
if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) {
|
||||
/* ack only if the vfrag has been matched */
|
||||
mca_pml_dr_recv_request_t* recvreq =
|
||||
mca_pml_dr_comm_proc_check_matched(proc, hdr->hdr_common.hdr_vid);
|
||||
@ -181,7 +182,7 @@ void mca_pml_dr_recv_frag_callback(
|
||||
|
||||
/* seq_sends protected by ompi_request lock*/
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
|
||||
if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
mca_pml_dr_send_request_rndv_ack(btl, &hdr->hdr_ack);
|
||||
} else {
|
||||
@ -197,7 +198,7 @@ void mca_pml_dr_recv_frag_callback(
|
||||
|
||||
/* seq_recvs protected by matching lock */
|
||||
OPAL_THREAD_LOCK(&comm->matching_lock);
|
||||
if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) {
|
||||
if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) {
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
OPAL_OUTPUT((0, "%s:%d: acking duplicate fragment\n", __FILE__, __LINE__));
|
||||
mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml,
|
||||
@ -219,7 +220,7 @@ void mca_pml_dr_recv_frag_callback(
|
||||
|
||||
/* seq_sends protected by ompi_request lock*/
|
||||
OPAL_THREAD_LOCK(&ompi_request_lock);
|
||||
if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
|
||||
if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) {
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
mca_pml_dr_send_request_frag_ack(btl, &hdr->hdr_ack);
|
||||
} else {
|
||||
@ -659,7 +660,7 @@ rematch:
|
||||
opal_list_append(&proc->frags_cant_match, (opal_list_item_t *)frag);
|
||||
}
|
||||
|
||||
mca_pml_dr_comm_proc_set_vid(&proc->seq_recvs, hdr->hdr_common.hdr_vid);
|
||||
ompi_seq_tracker_insert(&proc->seq_recvs, hdr->hdr_common.hdr_vid);
|
||||
OPAL_THREAD_UNLOCK(&comm->matching_lock);
|
||||
|
||||
/* release matching lock before processing fragment */
|
||||
|
@ -287,7 +287,7 @@ void mca_pml_dr_recv_request_progress(
|
||||
if((vfrag->vf_mask_pending & vfrag->vf_mask) == vfrag->vf_mask) {
|
||||
/* we have received all the pieces of the vfrag, ack
|
||||
everything that passed the checksum */
|
||||
mca_pml_dr_comm_proc_set_vid(&recvreq->req_proc->seq_recvs, vfrag->vf_id);
|
||||
ompi_seq_tracker_insert(&recvreq->req_proc->seq_recvs, vfrag->vf_id);
|
||||
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common,
|
||||
hdr->hdr_frag.hdr_src_ptr, vfrag->vf_size, vfrag->vf_mask);
|
||||
}
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "ompi/constants.h"
|
||||
#include "ompi/mca/pml/pml.h"
|
||||
#include "ompi/mca/btl/btl.h"
|
||||
#include "ompi/class/ompi_seq_tracker.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "ompi/mca/mpool/mpool.h"
|
||||
#include "pml_dr.h"
|
||||
@ -135,7 +136,7 @@ static void mca_pml_dr_match_completion(
|
||||
|
||||
/* update statistics and complete */
|
||||
sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed;
|
||||
mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
|
||||
ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id);
|
||||
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
|
||||
|
||||
/* on negative ack need to retransmit */
|
||||
@ -189,7 +190,7 @@ static void mca_pml_dr_rndv_completion(
|
||||
}
|
||||
|
||||
/* update statistics and complete */
|
||||
mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
|
||||
ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id);
|
||||
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
|
||||
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
|
||||
} else {
|
||||
@ -257,7 +258,7 @@ static void mca_pml_dr_frag_completion(
|
||||
}
|
||||
|
||||
/* record vfrag id to drop duplicate acks */
|
||||
mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
|
||||
ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id);
|
||||
|
||||
/* return this vfrag */
|
||||
MCA_PML_DR_VFRAG_RETURN(vfrag);
|
||||
@ -893,7 +894,7 @@ void mca_pml_dr_send_request_match_ack(
|
||||
|
||||
/* update statistics */
|
||||
sendreq->req_bytes_delivered = vfrag->vf_size;
|
||||
mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
|
||||
ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id);
|
||||
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
|
||||
}
|
||||
|
||||
@ -952,7 +953,7 @@ void mca_pml_dr_send_request_rndv_ack(
|
||||
schedule = true;
|
||||
}
|
||||
/* stash the vfrag id for duplicate acks.. */
|
||||
mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
|
||||
ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id);
|
||||
OPAL_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
||||
if(schedule) {
|
||||
@ -994,7 +995,6 @@ void mca_pml_dr_send_request_frag_ack(
|
||||
|
||||
/* need to retransmit? */
|
||||
if(vfrag->vf_ack != vfrag->vf_mask) {
|
||||
|
||||
vfrag->vf_idx = 0;
|
||||
vfrag->vf_mask_pending = 0;
|
||||
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
|
||||
@ -1008,7 +1008,7 @@ void mca_pml_dr_send_request_frag_ack(
|
||||
assert(sendreq->req_bytes_delivered <= sendreq->req_send.req_bytes_packed);
|
||||
|
||||
/* stash the vfid for duplicate acks.. */
|
||||
mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id);
|
||||
ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id);
|
||||
/* return vfrag */
|
||||
MCA_PML_DR_VFRAG_RETURN(vfrag);
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user