only create fragments for unmatched receives
This commit was SVN r5967.
Этот коммит содержится в:
родитель
f9788aa64f
Коммит
a475c189bc
@ -109,12 +109,14 @@ int mca_pml_ob1_add_bmis()
|
||||
ompi_list_t* bmis = &mca_bmi_base_modules_initialized;
|
||||
mca_bmi_base_selected_module_t* selected_bmi;
|
||||
size_t num_bmis = ompi_list_get_size(bmis);
|
||||
|
||||
mca_pml_ob1.num_bmi_modules = 0;
|
||||
mca_pml_ob1.num_bmi_progress = 0;
|
||||
mca_pml_ob1.num_bmi_components = 0;
|
||||
mca_pml_ob1.bmi_modules = (mca_bmi_base_module_t **)malloc(sizeof(mca_bmi_base_module_t*) * num_bmis);
|
||||
mca_pml_ob1.bmi_progress = (mca_bmi_base_component_progress_fn_t*)malloc(sizeof(mca_bmi_base_component_progress_fn_t) * num_bmis);
|
||||
mca_pml_ob1.bmi_components = (mca_bmi_base_component_t **)malloc(sizeof(mca_bmi_base_component_t*) * num_bmis);
|
||||
|
||||
if (NULL == mca_pml_ob1.bmi_modules ||
|
||||
NULL == mca_pml_ob1.bmi_progress ||
|
||||
NULL == mca_pml_ob1.bmi_components) {
|
||||
@ -135,6 +137,11 @@ int mca_pml_ob1_add_bmis()
|
||||
}
|
||||
}
|
||||
|
||||
/* override eager limit larger than our max */
|
||||
if(bmi->bmi_eager_limit > mca_pml_ob1.eager_limit) {
|
||||
bmi->bmi_eager_limit = mca_pml_ob1.eager_limit;
|
||||
}
|
||||
|
||||
/* setup callback for receive */
|
||||
rc = bmi->bmi_register(bmi, MCA_BMI_TAG_PML, mca_pml_ob1_recv_frag_callback, NULL);
|
||||
if(OMPI_SUCCESS != rc)
|
||||
@ -145,6 +152,16 @@ int mca_pml_ob1_add_bmis()
|
||||
}
|
||||
}
|
||||
|
||||
/* initialize free list of receive buffers */
|
||||
ompi_free_list_init(
|
||||
&mca_pml_ob1.buffers,
|
||||
sizeof(mca_pml_ob1_buffer_t) + mca_pml_ob1.eager_limit,
|
||||
OBJ_CLASS(mca_pml_ob1_buffer_t),
|
||||
mca_pml_ob1.free_list_num,
|
||||
mca_pml_ob1.free_list_max,
|
||||
mca_pml_ob1.free_list_inc,
|
||||
NULL);
|
||||
|
||||
/* sort ob1 list by exclusivity */
|
||||
qsort(mca_pml_ob1.bmi_modules,
|
||||
mca_pml_ob1.num_bmi_modules,
|
||||
|
@ -55,15 +55,18 @@ struct mca_pml_ob1_t {
|
||||
int free_list_num; /* initial size of free list */
|
||||
int free_list_max; /* maximum size of free list */
|
||||
int free_list_inc; /* number of elements to grow free list */
|
||||
size_t eager_limit;
|
||||
size_t send_pipeline_depth;
|
||||
size_t recv_pipeline_depth;
|
||||
|
||||
/* lock queue access */
|
||||
ompi_mutex_t lock;
|
||||
|
||||
/* free list of requests */
|
||||
/* free lists */
|
||||
ompi_free_list_t send_requests;
|
||||
ompi_free_list_t recv_requests;
|
||||
ompi_free_list_t buffers;
|
||||
ompi_free_list_t fragments;
|
||||
|
||||
/* list of pending send requests */
|
||||
ompi_list_t send_pending;
|
||||
|
@ -28,6 +28,7 @@
|
||||
#include "pml_ob1_hdr.h"
|
||||
#include "pml_ob1_sendreq.h"
|
||||
#include "pml_ob1_recvreq.h"
|
||||
#include "pml_ob1_recvfrag.h"
|
||||
|
||||
|
||||
mca_pml_base_component_1_0_0_t mca_pml_ob1_component = {
|
||||
@ -76,9 +77,18 @@ static inline int mca_pml_ob1_param_register_int(
|
||||
int mca_pml_ob1_component_open(void)
|
||||
{
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.lock, ompi_mutex_t);
|
||||
|
||||
/* requests */
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.send_requests, ompi_free_list_t);
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.recv_requests, ompi_free_list_t);
|
||||
|
||||
/* fragments */
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.fragments, ompi_free_list_t);
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.buffers, ompi_free_list_t);
|
||||
|
||||
/* pending operations */
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.send_pending, ompi_list_t);
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.acks_pending, ompi_list_t);
|
||||
|
||||
mca_pml_ob1.bmi_components = NULL;
|
||||
mca_pml_ob1.num_bmi_components = 0;
|
||||
@ -95,6 +105,8 @@ int mca_pml_ob1_component_open(void)
|
||||
mca_pml_ob1_param_register_int("free_list_inc", 256);
|
||||
mca_pml_ob1.priority =
|
||||
mca_pml_ob1_param_register_int("priority", 0);
|
||||
mca_pml_ob1.eager_limit =
|
||||
mca_pml_ob1_param_register_int("eager_limit", 256 * 1024);
|
||||
mca_pml_ob1.send_pipeline_depth =
|
||||
mca_pml_ob1_param_register_int("send_pipeline_depth", 3);
|
||||
mca_pml_ob1.recv_pipeline_depth =
|
||||
@ -134,6 +146,7 @@ int mca_pml_ob1_component_close(void)
|
||||
if(NULL != mca_pml_ob1.bmi_progress) {
|
||||
free(mca_pml_ob1.bmi_progress);
|
||||
}
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.acks_pending);
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.send_pending);
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.send_requests);
|
||||
OBJ_DESTRUCT(&mca_pml_ob1.recv_requests);
|
||||
@ -169,6 +182,16 @@ mca_pml_base_module_t* mca_pml_ob1_component_init(int* priority,
|
||||
mca_pml_ob1.free_list_inc,
|
||||
NULL);
|
||||
|
||||
/* recv fragments */
|
||||
ompi_free_list_init(
|
||||
&mca_pml_ob1.fragments,
|
||||
sizeof(mca_pml_ob1_fragment_t),
|
||||
OBJ_CLASS(mca_pml_ob1_fragment_t),
|
||||
mca_pml_ob1.free_list_num,
|
||||
mca_pml_ob1.free_list_max,
|
||||
mca_pml_ob1.free_list_inc,
|
||||
NULL);
|
||||
|
||||
/* buffered send */
|
||||
if(OMPI_SUCCESS != mca_pml_base_bsend_init(enable_mpi_threads)) {
|
||||
ompi_output(0, "mca_pml_ob1_component_init: mca_pml_bsend_init failed\n");
|
||||
|
@ -153,6 +153,61 @@ static inline mca_pml_ob1_endpoint_t* mca_pml_ob1_ep_array_get_next(mca_pml_ob1_
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate a descriptor
|
||||
*/
|
||||
|
||||
#if OMPI_HAVE_THREAD_SUPPORT
|
||||
#define MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor) \
|
||||
do { \
|
||||
if(NULL != (descriptor = endpoint->bmi_cache)) { \
|
||||
/* atomically acquire the cached descriptor */ \
|
||||
if(ompi_atomic_cmpset_ptr(&endpoint->bmi_cache, descriptor, NULL) == 0) { \
|
||||
endpoint->bmi_cache = NULL; \
|
||||
} else { \
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t)); \
|
||||
} \
|
||||
} else { \
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t)); \
|
||||
} \
|
||||
} while(0)
|
||||
#else
|
||||
#define MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor) \
|
||||
do { \
|
||||
if(NULL != (descriptor = endpoint->bmi_cache)) { \
|
||||
endpoint->bmi_cache = NULL; \
|
||||
} else { \
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t)); \
|
||||
} \
|
||||
} while(0)
|
||||
#endif
|
||||
|
||||
/**
|
||||
* Return a descriptor
|
||||
*/
|
||||
|
||||
#if OMPI_HAVE_THREAD_SUPPORT
|
||||
#define MCA_PML_OB1_ENDPOINT_DES_RETURN(endpoint, descriptor) \
|
||||
do { \
|
||||
if(NULL == bmi_ep->bmi_cache) { \
|
||||
if(ompi_atomic_cmpset_ptr(&endpoint->bmi_cache,NULL,descriptor) == 0) { \
|
||||
bmi->bmi_free(bmi,descriptor); \
|
||||
} \
|
||||
} else { \
|
||||
bmi->bmi_free(bmi,descriptor); \
|
||||
}
|
||||
} while(0)
|
||||
#else
|
||||
#define MCA_PML_OB1_ENDPOINT_DES_RETURN(endpoint, descriptor) \
|
||||
do { \
|
||||
if(NULL == bmi_ep->bmi_cache) { \
|
||||
bmi_ep->bmi_cache = descriptor; \
|
||||
} else { \
|
||||
bmi->bmi_free(bmi,descriptor); \
|
||||
} \
|
||||
} while(0)
|
||||
#endif
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -35,7 +35,14 @@
|
||||
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
mca_pml_ob1_recv_frag_t,
|
||||
mca_pml_ob1_buffer_t,
|
||||
ompi_list_item_t,
|
||||
NULL,
|
||||
NULL
|
||||
);
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
mca_pml_ob1_fragment_t,
|
||||
ompi_list_item_t,
|
||||
NULL,
|
||||
NULL
|
||||
@ -348,15 +355,15 @@ static bool mca_pml_ob1_check_cantmatch_for_match(
|
||||
* RCS/CTS receive side matching
|
||||
*
|
||||
* @param hdr list of parameters needed for matching
|
||||
* This list is also embeded in frag_desc,
|
||||
* This list is also embeded in frag,
|
||||
* but this allows to save a memory copy when
|
||||
* a match is made in this routine. (IN)
|
||||
* @param frag_desc pointer to receive fragment which we want
|
||||
* @param frag pointer to receive fragment which we want
|
||||
* to match (IN/OUT). If a match is not made,
|
||||
* hdr is copied to frag_desc.
|
||||
* @param match_made parameter indicating if we matched frag_desc/
|
||||
* hdr is copied to frag.
|
||||
* @param match_made parameter indicating if we matched frag/
|
||||
* hdr (OUT)
|
||||
* @param additional_matches if a match is made with frag_desc, we
|
||||
* @param additional_matches if a match is made with frag, we
|
||||
* may be able to match fragments that previously
|
||||
* have arrived out-of-order. If this is the
|
||||
* case, the associated fragment descriptors are
|
||||
@ -459,13 +466,13 @@ int mca_pml_ob1_recv_frag_match(
|
||||
} else {
|
||||
|
||||
/* if no match found, place on unexpected queue */
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc);
|
||||
mca_pml_ob1_fragment_t* frag;
|
||||
MCA_PML_OB1_FRAG_ALLOC(frag, rc);
|
||||
if(OMPI_SUCCESS != rc) {
|
||||
OMPI_THREAD_UNLOCK(&pml_comm->matching_lock);
|
||||
return rc;
|
||||
}
|
||||
MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segments,num_segments);
|
||||
MCA_PML_OB1_FRAG_INIT(frag,bmi,hdr,segments,num_segments);
|
||||
ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag );
|
||||
}
|
||||
|
||||
@ -484,13 +491,13 @@ int mca_pml_ob1_recv_frag_match(
|
||||
* This message comes after the next expected, so it
|
||||
* is ahead of sequence. Save it for later.
|
||||
*/
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc);
|
||||
mca_pml_ob1_fragment_t* frag;
|
||||
MCA_PML_OB1_FRAG_ALLOC(frag, rc);
|
||||
if(OMPI_SUCCESS != rc) {
|
||||
OMPI_THREAD_UNLOCK(&pml_comm->matching_lock);
|
||||
return rc;
|
||||
}
|
||||
MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segments,num_segments);
|
||||
MCA_PML_OB1_FRAG_INIT(frag,bmi,hdr,segments,num_segments);
|
||||
ompi_list_append(&proc->frags_cant_match, (ompi_list_item_t *)frag);
|
||||
|
||||
}
|
||||
@ -507,10 +514,10 @@ int mca_pml_ob1_recv_frag_match(
|
||||
if(additional_match) {
|
||||
ompi_list_item_t* item;
|
||||
while(NULL != (item = ompi_list_remove_first(&additional_matches))) {
|
||||
mca_pml_ob1_recv_frag_t* frag = (mca_pml_ob1_recv_frag_t*)item;
|
||||
mca_pml_ob1_fragment_t* frag = (mca_pml_ob1_fragment_t*)item;
|
||||
MCA_PML_OB1_RECV_REQUEST_MATCHED(frag->request, hdr);
|
||||
mca_pml_ob1_recv_request_progress(frag->request,frag->bmi,frag->segments,frag->num_segments);
|
||||
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
|
||||
MCA_PML_OB1_FRAG_RETURN(frag);
|
||||
}
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
@ -540,7 +547,7 @@ static bool mca_pml_ob1_check_cantmatch_for_match(
|
||||
/* local parameters */
|
||||
int match_found;
|
||||
uint16_t next_msg_seq_expected, frag_seq;
|
||||
mca_pml_ob1_recv_frag_t *frag_desc;
|
||||
mca_pml_ob1_fragment_t *frag;
|
||||
mca_pml_ob1_recv_request_t *match = NULL;
|
||||
bool match_made = false;
|
||||
|
||||
@ -561,19 +568,19 @@ static bool mca_pml_ob1_check_cantmatch_for_match(
|
||||
/* search the list for a fragment from the send with sequence
|
||||
* number next_msg_seq_expected
|
||||
*/
|
||||
for(frag_desc = (mca_pml_ob1_recv_frag_t *)
|
||||
for(frag = (mca_pml_ob1_fragment_t *)
|
||||
ompi_list_get_first(&proc->frags_cant_match);
|
||||
frag_desc != (mca_pml_ob1_recv_frag_t *)
|
||||
frag != (mca_pml_ob1_fragment_t *)
|
||||
ompi_list_get_end(&proc->frags_cant_match);
|
||||
frag_desc = (mca_pml_ob1_recv_frag_t *)
|
||||
ompi_list_get_next(frag_desc))
|
||||
frag = (mca_pml_ob1_fragment_t *)
|
||||
ompi_list_get_next(frag))
|
||||
{
|
||||
/*
|
||||
* If the message has the next expected seq from that proc...
|
||||
*/
|
||||
frag_seq=frag_desc->hdr.hdr_match.hdr_msg_seq;
|
||||
frag_seq=frag->hdr.hdr_match.hdr_msg_seq;
|
||||
if (frag_seq == next_msg_seq_expected) {
|
||||
mca_pml_ob1_match_hdr_t* hdr = &frag_desc->hdr.hdr_match;
|
||||
mca_pml_ob1_match_hdr_t* hdr = &frag->hdr.hdr_match;
|
||||
|
||||
/* We're now expecting the next sequence number. */
|
||||
(proc->expected_sequence)++;
|
||||
@ -582,10 +589,10 @@ static bool mca_pml_ob1_check_cantmatch_for_match(
|
||||
match_found = 1;
|
||||
|
||||
/*
|
||||
* remove frag_desc from list
|
||||
* remove frag from list
|
||||
*/
|
||||
ompi_list_remove_item(&proc->frags_cant_match,
|
||||
(ompi_list_item_t *)frag_desc);
|
||||
(ompi_list_item_t *)frag);
|
||||
|
||||
/*
|
||||
* figure out what sort of matching logic to use, if need to
|
||||
@ -616,7 +623,7 @@ static bool mca_pml_ob1_check_cantmatch_for_match(
|
||||
|
||||
/* associate the receive descriptor with the fragment
|
||||
* descriptor */
|
||||
frag_desc->request=match;
|
||||
frag->request=match;
|
||||
|
||||
/* add this fragment descriptor to the list of
|
||||
* descriptors to be processed later
|
||||
@ -625,12 +632,12 @@ static bool mca_pml_ob1_check_cantmatch_for_match(
|
||||
match_made = true;
|
||||
OBJ_CONSTRUCT(additional_matches, ompi_list_t);
|
||||
}
|
||||
ompi_list_append(additional_matches, (ompi_list_item_t *)frag_desc);
|
||||
ompi_list_append(additional_matches, (ompi_list_item_t *)frag);
|
||||
|
||||
} else {
|
||||
|
||||
/* if no match found, place on unexpected queue */
|
||||
ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag_desc);
|
||||
ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag);
|
||||
|
||||
}
|
||||
|
||||
@ -640,7 +647,7 @@ static bool mca_pml_ob1_check_cantmatch_for_match(
|
||||
|
||||
} /* end if (frag_seq == next_msg_seq_expected) */
|
||||
|
||||
} /* end for (frag_desc) loop */
|
||||
} /* end for (frag) loop */
|
||||
|
||||
} /* end while loop */
|
||||
|
||||
|
@ -23,31 +23,78 @@
|
||||
#include "mca/bmi/bmi.h"
|
||||
#include "pml_ob1_hdr.h"
|
||||
|
||||
struct mca_pml_ob1_recv_frag_t {
|
||||
struct mca_pml_ob1_buffer_t {
|
||||
ompi_list_item_t super;
|
||||
unsigned char addr[1];
|
||||
};
|
||||
typedef struct mca_pml_ob1_buffer_t mca_pml_ob1_buffer_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(mca_pml_ob1_buffer_t);
|
||||
|
||||
|
||||
struct mca_pml_ob1_fragment_t {
|
||||
ompi_list_item_t super;
|
||||
mca_bmi_base_module_t* bmi;
|
||||
mca_pml_ob1_hdr_t hdr;
|
||||
mca_bmi_base_segment_t* segments;
|
||||
size_t num_segments;
|
||||
struct mca_pml_ob1_recv_request_t* request;
|
||||
size_t num_segments;
|
||||
mca_bmi_base_segment_t segments[MCA_BMI_DES_MAX_SEGMENTS];
|
||||
mca_pml_ob1_buffer_t* buffers[MCA_BMI_DES_MAX_SEGMENTS];
|
||||
};
|
||||
typedef struct mca_pml_ob1_recv_frag_t mca_pml_ob1_recv_frag_t;
|
||||
typedef struct mca_pml_ob1_fragment_t mca_pml_ob1_fragment_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(mca_pml_ob1_fragment_t);
|
||||
|
||||
|
||||
#define MCA_PML_OB1_RECV_FRAG_ALLOC(frag,rc) \
|
||||
{ \
|
||||
\
|
||||
}
|
||||
#define MCA_PML_OB1_FRAG_ALLOC(frag,rc) \
|
||||
do { \
|
||||
ompi_list_item_t* item; \
|
||||
OMPI_FREE_LIST_WAIT(&mca_pml_ob1.fragments, item, rc); \
|
||||
frag = (mca_pml_ob1_fragment_t*)item; \
|
||||
} while(0)
|
||||
|
||||
#define MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segs,cnt) \
|
||||
{ \
|
||||
\
|
||||
}
|
||||
|
||||
#define MCA_PML_OB1_RECV_FRAG_RETURN(frag) \
|
||||
{ \
|
||||
\
|
||||
}
|
||||
#define MCA_PML_OB1_FRAG_INIT(frag,bmi,hdr,segs,cnt) \
|
||||
do { \
|
||||
size_t i; \
|
||||
mca_bmi_base_segment_t* segments = frag->segments; \
|
||||
mca_pml_ob1_buffer_t** buffers = frag->buffers; \
|
||||
\
|
||||
/* init fragment */ \
|
||||
frag->bmi = bmi; \
|
||||
frag->hdr = *(mca_pml_ob1_hdr_t*)hdr; \
|
||||
frag->num_segments = cnt; \
|
||||
\
|
||||
/* copy over data */ \
|
||||
for(i=0; i<cnt; i++) { \
|
||||
ompi_list_item_t* item; \
|
||||
mca_pml_ob1_buffer_t* buff; \
|
||||
OMPI_FREE_LIST_WAIT(&mca_pml_ob1.buffers, item, rc); \
|
||||
buff = (mca_pml_ob1_buffer_t*)item; \
|
||||
buffers[i] = buff; \
|
||||
segments[i].seg_addr.pval = buff->addr; \
|
||||
segments[i].seg_len = segs[i].seg_len; \
|
||||
memcpy(segments[i].seg_addr.pval, \
|
||||
segs[i].seg_addr.pval, \
|
||||
segs[i].seg_len); \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
#define MCA_PML_OB1_FRAG_RETURN(frag) \
|
||||
do { \
|
||||
size_t i; \
|
||||
\
|
||||
/* return buffers */ \
|
||||
for(i=0; i<frag->num_segments; i++) { \
|
||||
OMPI_FREE_LIST_RETURN(&mca_pml_ob1.buffers, \
|
||||
(ompi_list_item_t*)frag->buffers[i]); \
|
||||
} \
|
||||
frag->num_segments = 0; \
|
||||
\
|
||||
/* return fragment */ \
|
||||
OMPI_FREE_LIST_RETURN(&mca_pml_ob1.fragments, \
|
||||
(ompi_list_item_t*)frag); \
|
||||
} while(0)
|
||||
|
||||
|
||||
/**
|
||||
|
@ -24,7 +24,7 @@
|
||||
#include "pml_ob1_sendreq.h"
|
||||
|
||||
|
||||
static mca_pml_ob1_recv_frag_t* mca_pml_ob1_recv_request_match_specific_proc(
|
||||
static mca_pml_ob1_fragment_t* mca_pml_ob1_recv_request_match_specific_proc(
|
||||
mca_pml_ob1_recv_request_t* request, mca_pml_ob1_comm_proc_t* proc);
|
||||
|
||||
|
||||
@ -123,7 +123,7 @@ static void mca_pml_ob1_recv_request_ack(
|
||||
hdr->hdr_match.hdr_src);
|
||||
mca_pml_ob1_endpoint_t* ep = mca_pml_ob1_ep_array_get_next(&proc->bmi_first);
|
||||
mca_bmi_base_descriptor_t* des;
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
mca_pml_ob1_fragment_t* frag;
|
||||
mca_pml_ob1_ack_hdr_t* ack;
|
||||
int rc;
|
||||
|
||||
@ -153,10 +153,9 @@ static void mca_pml_ob1_recv_request_ack(
|
||||
|
||||
/* queue request to retry later */
|
||||
retry:
|
||||
MCA_PML_OB1_RECV_FRAG_ALLOC(frag,rc);
|
||||
MCA_PML_OB1_FRAG_ALLOC(frag,rc);
|
||||
frag->bmi = NULL;
|
||||
frag->hdr.hdr_rndv = *hdr;
|
||||
frag->segments = NULL;
|
||||
frag->num_segments = 0;
|
||||
frag->request = recvreq;
|
||||
ompi_list_append(&mca_pml_ob1.acks_pending, (ompi_list_item_t*)frag);
|
||||
@ -249,7 +248,7 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request
|
||||
{
|
||||
mca_pml_ob1_comm_t* comm = request->req_recv.req_base.req_comm->c_pml_comm;
|
||||
mca_pml_ob1_comm_proc_t* proc = comm->procs + request->req_recv.req_base.req_peer;
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
mca_pml_ob1_fragment_t* frag;
|
||||
|
||||
/* check for a specific match */
|
||||
OMPI_THREAD_LOCK(&comm->matching_lock);
|
||||
@ -264,7 +263,7 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request
|
||||
mca_pml_ob1_recv_request_progress(request,frag->bmi,frag->segments,frag->num_segments);
|
||||
if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) ||
|
||||
(MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) {
|
||||
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
|
||||
MCA_PML_OB1_FRAG_RETURN(frag);
|
||||
}
|
||||
return; /* match found */
|
||||
}
|
||||
@ -303,7 +302,7 @@ void mca_pml_ob1_recv_request_match_wild(mca_pml_ob1_recv_request_t* request)
|
||||
request->req_recv.req_base.req_sequence = comm->recv_sequence++;
|
||||
|
||||
for (i = 0; i < proc_count; i++) {
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
mca_pml_ob1_fragment_t* frag;
|
||||
|
||||
/* continue if no frags to match */
|
||||
if (ompi_list_get_size(&proc->unexpected_frags) == 0) {
|
||||
@ -318,7 +317,7 @@ void mca_pml_ob1_recv_request_match_wild(mca_pml_ob1_recv_request_t* request)
|
||||
mca_pml_ob1_recv_request_progress(request,frag->bmi,frag->segments,frag->num_segments);
|
||||
if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) ||
|
||||
(MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) {
|
||||
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
|
||||
MCA_PML_OB1_FRAG_RETURN(frag);
|
||||
}
|
||||
return; /* match found */
|
||||
}
|
||||
@ -340,19 +339,19 @@ void mca_pml_ob1_recv_request_match_wild(mca_pml_ob1_recv_request_t* request)
|
||||
* it places the request in the appropriate matched receive list.
|
||||
*/
|
||||
|
||||
static mca_pml_ob1_recv_frag_t* mca_pml_ob1_recv_request_match_specific_proc(
|
||||
static mca_pml_ob1_fragment_t* mca_pml_ob1_recv_request_match_specific_proc(
|
||||
mca_pml_ob1_recv_request_t* request,
|
||||
mca_pml_ob1_comm_proc_t* proc)
|
||||
{
|
||||
ompi_list_t* unexpected_frags = &proc->unexpected_frags;
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
mca_pml_ob1_fragment_t* frag;
|
||||
mca_pml_ob1_match_hdr_t* hdr;
|
||||
int tag = request->req_recv.req_base.req_tag;
|
||||
|
||||
if( OMPI_ANY_TAG == tag ) {
|
||||
for (frag = (mca_pml_ob1_recv_frag_t*)ompi_list_get_first(unexpected_frags);
|
||||
frag != (mca_pml_ob1_recv_frag_t*)ompi_list_get_end(unexpected_frags);
|
||||
frag = (mca_pml_ob1_recv_frag_t*)ompi_list_get_next(frag)) {
|
||||
for (frag = (mca_pml_ob1_fragment_t*)ompi_list_get_first(unexpected_frags);
|
||||
frag != (mca_pml_ob1_fragment_t*)ompi_list_get_end(unexpected_frags);
|
||||
frag = (mca_pml_ob1_fragment_t*)ompi_list_get_next(frag)) {
|
||||
hdr = &(frag->hdr.hdr_match);
|
||||
|
||||
/* check first frag - we assume that process matching has been done already */
|
||||
@ -361,9 +360,9 @@ static mca_pml_ob1_recv_frag_t* mca_pml_ob1_recv_request_match_specific_proc(
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (frag = (mca_pml_ob1_recv_frag_t*)ompi_list_get_first(unexpected_frags);
|
||||
frag != (mca_pml_ob1_recv_frag_t*)ompi_list_get_end(unexpected_frags);
|
||||
frag = (mca_pml_ob1_recv_frag_t*)ompi_list_get_next(frag)) {
|
||||
for (frag = (mca_pml_ob1_fragment_t*)ompi_list_get_first(unexpected_frags);
|
||||
frag != (mca_pml_ob1_fragment_t*)ompi_list_get_end(unexpected_frags);
|
||||
frag = (mca_pml_ob1_fragment_t*)ompi_list_get_next(frag)) {
|
||||
hdr = &(frag->hdr.hdr_match);
|
||||
|
||||
/* check first frag - we assume that process matching has been done already */
|
||||
|
@ -52,13 +52,13 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_recv_request_t);
|
||||
* @param rc (OUT) OMPI_SUCCESS or error status on failure.
|
||||
* @return Receive request.
|
||||
*/
|
||||
#define MCA_PML_OB1_RECV_REQUEST_ALLOC(recvreq, rc) \
|
||||
do { \
|
||||
ompi_list_item_t* item; \
|
||||
rc = OMPI_SUCCESS; \
|
||||
OMPI_FREE_LIST_GET(&mca_pml_ob1.recv_requests, item, rc); \
|
||||
recvreq = (mca_pml_ob1_recv_request_t*)item; \
|
||||
} while(0)
|
||||
#define MCA_PML_OB1_RECV_REQUEST_ALLOC(recvreq, rc) \
|
||||
do { \
|
||||
ompi_list_item_t* item; \
|
||||
rc = OMPI_SUCCESS; \
|
||||
OMPI_FREE_LIST_GET(&mca_pml_ob1.recv_requests, item, rc); \
|
||||
recvreq = (mca_pml_ob1_recv_request_t*)item; \
|
||||
} while(0)
|
||||
|
||||
|
||||
/**
|
||||
@ -82,7 +82,7 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_recv_request_t);
|
||||
tag, \
|
||||
comm, \
|
||||
persistent) \
|
||||
{ \
|
||||
do { \
|
||||
MCA_PML_BASE_RECV_REQUEST_INIT( \
|
||||
&(request)->req_recv, \
|
||||
addr, \
|
||||
@ -92,18 +92,18 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_recv_request_t);
|
||||
tag, \
|
||||
comm, \
|
||||
persistent); \
|
||||
}
|
||||
} while(0)
|
||||
|
||||
/**
|
||||
* Return a recv request to the modules free list.
|
||||
*
|
||||
* @param request (IN) Receive request.
|
||||
*/
|
||||
#define MCA_PML_OB1_RECV_REQUEST_RETURN(request) \
|
||||
do { \
|
||||
MCA_PML_BASE_RECV_REQUEST_FINI(&request->req_recv); \
|
||||
OMPI_FREE_LIST_RETURN(&mca_pml_ob1.recv_requests, (ompi_list_item_t*)request); \
|
||||
} while(0)
|
||||
#define MCA_PML_OB1_RECV_REQUEST_RETURN(request) \
|
||||
do { \
|
||||
MCA_PML_BASE_RECV_REQUEST_FINI(&request->req_recv); \
|
||||
OMPI_FREE_LIST_RETURN(&mca_pml_ob1.recv_requests, (ompi_list_item_t*)request); \
|
||||
} while(0)
|
||||
|
||||
/**
|
||||
* Attempt to match the request against the unexpected fragment list
|
||||
@ -160,7 +160,7 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request
|
||||
#define MCA_PML_OB1_RECV_REQUEST_MATCHED( \
|
||||
request, \
|
||||
hdr) \
|
||||
{ \
|
||||
do { \
|
||||
(request)->req_recv.req_bytes_packed = (hdr)->hdr_msg_length; \
|
||||
(request)->req_recv.req_base.req_ompi.req_status.MPI_TAG = (hdr)->hdr_tag; \
|
||||
(request)->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = (hdr)->hdr_src; \
|
||||
@ -181,7 +181,7 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request
|
||||
0, /* offset in bytes into packed buffer */ \
|
||||
NULL ); /* not allocating memory */ \
|
||||
} \
|
||||
}
|
||||
} while (0)
|
||||
|
||||
|
||||
/**
|
||||
@ -196,17 +196,22 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request
|
||||
data_offset, \
|
||||
bytes_received, \
|
||||
bytes_delivered) \
|
||||
{ \
|
||||
do { \
|
||||
if(request->req_recv.req_base.req_count > 0) { \
|
||||
struct iovec iov[MCA_BMI_DES_MAX_SEGMENTS]; \
|
||||
uint32_t iov_count = num_segments; \
|
||||
uint32_t iov_count = 0; \
|
||||
uint32_t max_data = bytes_received; \
|
||||
int32_t free_after = 0; \
|
||||
size_t i; \
|
||||
for(i=0; i<num_segments; i++) { \
|
||||
if(i == 0) { \
|
||||
iov[i].iov_base = (unsigned char*)segments[i].seg_addr.pval + seg_offset; \
|
||||
iov[i].iov_len = segments[i].seg_len - seg_offset; \
|
||||
size_t n, offset = seg_offset; \
|
||||
\
|
||||
for(n=0; n<num_segments; n++) { \
|
||||
mca_bmi_base_segment_t* segment = segments+n; \
|
||||
if(offset >= segment->seg_len) { \
|
||||
offset -= segment->seg_len; \
|
||||
} else { \
|
||||
iov[iov_count].iov_len = segment->seg_len - seg_offset; \
|
||||
iov[iov_count].iov_base = (unsigned char*)segment->seg_addr.pval + seg_offset; \
|
||||
iov_count++; \
|
||||
} \
|
||||
} \
|
||||
ompi_convertor_unpack( \
|
||||
@ -217,7 +222,7 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request
|
||||
&free_after); \
|
||||
bytes_delivered = max_data; \
|
||||
} \
|
||||
}
|
||||
} while (0)
|
||||
|
||||
|
||||
/**
|
||||
|
@ -81,11 +81,7 @@ static void mca_pml_ob1_send_completion(
|
||||
mca_pml_ob1_endpoint_t* bmi_ep = sendreq->req_endpoint;
|
||||
|
||||
/* for now - return the descriptor - may cache these at some point */
|
||||
if(NULL == bmi_ep->bmi_cache) {
|
||||
bmi_ep->bmi_cache = descriptor;
|
||||
} else {
|
||||
bmi->bmi_free(bmi,descriptor);
|
||||
}
|
||||
MCA_PML_OB1_ENDPOINT_DES_RETURN(bmi_ep,descriptor);
|
||||
|
||||
/* check for request completion */
|
||||
OMPI_THREAD_LOCK(&ompi_request_lock);
|
||||
@ -158,13 +154,9 @@ int mca_pml_ob1_send_request_start_copy(
|
||||
if(size == 0) {
|
||||
|
||||
/* allocate a descriptor */
|
||||
if(NULL != (descriptor = endpoint->bmi_cache)) {
|
||||
endpoint->bmi_cache = NULL;
|
||||
} else {
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t));
|
||||
if(NULL == descriptor) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor);
|
||||
if(NULL == descriptor) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
descriptor->des_cbfunc = mca_pml_ob1_send_completion;
|
||||
segment = descriptor->des_src;
|
||||
@ -208,7 +200,7 @@ int mca_pml_ob1_send_request_start_copy(
|
||||
}
|
||||
|
||||
/* allocate space for hdr + first fragment */
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, size + sizeof(mca_pml_ob1_hdr_t));
|
||||
MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor);
|
||||
if(NULL == descriptor) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user