diff --git a/ompi/class/Makefile.am b/ompi/class/Makefile.am index 40e1e28b72..96cdca075d 100644 --- a/ompi/class/Makefile.am +++ b/ompi/class/Makefile.am @@ -25,11 +25,13 @@ headers += \ class/ompi_free_list.h \ class/ompi_bitmap.h \ class/ompi_pointer_array.h \ - class/ompi_rb_tree.h + class/ompi_rb_tree.h \ + class/ompi_seq_tracker.h libmpi_la_SOURCES += \ class/ompi_bitmap.c \ class/ompi_free_list.c \ class/ompi_pointer_array.c \ - class/ompi_rb_tree.c + class/ompi_rb_tree.c \ + class/ompi_seq_tracker.c diff --git a/ompi/class/ompi_seq_tracker.c b/ompi/class/ompi_seq_tracker.c new file mode 100644 index 0000000000..a0d88e6623 --- /dev/null +++ b/ompi/class/ompi_seq_tracker.c @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/class/ompi_seq_tracker.h" +#include "opal/sys/cache.h" +#include "opal/util/output.h" + + + +OBJ_CLASS_INSTANCE(ompi_seq_tracker_range_t, + opal_list_item_t, + NULL, + NULL); + + +static void ompi_seq_tracker_construct(ompi_seq_tracker_t* seq_tracker) { + OBJ_CONSTRUCT(&seq_tracker->seq_ids, opal_list_t); + seq_tracker->seq_ids_current = NULL; +} + + +static void ompi_seq_tracker_destruct(ompi_seq_tracker_t* seq_tracker) { + OBJ_DESTRUCT(&seq_tracker->seq_ids); +} + +OBJ_CLASS_INSTANCE( + ompi_seq_tracker_t, + opal_object_t, + ompi_seq_tracker_construct, + ompi_seq_tracker_destruct); + + +/** + * Look for duplicate sequence number in current range. + * Must be called w/ matching lock held. + */ + +bool ompi_seq_tracker_check_duplicate( + ompi_seq_tracker_t* seq_tracker, + uint32_t seq_id) +{ + ompi_seq_tracker_range_t* item; + int8_t direction = 0; /* 1 is next, -1 is previous */ + + item = seq_tracker->seq_ids_current; + while(true) { + if(NULL == item) { + return false; + } else if(item->seq_id_high >= seq_id && item->seq_id_low <= seq_id) { + seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) item; + return true; + } else if(seq_id > item->seq_id_high && direction != -1) { + direction = 1; + item = (ompi_seq_tracker_range_t*) opal_list_get_next(item); + } else if(seq_id < item->seq_id_low && direction != 1) { + direction = -1; + item = (ompi_seq_tracker_range_t*) opal_list_get_prev(item); + } else { + return false; + } + } +} + + +/* + * insert item into sequence tracking list, + * compacts continuous regions into a single entry + */ +void ompi_seq_tracker_insert(ompi_seq_tracker_t* seq_tracker, + uint32_t seq_id) +{ + opal_list_t* seq_ids = &seq_tracker->seq_ids; + ompi_seq_tracker_range_t* item = seq_tracker->seq_ids_current; + int8_t direction = 0; /* 1 is next, -1 is previous */ + ompi_seq_tracker_range_t *new_item, *next_item, *prev_item; + while(true) { + if( item == NULL || item == (ompi_seq_tracker_range_t*) &seq_ids->opal_list_tail ) { + new_item = OBJ_NEW(ompi_seq_tracker_range_t); + new_item->seq_id_low = new_item->seq_id_high = seq_id; + opal_list_append(seq_ids, (opal_list_item_t*) new_item); + seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) new_item; + return; + } else if( item == (ompi_seq_tracker_range_t*) &seq_ids->opal_list_head ) { + new_item = OBJ_NEW(ompi_seq_tracker_range_t); + new_item->seq_id_low = new_item->seq_id_high = seq_id; + opal_list_prepend(seq_ids, (opal_list_item_t*) new_item); + seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) new_item; + return; + + } else if(item->seq_id_high >= seq_id && item->seq_id_low <= seq_id ) { + + seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) item; + return; + + } else if((item->seq_id_high + 1) == seq_id) { + + next_item = (ompi_seq_tracker_range_t*) opal_list_get_next(item); + /* try to consolidate */ + if(next_item && next_item->seq_id_low == (seq_id+1)) { + item->seq_id_high = next_item->seq_id_high; + opal_list_remove_item(seq_ids, (opal_list_item_t*) next_item); + OBJ_RELEASE(next_item); + } else { + item->seq_id_high = seq_id; + } + seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) item; + return; + + } else if((item->seq_id_low - 1) == seq_id) { + + prev_item = (ompi_seq_tracker_range_t*) opal_list_get_prev(item); + /* try to consolidate */ + if(prev_item && prev_item->seq_id_high == (seq_id-1)) { + item->seq_id_low = prev_item->seq_id_low; + opal_list_remove_item(seq_ids, (opal_list_item_t*) prev_item); + OBJ_RELEASE(prev_item); + } else { + item->seq_id_low = seq_id; + } + seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) item; + return; + + } else if(seq_id > item->seq_id_high ) { + if(direction == -1) { + /* we have gone back in the list, and we went one item too far */ + new_item = OBJ_NEW(ompi_seq_tracker_range_t); + new_item->seq_id_low = new_item->seq_id_high = seq_id; + /* insert new_item directly before item */ + opal_list_insert_pos(seq_ids, + (opal_list_item_t*) item, + (opal_list_item_t*) new_item); + seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) new_item; + return; + } else { + direction = 1; + item = (ompi_seq_tracker_range_t*) opal_list_get_next(item); + } + } else if(seq_id < item->seq_id_low) { + if(direction == 1) { + /* we have gone forward in the list, and we went one item too far */ + new_item = OBJ_NEW(ompi_seq_tracker_range_t); + next_item = (ompi_seq_tracker_range_t*) opal_list_get_next(item); + if(NULL == next_item) { + opal_list_append(seq_ids, (opal_list_item_t*) new_item); + } else { + opal_list_insert_pos(seq_ids, + (opal_list_item_t*) next_item, + (opal_list_item_t*) new_item); + } + seq_tracker->seq_ids_current = (ompi_seq_tracker_range_t*) new_item; + return; + } else { + direction = -1; + item = (ompi_seq_tracker_range_t*) opal_list_get_prev(item); + } + } else { + return; + } + } +} + + + + + diff --git a/ompi/class/ompi_seq_tracker.h b/ompi/class/ompi_seq_tracker.h new file mode 100644 index 0000000000..3117e50a81 --- /dev/null +++ b/ompi/class/ompi_seq_tracker.h @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OMPI_SEQ_TRACKER_H +#define OMPI_SEQ_TRACKER_H + +#include "ompi_config.h" +#include "opal/class/opal_list.h" +#include "opal/threads/threads.h" +#include "opal/threads/condition.h" +#include "ompi/constants.h" +#include "ompi/mca/mpool/mpool.h" + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif + +struct ompi_seq_tracker_range_t{ + opal_list_item_t super; + uint32_t seq_id_high; + uint32_t seq_id_low; +}; +typedef struct ompi_seq_tracker_range_t ompi_seq_tracker_range_t; + +OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_seq_tracker_range_t); + +struct ompi_seq_tracker_t{ + opal_list_t seq_ids; /**< list of seqs id's that have been seen */ + ompi_seq_tracker_range_t* seq_ids_current; /**< a pointer to the last place we were in the list */ + +}; +typedef struct ompi_seq_tracker_t ompi_seq_tracker_t; + +OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_seq_tracker_t); + + +/** + * Look for duplicate sequence number in current range. + * Must be called w/ matching lock held. + */ + +bool ompi_seq_tracker_check_duplicate( + ompi_seq_tracker_t* seq_tracker, + uint32_t seq_id); + + +/* + * insert item into sequence tracking list, + * compacts continuous regions into a single entry + */ +void ompi_seq_tracker_insert(ompi_seq_tracker_t* seq_tracker, + uint32_t seq_i); + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif +#endif + diff --git a/ompi/mca/pml/dr/pml_dr_comm.c b/ompi/mca/pml/dr/pml_dr_comm.c index 29168319af..4d6218af8e 100644 --- a/ompi/mca/pml/dr/pml_dr_comm.c +++ b/ompi/mca/pml/dr/pml_dr_comm.c @@ -22,27 +22,6 @@ #include "pml_dr.h" #include "pml_dr_comm.h" -OBJ_CLASS_INSTANCE(mca_pml_dr_range_t, - opal_list_item_t, - NULL, - NULL); - - -static void mca_pml_dr_seq_tracker_construct(mca_pml_dr_seq_tracker_t* seq_tracker) { - OBJ_CONSTRUCT(&seq_tracker->vfrag_ids, opal_list_t); - seq_tracker->vfrag_ids_current = NULL; -} - - -static void mca_pml_dr_seq_tracker_destruct(mca_pml_dr_seq_tracker_t* seq_tracker) { - OBJ_DESTRUCT(&seq_tracker->vfrag_ids); -} - -OBJ_CLASS_INSTANCE( - mca_pml_dr_seq_tracker_t, - opal_object_t, - mca_pml_dr_seq_tracker_construct, - mca_pml_dr_seq_tracker_destruct); static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc) { @@ -53,8 +32,8 @@ static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc) OBJ_CONSTRUCT(&proc->specific_receives, opal_list_t); OBJ_CONSTRUCT(&proc->matched_receives, opal_list_t); OBJ_CONSTRUCT(&proc->unexpected_frags, opal_list_t); - OBJ_CONSTRUCT(&proc->seq_sends, mca_pml_dr_seq_tracker_t); - OBJ_CONSTRUCT(&proc->seq_recvs, mca_pml_dr_seq_tracker_t); + OBJ_CONSTRUCT(&proc->seq_sends, ompi_seq_tracker_t); + OBJ_CONSTRUCT(&proc->seq_recvs, ompi_seq_tracker_t); } diff --git a/ompi/mca/pml/dr/pml_dr_comm.h b/ompi/mca/pml/dr/pml_dr_comm.h index 5a311df779..5740bfb643 100644 --- a/ompi/mca/pml/dr/pml_dr_comm.h +++ b/ompi/mca/pml/dr/pml_dr_comm.h @@ -24,6 +24,7 @@ #include "opal/threads/mutex.h" #include "opal/threads/condition.h" #include "opal/class/opal_list.h" +#include "ompi/class/ompi_seq_tracker.h" #include "ompi/communicator/communicator.h" #include "ompi/proc/proc.h" #if defined(c_plusplus) || defined(__cplusplus) @@ -31,24 +32,6 @@ extern "C" { #endif -struct mca_pml_dr_range_t{ - opal_list_item_t super; - uint32_t vfrag_id_high; - uint32_t vfrag_id_low; -}; -typedef struct mca_pml_dr_range_t mca_pml_dr_range_t; - -OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_range_t); - -struct mca_pml_dr_seq_tracker_t{ - opal_list_t vfrag_ids; /**< list of vfrags id's that have been seen */ - mca_pml_dr_range_t* vfrag_ids_current; /**< a pointer to the last place we were in the list */ - -}; -typedef struct mca_pml_dr_seq_tracker_t mca_pml_dr_seq_tracker_t; - -OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_seq_tracker_t); - struct mca_pml_dr_comm_proc_t { opal_object_t super; uint16_t expected_sequence; /**< send message sequence number - receiver side */ @@ -63,8 +46,8 @@ struct mca_pml_dr_comm_proc_t { opal_list_t unexpected_frags; /**< unexpected fragment queues */ opal_list_t matched_receives; /**< list of in-progress matched receives */ ompi_proc_t* ompi_proc; /**< back pointer to ompi_proc_t */ - mca_pml_dr_seq_tracker_t seq_sends; /**< Tracks the send vfrags that have been acked */ - mca_pml_dr_seq_tracker_t seq_recvs; /**< Tracks the receive vfrags that have been acked */ + ompi_seq_tracker_t seq_sends; /**< Tracks the send vfrags that have been acked */ + ompi_seq_tracker_t seq_recvs; /**< Tracks the receive vfrags that have been acked */ }; typedef struct mca_pml_dr_comm_proc_t mca_pml_dr_comm_proc_t; @@ -99,134 +82,6 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_comm_t); OMPI_DECLSPEC extern int mca_pml_dr_comm_init(mca_pml_dr_comm_t* dr_comm, ompi_communicator_t* ompi_comm); -/** - * Look for duplicate sequence number in current range. - * Must be called w/ matching lock held. - */ - -static inline bool mca_pml_dr_comm_proc_check_duplicate( - mca_pml_dr_seq_tracker_t* seq_tracker, - uint32_t vfrag_id) -{ - mca_pml_dr_range_t* item; - int8_t direction = 0; /* 1 is next, -1 is previous */ - - item = seq_tracker->vfrag_ids_current; - while(true) { - if(NULL == item) { - return false; - } else if(item->vfrag_id_high >= vfrag_id && item->vfrag_id_low <= vfrag_id) { - seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) item; - return true; - } else if(vfrag_id > item->vfrag_id_high && direction != -1) { - direction = 1; - item = (mca_pml_dr_range_t*) opal_list_get_next(item); - } else if(vfrag_id < item->vfrag_id_low && direction != 1) { - direction = -1; - item = (mca_pml_dr_range_t*) opal_list_get_prev(item); - } else { - return false; - } - } -} - - -/* - * Must be called w/ matching lock held - */ -static inline void mca_pml_dr_comm_proc_set_vid(mca_pml_dr_seq_tracker_t* seq_tracker, - uint32_t vfrag_id) -{ - opal_list_t* vfrag_ids = &seq_tracker->vfrag_ids; - mca_pml_dr_range_t* item = seq_tracker->vfrag_ids_current; - int8_t direction = 0; /* 1 is next, -1 is previous */ - mca_pml_dr_range_t *new_item, *next_item, *prev_item; - while(true) { - if( item == NULL || item == (mca_pml_dr_range_t*) &vfrag_ids->opal_list_tail ) { - new_item = OBJ_NEW(mca_pml_dr_range_t); - new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id; - opal_list_append(vfrag_ids, (opal_list_item_t*) new_item); - seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) new_item; - return; - } else if( item == (mca_pml_dr_range_t*) &vfrag_ids->opal_list_head ) { - new_item = OBJ_NEW(mca_pml_dr_range_t); - new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id; - opal_list_prepend(vfrag_ids, (opal_list_item_t*) new_item); - seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) new_item; - return; - - } else if(item->vfrag_id_high >= vfrag_id && item->vfrag_id_low <= vfrag_id ) { - - seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) item; - return; - - } else if((item->vfrag_id_high + 1) == vfrag_id) { - - next_item = (mca_pml_dr_range_t*) opal_list_get_next(item); - /* try to consolidate */ - if(next_item && next_item->vfrag_id_low == (vfrag_id+1)) { - item->vfrag_id_high = next_item->vfrag_id_high; - opal_list_remove_item(vfrag_ids, (opal_list_item_t*) next_item); - OBJ_RELEASE(next_item); - } else { - item->vfrag_id_high = vfrag_id; - } - seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) item; - return; - - } else if((item->vfrag_id_low - 1) == vfrag_id) { - - prev_item = (mca_pml_dr_range_t*) opal_list_get_prev(item); - /* try to consolidate */ - if(prev_item && prev_item->vfrag_id_high == (vfrag_id-1)) { - item->vfrag_id_low = prev_item->vfrag_id_low; - opal_list_remove_item(vfrag_ids, (opal_list_item_t*) prev_item); - OBJ_RELEASE(prev_item); - } else { - item->vfrag_id_low = vfrag_id; - } - seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) item; - return; - - } else if(vfrag_id > item->vfrag_id_high ) { - if(direction == -1) { - /* we have gone back in the list, and we went one item too far */ - new_item = OBJ_NEW(mca_pml_dr_range_t); - new_item->vfrag_id_low = new_item->vfrag_id_high = vfrag_id; - /* insert new_item directly before item */ - opal_list_insert_pos(vfrag_ids, - (opal_list_item_t*) item, - (opal_list_item_t*) new_item); - seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) new_item; - return; - } else { - direction = 1; - item = (mca_pml_dr_range_t*) opal_list_get_next(item); - } - } else if(vfrag_id < item->vfrag_id_low) { - if(direction == 1) { - /* we have gone forward in the list, and we went one item too far */ - new_item = OBJ_NEW(mca_pml_dr_range_t); - next_item = (mca_pml_dr_range_t*) opal_list_get_next(item); - if(NULL == next_item) { - opal_list_append(vfrag_ids, (opal_list_item_t*) new_item); - } else { - opal_list_insert_pos(vfrag_ids, - (opal_list_item_t*) next_item, - (opal_list_item_t*) new_item); - } - seq_tracker->vfrag_ids_current = (mca_pml_dr_range_t*) new_item; - return; - } else { - direction = -1; - item = (mca_pml_dr_range_t*) opal_list_get_prev(item); - } - } else { - return; - } - } -} - #if defined(c_plusplus) || defined(__cplusplus) } diff --git a/ompi/mca/pml/dr/pml_dr_recvfrag.c b/ompi/mca/pml/dr/pml_dr_recvfrag.c index 09ef30ba94..70bbcc7018 100644 --- a/ompi/mca/pml/dr/pml_dr_recvfrag.c +++ b/ompi/mca/pml/dr/pml_dr_recvfrag.c @@ -29,6 +29,7 @@ #include "ompi/communicator/communicator.h" #include "ompi/datatype/datatype.h" #include "ompi/mca/pml/pml.h" +#include "ompi/class/ompi_seq_tracker.h" #include "pml_dr.h" #include "pml_dr_comm.h" #include "pml_dr_recvfrag.h" @@ -121,7 +122,7 @@ void mca_pml_dr_recv_frag_callback( /* seq_recvs protected by matching lock */ OPAL_THREAD_LOCK(&comm->matching_lock); - if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) { + if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&comm->matching_lock); OPAL_OUTPUT((0, "%s:%d: acking duplicate match\n", __FILE__, __LINE__)); mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml, @@ -141,7 +142,7 @@ void mca_pml_dr_recv_frag_callback( /* seq_sends protected by ompi_request lock*/ OPAL_THREAD_LOCK(&ompi_request_lock); - if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { + if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&ompi_request_lock); mca_pml_dr_send_request_match_ack(btl, &hdr->hdr_ack); } else { @@ -156,7 +157,7 @@ void mca_pml_dr_recv_frag_callback( /* seq_recvs protected by matching lock */ OPAL_THREAD_LOCK(&comm->matching_lock); - if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) { + if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) { /* ack only if the vfrag has been matched */ mca_pml_dr_recv_request_t* recvreq = mca_pml_dr_comm_proc_check_matched(proc, hdr->hdr_common.hdr_vid); @@ -181,7 +182,7 @@ void mca_pml_dr_recv_frag_callback( /* seq_sends protected by ompi_request lock*/ OPAL_THREAD_LOCK(&ompi_request_lock); - if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { + if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&ompi_request_lock); mca_pml_dr_send_request_rndv_ack(btl, &hdr->hdr_ack); } else { @@ -197,7 +198,7 @@ void mca_pml_dr_recv_frag_callback( /* seq_recvs protected by matching lock */ OPAL_THREAD_LOCK(&comm->matching_lock); - if(mca_pml_dr_comm_proc_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) { + if(ompi_seq_tracker_check_duplicate(&proc->seq_recvs, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&comm->matching_lock); OPAL_OUTPUT((0, "%s:%d: acking duplicate fragment\n", __FILE__, __LINE__)); mca_pml_dr_recv_frag_ack((mca_bml_base_endpoint_t*)proc->ompi_proc->proc_pml, @@ -219,7 +220,7 @@ void mca_pml_dr_recv_frag_callback( /* seq_sends protected by ompi_request lock*/ OPAL_THREAD_LOCK(&ompi_request_lock); - if(!mca_pml_dr_comm_proc_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { + if(!ompi_seq_tracker_check_duplicate(&proc->seq_sends, hdr->hdr_common.hdr_vid)) { OPAL_THREAD_UNLOCK(&ompi_request_lock); mca_pml_dr_send_request_frag_ack(btl, &hdr->hdr_ack); } else { @@ -659,7 +660,7 @@ rematch: opal_list_append(&proc->frags_cant_match, (opal_list_item_t *)frag); } - mca_pml_dr_comm_proc_set_vid(&proc->seq_recvs, hdr->hdr_common.hdr_vid); + ompi_seq_tracker_insert(&proc->seq_recvs, hdr->hdr_common.hdr_vid); OPAL_THREAD_UNLOCK(&comm->matching_lock); /* release matching lock before processing fragment */ diff --git a/ompi/mca/pml/dr/pml_dr_recvreq.c b/ompi/mca/pml/dr/pml_dr_recvreq.c index 4eb818f404..0ced5ac929 100644 --- a/ompi/mca/pml/dr/pml_dr_recvreq.c +++ b/ompi/mca/pml/dr/pml_dr_recvreq.c @@ -287,7 +287,7 @@ void mca_pml_dr_recv_request_progress( if((vfrag->vf_mask_pending & vfrag->vf_mask) == vfrag->vf_mask) { /* we have received all the pieces of the vfrag, ack everything that passed the checksum */ - mca_pml_dr_comm_proc_set_vid(&recvreq->req_proc->seq_recvs, vfrag->vf_id); + ompi_seq_tracker_insert(&recvreq->req_proc->seq_recvs, vfrag->vf_id); mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_common, hdr->hdr_frag.hdr_src_ptr, vfrag->vf_size, vfrag->vf_mask); } diff --git a/ompi/mca/pml/dr/pml_dr_sendreq.c b/ompi/mca/pml/dr/pml_dr_sendreq.c index ec9aeca097..0d29775d78 100644 --- a/ompi/mca/pml/dr/pml_dr_sendreq.c +++ b/ompi/mca/pml/dr/pml_dr_sendreq.c @@ -24,6 +24,7 @@ #include "ompi/constants.h" #include "ompi/mca/pml/pml.h" #include "ompi/mca/btl/btl.h" +#include "ompi/class/ompi_seq_tracker.h" #include "orte/mca/errmgr/errmgr.h" #include "ompi/mca/mpool/mpool.h" #include "pml_dr.h" @@ -135,7 +136,7 @@ static void mca_pml_dr_match_completion( /* update statistics and complete */ sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed; - mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); /* on negative ack need to retransmit */ @@ -189,7 +190,7 @@ static void mca_pml_dr_rndv_completion( } /* update statistics and complete */ - mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); } else { @@ -257,7 +258,7 @@ static void mca_pml_dr_frag_completion( } /* record vfrag id to drop duplicate acks */ - mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); /* return this vfrag */ MCA_PML_DR_VFRAG_RETURN(vfrag); @@ -893,7 +894,7 @@ void mca_pml_dr_send_request_match_ack( /* update statistics */ sendreq->req_bytes_delivered = vfrag->vf_size; - mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); } @@ -952,7 +953,7 @@ void mca_pml_dr_send_request_rndv_ack( schedule = true; } /* stash the vfrag id for duplicate acks.. */ - mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); OPAL_THREAD_UNLOCK(&ompi_request_lock); if(schedule) { @@ -994,7 +995,6 @@ void mca_pml_dr_send_request_frag_ack( /* need to retransmit? */ if(vfrag->vf_ack != vfrag->vf_mask) { - vfrag->vf_idx = 0; vfrag->vf_mask_pending = 0; opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); @@ -1008,7 +1008,7 @@ void mca_pml_dr_send_request_frag_ack( assert(sendreq->req_bytes_delivered <= sendreq->req_send.req_bytes_packed); /* stash the vfid for duplicate acks.. */ - mca_pml_dr_comm_proc_set_vid(&sendreq->req_proc->seq_sends, vfrag->vf_id); + ompi_seq_tracker_insert(&sendreq->req_proc->seq_sends, vfrag->vf_id); /* return vfrag */ MCA_PML_DR_VFRAG_RETURN(vfrag);