cleanup - support for sender side scheduling (non-rdma case)
This commit was SVN r5915.
Этот коммит содержится в:
родитель
8b411a10be
Коммит
e7332d0521
@ -456,7 +456,7 @@ struct mca_bmi_base_module_t {
|
||||
|
||||
/* BMI common attributes */
|
||||
mca_bmi_base_component_t* bmi_component; /**< pointer back to the BMI component structure */
|
||||
size_t bmi_first_frag_size; /**< maximum size of first fragment -- eager send */
|
||||
size_t bmi_eager_limit; /**< maximum size of first fragment -- eager send */
|
||||
size_t bmi_min_frag_size; /**< threshold below which the BMI will not fragment */
|
||||
size_t bmi_max_frag_size; /**< maximum fragment size supported by the BMI */
|
||||
uint32_t bmi_exclusivity; /**< indicates this BMI should be used exclusively */
|
||||
|
@ -47,7 +47,7 @@ mca_bmi_sm_t mca_bmi_sm[2] = {
|
||||
{
|
||||
{
|
||||
&mca_bmi_sm_component.super,
|
||||
0, /* bmi_first_frag_size */
|
||||
0, /* bmi_eager_limit */
|
||||
0, /* bmi_min_frag_size */
|
||||
0, /* bmi_max_frag_size */
|
||||
0, /* bmi_exclusivity */
|
||||
@ -70,7 +70,7 @@ mca_bmi_sm_t mca_bmi_sm[2] = {
|
||||
{
|
||||
{
|
||||
&mca_bmi_sm_component.super,
|
||||
0, /* bmi_first_frag_size */
|
||||
0, /* bmi_eager_limit */
|
||||
0, /* bmi_min_frag_size */
|
||||
0, /* bmi_max_frag_size */
|
||||
0, /* bmi_exclusivity */
|
||||
@ -489,17 +489,10 @@ int mca_bmi_sm_add_procs_same_base_addr(
|
||||
/* some initialization happens only the first time this routine
|
||||
* is called, i.e. when bmi_inited is false */
|
||||
|
||||
/* initialize fragment descriptor free list */
|
||||
/* initialize fragment descriptor free lists */
|
||||
|
||||
/*
|
||||
* first fragment
|
||||
*/
|
||||
|
||||
/* allocation will be for the fragment descriptor, payload buffer,
|
||||
* and padding to ensure proper alignment can be acheived */
|
||||
length=sizeof(mca_bmi_sm_frag_t)+
|
||||
mca_bmi_sm_component.fragment_alignment+
|
||||
mca_bmi_sm_component.first_fragment_size;
|
||||
/* allocation will be for the fragment descriptor and payload buffer */
|
||||
length=sizeof(mca_bmi_sm_frag_t) + mca_bmi_sm_component.eager_limit;
|
||||
ompi_free_list_init(&mca_bmi_sm_component.sm_frags1, length,
|
||||
OBJ_CLASS(mca_bmi_sm_frag1_t),
|
||||
mca_bmi_sm_component.sm_free_list_num,
|
||||
@ -507,9 +500,7 @@ int mca_bmi_sm_add_procs_same_base_addr(
|
||||
mca_bmi_sm_component.sm_free_list_inc,
|
||||
mca_bmi_sm_component.sm_mpool); /* use shared-memory pool */
|
||||
|
||||
length=sizeof(mca_bmi_sm_frag_t)+
|
||||
mca_bmi_sm_component.fragment_alignment+
|
||||
mca_bmi_sm_component.max_fragment_size;
|
||||
length=sizeof(mca_bmi_sm_frag_t) + mca_bmi_sm_component.max_frag_size;
|
||||
ompi_free_list_init(&mca_bmi_sm_component.sm_frags2, length,
|
||||
OBJ_CLASS(mca_bmi_sm_frag2_t),
|
||||
mca_bmi_sm_component.sm_free_list_num,
|
||||
@ -745,7 +736,7 @@ extern mca_bmi_base_descriptor_t* mca_bmi_sm_alloc(
|
||||
{
|
||||
mca_bmi_sm_frag_t* frag;
|
||||
int rc;
|
||||
if(size <= mca_bmi_sm_component.first_fragment_size) {
|
||||
if(size <= mca_bmi_sm_component.eager_limit) {
|
||||
MCA_BMI_SM_FRAG_ALLOC1(frag,rc);
|
||||
} else {
|
||||
MCA_BMI_SM_FRAG_ALLOC2(frag,rc);
|
||||
@ -764,7 +755,7 @@ extern int mca_bmi_sm_free(
|
||||
mca_bmi_base_descriptor_t* des)
|
||||
{
|
||||
mca_bmi_sm_frag_t* frag = (mca_bmi_sm_frag_t*)des;
|
||||
if(frag->size <= mca_bmi_sm_component.first_fragment_size) {
|
||||
if(frag->size <= mca_bmi_sm_component.eager_limit) {
|
||||
MCA_BMI_SM_FRAG_RETURN1(frag);
|
||||
} else {
|
||||
MCA_BMI_SM_FRAG_RETURN2(frag);
|
||||
@ -790,6 +781,7 @@ struct mca_bmi_base_descriptor_t* mca_bmi_sm_prepare_src(
|
||||
struct iovec iov;
|
||||
uint32_t iov_count = 1;
|
||||
uint32_t max_data = *size;
|
||||
int32_t free_after;
|
||||
int rc;
|
||||
|
||||
MCA_BMI_SM_FRAG_ALLOC2(frag, rc);
|
||||
@ -798,12 +790,12 @@ struct mca_bmi_base_descriptor_t* mca_bmi_sm_prepare_src(
|
||||
}
|
||||
|
||||
if(max_data + reserve > frag->size) {
|
||||
max_data = *size - reserve;
|
||||
max_data = frag->size - reserve;
|
||||
}
|
||||
iov.iov_len = max_data;
|
||||
iov.iov_base = (unsigned char*)(frag+1) + reserve;
|
||||
|
||||
rc = ompi_convertor_pack(convertor, &iov, &iov_count, &max_data, NULL);
|
||||
rc = ompi_convertor_pack(convertor, &iov, &iov_count, &max_data, &free_after);
|
||||
if(rc < 0) {
|
||||
MCA_BMI_SM_FRAG_RETURN2(frag);
|
||||
return NULL;
|
||||
|
@ -80,10 +80,8 @@ struct mca_bmi_sm_component_t {
|
||||
char* sm_mpool_name; /**< name of shared memory pool module */
|
||||
mca_mpool_base_module_t* sm_mpool; /**< shared memory pool */
|
||||
void* sm_mpool_base; /**< base address of shared memory pool */
|
||||
size_t first_fragment_size; /**< first fragment size */
|
||||
size_t max_fragment_size; /**< maximum (second and
|
||||
beyone) fragment size */
|
||||
size_t fragment_alignment; /**< fragment alignment */
|
||||
size_t eager_limit; /**< first fragment size */
|
||||
size_t max_frag_size; /**< maximum (second and beyone) fragment size */
|
||||
ompi_mutex_t sm_lock;
|
||||
char* sm_resouce_ctl_file; /**< name of shared memory file used
|
||||
to coordinate resource usage */
|
||||
|
@ -123,13 +123,10 @@ int mca_bmi_sm_component_open(void)
|
||||
mca_bmi_sm_param_register_int("sm_extra_procs", -1);
|
||||
mca_bmi_sm_component.sm_mpool_name =
|
||||
mca_bmi_sm_param_register_string("mpool", "sm");
|
||||
mca_bmi_sm_component.first_fragment_size =
|
||||
mca_bmi_sm_param_register_int("first_fragment_size", 1024);
|
||||
mca_bmi_sm_component.max_fragment_size =
|
||||
mca_bmi_sm_param_register_int("max_fragment_size", 8*1024);
|
||||
mca_bmi_sm_component.fragment_alignment =
|
||||
mca_bmi_sm_param_register_int("fragment_alignment",
|
||||
CACHE_LINE_SIZE);
|
||||
mca_bmi_sm_component.eager_limit =
|
||||
mca_bmi_sm_param_register_int("eager_limit", 1024);
|
||||
mca_bmi_sm_component.max_frag_size =
|
||||
mca_bmi_sm_param_register_int("max_frag_size", 8*1024);
|
||||
mca_bmi_sm_component.size_of_cb_queue =
|
||||
mca_bmi_sm_param_register_int("size_of_cb_queue", 128);
|
||||
mca_bmi_sm_component.cb_lazy_free_freq =
|
||||
@ -271,9 +268,9 @@ mca_bmi_base_module_t** mca_bmi_sm_component_init(
|
||||
|
||||
/* set scheduling parameters */
|
||||
for( i=0 ; i < 2 ; i++ ) {
|
||||
mca_bmi_sm[i].super.bmi_first_frag_size=mca_bmi_sm_component.first_fragment_size;
|
||||
mca_bmi_sm[i].super.bmi_min_frag_size=mca_bmi_sm_component.max_fragment_size;
|
||||
mca_bmi_sm[i].super.bmi_max_frag_size=mca_bmi_sm_component.max_fragment_size;
|
||||
mca_bmi_sm[i].super.bmi_eager_limit=mca_bmi_sm_component.eager_limit;
|
||||
mca_bmi_sm[i].super.bmi_min_frag_size=mca_bmi_sm_component.max_frag_size;
|
||||
mca_bmi_sm[i].super.bmi_max_frag_size=mca_bmi_sm_component.max_frag_size;
|
||||
mca_bmi_sm[i].super.bmi_exclusivity=100; /* always use this ptl */
|
||||
mca_bmi_sm[i].super.bmi_latency=100; /* lowest latency */
|
||||
mca_bmi_sm[i].super.bmi_bandwidth=900; /* not really used now since exclusivity is set to 100 */
|
||||
|
@ -14,13 +14,13 @@ static inline void mca_bmi_sm_frag_constructor(mca_bmi_sm_frag_t* frag)
|
||||
|
||||
static void mca_bmi_sm_frag1_constructor(mca_bmi_sm_frag_t* frag)
|
||||
{
|
||||
frag->size = mca_bmi_sm_component.first_fragment_size;
|
||||
frag->size = mca_bmi_sm_component.eager_limit;
|
||||
mca_bmi_sm_frag_constructor(frag);
|
||||
}
|
||||
|
||||
static void mca_bmi_sm_frag2_constructor(mca_bmi_sm_frag_t* frag)
|
||||
{
|
||||
frag->size = mca_bmi_sm_component.max_fragment_size;
|
||||
frag->size = mca_bmi_sm_component.max_frag_size;
|
||||
mca_bmi_sm_frag_constructor(frag);
|
||||
}
|
||||
|
||||
|
@ -108,7 +108,7 @@ typedef struct mca_pml_base_send_request_t mca_pml_base_send_request_t;
|
||||
(request)->req_base.req_datatype, \
|
||||
(request)->req_base.req_count, \
|
||||
(request)->req_base.req_addr, \
|
||||
-1, NULL ); \
|
||||
0, NULL ); \
|
||||
ompi_convertor_get_packed_size( &(request)->req_convertor, \
|
||||
(uint32_t*)&((request)->req_bytes_packed) );\
|
||||
} else { \
|
||||
|
@ -28,7 +28,6 @@ libmca_pml_ob1_la_SOURCES = \
|
||||
pml_ob1_iprobe.c \
|
||||
pml_ob1_irecv.c \
|
||||
pml_ob1_isend.c \
|
||||
pml_ob1_match.c \
|
||||
pml_ob1_proc.c \
|
||||
pml_ob1_proc.h \
|
||||
pml_ob1_progress.c \
|
||||
|
@ -28,6 +28,7 @@
|
||||
#include "pml_ob1_comm.h"
|
||||
#include "pml_ob1_proc.h"
|
||||
#include "pml_ob1_hdr.h"
|
||||
#include "pml_ob1_recvfrag.h"
|
||||
|
||||
|
||||
mca_pml_ob1_t mca_pml_ob1 = {
|
||||
@ -135,7 +136,7 @@ int mca_pml_ob1_add_bmis()
|
||||
}
|
||||
|
||||
/* setup callback for receive */
|
||||
rc = bmi->bmi_register(bmi, MCA_BMI_TAG_PML, mca_pml_ob1_recv_callback, NULL);
|
||||
rc = bmi->bmi_register(bmi, MCA_BMI_TAG_PML, mca_pml_ob1_recv_frag_callback, NULL);
|
||||
if(OMPI_SUCCESS != rc)
|
||||
return rc;
|
||||
|
||||
@ -250,14 +251,19 @@ int mca_pml_ob1_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
}
|
||||
}
|
||||
|
||||
/* cache the ob1 on the proc */
|
||||
/* cache the endpoint on the proc */
|
||||
endpoint = mca_pml_ob1_ep_array_insert(&proc_pml->bmi_next);
|
||||
endpoint->bmi = bmi;
|
||||
endpoint->bmi_eager_limit = bmi->bmi_eager_limit;
|
||||
endpoint->bmi_min_frag_size = bmi->bmi_min_frag_size;
|
||||
endpoint->bmi_max_frag_size = bmi->bmi_max_frag_size;
|
||||
endpoint->bmi_cache = NULL;
|
||||
endpoint->bmi_endpoint = bmi_endpoints[p];
|
||||
endpoint->bmi_weight = 0;
|
||||
endpoint->bmi_alloc = bmi->bmi_alloc;
|
||||
endpoint->bmi_free = bmi->bmi_free;
|
||||
endpoint->bmi_prepare_src = bmi->bmi_prepare_src;
|
||||
endpoint->bmi_prepare_dst = bmi->bmi_prepare_dst;
|
||||
endpoint->bmi_send = bmi->bmi_send;
|
||||
endpoint->bmi_put = bmi->bmi_put;
|
||||
endpoint->bmi_get = bmi->bmi_get;
|
||||
|
@ -199,13 +199,6 @@ extern int mca_pml_ob1_recv(
|
||||
ompi_status_public_t* status
|
||||
);
|
||||
|
||||
extern void mca_pml_ob1_recv_callback(
|
||||
mca_bmi_base_module_t* bmi,
|
||||
mca_bmi_base_tag_t tag,
|
||||
mca_bmi_base_descriptor_t* descriptor,
|
||||
void* cbdata
|
||||
);
|
||||
|
||||
extern int mca_pml_ob1_progress(void);
|
||||
|
||||
extern int mca_pml_ob1_start(
|
||||
|
@ -95,6 +95,10 @@ int mca_pml_ob1_component_open(void)
|
||||
mca_pml_ob1_param_register_int("free_list_inc", 256);
|
||||
mca_pml_ob1.priority =
|
||||
mca_pml_ob1_param_register_int("priority", 0);
|
||||
mca_pml_ob1.send_pipeline_depth =
|
||||
mca_pml_ob1_param_register_int("send_pipeline_depth", 2);
|
||||
mca_pml_ob1.recv_pipeline_depth =
|
||||
mca_pml_ob1_param_register_int("recv_pipeline_depth", 2);
|
||||
|
||||
return mca_bmi_base_open();
|
||||
}
|
||||
|
@ -35,8 +35,8 @@ struct mca_pml_ob1_endpoint_t {
|
||||
int bmi_weight; /**< BMI weight for scheduling */
|
||||
int bmi_flags; /**< support for put/get? */
|
||||
size_t bmi_eager_limit; /**< BMI eager limit */
|
||||
size_t bmi_min_seg_size; /**< BMI min segment size */
|
||||
size_t bmi_max_seg_size; /**< BMI max segment size */
|
||||
size_t bmi_min_frag_size; /**< BMI min fragment size */
|
||||
size_t bmi_max_frag_size; /**< BMI max fragment size */
|
||||
struct mca_bmi_base_module_t *bmi; /**< BMI module */
|
||||
struct mca_bmi_base_endpoint_t* bmi_endpoint; /**< BMI addressing info */
|
||||
struct mca_bmi_base_descriptor_t* bmi_cache;
|
||||
|
@ -1,592 +0,0 @@
|
||||
/** @file */
|
||||
|
||||
/*
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "ompi_config.h"
|
||||
#include <stdio.h>
|
||||
|
||||
#include "class/ompi_list.h"
|
||||
#include "threads/mutex.h"
|
||||
#include "include/constants.h"
|
||||
#include "communicator/communicator.h"
|
||||
#include "pml_ob1.h"
|
||||
#include "pml_ob1_comm.h"
|
||||
#include "pml_ob1_recvfrag.h"
|
||||
#include "pml_ob1_recvreq.h"
|
||||
#include "pml_ob1_hdr.h"
|
||||
#include "pml_ob1_match.h"
|
||||
|
||||
|
||||
/**
|
||||
* Try and match the incoming message fragment to the list of
|
||||
* "wild" receives
|
||||
*
|
||||
* @param hdr Matching data from recived fragment (IN)
|
||||
*
|
||||
* @param pml_comm Pointer to the communicator structure used for
|
||||
* matching purposes. (IN)
|
||||
*
|
||||
* @return Matched receive
|
||||
*
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
|
||||
#define MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr,comm,proc,return_match) \
|
||||
do { \
|
||||
/* local parameters */ \
|
||||
ompi_list_t* wild_receives = &comm->wild_receives; \
|
||||
mca_pml_ob1_recv_request_t *wild_recv; \
|
||||
int frag_tag,recv_tag; \
|
||||
\
|
||||
/* initialization */ \
|
||||
frag_tag=hdr->hdr_tag; \
|
||||
\
|
||||
/* \
|
||||
* Loop over the wild irecvs - no need to lock, the upper level \
|
||||
* locking is protecting from having other threads trying to \
|
||||
* change this list. \
|
||||
*/ \
|
||||
for(wild_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_first(wild_receives); \
|
||||
wild_recv != (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_end(wild_receives); \
|
||||
wild_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
((ompi_list_item_t *)wild_recv)->ompi_list_next) { \
|
||||
\
|
||||
recv_tag = wild_recv->req_recv.req_base.req_tag; \
|
||||
if ( \
|
||||
/* exact tag match */ \
|
||||
(frag_tag == recv_tag) || \
|
||||
/* wild tag match - negative tags (except for \
|
||||
* OMPI_ANY_TAG) are reserved for internal use, and will \
|
||||
* not be matched with OMPI_ANY_TAG */ \
|
||||
( (recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) \
|
||||
\
|
||||
{ \
|
||||
/* \
|
||||
* Mark that this is the matching irecv, and go to process it. \
|
||||
*/ \
|
||||
return_match = wild_recv; \
|
||||
\
|
||||
/* remove this irecv from the postd wild ireceive list */ \
|
||||
ompi_list_remove_item(wild_receives, \
|
||||
(ompi_list_item_t *)wild_recv); \
|
||||
\
|
||||
/* found match - no need to continue */ \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
|
||||
/**
|
||||
* Try and match the incoming message fragment to the list of
|
||||
* "specific" receives
|
||||
*
|
||||
* @param hdr Matching data from recived fragment (IN)
|
||||
*
|
||||
* @param comm Pointer to the communicator structure used for
|
||||
* matching purposes. (IN)
|
||||
*
|
||||
* @return Matched receive
|
||||
*
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
#define MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr,comm,proc,return_match) \
|
||||
do { \
|
||||
/* local variables */ \
|
||||
ompi_list_t* specific_receives = &proc->specific_receives; \
|
||||
mca_pml_ob1_recv_request_t *specific_recv; \
|
||||
int recv_tag,frag_tag; \
|
||||
\
|
||||
/* initialization */ \
|
||||
frag_tag=hdr->hdr_tag; \
|
||||
\
|
||||
/* \
|
||||
* Loop over the specific irecvs. \
|
||||
*/ \
|
||||
for(specific_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_first(specific_receives); \
|
||||
specific_recv != (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_end(specific_receives); \
|
||||
specific_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
((ompi_list_item_t *)specific_recv)->ompi_list_next) { \
|
||||
/* \
|
||||
* Check for a match \
|
||||
*/ \
|
||||
recv_tag = specific_recv->req_recv.req_base.req_tag; \
|
||||
if ( (frag_tag == recv_tag) || \
|
||||
( (recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) { \
|
||||
\
|
||||
/* \
|
||||
* Match made \
|
||||
*/ \
|
||||
return_match = specific_recv; \
|
||||
\
|
||||
/* remove descriptor from posted specific ireceive list */ \
|
||||
ompi_list_remove_item(specific_receives, \
|
||||
(ompi_list_item_t *)specific_recv); \
|
||||
\
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
/**
|
||||
* Try and match the incoming message fragment to the list of
|
||||
* "wild" receives and "specific" receives. Used when both types
|
||||
* of receives have been posted, i.e. when we need to coordinate
|
||||
* between multiple lists to make sure ordered delivery occurs.
|
||||
*
|
||||
* @param hdr Matching data from recived fragment (IN)
|
||||
*
|
||||
* @param comm Pointer to the communicator structure used for
|
||||
* matching purposes. (IN)
|
||||
*
|
||||
* @return Matched receive
|
||||
*
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
|
||||
#define MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH( \
|
||||
hdr,comm,proc,return_match) \
|
||||
do { \
|
||||
/* local variables */ \
|
||||
mca_pml_ob1_recv_request_t *specific_recv, *wild_recv; \
|
||||
mca_ptl_sequence_t wild_recv_seq, specific_recv_seq; \
|
||||
int frag_tag, wild_recv_tag, specific_recv_tag; \
|
||||
\
|
||||
/* initialization */ \
|
||||
frag_tag=hdr->hdr_tag; \
|
||||
\
|
||||
/* \
|
||||
* We know that when this is called, both specific and wild irecvs \
|
||||
* have been posted. \
|
||||
*/ \
|
||||
specific_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_first(&(proc)->specific_receives); \
|
||||
wild_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_first(&comm->wild_receives); \
|
||||
\
|
||||
specific_recv_seq = specific_recv->req_recv.req_base.req_sequence; \
|
||||
wild_recv_seq = wild_recv->req_recv.req_base.req_sequence; \
|
||||
\
|
||||
while (true) { \
|
||||
if (wild_recv_seq < specific_recv_seq) { \
|
||||
/* \
|
||||
* wild recv is earlier than the specific one. \
|
||||
*/ \
|
||||
/* \
|
||||
* try and match \
|
||||
*/ \
|
||||
wild_recv_tag = wild_recv->req_recv.req_base.req_tag; \
|
||||
if ( (frag_tag == wild_recv_tag) || \
|
||||
( (wild_recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) { \
|
||||
/* \
|
||||
* Match made \
|
||||
*/ \
|
||||
return_match=wild_recv; \
|
||||
\
|
||||
/* remove this recv from the wild receive queue */ \
|
||||
ompi_list_remove_item(&comm->wild_receives, \
|
||||
(ompi_list_item_t *)wild_recv); \
|
||||
break; \
|
||||
} \
|
||||
\
|
||||
/* \
|
||||
* No match, go to the next. \
|
||||
*/ \
|
||||
wild_recv=(mca_pml_ob1_recv_request_t *) \
|
||||
((ompi_list_item_t *)wild_recv)->ompi_list_next; \
|
||||
\
|
||||
/* \
|
||||
* If that was the last wild one, just look at the \
|
||||
* rest of the specific ones. \
|
||||
*/ \
|
||||
if (wild_recv == (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_end(&comm->wild_receives) ) \
|
||||
{ \
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, return_match); \
|
||||
break; \
|
||||
} \
|
||||
\
|
||||
/* \
|
||||
* Get the sequence number for this recv, and go \
|
||||
* back to the top of the loop. \
|
||||
*/ \
|
||||
wild_recv_seq = wild_recv->req_recv.req_base.req_sequence; \
|
||||
\
|
||||
} else { \
|
||||
/* \
|
||||
* specific recv is earlier than the wild one. \
|
||||
*/ \
|
||||
specific_recv_tag=specific_recv->req_recv.req_base.req_tag; \
|
||||
if ( (frag_tag == specific_recv_tag) || \
|
||||
( (specific_recv_tag == OMPI_ANY_TAG) && (0<=frag_tag)) ) \
|
||||
{ \
|
||||
/* \
|
||||
* Match made \
|
||||
*/ \
|
||||
return_match = specific_recv; \
|
||||
/* remove descriptor from specific receive list */ \
|
||||
ompi_list_remove_item(&(proc)->specific_receives, \
|
||||
(ompi_list_item_t *)specific_recv); \
|
||||
break; \
|
||||
} \
|
||||
\
|
||||
/* \
|
||||
* No match, go on to the next specific irecv. \
|
||||
*/ \
|
||||
specific_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
((ompi_list_item_t *)specific_recv)->ompi_list_next; \
|
||||
\
|
||||
/* \
|
||||
* If that was the last specific irecv, process the \
|
||||
* rest of the wild ones. \
|
||||
*/ \
|
||||
if (specific_recv == (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_end(&(proc)->specific_receives)) \
|
||||
{ \
|
||||
MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, return_match); \
|
||||
break; \
|
||||
} \
|
||||
/* \
|
||||
* Get the sequence number for this recv, and go \
|
||||
* back to the top of the loop. \
|
||||
*/ \
|
||||
specific_recv_seq = specific_recv->req_recv.req_base.req_sequence; \
|
||||
} \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
|
||||
/*
|
||||
* Specialized matching routines for internal use only.
|
||||
*/
|
||||
|
||||
static bool mca_pml_ob1_check_cantmatch_for_match(
|
||||
ompi_list_t *additional_matches,
|
||||
mca_pml_ob1_comm_t* comm,
|
||||
mca_pml_ob1_comm_proc_t *proc);
|
||||
|
||||
|
||||
/**
|
||||
* RCS/CTS receive side matching
|
||||
*
|
||||
* @param hdr list of parameters needed for matching
|
||||
* This list is also embeded in frag_desc,
|
||||
* but this allows to save a memory copy when
|
||||
* a match is made in this routine. (IN)
|
||||
* @param frag_desc pointer to receive fragment which we want
|
||||
* to match (IN/OUT). If a match is not made,
|
||||
* hdr is copied to frag_desc.
|
||||
* @param match_made parameter indicating if we matched frag_desc/
|
||||
* hdr (OUT)
|
||||
* @param additional_matches if a match is made with frag_desc, we
|
||||
* may be able to match fragments that previously
|
||||
* have arrived out-of-order. If this is the
|
||||
* case, the associated fragment descriptors are
|
||||
* put on this list for further processing. (OUT)
|
||||
*
|
||||
* @return OMPI error code
|
||||
*
|
||||
* This routine is used to try and match a newly arrived message fragment
|
||||
* to pre-posted receives. The following assumptions are made
|
||||
* - fragments are received out of order
|
||||
* - for long messages, e.g. more than one fragment, a RTS/CTS algorithm
|
||||
* is used.
|
||||
* - 2nd and greater fragments include a receive descriptor pointer
|
||||
* - fragments may be dropped
|
||||
* - fragments may be corrupt
|
||||
* - this routine may be called simultaneously by more than one thread
|
||||
*/
|
||||
int mca_pml_ob1_match(
|
||||
mca_bmi_base_module_t* bmi,
|
||||
mca_pml_ob1_match_hdr_t *hdr,
|
||||
mca_bmi_base_segment_t* segments,
|
||||
size_t num_segments)
|
||||
{
|
||||
/* local variables */
|
||||
uint16_t next_msg_seq_expected, frag_msg_seq;
|
||||
ompi_communicator_t *comm_ptr;
|
||||
mca_pml_ob1_recv_request_t *matched_receive = NULL;
|
||||
mca_pml_ob1_comm_t *comm;
|
||||
mca_pml_ob1_comm_proc_t *proc;
|
||||
bool additional_match=false;
|
||||
ompi_list_t additional_matches;
|
||||
int rc;
|
||||
|
||||
/* communicator pointer */
|
||||
comm_ptr=ompi_comm_lookup(hdr->hdr_contextid);
|
||||
comm=(mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm;
|
||||
|
||||
/* source sequence number */
|
||||
frag_msg_seq = hdr->hdr_msg_seq;
|
||||
proc = comm->procs + hdr->hdr_src;
|
||||
|
||||
/* get next expected message sequence number - if threaded
|
||||
* run, lock to make sure that if another thread is processing
|
||||
* a frag from the same message a match is made only once.
|
||||
* Also, this prevents other posted receives (for a pair of
|
||||
* end points) from being processed, and potentially "loosing"
|
||||
* the fragment.
|
||||
*/
|
||||
OMPI_THREAD_LOCK(&comm->matching_lock);
|
||||
|
||||
/* get sequence number of next message that can be processed */
|
||||
next_msg_seq_expected = (uint16_t)proc->expected_sequence;
|
||||
if (frag_msg_seq == next_msg_seq_expected) {
|
||||
|
||||
/*
|
||||
* This is the sequence number we were expecting,
|
||||
* so we can try matching it to already posted
|
||||
* receives.
|
||||
*/
|
||||
|
||||
/* We're now expecting the next sequence number. */
|
||||
(proc->expected_sequence)++;
|
||||
|
||||
/*
|
||||
* figure out what sort of matching logic to use, if need to
|
||||
* look only at "specific" receives, or "wild" receives,
|
||||
* or if we need to traverse both sets at the same time.
|
||||
*/
|
||||
if (ompi_list_get_size(&proc->specific_receives) == 0 ){
|
||||
/*
|
||||
* There are only wild irecvs, so specialize the algorithm.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, matched_receive);
|
||||
|
||||
} else if (ompi_list_get_size(&comm->wild_receives) == 0 ) {
|
||||
/*
|
||||
* There are only specific irecvs, so specialize the algorithm.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, matched_receive);
|
||||
} else {
|
||||
/*
|
||||
* There are some of each.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, matched_receive);
|
||||
}
|
||||
|
||||
/* if match found, process data */
|
||||
if (matched_receive) {
|
||||
|
||||
/* set length of incoming message */
|
||||
matched_receive->req_recv.req_bytes_packed = hdr->hdr_msg_length;
|
||||
|
||||
/*
|
||||
* update delivered sequence number information, if needed.
|
||||
*/
|
||||
if( (matched_receive->req_recv.req_base.req_type == MCA_PML_REQUEST_PROBE) ) {
|
||||
/* Match a probe, rollback the next expected sequence number */
|
||||
(proc->expected_sequence)--;
|
||||
}
|
||||
} else {
|
||||
|
||||
/* if no match found, place on unexpected queue */
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc);
|
||||
if(OMPI_SUCCESS != rc) {
|
||||
OMPI_THREAD_UNLOCK(&pml_comm->matching_lock);
|
||||
return rc;
|
||||
}
|
||||
MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segments,num_segments);
|
||||
ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag );
|
||||
}
|
||||
|
||||
/*
|
||||
* Now that new message has arrived, check to see if
|
||||
* any fragments on the c_c_frags_cant_match list
|
||||
* may now be used to form new matchs
|
||||
*/
|
||||
if (0 < ompi_list_get_size(&proc->frags_cant_match)) {
|
||||
additional_match = mca_pml_ob1_check_cantmatch_for_match(&additional_matches,comm,proc);
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
/*
|
||||
* This message comes after the next expected, so it
|
||||
* is ahead of sequence. Save it for later.
|
||||
*/
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc);
|
||||
if(OMPI_SUCCESS != rc) {
|
||||
OMPI_THREAD_UNLOCK(&pml_comm->matching_lock);
|
||||
return rc;
|
||||
}
|
||||
MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segments,num_segments);
|
||||
ompi_list_append(&proc->frags_cant_match, (ompi_list_item_t *)frag);
|
||||
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&pml_comm->matching_lock);
|
||||
|
||||
|
||||
/* release matching lock before processing fragment */
|
||||
if(matched_receive != NULL) {
|
||||
mca_pml_ob1_recv_request_progress(matched_receive,bmi,segments,num_segments);
|
||||
} else {
|
||||
ompi_output(0, "match not found\n");
|
||||
}
|
||||
if(additional_match) {
|
||||
ompi_list_item_t* item;
|
||||
while(NULL != (item = ompi_list_remove_first(&additional_matches))) {
|
||||
#if 0
|
||||
mca_pml_ob1_recv_frag_t* frag = (mca_pml_ob1_recv_frag_t*)item;
|
||||
mca_pml_ob1_recv_request_progress(frag->request,frag->bmi,frag->segments,frag->num_segments);
|
||||
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Scan the list of frags that came in ahead of time to see if any
|
||||
* can be processed at this time. If they can, try and match the
|
||||
* frags.
|
||||
*
|
||||
* @param additional_matches List to hold new matches with fragments
|
||||
* from the c_frags_cant_match list. (IN/OUT)
|
||||
*
|
||||
* @param pml_comm Pointer to the communicator structure used for
|
||||
* matching purposes. (IN)
|
||||
*
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
|
||||
static bool mca_pml_ob1_check_cantmatch_for_match(
|
||||
ompi_list_t *additional_matches,
|
||||
mca_pml_ob1_comm_t* comm,
|
||||
mca_pml_ob1_comm_proc_t *proc)
|
||||
{
|
||||
/* local parameters */
|
||||
int match_found;
|
||||
uint16_t next_msg_seq_expected, frag_seq;
|
||||
mca_pml_ob1_recv_frag_t *frag_desc;
|
||||
mca_pml_ob1_recv_request_t *matched_receive = NULL;
|
||||
bool match_made = false;
|
||||
|
||||
/*
|
||||
* Loop over all the out of sequence messages. No ordering is assumed
|
||||
* in the c_frags_cant_match list.
|
||||
*/
|
||||
|
||||
match_found = 1;
|
||||
while ((0 < ompi_list_get_size(&proc->frags_cant_match)) && match_found) {
|
||||
|
||||
/* initialize match flag for this search */
|
||||
match_found = 0;
|
||||
|
||||
/* get sequence number of next message that can be processed */
|
||||
next_msg_seq_expected = proc->expected_sequence;
|
||||
|
||||
/* search the list for a fragment from the send with sequence
|
||||
* number next_msg_seq_expected
|
||||
*/
|
||||
for(frag_desc = (mca_pml_ob1_recv_frag_t *)
|
||||
ompi_list_get_first(&proc->frags_cant_match);
|
||||
frag_desc != (mca_pml_ob1_recv_frag_t *)
|
||||
ompi_list_get_end(&proc->frags_cant_match);
|
||||
frag_desc = (mca_pml_ob1_recv_frag_t *)
|
||||
ompi_list_get_next(frag_desc))
|
||||
{
|
||||
/*
|
||||
* If the message has the next expected seq from that proc...
|
||||
*/
|
||||
frag_seq=frag_desc->hdr.hdr_match.hdr_msg_seq;
|
||||
if (frag_seq == next_msg_seq_expected) {
|
||||
mca_pml_ob1_match_hdr_t* hdr = &frag_desc->hdr.hdr_match;
|
||||
|
||||
/* We're now expecting the next sequence number. */
|
||||
(proc->expected_sequence)++;
|
||||
|
||||
/* signal that match was made */
|
||||
match_found = 1;
|
||||
|
||||
/*
|
||||
* remove frag_desc from list
|
||||
*/
|
||||
ompi_list_remove_item(&proc->frags_cant_match,
|
||||
(ompi_list_item_t *)frag_desc);
|
||||
|
||||
/*
|
||||
* figure out what sort of matching logic to use, if need to
|
||||
* look only at "specific" receives, or "wild" receives,
|
||||
* or if we need to traverse both sets at the same time.
|
||||
*/
|
||||
proc = comm->procs + hdr->hdr_src;
|
||||
if (ompi_list_get_size(&proc->specific_receives) == 0 ) {
|
||||
/*
|
||||
* There are only wild irecvs, so specialize the algorithm.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, matched_receive);
|
||||
} else if (ompi_list_get_size(&comm->wild_receives) == 0 ) {
|
||||
/*
|
||||
* There are only specific irecvs, so specialize the algorithm.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, matched_receive);
|
||||
} else {
|
||||
/*
|
||||
* There are some of each.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, matched_receive);
|
||||
|
||||
}
|
||||
|
||||
/* if match found, process data */
|
||||
if (matched_receive) {
|
||||
|
||||
/* associate the receive descriptor with the fragment
|
||||
* descriptor */
|
||||
frag_desc->request=matched_receive;
|
||||
|
||||
/* add this fragment descriptor to the list of
|
||||
* descriptors to be processed later
|
||||
*/
|
||||
if(match_made == false) {
|
||||
match_made = true;
|
||||
OBJ_CONSTRUCT(additional_matches, ompi_list_t);
|
||||
}
|
||||
ompi_list_append(additional_matches, (ompi_list_item_t *)frag_desc);
|
||||
|
||||
} else {
|
||||
|
||||
/* if no match found, place on unexpected queue */
|
||||
ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag_desc);
|
||||
|
||||
}
|
||||
|
||||
/* c_frags_cant_match is not an ordered list, so exit loop
|
||||
* and re-start search for next sequence number */
|
||||
break;
|
||||
|
||||
} /* end if (frag_seq == next_msg_seq_expected) */
|
||||
|
||||
} /* end for (frag_desc) loop */
|
||||
|
||||
} /* end while loop */
|
||||
|
||||
return match_made;
|
||||
}
|
||||
|
@ -1,49 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2005 The Trustees of Indiana University.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
|
||||
* All rights reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
/**
|
||||
* @file
|
||||
*/
|
||||
#ifndef MCA_PML_OB1_MATCH_H
|
||||
#define MCA_PML_OB1_MATCH_H
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
struct mca_pml_ob1_recv_frag_t;
|
||||
|
||||
|
||||
/**
|
||||
* RCS/CTS receive side matching
|
||||
* Match incoming fragments against posted receives. Out of order
|
||||
* delivery.
|
||||
*
|
||||
* @param frag_header (IN) Header of received fragment.
|
||||
* @param frag_desc (IN) Received fragment descriptor.
|
||||
* @param match_made (OUT) Flag indicating wether a match was made.
|
||||
* @param additional_matches (OUT) List of additional matches
|
||||
* @return OMPI_SUCCESS or error status on failure.
|
||||
*/
|
||||
OMPI_DECLSPEC int mca_pml_ob1_match(
|
||||
mca_bmi_base_module_t* bmi,
|
||||
mca_pml_ob1_match_hdr_t *hdr,
|
||||
mca_bmi_base_segment_t* segments,
|
||||
size_t num_segments);
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
#endif /* MCA_PML_OB1_MATCH_H */
|
||||
|
@ -20,12 +20,18 @@
|
||||
|
||||
#include "ompi_config.h"
|
||||
|
||||
#include "class/ompi_list.h"
|
||||
#include "threads/mutex.h"
|
||||
#include "include/constants.h"
|
||||
#include "communicator/communicator.h"
|
||||
#include "mca/pml/pml.h"
|
||||
#include "pml_ob1.h"
|
||||
#include "pml_ob1_comm.h"
|
||||
#include "pml_ob1_recvfrag.h"
|
||||
#include "pml_ob1_recvreq.h"
|
||||
#include "pml_ob1_sendreq.h"
|
||||
#include "pml_ob1_proc.h"
|
||||
#include "pml_ob1_match.h"
|
||||
#include "pml_ob1_hdr.h"
|
||||
|
||||
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
@ -37,7 +43,11 @@ OBJ_CLASS_INSTANCE(
|
||||
|
||||
|
||||
|
||||
void mca_pml_ob1_recv_callback(
|
||||
/**
|
||||
* Callback from BMI on receive.
|
||||
*/
|
||||
|
||||
void mca_pml_ob1_recv_frag_callback(
|
||||
mca_bmi_base_module_t* bmi,
|
||||
mca_bmi_base_tag_t tag,
|
||||
mca_bmi_base_descriptor_t* des,
|
||||
@ -48,20 +58,596 @@ void mca_pml_ob1_recv_callback(
|
||||
if(segments->seg_len < sizeof(mca_pml_ob1_common_hdr_t)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
switch(hdr->hdr_common.hdr_type) {
|
||||
case MCA_PML_OB1_HDR_TYPE_MATCH:
|
||||
mca_pml_ob1_match(bmi,&hdr->hdr_match,segments,des->des_src_cnt);
|
||||
break;
|
||||
case MCA_PML_OB1_HDR_TYPE_RNDV:
|
||||
mca_pml_ob1_match(bmi,&hdr->hdr_match,segments,des->des_src_cnt);
|
||||
{
|
||||
mca_pml_ob1_recv_frag_match(bmi,&hdr->hdr_match,segments,des->des_src_cnt);
|
||||
break;
|
||||
}
|
||||
case MCA_PML_OB1_HDR_TYPE_ACK:
|
||||
mca_pml_ob1_send_request_acked(bmi,&hdr->hdr_ack);
|
||||
{
|
||||
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)
|
||||
hdr->hdr_ack.hdr_src_req.pval;
|
||||
sendreq->req_state = MCA_PML_OB1_SR_SEND;
|
||||
sendreq->req_recv = hdr->hdr_ack.hdr_dst_req;
|
||||
mca_pml_ob1_send_request_schedule(sendreq);
|
||||
break;
|
||||
}
|
||||
case MCA_PML_OB1_HDR_TYPE_FRAG:
|
||||
{
|
||||
mca_pml_ob1_recv_request_t* recvreq = (mca_pml_ob1_recv_request_t*)
|
||||
hdr->hdr_frag.hdr_dst_req.pval;
|
||||
mca_pml_ob1_recv_request_progress(recvreq,bmi,segments,des->des_src_cnt);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Try and match the incoming message fragment to the list of
|
||||
* "wild" receives
|
||||
*
|
||||
* @param hdr Matching data from recived fragment (IN)
|
||||
*
|
||||
* @param pml_comm Pointer to the communicator structure used for
|
||||
* matching purposes. (IN)
|
||||
*
|
||||
* @return Matched receive
|
||||
*
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
|
||||
#define MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr,comm,proc,return_match) \
|
||||
do { \
|
||||
/* local parameters */ \
|
||||
ompi_list_t* wild_receives = &comm->wild_receives; \
|
||||
mca_pml_ob1_recv_request_t *wild_recv; \
|
||||
int frag_tag,recv_tag; \
|
||||
\
|
||||
/* initialization */ \
|
||||
frag_tag=hdr->hdr_tag; \
|
||||
\
|
||||
/* \
|
||||
* Loop over the wild irecvs - no need to lock, the upper level \
|
||||
* locking is protecting from having other threads trying to \
|
||||
* change this list. \
|
||||
*/ \
|
||||
for(wild_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_first(wild_receives); \
|
||||
wild_recv != (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_end(wild_receives); \
|
||||
wild_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
((ompi_list_item_t *)wild_recv)->ompi_list_next) { \
|
||||
\
|
||||
recv_tag = wild_recv->req_recv.req_base.req_tag; \
|
||||
if ( \
|
||||
/* exact tag match */ \
|
||||
(frag_tag == recv_tag) || \
|
||||
/* wild tag match - negative tags (except for \
|
||||
* OMPI_ANY_TAG) are reserved for internal use, and will \
|
||||
* not be matched with OMPI_ANY_TAG */ \
|
||||
( (recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) \
|
||||
\
|
||||
{ \
|
||||
/* \
|
||||
* Mark that this is the matching irecv, and go to process it. \
|
||||
*/ \
|
||||
return_match = wild_recv; \
|
||||
\
|
||||
/* remove this irecv from the postd wild ireceive list */ \
|
||||
ompi_list_remove_item(wild_receives, \
|
||||
(ompi_list_item_t *)wild_recv); \
|
||||
\
|
||||
/* found match - no need to continue */ \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
|
||||
/**
|
||||
* Try and match the incoming message fragment to the list of
|
||||
* "specific" receives
|
||||
*
|
||||
* @param hdr Matching data from recived fragment (IN)
|
||||
*
|
||||
* @param comm Pointer to the communicator structure used for
|
||||
* matching purposes. (IN)
|
||||
*
|
||||
* @return Matched receive
|
||||
*
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
#define MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr,comm,proc,return_match) \
|
||||
do { \
|
||||
/* local variables */ \
|
||||
ompi_list_t* specific_receives = &proc->specific_receives; \
|
||||
mca_pml_ob1_recv_request_t *specific_recv; \
|
||||
int recv_tag,frag_tag; \
|
||||
\
|
||||
/* initialization */ \
|
||||
frag_tag=hdr->hdr_tag; \
|
||||
\
|
||||
/* \
|
||||
* Loop over the specific irecvs. \
|
||||
*/ \
|
||||
for(specific_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_first(specific_receives); \
|
||||
specific_recv != (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_end(specific_receives); \
|
||||
specific_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
((ompi_list_item_t *)specific_recv)->ompi_list_next) { \
|
||||
/* \
|
||||
* Check for a match \
|
||||
*/ \
|
||||
recv_tag = specific_recv->req_recv.req_base.req_tag; \
|
||||
if ( (frag_tag == recv_tag) || \
|
||||
( (recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) { \
|
||||
\
|
||||
/* \
|
||||
* Match made \
|
||||
*/ \
|
||||
return_match = specific_recv; \
|
||||
\
|
||||
/* remove descriptor from posted specific ireceive list */ \
|
||||
ompi_list_remove_item(specific_receives, \
|
||||
(ompi_list_item_t *)specific_recv); \
|
||||
\
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
/**
|
||||
* Try and match the incoming message fragment to the list of
|
||||
* "wild" receives and "specific" receives. Used when both types
|
||||
* of receives have been posted, i.e. when we need to coordinate
|
||||
* between multiple lists to make sure ordered delivery occurs.
|
||||
*
|
||||
* @param hdr Matching data from recived fragment (IN)
|
||||
*
|
||||
* @param comm Pointer to the communicator structure used for
|
||||
* matching purposes. (IN)
|
||||
*
|
||||
* @return Matched receive
|
||||
*
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
|
||||
#define MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH( \
|
||||
hdr,comm,proc,return_match) \
|
||||
do { \
|
||||
/* local variables */ \
|
||||
mca_pml_ob1_recv_request_t *specific_recv, *wild_recv; \
|
||||
mca_ptl_sequence_t wild_recv_seq, specific_recv_seq; \
|
||||
int frag_tag, wild_recv_tag, specific_recv_tag; \
|
||||
\
|
||||
/* initialization */ \
|
||||
frag_tag=hdr->hdr_tag; \
|
||||
\
|
||||
/* \
|
||||
* We know that when this is called, both specific and wild irecvs \
|
||||
* have been posted. \
|
||||
*/ \
|
||||
specific_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_first(&(proc)->specific_receives); \
|
||||
wild_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_first(&comm->wild_receives); \
|
||||
\
|
||||
specific_recv_seq = specific_recv->req_recv.req_base.req_sequence; \
|
||||
wild_recv_seq = wild_recv->req_recv.req_base.req_sequence; \
|
||||
\
|
||||
while (true) { \
|
||||
if (wild_recv_seq < specific_recv_seq) { \
|
||||
/* \
|
||||
* wild recv is earlier than the specific one. \
|
||||
*/ \
|
||||
/* \
|
||||
* try and match \
|
||||
*/ \
|
||||
wild_recv_tag = wild_recv->req_recv.req_base.req_tag; \
|
||||
if ( (frag_tag == wild_recv_tag) || \
|
||||
( (wild_recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) { \
|
||||
/* \
|
||||
* Match made \
|
||||
*/ \
|
||||
return_match=wild_recv; \
|
||||
\
|
||||
/* remove this recv from the wild receive queue */ \
|
||||
ompi_list_remove_item(&comm->wild_receives, \
|
||||
(ompi_list_item_t *)wild_recv); \
|
||||
break; \
|
||||
} \
|
||||
\
|
||||
/* \
|
||||
* No match, go to the next. \
|
||||
*/ \
|
||||
wild_recv=(mca_pml_ob1_recv_request_t *) \
|
||||
((ompi_list_item_t *)wild_recv)->ompi_list_next; \
|
||||
\
|
||||
/* \
|
||||
* If that was the last wild one, just look at the \
|
||||
* rest of the specific ones. \
|
||||
*/ \
|
||||
if (wild_recv == (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_end(&comm->wild_receives) ) \
|
||||
{ \
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, return_match); \
|
||||
break; \
|
||||
} \
|
||||
\
|
||||
/* \
|
||||
* Get the sequence number for this recv, and go \
|
||||
* back to the top of the loop. \
|
||||
*/ \
|
||||
wild_recv_seq = wild_recv->req_recv.req_base.req_sequence; \
|
||||
\
|
||||
} else { \
|
||||
/* \
|
||||
* specific recv is earlier than the wild one. \
|
||||
*/ \
|
||||
specific_recv_tag=specific_recv->req_recv.req_base.req_tag; \
|
||||
if ( (frag_tag == specific_recv_tag) || \
|
||||
( (specific_recv_tag == OMPI_ANY_TAG) && (0<=frag_tag)) ) \
|
||||
{ \
|
||||
/* \
|
||||
* Match made \
|
||||
*/ \
|
||||
return_match = specific_recv; \
|
||||
/* remove descriptor from specific receive list */ \
|
||||
ompi_list_remove_item(&(proc)->specific_receives, \
|
||||
(ompi_list_item_t *)specific_recv); \
|
||||
break; \
|
||||
} \
|
||||
\
|
||||
/* \
|
||||
* No match, go on to the next specific irecv. \
|
||||
*/ \
|
||||
specific_recv = (mca_pml_ob1_recv_request_t *) \
|
||||
((ompi_list_item_t *)specific_recv)->ompi_list_next; \
|
||||
\
|
||||
/* \
|
||||
* If that was the last specific irecv, process the \
|
||||
* rest of the wild ones. \
|
||||
*/ \
|
||||
if (specific_recv == (mca_pml_ob1_recv_request_t *) \
|
||||
ompi_list_get_end(&(proc)->specific_receives)) \
|
||||
{ \
|
||||
MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, return_match); \
|
||||
break; \
|
||||
} \
|
||||
/* \
|
||||
* Get the sequence number for this recv, and go \
|
||||
* back to the top of the loop. \
|
||||
*/ \
|
||||
specific_recv_seq = specific_recv->req_recv.req_base.req_sequence; \
|
||||
} \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
|
||||
/*
|
||||
* Specialized matching routines for internal use only.
|
||||
*/
|
||||
|
||||
static bool mca_pml_ob1_check_cantmatch_for_match(
|
||||
ompi_list_t *additional_matches,
|
||||
mca_pml_ob1_comm_t* comm,
|
||||
mca_pml_ob1_comm_proc_t *proc);
|
||||
|
||||
|
||||
/**
|
||||
* RCS/CTS receive side matching
|
||||
*
|
||||
* @param hdr list of parameters needed for matching
|
||||
* This list is also embeded in frag_desc,
|
||||
* but this allows to save a memory copy when
|
||||
* a match is made in this routine. (IN)
|
||||
* @param frag_desc pointer to receive fragment which we want
|
||||
* to match (IN/OUT). If a match is not made,
|
||||
* hdr is copied to frag_desc.
|
||||
* @param match_made parameter indicating if we matched frag_desc/
|
||||
* hdr (OUT)
|
||||
* @param additional_matches if a match is made with frag_desc, we
|
||||
* may be able to match fragments that previously
|
||||
* have arrived out-of-order. If this is the
|
||||
* case, the associated fragment descriptors are
|
||||
* put on this list for further processing. (OUT)
|
||||
*
|
||||
* @return OMPI error code
|
||||
*
|
||||
* This routine is used to try and match a newly arrived message fragment
|
||||
* to pre-posted receives. The following assumptions are made
|
||||
* - fragments are received out of order
|
||||
* - for long messages, e.g. more than one fragment, a RTS/CTS algorithm
|
||||
* is used.
|
||||
* - 2nd and greater fragments include a receive descriptor pointer
|
||||
* - fragments may be dropped
|
||||
* - fragments may be corrupt
|
||||
* - this routine may be called simultaneously by more than one thread
|
||||
*/
|
||||
int mca_pml_ob1_recv_frag_match(
|
||||
mca_bmi_base_module_t* bmi,
|
||||
mca_pml_ob1_match_hdr_t *hdr,
|
||||
mca_bmi_base_segment_t* segments,
|
||||
size_t num_segments)
|
||||
{
|
||||
/* local variables */
|
||||
uint16_t next_msg_seq_expected, frag_msg_seq;
|
||||
ompi_communicator_t *comm_ptr;
|
||||
mca_pml_ob1_recv_request_t *match = NULL;
|
||||
mca_pml_ob1_comm_t *comm;
|
||||
mca_pml_ob1_comm_proc_t *proc;
|
||||
bool additional_match=false;
|
||||
ompi_list_t additional_matches;
|
||||
int rc;
|
||||
|
||||
/* communicator pointer */
|
||||
comm_ptr=ompi_comm_lookup(hdr->hdr_contextid);
|
||||
comm=(mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm;
|
||||
|
||||
/* source sequence number */
|
||||
frag_msg_seq = hdr->hdr_msg_seq;
|
||||
proc = comm->procs + hdr->hdr_src;
|
||||
|
||||
/* get next expected message sequence number - if threaded
|
||||
* run, lock to make sure that if another thread is processing
|
||||
* a frag from the same message a match is made only once.
|
||||
* Also, this prevents other posted receives (for a pair of
|
||||
* end points) from being processed, and potentially "loosing"
|
||||
* the fragment.
|
||||
*/
|
||||
OMPI_THREAD_LOCK(&comm->matching_lock);
|
||||
|
||||
/* get sequence number of next message that can be processed */
|
||||
next_msg_seq_expected = (uint16_t)proc->expected_sequence;
|
||||
if (frag_msg_seq == next_msg_seq_expected) {
|
||||
|
||||
/*
|
||||
* This is the sequence number we were expecting,
|
||||
* so we can try matching it to already posted
|
||||
* receives.
|
||||
*/
|
||||
|
||||
/* We're now expecting the next sequence number. */
|
||||
(proc->expected_sequence)++;
|
||||
|
||||
/*
|
||||
* figure out what sort of matching logic to use, if need to
|
||||
* look only at "specific" receives, or "wild" receives,
|
||||
* or if we need to traverse both sets at the same time.
|
||||
*/
|
||||
if (ompi_list_get_size(&proc->specific_receives) == 0 ){
|
||||
/*
|
||||
* There are only wild irecvs, so specialize the algorithm.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
|
||||
|
||||
} else if (ompi_list_get_size(&comm->wild_receives) == 0 ) {
|
||||
/*
|
||||
* There are only specific irecvs, so specialize the algorithm.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
|
||||
} else {
|
||||
/*
|
||||
* There are some of each.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
|
||||
}
|
||||
|
||||
/* if match found, process data */
|
||||
if (match) {
|
||||
|
||||
/* set length of incoming message */
|
||||
match->req_recv.req_bytes_packed = hdr->hdr_msg_length;
|
||||
|
||||
/*
|
||||
* update delivered sequence number information, if needed.
|
||||
*/
|
||||
if( (match->req_recv.req_base.req_type == MCA_PML_REQUEST_PROBE) ) {
|
||||
/* Match a probe, rollback the next expected sequence number */
|
||||
(proc->expected_sequence)--;
|
||||
}
|
||||
} else {
|
||||
|
||||
/* if no match found, place on unexpected queue */
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc);
|
||||
if(OMPI_SUCCESS != rc) {
|
||||
OMPI_THREAD_UNLOCK(&pml_comm->matching_lock);
|
||||
return rc;
|
||||
}
|
||||
MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segments,num_segments);
|
||||
ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag );
|
||||
}
|
||||
|
||||
/*
|
||||
* Now that new message has arrived, check to see if
|
||||
* any fragments on the c_c_frags_cant_match list
|
||||
* may now be used to form new matchs
|
||||
*/
|
||||
if (0 < ompi_list_get_size(&proc->frags_cant_match)) {
|
||||
additional_match = mca_pml_ob1_check_cantmatch_for_match(&additional_matches,comm,proc);
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
/*
|
||||
* This message comes after the next expected, so it
|
||||
* is ahead of sequence. Save it for later.
|
||||
*/
|
||||
mca_pml_ob1_recv_frag_t* frag;
|
||||
MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc);
|
||||
if(OMPI_SUCCESS != rc) {
|
||||
OMPI_THREAD_UNLOCK(&pml_comm->matching_lock);
|
||||
return rc;
|
||||
}
|
||||
MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segments,num_segments);
|
||||
ompi_list_append(&proc->frags_cant_match, (ompi_list_item_t *)frag);
|
||||
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&pml_comm->matching_lock);
|
||||
|
||||
|
||||
/* release matching lock before processing fragment */
|
||||
if(match != NULL) {
|
||||
match->req_recv.req_bytes_packed = hdr->hdr_msg_length;
|
||||
match->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_tag;
|
||||
match->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_src;
|
||||
mca_pml_ob1_recv_request_progress(match,bmi,segments,num_segments);
|
||||
} else {
|
||||
ompi_output(0, "match not found\n");
|
||||
}
|
||||
if(additional_match) {
|
||||
ompi_list_item_t* item;
|
||||
while(NULL != (item = ompi_list_remove_first(&additional_matches))) {
|
||||
mca_pml_ob1_recv_frag_t* frag = (mca_pml_ob1_recv_frag_t*)item;
|
||||
frag->request->req_recv.req_bytes_packed = hdr->hdr_msg_length;
|
||||
frag->request->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_tag;
|
||||
frag->request->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_src;
|
||||
mca_pml_ob1_recv_request_progress(frag->request,frag->bmi,frag->segments,frag->num_segments);
|
||||
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
|
||||
}
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Scan the list of frags that came in ahead of time to see if any
|
||||
* can be processed at this time. If they can, try and match the
|
||||
* frags.
|
||||
*
|
||||
* @param additional_matches List to hold new matches with fragments
|
||||
* from the c_frags_cant_match list. (IN/OUT)
|
||||
*
|
||||
* @param pml_comm Pointer to the communicator structure used for
|
||||
* matching purposes. (IN)
|
||||
*
|
||||
* This routine assumes that the appropriate matching locks are
|
||||
* set by the upper level routine.
|
||||
*/
|
||||
|
||||
static bool mca_pml_ob1_check_cantmatch_for_match(
|
||||
ompi_list_t *additional_matches,
|
||||
mca_pml_ob1_comm_t* comm,
|
||||
mca_pml_ob1_comm_proc_t *proc)
|
||||
{
|
||||
/* local parameters */
|
||||
int match_found;
|
||||
uint16_t next_msg_seq_expected, frag_seq;
|
||||
mca_pml_ob1_recv_frag_t *frag_desc;
|
||||
mca_pml_ob1_recv_request_t *match = NULL;
|
||||
bool match_made = false;
|
||||
|
||||
/*
|
||||
* Loop over all the out of sequence messages. No ordering is assumed
|
||||
* in the c_frags_cant_match list.
|
||||
*/
|
||||
|
||||
match_found = 1;
|
||||
while ((0 < ompi_list_get_size(&proc->frags_cant_match)) && match_found) {
|
||||
|
||||
/* initialize match flag for this search */
|
||||
match_found = 0;
|
||||
|
||||
/* get sequence number of next message that can be processed */
|
||||
next_msg_seq_expected = proc->expected_sequence;
|
||||
|
||||
/* search the list for a fragment from the send with sequence
|
||||
* number next_msg_seq_expected
|
||||
*/
|
||||
for(frag_desc = (mca_pml_ob1_recv_frag_t *)
|
||||
ompi_list_get_first(&proc->frags_cant_match);
|
||||
frag_desc != (mca_pml_ob1_recv_frag_t *)
|
||||
ompi_list_get_end(&proc->frags_cant_match);
|
||||
frag_desc = (mca_pml_ob1_recv_frag_t *)
|
||||
ompi_list_get_next(frag_desc))
|
||||
{
|
||||
/*
|
||||
* If the message has the next expected seq from that proc...
|
||||
*/
|
||||
frag_seq=frag_desc->hdr.hdr_match.hdr_msg_seq;
|
||||
if (frag_seq == next_msg_seq_expected) {
|
||||
mca_pml_ob1_match_hdr_t* hdr = &frag_desc->hdr.hdr_match;
|
||||
|
||||
/* We're now expecting the next sequence number. */
|
||||
(proc->expected_sequence)++;
|
||||
|
||||
/* signal that match was made */
|
||||
match_found = 1;
|
||||
|
||||
/*
|
||||
* remove frag_desc from list
|
||||
*/
|
||||
ompi_list_remove_item(&proc->frags_cant_match,
|
||||
(ompi_list_item_t *)frag_desc);
|
||||
|
||||
/*
|
||||
* figure out what sort of matching logic to use, if need to
|
||||
* look only at "specific" receives, or "wild" receives,
|
||||
* or if we need to traverse both sets at the same time.
|
||||
*/
|
||||
proc = comm->procs + hdr->hdr_src;
|
||||
if (ompi_list_get_size(&proc->specific_receives) == 0 ) {
|
||||
/*
|
||||
* There are only wild irecvs, so specialize the algorithm.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
|
||||
} else if (ompi_list_get_size(&comm->wild_receives) == 0 ) {
|
||||
/*
|
||||
* There are only specific irecvs, so specialize the algorithm.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
|
||||
} else {
|
||||
/*
|
||||
* There are some of each.
|
||||
*/
|
||||
MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
|
||||
|
||||
}
|
||||
|
||||
/* if match found, process data */
|
||||
if (match) {
|
||||
|
||||
/* associate the receive descriptor with the fragment
|
||||
* descriptor */
|
||||
frag_desc->request=match;
|
||||
|
||||
/* add this fragment descriptor to the list of
|
||||
* descriptors to be processed later
|
||||
*/
|
||||
if(match_made == false) {
|
||||
match_made = true;
|
||||
OBJ_CONSTRUCT(additional_matches, ompi_list_t);
|
||||
}
|
||||
ompi_list_append(additional_matches, (ompi_list_item_t *)frag_desc);
|
||||
|
||||
} else {
|
||||
|
||||
/* if no match found, place on unexpected queue */
|
||||
ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag_desc);
|
||||
|
||||
}
|
||||
|
||||
/* c_frags_cant_match is not an ordered list, so exit loop
|
||||
* and re-start search for next sequence number */
|
||||
break;
|
||||
|
||||
} /* end if (frag_seq == next_msg_seq_expected) */
|
||||
|
||||
} /* end for (frag_desc) loop */
|
||||
|
||||
} /* end while loop */
|
||||
|
||||
return match_made;
|
||||
}
|
||||
|
||||
|
@ -51,26 +51,35 @@ typedef struct mca_pml_ob1_recv_frag_t mca_pml_ob1_recv_frag_t;
|
||||
|
||||
|
||||
/**
|
||||
* Called to attempt a match for new fragments.
|
||||
*
|
||||
* @param bmi (IN) The PTL pointer
|
||||
* @param frag (IN) Receive fragment descriptor.
|
||||
* @param hdr (IN) Header corresponding to the receive fragment.
|
||||
* @return OMPI_SUCCESS or error status on failure.
|
||||
* Callback from BMI on receipt of a fragment.
|
||||
*/
|
||||
bool mca_pml_ob1_recv_frag_match(
|
||||
struct mca_pml_ob1_recv_frag_t* frag
|
||||
);
|
||||
|
||||
int mca_pml_ob1_recv_frag_matched(
|
||||
struct mca_pml_ob1_recv_frag_t* frag
|
||||
OMPI_DECLSPEC void mca_pml_ob1_recv_frag_callback(
|
||||
mca_bmi_base_module_t* bmi,
|
||||
mca_bmi_base_tag_t tag,
|
||||
mca_bmi_base_descriptor_t* descriptor,
|
||||
void* cbdata
|
||||
);
|
||||
|
||||
/**
|
||||
* Match incoming fragments against posted receives.
|
||||
* Supports out of order delivery.
|
||||
*
|
||||
* @param frag_header (IN) Header of received fragment.
|
||||
* @param frag_desc (IN) Received fragment descriptor.
|
||||
* @param match_made (OUT) Flag indicating wether a match was made.
|
||||
* @param additional_matches (OUT) List of additional matches
|
||||
* @return OMPI_SUCCESS or error status on failure.
|
||||
*/
|
||||
OMPI_DECLSPEC int mca_pml_ob1_recv_frag_match(
|
||||
mca_bmi_base_module_t* bmi,
|
||||
mca_pml_ob1_match_hdr_t *hdr,
|
||||
mca_bmi_base_segment_t* segments,
|
||||
size_t num_segments);
|
||||
|
||||
int mca_pml_ob1_recv_frag_complete(
|
||||
struct mca_bmi_base_module_t* bmi,
|
||||
struct mca_pml_ob1_recv_request_t* req,
|
||||
struct mca_pml_ob1_recv_frag_t* frag
|
||||
);
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
||||
|
@ -165,6 +165,8 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request
|
||||
bytes_received, \
|
||||
bytes_delivered) \
|
||||
{ \
|
||||
/* FIX */ \
|
||||
bytes_delivered = bytes_received; \
|
||||
}
|
||||
|
||||
|
||||
|
@ -81,7 +81,8 @@ static void mca_pml_ob1_send_completion(
|
||||
|
||||
/* check for request completion */
|
||||
OMPI_THREAD_LOCK(&ompi_request_lock);
|
||||
if (sendreq->req_offset == sendreq->req_send.req_bytes_packed) {
|
||||
if (OMPI_THREAD_ADD32(&sendreq->req_pending,-1) == 0 &&
|
||||
sendreq->req_offset == sendreq->req_send.req_bytes_packed) {
|
||||
sendreq->req_send.req_base.req_pml_complete = true;
|
||||
if (sendreq->req_send.req_base.req_ompi.req_complete == false) {
|
||||
sendreq->req_send.req_base.req_ompi.req_status.MPI_SOURCE = sendreq->req_send.req_base.req_comm->c_my_rank;
|
||||
@ -89,6 +90,7 @@ static void mca_pml_ob1_send_completion(
|
||||
sendreq->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
sendreq->req_send.req_base.req_ompi.req_status._count = sendreq->req_send.req_bytes_packed;
|
||||
sendreq->req_send.req_base.req_ompi.req_complete = true;
|
||||
sendreq->req_state = MCA_PML_OB1_SR_COMPLETE;
|
||||
if(ompi_request_waiting) {
|
||||
ompi_condition_broadcast(&ompi_request_cond);
|
||||
}
|
||||
@ -96,16 +98,20 @@ static void mca_pml_ob1_send_completion(
|
||||
MCA_PML_OB1_FREE((ompi_request_t**)&sendreq);
|
||||
} else if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) {
|
||||
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq);
|
||||
sendreq->req_state = MCA_PML_OB1_SR_COMPLETE;
|
||||
}
|
||||
sendreq->req_state = MCA_PML_OB1_SR_COMPLETE;
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&ompi_request_lock);
|
||||
|
||||
/* advance based on request state */
|
||||
MCA_PML_OB1_SEND_REQUEST_ADVANCE(sendreq);
|
||||
|
||||
/* check for pending requests that need to be progressed */
|
||||
while(ompi_list_get_size(&mca_pml_ob1.send_pending) != 0) {
|
||||
/* advance pending requests */
|
||||
while(NULL != sendreq) {
|
||||
switch(sendreq->req_state) {
|
||||
case MCA_PML_OB1_SR_SEND:
|
||||
mca_pml_ob1_send_request_schedule(sendreq);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
OMPI_THREAD_LOCK(&mca_pml_ob1.ob1_lock);
|
||||
sendreq = (mca_pml_ob1_send_request_t*)ompi_list_remove_first(&mca_pml_ob1.send_pending);
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_ob1.ob1_lock);
|
||||
@ -214,6 +220,7 @@ int mca_pml_ob1_send_request_start_copy(
|
||||
|
||||
/* if an acknowledgment is not required - can get by w/ shorter hdr */
|
||||
if (flags == 0) {
|
||||
int32_t free_after;
|
||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
|
||||
|
||||
/* pack the data into the supplied buffer */
|
||||
@ -226,13 +233,14 @@ int mca_pml_ob1_send_request_start_copy(
|
||||
&iov,
|
||||
&iov_count,
|
||||
&max_data,
|
||||
NULL)) < 0) {
|
||||
&free_after)) < 0) {
|
||||
endpoint->bmi_free(endpoint->bmi, descriptor);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* update length w/ number of bytes actually packed */
|
||||
segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t) + max_data;
|
||||
|
||||
sendreq->req_offset = max_data;
|
||||
if(sendreq->req_offset == sendreq->req_send.req_bytes_packed) {
|
||||
ompi_request_complete((ompi_request_t*)sendreq);
|
||||
@ -240,6 +248,7 @@ int mca_pml_ob1_send_request_start_copy(
|
||||
|
||||
/* rendezvous header is required */
|
||||
} else {
|
||||
int32_t free_after;
|
||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV;
|
||||
hdr->hdr_rndv.hdr_src_req.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_rndv.hdr_src_req.pval = sendreq;
|
||||
@ -254,7 +263,7 @@ int mca_pml_ob1_send_request_start_copy(
|
||||
&iov,
|
||||
&iov_count,
|
||||
&max_data,
|
||||
NULL)) < 0) {
|
||||
&free_after)) < 0) {
|
||||
endpoint->bmi_free(endpoint->bmi, descriptor);
|
||||
return rc;
|
||||
}
|
||||
@ -265,6 +274,7 @@ int mca_pml_ob1_send_request_start_copy(
|
||||
}
|
||||
}
|
||||
descriptor->des_cbdata = sendreq;
|
||||
OMPI_THREAD_ADD32(&sendreq->req_pending,1);
|
||||
|
||||
/* send */
|
||||
rc = endpoint->bmi_send(
|
||||
@ -283,7 +293,7 @@ int mca_pml_ob1_send_request_start_copy(
|
||||
*
|
||||
*/
|
||||
|
||||
int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq)
|
||||
int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
|
||||
{
|
||||
/*
|
||||
* Only allow one thread in this routine for a given request.
|
||||
@ -293,6 +303,7 @@ int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq)
|
||||
*/
|
||||
if(OMPI_THREAD_ADD32(&sendreq->req_lock,1) == 1) {
|
||||
mca_pml_ob1_proc_t* proc = sendreq->req_proc;
|
||||
size_t num_bmi_avail = mca_pml_ob1_ep_array_get_size(&proc->bmi_next);
|
||||
do {
|
||||
/* allocate remaining bytes to PTLs */
|
||||
size_t bytes_remaining = sendreq->req_send.req_bytes_packed - sendreq->req_offset;
|
||||
@ -307,7 +318,7 @@ int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq)
|
||||
* size, then go ahead and give the rest of the message to this PTL.
|
||||
*/
|
||||
size_t size;
|
||||
if(bytes_remaining < ep->bmi_min_seg_size) {
|
||||
if(bytes_remaining < ep->bmi_min_frag_size) {
|
||||
size = bytes_remaining;
|
||||
|
||||
/* otherwise attempt to give the PTL a percentage of the message
|
||||
@ -315,13 +326,15 @@ int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq)
|
||||
* a percentage of the overall message length (regardless of amount
|
||||
* previously assigned)
|
||||
*/
|
||||
} else {
|
||||
} else if (num_bmi_avail > 1) {
|
||||
size = (ep->bmi_weight * bytes_remaining) / 100;
|
||||
} else {
|
||||
size = ep->bmi_min_frag_size;
|
||||
}
|
||||
|
||||
/* makes sure that we don't exceed ptl_max_frag_size */
|
||||
if(ep->bmi_max_seg_size != 0 && size > ep->bmi_max_seg_size)
|
||||
size = ep->bmi_max_seg_size;
|
||||
if (ep->bmi_max_frag_size != 0 && size > ep->bmi_max_frag_size)
|
||||
size = ep->bmi_max_frag_size - sizeof(mca_pml_ob1_frag_hdr_t);
|
||||
|
||||
/* pack into a descriptor */
|
||||
des = ep->bmi_prepare_src(
|
||||
@ -331,19 +344,24 @@ int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq)
|
||||
sizeof(mca_pml_ob1_frag_hdr_t),
|
||||
&size);
|
||||
if(des == NULL) {
|
||||
/* queue for retry? */
|
||||
break;
|
||||
}
|
||||
des->des_cbfunc = mca_pml_ob1_send_completion;
|
||||
des->des_cbdata = sendreq;
|
||||
|
||||
/* prepare header */
|
||||
/* setup header */
|
||||
hdr = (mca_pml_ob1_frag_hdr_t*)des->des_src->seg_addr.pval;
|
||||
hdr->hdr_common.hdr_flags = 0;
|
||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_FRAG;
|
||||
hdr->hdr_frag_length = size;
|
||||
hdr->hdr_frag_offset = sendreq->req_offset;
|
||||
hdr->hdr_src_req.pval = sendreq;
|
||||
hdr->hdr_dst_req.pval = sendreq->req_peer.pval;
|
||||
hdr->hdr_dst_req = sendreq->req_recv;
|
||||
|
||||
/* update state */
|
||||
sendreq->req_offset += size;
|
||||
sendreq->req_pending++;
|
||||
OMPI_THREAD_ADD32(&sendreq->req_pending,1);
|
||||
|
||||
/* initiate send - note that this may complete before the call returns */
|
||||
rc = ep->bmi_send(ep->bmi, ep->bmi_endpoint, des, MCA_BMI_TAG_PML);
|
||||
@ -352,6 +370,7 @@ int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq)
|
||||
} else {
|
||||
sendreq->req_offset -= size;
|
||||
sendreq->req_pending--;
|
||||
/* queue for retry? */
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -361,39 +380,3 @@ int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq)
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
||||
int mca_pml_ob1_send_request_acked(
|
||||
mca_bmi_base_module_t* bmi,
|
||||
mca_pml_ob1_ack_hdr_t* hdr)
|
||||
{
|
||||
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)
|
||||
hdr->hdr_src_req.pval;
|
||||
if(hdr->hdr_common.hdr_flags & MCA_PML_OB1_HDR_FLAGS_PUT) {
|
||||
sendreq->req_state = MCA_PML_OB1_SR_PUT;
|
||||
mca_pml_ob1_send_request_put(sendreq);
|
||||
} else {
|
||||
sendreq->req_state = MCA_PML_OB1_SR_SEND;
|
||||
mca_pml_ob1_send_request_send(sendreq);
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
*
|
||||
*/
|
||||
|
||||
int mca_pml_ob1_send_request_put(
|
||||
mca_pml_ob1_send_request_t* sendreq)
|
||||
{
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -31,7 +31,7 @@ extern "C" {
|
||||
|
||||
typedef enum {
|
||||
MCA_PML_OB1_SR_INIT,
|
||||
MCA_PML_OB1_SR_WAIT,
|
||||
MCA_PML_OB1_SR_START,
|
||||
MCA_PML_OB1_SR_SEND,
|
||||
MCA_PML_OB1_SR_PUT,
|
||||
MCA_PML_OB1_SR_COMPLETE
|
||||
@ -43,7 +43,7 @@ struct mca_pml_ob1_send_request_t {
|
||||
mca_pml_ob1_proc_t* req_proc;
|
||||
mca_pml_ob1_endpoint_t* req_endpoint;
|
||||
mca_pml_ob1_send_request_state_t req_state;
|
||||
ompi_ptr_t req_peer;
|
||||
ompi_ptr_t req_recv;
|
||||
int32_t req_lock;
|
||||
size_t req_pending;
|
||||
size_t req_offset;
|
||||
@ -85,6 +85,7 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
||||
sendmode, \
|
||||
persistent) \
|
||||
{ \
|
||||
sendreq->req_state = MCA_PML_OB1_SR_INIT; \
|
||||
MCA_PML_BASE_SEND_REQUEST_INIT(&sendreq->req_send, \
|
||||
buf, \
|
||||
count, \
|
||||
@ -108,6 +109,8 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
||||
/* select next endpoint */ \
|
||||
endpoint = mca_pml_ob1_ep_array_get_next(&proc->bmi_first); \
|
||||
sendreq->req_offset = 0; \
|
||||
sendreq->req_pending = 0; \
|
||||
sendreq->req_state = MCA_PML_OB1_SR_START; \
|
||||
sendreq->req_send.req_base.req_ompi.req_complete = false; \
|
||||
sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
|
||||
sendreq->req_send.req_base.req_sequence = OMPI_THREAD_ADD32(&proc->proc_sequence,1); \
|
||||
@ -129,24 +132,6 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
||||
* Advance a request
|
||||
*/
|
||||
|
||||
#define MCA_PML_OB1_SEND_REQUEST_ADVANCE(sendreq)
|
||||
|
||||
#if 0
|
||||
switch(sendreq->req_state) {
|
||||
case MCA_PML_OB1_SR_WAIT:
|
||||
mca_pml_ob1_send_request_wait(sendreq);
|
||||
break;
|
||||
case MCA_PML_OB1_SR_SEND:
|
||||
mca_pml_ob1_send_request_send(sendreq);
|
||||
break;
|
||||
case MCA_PML_OB1_SR_PUT:
|
||||
mca_pml_ob1_send_request_put(sendreq);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
#define MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq) \
|
||||
{ \
|
||||
@ -176,26 +161,10 @@ int mca_pml_ob1_send_request_start_copy(
|
||||
mca_pml_ob1_send_request_t* sendreq,
|
||||
mca_pml_ob1_endpoint_t* endpoint);
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
||||
int mca_pml_ob1_send_request_acked(
|
||||
mca_bmi_base_module_t* bmi,
|
||||
struct mca_pml_ob1_ack_hdr_t* hdr);
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
int mca_pml_ob1_send_request_send(
|
||||
mca_pml_ob1_send_request_t* sendreq);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
int mca_pml_ob1_send_request_put(
|
||||
int mca_pml_ob1_send_request_schedule(
|
||||
mca_pml_ob1_send_request_t* sendreq);
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user