updates to bmi and pml I/F
This commit was SVN r5910.
Этот коммит содержится в:
родитель
19b4479a0a
Коммит
4ce8f91b6a
@ -154,7 +154,7 @@ typedef struct mca_bmi_base_segment_t mca_bmi_base_segment_t;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* A descriptor that holds the parameters to a send/put/get
|
* A descriptor that holds the parameters to a send/put/get
|
||||||
* operation along w/ a callback function that is called on
|
* operation along w/ a callback routine that is called on
|
||||||
* completion of the request.
|
* completion of the request.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@ -373,7 +373,7 @@ typedef int (*mca_bmi_base_module_free_fn_t)(
|
|||||||
* @param bmi (IN) BMI module
|
* @param bmi (IN) BMI module
|
||||||
* @param peer (IN) BMI peer addressing
|
* @param peer (IN) BMI peer addressing
|
||||||
*/
|
*/
|
||||||
typedef struct mca_bmi_base_descriptor_t* (*mca_bmi_base_module_pack_fn_t)(
|
typedef struct mca_bmi_base_descriptor_t* (*mca_bmi_base_module_prepare_fn_t)(
|
||||||
struct mca_bmi_base_module_t* bmi,
|
struct mca_bmi_base_module_t* bmi,
|
||||||
struct mca_bmi_base_endpoint_t* peer,
|
struct mca_bmi_base_endpoint_t* peer,
|
||||||
struct ompi_convertor_t* convertor,
|
struct ompi_convertor_t* convertor,
|
||||||
@ -381,6 +381,31 @@ typedef struct mca_bmi_base_descriptor_t* (*mca_bmi_base_module_pack_fn_t)(
|
|||||||
size_t* size
|
size_t* size
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pack data and return a descriptor that can be
|
||||||
|
* used for send/put.
|
||||||
|
*
|
||||||
|
* @param bmi (IN) BMI module
|
||||||
|
* @param endpoint (IN) BMI peer addressing
|
||||||
|
*/
|
||||||
|
|
||||||
|
typedef struct mca_bmi_base_descriptor_t* (*mca_bmi_base_module_prepare_dst_fn_t)(
|
||||||
|
struct mca_bmi_base_module_t* bmi,
|
||||||
|
struct mca_bmi_base_endpoint_t* endpoint,
|
||||||
|
struct ompi_convertor_t* convertor,
|
||||||
|
size_t reserve,
|
||||||
|
size_t* size
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
typedef int (*mca_bmi_base_module_unpack_fn_t)(
|
||||||
|
struct mca_bmi_base_module_t* bmi,
|
||||||
|
struct mca_bmi_base_endpoint_t* peer,
|
||||||
|
struct ompi_convertor_t* convertor,
|
||||||
|
struct mca_bmi_base_descriptor_t* descriptor
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initiate a send to the peer.
|
* Initiate a send to the peer.
|
||||||
@ -405,7 +430,7 @@ typedef int (*mca_bmi_base_module_send_fn_t)(
|
|||||||
|
|
||||||
typedef int (*mca_bmi_base_module_put_fn_t)(
|
typedef int (*mca_bmi_base_module_put_fn_t)(
|
||||||
struct mca_bmi_base_module_t* bmi,
|
struct mca_bmi_base_module_t* bmi,
|
||||||
struct mca_bmi_base_endpoint_t* peer,
|
struct mca_bmi_base_endpoint_t* endpoint,
|
||||||
struct mca_bmi_base_descriptor_t* descriptor
|
struct mca_bmi_base_descriptor_t* descriptor
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -447,7 +472,8 @@ struct mca_bmi_base_module_t {
|
|||||||
|
|
||||||
mca_bmi_base_module_alloc_fn_t bmi_alloc;
|
mca_bmi_base_module_alloc_fn_t bmi_alloc;
|
||||||
mca_bmi_base_module_free_fn_t bmi_free;
|
mca_bmi_base_module_free_fn_t bmi_free;
|
||||||
mca_bmi_base_module_pack_fn_t bmi_pack;
|
mca_bmi_base_module_prepare_fn_t bmi_prepare_src;
|
||||||
|
mca_bmi_base_module_prepare_fn_t bmi_prepare_dst;
|
||||||
mca_bmi_base_module_send_fn_t bmi_send;
|
mca_bmi_base_module_send_fn_t bmi_send;
|
||||||
mca_bmi_base_module_put_fn_t bmi_put;
|
mca_bmi_base_module_put_fn_t bmi_put;
|
||||||
mca_bmi_base_module_get_fn_t bmi_get;
|
mca_bmi_base_module_get_fn_t bmi_get;
|
||||||
|
@ -44,7 +44,7 @@ libmca_bmi_ib_la_SOURCES = \
|
|||||||
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
|
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la
|
||||||
# (for static builds).
|
# (for static builds).
|
||||||
|
|
||||||
if OMPI_BUILD_bmi_sm_DSO
|
if OMPI_BUILD_bmi_ib_DSO
|
||||||
component_noinst =
|
component_noinst =
|
||||||
component_install = mca_bmi_ib.la
|
component_install = mca_bmi_ib.la
|
||||||
else
|
else
|
||||||
|
@ -60,7 +60,8 @@ mca_bmi_sm_t mca_bmi_sm[2] = {
|
|||||||
mca_bmi_sm_finalize,
|
mca_bmi_sm_finalize,
|
||||||
mca_bmi_sm_alloc,
|
mca_bmi_sm_alloc,
|
||||||
mca_bmi_sm_free,
|
mca_bmi_sm_free,
|
||||||
mca_bmi_sm_pack,
|
mca_bmi_sm_prepare_src,
|
||||||
|
NULL,
|
||||||
mca_bmi_sm_send,
|
mca_bmi_sm_send,
|
||||||
NULL, /* put */
|
NULL, /* put */
|
||||||
NULL /* get */
|
NULL /* get */
|
||||||
@ -82,7 +83,8 @@ mca_bmi_sm_t mca_bmi_sm[2] = {
|
|||||||
mca_bmi_sm_finalize,
|
mca_bmi_sm_finalize,
|
||||||
mca_bmi_sm_alloc,
|
mca_bmi_sm_alloc,
|
||||||
mca_bmi_sm_free,
|
mca_bmi_sm_free,
|
||||||
mca_bmi_sm_pack,
|
mca_bmi_sm_prepare_src,
|
||||||
|
NULL,
|
||||||
mca_bmi_sm_send,
|
mca_bmi_sm_send,
|
||||||
NULL, /* get function */
|
NULL, /* get function */
|
||||||
NULL /* put function */
|
NULL /* put function */
|
||||||
@ -763,9 +765,9 @@ extern int mca_bmi_sm_free(
|
|||||||
{
|
{
|
||||||
mca_bmi_sm_frag_t* frag = (mca_bmi_sm_frag_t*)des;
|
mca_bmi_sm_frag_t* frag = (mca_bmi_sm_frag_t*)des;
|
||||||
if(frag->size <= mca_bmi_sm_component.first_fragment_size) {
|
if(frag->size <= mca_bmi_sm_component.first_fragment_size) {
|
||||||
MCA_BMI_SM_FRAG_RETURN1(des);
|
MCA_BMI_SM_FRAG_RETURN1(frag);
|
||||||
} else {
|
} else {
|
||||||
MCA_BMI_SM_FRAG_RETURN2(des);
|
MCA_BMI_SM_FRAG_RETURN2(frag);
|
||||||
}
|
}
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
@ -777,14 +779,37 @@ extern int mca_bmi_sm_free(
|
|||||||
* @param bmi (IN) BMI module
|
* @param bmi (IN) BMI module
|
||||||
* @param peer (IN) BMI peer addressing
|
* @param peer (IN) BMI peer addressing
|
||||||
*/
|
*/
|
||||||
struct mca_bmi_base_descriptor_t* mca_bmi_sm_pack(
|
struct mca_bmi_base_descriptor_t* mca_bmi_sm_prepare_src(
|
||||||
struct mca_bmi_base_module_t* bmi,
|
struct mca_bmi_base_module_t* bmi,
|
||||||
struct mca_bmi_base_endpoint_t* peer,
|
struct mca_bmi_base_endpoint_t* peer,
|
||||||
struct ompi_convertor_t* convertor,
|
struct ompi_convertor_t* convertor,
|
||||||
size_t reserve,
|
size_t reserve,
|
||||||
size_t* size)
|
size_t* size)
|
||||||
{
|
{
|
||||||
return NULL;
|
mca_bmi_sm_frag_t* frag;
|
||||||
|
struct iovec iov;
|
||||||
|
uint32_t iov_count = 1;
|
||||||
|
uint32_t max_data = *size;
|
||||||
|
int rc;
|
||||||
|
|
||||||
|
MCA_BMI_SM_FRAG_ALLOC2(frag, rc);
|
||||||
|
if(NULL == frag) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(max_data + reserve > frag->size) {
|
||||||
|
max_data = *size - reserve;
|
||||||
|
}
|
||||||
|
iov.iov_len = max_data;
|
||||||
|
iov.iov_base = (unsigned char*)(frag+1) + reserve;
|
||||||
|
|
||||||
|
rc = ompi_convertor_pack(convertor, &iov, &iov_count, &max_data, NULL);
|
||||||
|
if(rc < 0) {
|
||||||
|
MCA_BMI_SM_FRAG_RETURN2(frag);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
*size = max_data;
|
||||||
|
return &frag->base;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -300,7 +300,7 @@ extern int mca_bmi_sm_free(
|
|||||||
* @param bmi (IN) BMI module
|
* @param bmi (IN) BMI module
|
||||||
* @param peer (IN) BMI peer addressing
|
* @param peer (IN) BMI peer addressing
|
||||||
*/
|
*/
|
||||||
struct mca_bmi_base_descriptor_t* mca_bmi_sm_pack(
|
struct mca_bmi_base_descriptor_t* mca_bmi_sm_prepare_src(
|
||||||
struct mca_bmi_base_module_t* bmi,
|
struct mca_bmi_base_module_t* bmi,
|
||||||
struct mca_bmi_base_endpoint_t* peer,
|
struct mca_bmi_base_endpoint_t* peer,
|
||||||
struct ompi_convertor_t* convertor,
|
struct ompi_convertor_t* convertor,
|
||||||
|
@ -70,12 +70,12 @@ OBJ_CLASS_DECLARATION(mca_bmi_sm_frag2_t);
|
|||||||
|
|
||||||
#define MCA_BMI_SM_FRAG_RETURN1(frag) \
|
#define MCA_BMI_SM_FRAG_RETURN1(frag) \
|
||||||
{ \
|
{ \
|
||||||
OMPI_FREE_LIST_RETURN(&mca_bmi_sm_component.sm_frags1, &frag->super); \
|
OMPI_FREE_LIST_RETURN(&mca_bmi_sm_component.sm_frags1, (ompi_list_item_t*)(frag)); \
|
||||||
}
|
}
|
||||||
|
|
||||||
#define MCA_BMI_SM_FRAG_RETURN2(frag) \
|
#define MCA_BMI_SM_FRAG_RETURN2(frag) \
|
||||||
{ \
|
{ \
|
||||||
OMPI_FREE_LIST_RETURN(&mca_bmi_sm_component.sm_frags2, &frag->super); \
|
OMPI_FREE_LIST_RETURN(&mca_bmi_sm_component.sm_frags2, (ompi_list_item_t*)(frag)); \
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -55,7 +55,8 @@ struct mca_pml_ob1_t {
|
|||||||
int free_list_num; /* initial size of free list */
|
int free_list_num; /* initial size of free list */
|
||||||
int free_list_max; /* maximum size of free list */
|
int free_list_max; /* maximum size of free list */
|
||||||
int free_list_inc; /* number of elements to grow free list */
|
int free_list_inc; /* number of elements to grow free list */
|
||||||
int poll_iterations; /* number of iterations to poll for completion */
|
size_t send_pipeline_depth;
|
||||||
|
size_t recv_pipeline_depth;
|
||||||
|
|
||||||
/* lock queue access */
|
/* lock queue access */
|
||||||
ompi_mutex_t lock;
|
ompi_mutex_t lock;
|
||||||
@ -66,6 +67,7 @@ struct mca_pml_ob1_t {
|
|||||||
|
|
||||||
/* list of pending send requests */
|
/* list of pending send requests */
|
||||||
ompi_list_t send_pending;
|
ompi_list_t send_pending;
|
||||||
|
ompi_list_t acks_pending;
|
||||||
};
|
};
|
||||||
typedef struct mca_pml_ob1_t mca_pml_ob1_t;
|
typedef struct mca_pml_ob1_t mca_pml_ob1_t;
|
||||||
|
|
||||||
@ -203,7 +205,6 @@ extern void mca_pml_ob1_recv_callback(
|
|||||||
mca_bmi_base_descriptor_t* descriptor,
|
mca_bmi_base_descriptor_t* descriptor,
|
||||||
void* cbdata
|
void* cbdata
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
extern int mca_pml_ob1_progress(void);
|
extern int mca_pml_ob1_progress(void);
|
||||||
|
|
||||||
|
@ -93,8 +93,6 @@ int mca_pml_ob1_component_open(void)
|
|||||||
mca_pml_ob1_param_register_int("free_list_max", -1);
|
mca_pml_ob1_param_register_int("free_list_max", -1);
|
||||||
mca_pml_ob1.free_list_inc =
|
mca_pml_ob1.free_list_inc =
|
||||||
mca_pml_ob1_param_register_int("free_list_inc", 256);
|
mca_pml_ob1_param_register_int("free_list_inc", 256);
|
||||||
mca_pml_ob1.poll_iterations =
|
|
||||||
mca_pml_ob1_param_register_int("poll_iterations", 100000);
|
|
||||||
mca_pml_ob1.priority =
|
mca_pml_ob1.priority =
|
||||||
mca_pml_ob1_param_register_int("priority", 0);
|
mca_pml_ob1_param_register_int("priority", 0);
|
||||||
|
|
||||||
|
@ -27,23 +27,28 @@ extern "C" {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* A data structure associated with a ompi_proc_t that caches
|
* A data structure associated with a ompi_proc_t that caches
|
||||||
* addressing/scheduling attributes for a specific NTL instance
|
* addressing/scheduling attributes for a specific BMI instance
|
||||||
* that can be used to reach the process.
|
* that can be used to reach the process.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
struct mca_pml_ob1_endpoint_t {
|
struct mca_pml_ob1_endpoint_t {
|
||||||
int bmi_weight; /**< NTL weight for scheduling */
|
int bmi_weight; /**< BMI weight for scheduling */
|
||||||
size_t bmi_eager_limit; /**< NTL eager limit */
|
int bmi_flags; /**< support for put/get? */
|
||||||
struct mca_bmi_base_module_t *bmi; /**< NTL module */
|
size_t bmi_eager_limit; /**< BMI eager limit */
|
||||||
struct mca_bmi_base_endpoint_t* bmi_endpoint; /**< NTL addressing info */
|
size_t bmi_min_seg_size; /**< BMI min segment size */
|
||||||
|
size_t bmi_max_seg_size; /**< BMI max segment size */
|
||||||
|
struct mca_bmi_base_module_t *bmi; /**< BMI module */
|
||||||
|
struct mca_bmi_base_endpoint_t* bmi_endpoint; /**< BMI addressing info */
|
||||||
struct mca_bmi_base_descriptor_t* bmi_cache;
|
struct mca_bmi_base_descriptor_t* bmi_cache;
|
||||||
|
|
||||||
/* NTL function table */
|
/* BMI function table */
|
||||||
mca_bmi_base_module_alloc_fn_t bmi_alloc;
|
mca_bmi_base_module_alloc_fn_t bmi_alloc;
|
||||||
mca_bmi_base_module_free_fn_t bmi_free;
|
mca_bmi_base_module_free_fn_t bmi_free;
|
||||||
mca_bmi_base_module_send_fn_t bmi_send;
|
mca_bmi_base_module_send_fn_t bmi_send;
|
||||||
mca_bmi_base_module_put_fn_t bmi_put;
|
mca_bmi_base_module_prepare_fn_t bmi_prepare_src;
|
||||||
mca_bmi_base_module_get_fn_t bmi_get;
|
mca_bmi_base_module_prepare_fn_t bmi_prepare_dst;
|
||||||
|
mca_bmi_base_module_put_fn_t bmi_put;
|
||||||
|
mca_bmi_base_module_get_fn_t bmi_get;
|
||||||
};
|
};
|
||||||
typedef struct mca_pml_ob1_endpoint_t mca_pml_ob1_endpoint_t;
|
typedef struct mca_pml_ob1_endpoint_t mca_pml_ob1_endpoint_t;
|
||||||
|
|
||||||
|
@ -30,15 +30,17 @@
|
|||||||
|
|
||||||
#define MCA_PML_OB1_HDR_TYPE_MATCH 1
|
#define MCA_PML_OB1_HDR_TYPE_MATCH 1
|
||||||
#define MCA_PML_OB1_HDR_TYPE_RNDV 2
|
#define MCA_PML_OB1_HDR_TYPE_RNDV 2
|
||||||
#define MCA_PML_OB1_HDR_TYPE_FRAG 3
|
#define MCA_PML_OB1_HDR_TYPE_ACK 3
|
||||||
#define MCA_PML_OB1_HDR_TYPE_ACK 4
|
#define MCA_PML_OB1_HDR_TYPE_NACK 4
|
||||||
#define MCA_PML_OB1_HDR_TYPE_NACK 5
|
#define MCA_PML_OB1_HDR_TYPE_FRAG 5
|
||||||
#define MCA_PML_OB1_HDR_TYPE_GET 6
|
#define MCA_PML_OB1_HDR_TYPE_SEG 6
|
||||||
#define MCA_PML_OB1_HDR_TYPE_FIN 7
|
#define MCA_PML_OB1_HDR_TYPE_GET 7
|
||||||
#define MCA_PML_OB1_HDR_TYPE_MAX 8
|
#define MCA_PML_OB1_HDR_TYPE_FIN 8
|
||||||
|
#define MCA_PML_OB1_HDR_TYPE_MAX 9
|
||||||
|
|
||||||
#define MCA_PML_OB1_HDR_FLAGS_ACK 1 /* is an ack required */
|
#define MCA_PML_OB1_HDR_FLAGS_ACK 1 /* is an ack required */
|
||||||
#define MCA_PML_OB1_HDR_FLAGS_NBO 2 /* is the hdr in network byte order */
|
#define MCA_PML_OB1_HDR_FLAGS_NBO 2 /* is the hdr in network byte order */
|
||||||
|
#define MCA_PML_OB1_HDR_FLAGS_PUT 3
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -138,8 +140,8 @@ typedef struct mca_pml_ob1_match_hdr_t mca_pml_ob1_match_hdr_t;
|
|||||||
*/
|
*/
|
||||||
struct mca_pml_ob1_rendezvous_hdr_t {
|
struct mca_pml_ob1_rendezvous_hdr_t {
|
||||||
mca_pml_ob1_match_hdr_t hdr_match;
|
mca_pml_ob1_match_hdr_t hdr_match;
|
||||||
uint64_t hdr_frag_length; /**< fragment length */
|
uint64_t hdr_frag_length; /**< fragment length */
|
||||||
ompi_ptr_t hdr_src_ptr; /**< pointer to source fragment - returned in ack */
|
ompi_ptr_t hdr_src_req; /**< pointer to source request - returned in ack */
|
||||||
};
|
};
|
||||||
typedef struct mca_pml_ob1_rendezvous_hdr_t mca_pml_ob1_rendezvous_hdr_t;
|
typedef struct mca_pml_ob1_rendezvous_hdr_t mca_pml_ob1_rendezvous_hdr_t;
|
||||||
|
|
||||||
@ -162,8 +164,8 @@ struct mca_pml_ob1_frag_hdr_t {
|
|||||||
mca_pml_ob1_common_hdr_t hdr_common; /**< common attributes */
|
mca_pml_ob1_common_hdr_t hdr_common; /**< common attributes */
|
||||||
uint64_t hdr_frag_length; /**< fragment length */
|
uint64_t hdr_frag_length; /**< fragment length */
|
||||||
uint64_t hdr_frag_offset; /**< offset into message */
|
uint64_t hdr_frag_offset; /**< offset into message */
|
||||||
ompi_ptr_t hdr_src_ptr; /**< pointer to source fragment */
|
ompi_ptr_t hdr_src_req; /**< pointer to source request */
|
||||||
ompi_ptr_t hdr_dst_ptr; /**< pointer to matched receive */
|
ompi_ptr_t hdr_dst_req; /**< pointer to matched receive */
|
||||||
};
|
};
|
||||||
typedef struct mca_pml_ob1_frag_hdr_t mca_pml_ob1_frag_hdr_t;
|
typedef struct mca_pml_ob1_frag_hdr_t mca_pml_ob1_frag_hdr_t;
|
||||||
|
|
||||||
@ -185,13 +187,12 @@ typedef struct mca_pml_ob1_frag_hdr_t mca_pml_ob1_frag_hdr_t;
|
|||||||
/**
|
/**
|
||||||
* Header used to acknowledgment outstanding fragment(s).
|
* Header used to acknowledgment outstanding fragment(s).
|
||||||
*/
|
*/
|
||||||
|
|
||||||
struct mca_pml_ob1_ack_hdr_t {
|
struct mca_pml_ob1_ack_hdr_t {
|
||||||
mca_pml_ob1_common_hdr_t hdr_common; /**< common attributes */
|
mca_pml_ob1_common_hdr_t hdr_common; /**< common attributes */
|
||||||
ompi_ptr_t hdr_src_ptr; /**< source fragment */
|
ompi_ptr_t hdr_src_req; /**< source request */
|
||||||
ompi_ptr_t hdr_dst_match; /**< matched receive request */
|
ompi_ptr_t hdr_dst_req; /**< matched receive request */
|
||||||
ompi_ptr_t hdr_dst_addr; /**< posted receive buffer */
|
mca_bmi_base_segment_t hdr_seg; /**< segment */
|
||||||
uint64_t hdr_dst_size; /**< size of posted buffer */
|
|
||||||
/* sequence range? */
|
|
||||||
};
|
};
|
||||||
typedef struct mca_pml_ob1_ack_hdr_t mca_pml_ob1_ack_hdr_t;
|
typedef struct mca_pml_ob1_ack_hdr_t mca_pml_ob1_ack_hdr_t;
|
||||||
|
|
||||||
|
@ -34,9 +34,10 @@ extern "C" {
|
|||||||
*/
|
*/
|
||||||
struct mca_pml_proc_t {
|
struct mca_pml_proc_t {
|
||||||
ompi_object_t super;
|
ompi_object_t super;
|
||||||
ompi_proc_t *proc_ompi; /**< back-pointer to ompi_proc_t */
|
ompi_proc_t *proc_ompi; /**< back-pointer to ompi_proc_t */
|
||||||
ompi_mutex_t proc_lock; /**< lock to protect against concurrent access */
|
ompi_mutex_t proc_lock; /**< lock to protect against concurrent access */
|
||||||
volatile uint32_t proc_sequence; /**< sequence number for send */
|
int proc_flags; /**< prefered method of accessing this peer */
|
||||||
|
volatile uint32_t proc_sequence; /**< sequence number for send */
|
||||||
mca_pml_ob1_ep_array_t bmi_first; /**< array of endpoints to use for first fragments */
|
mca_pml_ob1_ep_array_t bmi_first; /**< array of endpoints to use for first fragments */
|
||||||
mca_pml_ob1_ep_array_t bmi_next; /**< array of endpoints to use for remaining fragments */
|
mca_pml_ob1_ep_array_t bmi_next; /**< array of endpoints to use for remaining fragments */
|
||||||
};
|
};
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
#include "mca/pml/pml.h"
|
#include "mca/pml/pml.h"
|
||||||
#include "pml_ob1_recvfrag.h"
|
#include "pml_ob1_recvfrag.h"
|
||||||
#include "pml_ob1_recvreq.h"
|
#include "pml_ob1_recvreq.h"
|
||||||
|
#include "pml_ob1_sendreq.h"
|
||||||
#include "pml_ob1_proc.h"
|
#include "pml_ob1_proc.h"
|
||||||
#include "pml_ob1_match.h"
|
#include "pml_ob1_match.h"
|
||||||
|
|
||||||
@ -36,7 +37,6 @@ OBJ_CLASS_INSTANCE(
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void mca_pml_ob1_recv_callback(
|
void mca_pml_ob1_recv_callback(
|
||||||
mca_bmi_base_module_t* bmi,
|
mca_bmi_base_module_t* bmi,
|
||||||
mca_bmi_base_tag_t tag,
|
mca_bmi_base_tag_t tag,
|
||||||
@ -56,6 +56,9 @@ void mca_pml_ob1_recv_callback(
|
|||||||
case MCA_PML_OB1_HDR_TYPE_RNDV:
|
case MCA_PML_OB1_HDR_TYPE_RNDV:
|
||||||
mca_pml_ob1_match(bmi,&hdr->hdr_match,segments,des->des_src_cnt);
|
mca_pml_ob1_match(bmi,&hdr->hdr_match,segments,des->des_src_cnt);
|
||||||
break;
|
break;
|
||||||
|
case MCA_PML_OB1_HDR_TYPE_ACK:
|
||||||
|
mca_pml_ob1_send_request_acked(bmi,&hdr->hdr_ack);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -95,41 +95,143 @@ OBJ_CLASS_INSTANCE(
|
|||||||
mca_pml_ob1_recv_request_construct,
|
mca_pml_ob1_recv_request_construct,
|
||||||
mca_pml_ob1_recv_request_destruct);
|
mca_pml_ob1_recv_request_destruct);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Release resources.
|
||||||
|
*/
|
||||||
|
|
||||||
|
static void mca_pml_ob1_recv_request_acked(
|
||||||
|
mca_bmi_base_module_t* bmi,
|
||||||
|
struct mca_bmi_base_endpoint_t* ep,
|
||||||
|
struct mca_bmi_base_descriptor_t* des,
|
||||||
|
int status)
|
||||||
|
{
|
||||||
|
bmi->bmi_free(bmi,des);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
static void mca_pml_ob1_recv_request_ack(
|
||||||
|
mca_pml_ob1_recv_request_t* recvreq,
|
||||||
|
mca_pml_ob1_rendezvous_hdr_t* hdr)
|
||||||
|
{
|
||||||
|
mca_pml_ob1_proc_t* proc = mca_pml_ob1_proc_lookup_remote(
|
||||||
|
recvreq->req_recv.req_base.req_comm,
|
||||||
|
hdr->hdr_match.hdr_src);
|
||||||
|
mca_pml_ob1_endpoint_t* ep = mca_pml_ob1_ep_array_get_next(&proc->bmi_first);
|
||||||
|
mca_bmi_base_descriptor_t* des;
|
||||||
|
mca_pml_ob1_recv_frag_t* frag;
|
||||||
|
mca_pml_ob1_ack_hdr_t* ack;
|
||||||
|
int rc;
|
||||||
|
|
||||||
|
/* allocate descriptor */
|
||||||
|
des = ep->bmi_alloc(ep->bmi, sizeof(mca_pml_ob1_ack_hdr_t));
|
||||||
|
if(NULL == des) {
|
||||||
|
goto retry;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* fill out header */
|
||||||
|
ack = (mca_pml_ob1_ack_hdr_t*)des->des_src->seg_addr.pval;
|
||||||
|
ack->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_ACK;
|
||||||
|
ack->hdr_common.hdr_flags = 0;
|
||||||
|
ack->hdr_src_req = hdr->hdr_src_req;
|
||||||
|
ack->hdr_dst_req.pval = recvreq;
|
||||||
|
|
||||||
|
/* initialize descriptor */
|
||||||
|
des->des_cbfunc = mca_pml_ob1_recv_request_acked;
|
||||||
|
des->des_cbdata = recvreq;
|
||||||
|
|
||||||
|
rc = ep->bmi_send(ep->bmi, ep->bmi_endpoint, des, MCA_BMI_TAG_PML);
|
||||||
|
if(rc != OMPI_SUCCESS) {
|
||||||
|
ep->bmi_free(ep->bmi,des);
|
||||||
|
goto retry;
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
|
||||||
|
/* queue request to retry later */
|
||||||
|
retry:
|
||||||
|
MCA_PML_OB1_RECV_FRAG_ALLOC(frag,rc);
|
||||||
|
frag->bmi = NULL;
|
||||||
|
frag->hdr.hdr_rndv = *hdr;
|
||||||
|
frag->segments = NULL;
|
||||||
|
frag->num_segments = 0;
|
||||||
|
frag->request = recvreq;
|
||||||
|
ompi_list_append(&mca_pml_ob1.acks_pending, (ompi_list_item_t*)frag);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Update the recv request status to reflect the number of bytes
|
* Update the recv request status to reflect the number of bytes
|
||||||
* received and actually delivered to the application.
|
* received and actually delivered to the application.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
void mca_pml_ob1_recv_request_progress(
|
void mca_pml_ob1_recv_request_progress(
|
||||||
mca_pml_ob1_recv_request_t* req,
|
mca_pml_ob1_recv_request_t* recvreq,
|
||||||
mca_bmi_base_module_t* bmi,
|
mca_bmi_base_module_t* bmi,
|
||||||
mca_bmi_base_segment_t* segments,
|
mca_bmi_base_segment_t* segments,
|
||||||
size_t num_segments)
|
size_t num_segments)
|
||||||
{
|
{
|
||||||
size_t bytes_received = 0;
|
uint64_t bytes_received = 0;
|
||||||
size_t bytes_delivered = 0;
|
uint64_t bytes_delivered = 0;
|
||||||
mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
|
mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
|
||||||
|
|
||||||
switch(hdr->hdr_common.hdr_type) {
|
switch(hdr->hdr_common.hdr_type) {
|
||||||
case MCA_PML_OB1_HDR_TYPE_MATCH:
|
case MCA_PML_OB1_HDR_TYPE_MATCH:
|
||||||
|
|
||||||
bytes_received = hdr->hdr_match.hdr_msg_length;
|
bytes_received = hdr->hdr_match.hdr_msg_length;
|
||||||
|
MCA_PML_OB1_RECV_REQUEST_UNPACK(
|
||||||
|
recvreq,
|
||||||
|
segments,
|
||||||
|
num_segments,
|
||||||
|
sizeof(mca_pml_ob1_match_hdr_t),
|
||||||
|
0,
|
||||||
|
bytes_received,
|
||||||
|
bytes_delivered);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case MCA_PML_OB1_HDR_TYPE_RNDV:
|
case MCA_PML_OB1_HDR_TYPE_RNDV:
|
||||||
bytes_received = hdr->hdr_frag.hdr_frag_length;
|
|
||||||
|
mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv);
|
||||||
|
bytes_received = hdr->hdr_rndv.hdr_frag_length;
|
||||||
|
MCA_PML_OB1_RECV_REQUEST_UNPACK(
|
||||||
|
recvreq,
|
||||||
|
segments,
|
||||||
|
num_segments,
|
||||||
|
sizeof(mca_pml_ob1_rendezvous_hdr_t),
|
||||||
|
0,
|
||||||
|
bytes_received,
|
||||||
|
bytes_delivered);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case MCA_PML_OB1_HDR_TYPE_FRAG:
|
||||||
|
|
||||||
|
bytes_received = hdr->hdr_frag.hdr_frag_length;
|
||||||
|
MCA_PML_OB1_RECV_REQUEST_UNPACK(
|
||||||
|
recvreq,
|
||||||
|
segments,
|
||||||
|
num_segments,
|
||||||
|
sizeof(mca_pml_ob1_rendezvous_hdr_t),
|
||||||
|
hdr->hdr_frag.frag_offset,
|
||||||
|
bytes_received,
|
||||||
|
bytes_delivered);
|
||||||
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
bytes_delivered = bytes_received;
|
|
||||||
|
|
||||||
|
/* check completion status */
|
||||||
OMPI_THREAD_LOCK(&ompi_request_lock);
|
OMPI_THREAD_LOCK(&ompi_request_lock);
|
||||||
req->req_bytes_received += bytes_received;
|
recvreq->req_bytes_received += bytes_received;
|
||||||
req->req_bytes_delivered += bytes_delivered;
|
recvreq->req_bytes_delivered += bytes_delivered;
|
||||||
if (req->req_bytes_received >= req->req_recv.req_bytes_packed) {
|
if (recvreq->req_bytes_received >= recvreq->req_recv.req_bytes_packed) {
|
||||||
/* initialize request status */
|
/* initialize request status */
|
||||||
req->req_recv.req_base.req_ompi.req_status._count = req->req_bytes_delivered;
|
recvreq->req_recv.req_base.req_ompi.req_status._count = recvreq->req_bytes_delivered;
|
||||||
req->req_recv.req_base.req_pml_complete = true;
|
recvreq->req_recv.req_base.req_pml_complete = true;
|
||||||
req->req_recv.req_base.req_ompi.req_complete = true;
|
recvreq->req_recv.req_base.req_ompi.req_complete = true;
|
||||||
if(ompi_request_waiting) {
|
if(ompi_request_waiting) {
|
||||||
ompi_condition_broadcast(&ompi_request_cond);
|
ompi_condition_broadcast(&ompi_request_cond);
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,13 @@ struct mca_pml_ob1_recv_request_t {
|
|||||||
mca_pml_base_recv_request_t req_recv;
|
mca_pml_base_recv_request_t req_recv;
|
||||||
size_t req_bytes_received;
|
size_t req_bytes_received;
|
||||||
size_t req_bytes_delivered;
|
size_t req_bytes_delivered;
|
||||||
|
|
||||||
|
/* note that we allocate additional space for the recv
|
||||||
|
* request to increase the array size based on run-time
|
||||||
|
* parameters for the pipeline depth. So... this MUST be
|
||||||
|
* the last element of this struct.
|
||||||
|
*/
|
||||||
|
mca_bmi_base_descriptor_t *req_pipeline[1];
|
||||||
};
|
};
|
||||||
typedef struct mca_pml_ob1_recv_request_t mca_pml_ob1_recv_request_t;
|
typedef struct mca_pml_ob1_recv_request_t mca_pml_ob1_recv_request_t;
|
||||||
|
|
||||||
@ -119,7 +126,7 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request
|
|||||||
* @param request Receive request.
|
* @param request Receive request.
|
||||||
* @return OMPI_SUCESS or error status on failure.
|
* @return OMPI_SUCESS or error status on failure.
|
||||||
*/
|
*/
|
||||||
#define MCA_PML_OB1_RECV_REQUEST_START(request) \
|
#define MCA_PML_OB1_RECV_REQUEST_START(request) \
|
||||||
{ \
|
{ \
|
||||||
/* init/re-init the request */ \
|
/* init/re-init the request */ \
|
||||||
(request)->req_bytes_received = 0; \
|
(request)->req_bytes_received = 0; \
|
||||||
@ -138,13 +145,29 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request
|
|||||||
\
|
\
|
||||||
/* attempt to match posted recv */ \
|
/* attempt to match posted recv */ \
|
||||||
if((request)->req_recv.req_base.req_peer == OMPI_ANY_SOURCE) { \
|
if((request)->req_recv.req_base.req_peer == OMPI_ANY_SOURCE) { \
|
||||||
mca_pml_ob1_recv_request_match_wild(request); \
|
mca_pml_ob1_recv_request_match_wild(request); \
|
||||||
} else { \
|
} else { \
|
||||||
mca_pml_ob1_recv_request_match_specific(request); \
|
mca_pml_ob1_recv_request_match_specific(request); \
|
||||||
} \
|
} \
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define MCA_PML_OB1_RECV_REQUEST_UNPACK( \
|
||||||
|
request, \
|
||||||
|
segments, \
|
||||||
|
num_segments, \
|
||||||
|
seg_offset, \
|
||||||
|
data_offset, \
|
||||||
|
bytes_received, \
|
||||||
|
bytes_delivered) \
|
||||||
|
{ \
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
@ -78,8 +78,8 @@ static void mca_pml_ob1_send_completion(
|
|||||||
int status)
|
int status)
|
||||||
{
|
{
|
||||||
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)descriptor->des_cbdata;
|
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)descriptor->des_cbdata;
|
||||||
mca_pml_ob1_endpoint_t* ob1_ep = sendreq->req_endpoint;
|
|
||||||
|
|
||||||
|
/* check for request completion */
|
||||||
OMPI_THREAD_LOCK(&ompi_request_lock);
|
OMPI_THREAD_LOCK(&ompi_request_lock);
|
||||||
if (sendreq->req_offset == sendreq->req_send.req_bytes_packed) {
|
if (sendreq->req_offset == sendreq->req_send.req_bytes_packed) {
|
||||||
sendreq->req_send.req_base.req_pml_complete = true;
|
sendreq->req_send.req_base.req_pml_complete = true;
|
||||||
@ -97,30 +97,27 @@ static void mca_pml_ob1_send_completion(
|
|||||||
} else if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) {
|
} else if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) {
|
||||||
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq);
|
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq);
|
||||||
}
|
}
|
||||||
|
sendreq->req_state = MCA_PML_OB1_SR_COMPLETE;
|
||||||
}
|
}
|
||||||
OMPI_THREAD_UNLOCK(&ompi_request_lock);
|
OMPI_THREAD_UNLOCK(&ompi_request_lock);
|
||||||
|
|
||||||
|
/* advance based on request state */
|
||||||
|
MCA_PML_OB1_SEND_REQUEST_ADVANCE(sendreq);
|
||||||
|
|
||||||
/* check for pending requests that need to be progressed */
|
/* check for pending requests that need to be progressed */
|
||||||
while(ompi_list_get_size(&mca_pml_ob1.send_pending) != 0) {
|
while(ompi_list_get_size(&mca_pml_ob1.send_pending) != 0) {
|
||||||
OMPI_THREAD_LOCK(&mca_pml_ob1.ob1_lock);
|
OMPI_THREAD_LOCK(&mca_pml_ob1.ob1_lock);
|
||||||
sendreq = (mca_pml_ob1_send_request_t*)ompi_list_remove_first(&mca_pml_ob1.send_pending);
|
sendreq = (mca_pml_ob1_send_request_t*)ompi_list_remove_first(&mca_pml_ob1.send_pending);
|
||||||
OMPI_THREAD_UNLOCK(&mca_pml_ob1.ob1_lock);
|
OMPI_THREAD_UNLOCK(&mca_pml_ob1.ob1_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* release NTL resources */
|
|
||||||
if(ob1_ep->bmi_cache == NULL) {
|
|
||||||
ob1_ep->bmi_cache = descriptor;
|
|
||||||
} else {
|
|
||||||
ob1_ep->bmi_free(bmi,descriptor);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* NTL can send directly from user allocated memory.
|
* BMI can send directly from user allocated memory.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
int mca_pml_ob1_send_user(
|
int mca_pml_ob1_send_request_start_user(
|
||||||
mca_pml_ob1_send_request_t* sendreq,
|
mca_pml_ob1_send_request_t* sendreq,
|
||||||
mca_pml_ob1_endpoint_t* endpoint)
|
mca_pml_ob1_endpoint_t* endpoint)
|
||||||
{
|
{
|
||||||
@ -129,11 +126,11 @@ int mca_pml_ob1_send_user(
|
|||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* NTL requires "specially" allocated memory. Request a segment that
|
* BMI requires "specially" allocated memory. Request a segment that
|
||||||
* is used for initial hdr and any eager data.
|
* is used for initial hdr and any eager data.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
int mca_pml_ob1_send_copy(
|
int mca_pml_ob1_send_request_start_copy(
|
||||||
mca_pml_ob1_send_request_t* sendreq,
|
mca_pml_ob1_send_request_t* sendreq,
|
||||||
mca_pml_ob1_endpoint_t* endpoint)
|
mca_pml_ob1_endpoint_t* endpoint)
|
||||||
{
|
{
|
||||||
@ -152,7 +149,6 @@ int mca_pml_ob1_send_copy(
|
|||||||
} else {
|
} else {
|
||||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t));
|
descriptor = endpoint->bmi_alloc(endpoint->bmi, sizeof(mca_pml_ob1_hdr_t));
|
||||||
if(NULL == descriptor) {
|
if(NULL == descriptor) {
|
||||||
OBJ_RELEASE(sendreq);
|
|
||||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
}
|
}
|
||||||
descriptor->des_cbfunc = mca_pml_ob1_send_completion;
|
descriptor->des_cbfunc = mca_pml_ob1_send_completion;
|
||||||
@ -173,31 +169,34 @@ int mca_pml_ob1_send_copy(
|
|||||||
hdr->hdr_common.hdr_flags = 0;
|
hdr->hdr_common.hdr_flags = 0;
|
||||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
|
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
|
||||||
segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t);
|
segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t);
|
||||||
|
ompi_request_complete((ompi_request_t*)sendreq);
|
||||||
} else {
|
} else {
|
||||||
hdr->hdr_common.hdr_flags = MCA_PML_OB1_HDR_FLAGS_ACK;
|
hdr->hdr_common.hdr_flags = MCA_PML_OB1_HDR_FLAGS_ACK;
|
||||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV;
|
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV;
|
||||||
hdr->hdr_rndv.hdr_frag_length = 0;
|
hdr->hdr_rndv.hdr_frag_length = 0;
|
||||||
hdr->hdr_rndv.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
hdr->hdr_rndv.hdr_src_req.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||||
hdr->hdr_rndv.hdr_src_ptr.pval = sendreq;
|
hdr->hdr_rndv.hdr_src_req.pval = sendreq;
|
||||||
segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t);
|
segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t);
|
||||||
}
|
}
|
||||||
ompi_request_complete((ompi_request_t*)sendreq);
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
struct iovec iov;
|
struct iovec iov;
|
||||||
unsigned int iov_count;
|
unsigned int iov_count;
|
||||||
unsigned int max_data;
|
unsigned int max_data;
|
||||||
|
int flags = 0;
|
||||||
|
|
||||||
/* determine first fragment size */
|
/* determine first fragment size */
|
||||||
if(size > endpoint->bmi_eager_limit - sizeof(mca_pml_ob1_hdr_t)) {
|
if(size > endpoint->bmi_eager_limit - sizeof(mca_pml_ob1_hdr_t)) {
|
||||||
size = endpoint->bmi_eager_limit - sizeof(mca_pml_ob1_hdr_t);
|
size = endpoint->bmi_eager_limit - sizeof(mca_pml_ob1_hdr_t);
|
||||||
}
|
flags = MCA_PML_OB1_HDR_FLAGS_ACK;
|
||||||
|
} else if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_SYNCHRONOUS) {
|
||||||
|
flags = MCA_PML_OB1_HDR_FLAGS_ACK;
|
||||||
|
}
|
||||||
|
|
||||||
/* allocate space for hdr + first fragment */
|
/* allocate space for hdr + first fragment */
|
||||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, size + sizeof(mca_pml_ob1_hdr_t));
|
descriptor = endpoint->bmi_alloc(endpoint->bmi, size + sizeof(mca_pml_ob1_hdr_t));
|
||||||
if(NULL == descriptor) {
|
if(NULL == descriptor) {
|
||||||
OBJ_RELEASE(sendreq);
|
|
||||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||||
}
|
}
|
||||||
descriptor->des_cbfunc = mca_pml_ob1_send_completion;
|
descriptor->des_cbfunc = mca_pml_ob1_send_completion;
|
||||||
@ -205,6 +204,7 @@ int mca_pml_ob1_send_copy(
|
|||||||
|
|
||||||
/* build hdr */
|
/* build hdr */
|
||||||
hdr = (mca_pml_ob1_hdr_t*)segment->seg_addr.pval;
|
hdr = (mca_pml_ob1_hdr_t*)segment->seg_addr.pval;
|
||||||
|
hdr->hdr_common.hdr_flags = flags;
|
||||||
hdr->hdr_match.hdr_contextid = sendreq->req_send.req_base.req_comm->c_contextid;
|
hdr->hdr_match.hdr_contextid = sendreq->req_send.req_base.req_comm->c_contextid;
|
||||||
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
|
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
|
||||||
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
|
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
|
||||||
@ -213,8 +213,7 @@ int mca_pml_ob1_send_copy(
|
|||||||
hdr->hdr_match.hdr_msg_seq = sendreq->req_send.req_base.req_sequence;
|
hdr->hdr_match.hdr_msg_seq = sendreq->req_send.req_base.req_sequence;
|
||||||
|
|
||||||
/* if an acknowledgment is not required - can get by w/ shorter hdr */
|
/* if an acknowledgment is not required - can get by w/ shorter hdr */
|
||||||
if (sendreq->req_send.req_send_mode != MCA_PML_BASE_SEND_SYNCHRONOUS) {
|
if (flags == 0) {
|
||||||
hdr->hdr_common.hdr_flags = MCA_PML_OB1_HDR_FLAGS_ACK;
|
|
||||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
|
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
|
||||||
|
|
||||||
/* pack the data into the supplied buffer */
|
/* pack the data into the supplied buffer */
|
||||||
@ -229,19 +228,21 @@ int mca_pml_ob1_send_copy(
|
|||||||
&max_data,
|
&max_data,
|
||||||
NULL)) < 0) {
|
NULL)) < 0) {
|
||||||
endpoint->bmi_free(endpoint->bmi, descriptor);
|
endpoint->bmi_free(endpoint->bmi, descriptor);
|
||||||
OBJ_RELEASE(sendreq);
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* update length w/ number of bytes actually packed */
|
/* update length w/ number of bytes actually packed */
|
||||||
segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t) + max_data;
|
segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t) + max_data;
|
||||||
|
sendreq->req_offset = max_data;
|
||||||
|
if(sendreq->req_offset == sendreq->req_send.req_bytes_packed) {
|
||||||
|
ompi_request_complete((ompi_request_t*)sendreq);
|
||||||
|
}
|
||||||
|
|
||||||
/* rendezvous header is required */
|
/* rendezvous header is required */
|
||||||
} else {
|
} else {
|
||||||
hdr->hdr_common.hdr_flags = MCA_PML_OB1_HDR_FLAGS_ACK;
|
|
||||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV;
|
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV;
|
||||||
hdr->hdr_rndv.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
hdr->hdr_rndv.hdr_src_req.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||||
hdr->hdr_rndv.hdr_src_ptr.pval = sendreq;
|
hdr->hdr_rndv.hdr_src_req.pval = sendreq;
|
||||||
|
|
||||||
/* pack the data into the supplied buffer */
|
/* pack the data into the supplied buffer */
|
||||||
iov.iov_base = (unsigned char*)segment->seg_addr.pval + sizeof(mca_pml_ob1_rendezvous_hdr_t);
|
iov.iov_base = (unsigned char*)segment->seg_addr.pval + sizeof(mca_pml_ob1_rendezvous_hdr_t);
|
||||||
@ -255,16 +256,12 @@ int mca_pml_ob1_send_copy(
|
|||||||
&max_data,
|
&max_data,
|
||||||
NULL)) < 0) {
|
NULL)) < 0) {
|
||||||
endpoint->bmi_free(endpoint->bmi, descriptor);
|
endpoint->bmi_free(endpoint->bmi, descriptor);
|
||||||
OBJ_RELEASE(sendreq);
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
hdr->hdr_rndv.hdr_frag_length = max_data;
|
hdr->hdr_rndv.hdr_frag_length = max_data;
|
||||||
segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t) + max_data;
|
segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t) + max_data;
|
||||||
}
|
sendreq->req_offset = max_data;
|
||||||
sendreq->req_offset = max_data;
|
|
||||||
if(sendreq->req_offset == sendreq->req_send.req_bytes_packed) {
|
|
||||||
ompi_request_complete((ompi_request_t*)sendreq);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
descriptor->des_cbdata = sendreq;
|
descriptor->des_cbdata = sendreq;
|
||||||
@ -277,9 +274,126 @@ int mca_pml_ob1_send_copy(
|
|||||||
MCA_BMI_TAG_PML);
|
MCA_BMI_TAG_PML);
|
||||||
if(OMPI_SUCCESS != rc) {
|
if(OMPI_SUCCESS != rc) {
|
||||||
endpoint->bmi_free(endpoint->bmi,descriptor);
|
endpoint->bmi_free(endpoint->bmi,descriptor);
|
||||||
OBJ_RELEASE(sendreq);
|
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Only allow one thread in this routine for a given request.
|
||||||
|
* However, we cannot block callers on a mutex, so simply keep track
|
||||||
|
* of the number of times the routine has been called and run through
|
||||||
|
* the scheduling logic once for every call.
|
||||||
|
*/
|
||||||
|
if(OMPI_THREAD_ADD32(&sendreq->req_lock,1) == 1) {
|
||||||
|
mca_pml_ob1_proc_t* proc = sendreq->req_proc;
|
||||||
|
do {
|
||||||
|
/* allocate remaining bytes to PTLs */
|
||||||
|
size_t bytes_remaining = sendreq->req_send.req_bytes_packed - sendreq->req_offset;
|
||||||
|
while(bytes_remaining > 0 && sendreq->req_pending < mca_pml_ob1.send_pipeline_depth) {
|
||||||
|
mca_pml_ob1_endpoint_t* ep = mca_pml_ob1_ep_array_get_next(&proc->bmi_next);
|
||||||
|
mca_pml_ob1_frag_hdr_t* hdr;
|
||||||
|
mca_bmi_base_descriptor_t* des;
|
||||||
|
int rc;
|
||||||
|
|
||||||
|
/* if this is the last PTL that is available to use, or the number of
|
||||||
|
* bytes remaining in the message is less than the PTLs minimum fragment
|
||||||
|
* size, then go ahead and give the rest of the message to this PTL.
|
||||||
|
*/
|
||||||
|
size_t size;
|
||||||
|
if(bytes_remaining < ep->bmi_min_seg_size) {
|
||||||
|
size = bytes_remaining;
|
||||||
|
|
||||||
|
/* otherwise attempt to give the PTL a percentage of the message
|
||||||
|
* based on a weighting factor. for simplicity calculate this as
|
||||||
|
* a percentage of the overall message length (regardless of amount
|
||||||
|
* previously assigned)
|
||||||
|
*/
|
||||||
|
} else {
|
||||||
|
size = (ep->bmi_weight * bytes_remaining) / 100;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* makes sure that we don't exceed ptl_max_frag_size */
|
||||||
|
if(ep->bmi_max_seg_size != 0 && size > ep->bmi_max_seg_size)
|
||||||
|
size = ep->bmi_max_seg_size;
|
||||||
|
|
||||||
|
/* pack into a descriptor */
|
||||||
|
des = ep->bmi_prepare_src(
|
||||||
|
ep->bmi,
|
||||||
|
ep->bmi_endpoint,
|
||||||
|
&sendreq->req_send.req_convertor,
|
||||||
|
sizeof(mca_pml_ob1_frag_hdr_t),
|
||||||
|
&size);
|
||||||
|
if(des == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* prepare header */
|
||||||
|
hdr = (mca_pml_ob1_frag_hdr_t*)des->des_src->seg_addr.pval;
|
||||||
|
hdr->hdr_frag_length = size;
|
||||||
|
hdr->hdr_frag_offset = sendreq->req_offset;
|
||||||
|
hdr->hdr_src_req.pval = sendreq;
|
||||||
|
hdr->hdr_dst_req.pval = sendreq->req_peer.pval;
|
||||||
|
|
||||||
|
/* update state */
|
||||||
|
sendreq->req_offset += size;
|
||||||
|
sendreq->req_pending++;
|
||||||
|
|
||||||
|
/* initiate send - note that this may complete before the call returns */
|
||||||
|
rc = ep->bmi_send(ep->bmi, ep->bmi_endpoint, des, MCA_BMI_TAG_PML);
|
||||||
|
if(rc == OMPI_SUCCESS) {
|
||||||
|
bytes_remaining = sendreq->req_send.req_bytes_packed - sendreq->req_offset;
|
||||||
|
} else {
|
||||||
|
sendreq->req_offset -= size;
|
||||||
|
sendreq->req_pending--;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (OMPI_THREAD_ADD32(&sendreq->req_lock,-1) > 0);
|
||||||
|
}
|
||||||
|
return OMPI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
int mca_pml_ob1_send_request_acked(
|
||||||
|
mca_bmi_base_module_t* bmi,
|
||||||
|
mca_pml_ob1_ack_hdr_t* hdr)
|
||||||
|
{
|
||||||
|
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)
|
||||||
|
hdr->hdr_src_req.pval;
|
||||||
|
if(hdr->hdr_common.hdr_flags & MCA_PML_OB1_HDR_FLAGS_PUT) {
|
||||||
|
sendreq->req_state = MCA_PML_OB1_SR_PUT;
|
||||||
|
mca_pml_ob1_send_request_put(sendreq);
|
||||||
|
} else {
|
||||||
|
sendreq->req_state = MCA_PML_OB1_SR_SEND;
|
||||||
|
mca_pml_ob1_send_request_send(sendreq);
|
||||||
|
}
|
||||||
|
return OMPI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
int mca_pml_ob1_send_request_put(
|
||||||
|
mca_pml_ob1_send_request_t* sendreq)
|
||||||
|
{
|
||||||
|
return OMPI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -23,16 +23,29 @@
|
|||||||
#include "mca/pml/base/pml_base_sendreq.h"
|
#include "mca/pml/base/pml_base_sendreq.h"
|
||||||
#include "pml_ob1_proc.h"
|
#include "pml_ob1_proc.h"
|
||||||
#include "pml_ob1_comm.h"
|
#include "pml_ob1_comm.h"
|
||||||
|
#include "pml_ob1_hdr.h"
|
||||||
|
|
||||||
#if defined(c_plusplus) || defined(__cplusplus)
|
#if defined(c_plusplus) || defined(__cplusplus)
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
MCA_PML_OB1_SR_INIT,
|
||||||
|
MCA_PML_OB1_SR_WAIT,
|
||||||
|
MCA_PML_OB1_SR_SEND,
|
||||||
|
MCA_PML_OB1_SR_PUT,
|
||||||
|
MCA_PML_OB1_SR_COMPLETE
|
||||||
|
} mca_pml_ob1_send_request_state_t;
|
||||||
|
|
||||||
|
|
||||||
struct mca_pml_ob1_send_request_t {
|
struct mca_pml_ob1_send_request_t {
|
||||||
mca_pml_base_send_request_t req_send;
|
mca_pml_base_send_request_t req_send;
|
||||||
mca_pml_ob1_proc_t* req_proc;
|
mca_pml_ob1_proc_t* req_proc;
|
||||||
mca_pml_ob1_endpoint_t* req_endpoint;
|
mca_pml_ob1_endpoint_t* req_endpoint;
|
||||||
|
mca_pml_ob1_send_request_state_t req_state;
|
||||||
|
ompi_ptr_t req_peer;
|
||||||
|
int32_t req_lock;
|
||||||
|
size_t req_pending;
|
||||||
size_t req_offset;
|
size_t req_offset;
|
||||||
};
|
};
|
||||||
typedef struct mca_pml_ob1_send_request_t mca_pml_ob1_send_request_t;
|
typedef struct mca_pml_ob1_send_request_t mca_pml_ob1_send_request_t;
|
||||||
@ -41,27 +54,27 @@ typedef struct mca_pml_ob1_send_request_t mca_pml_ob1_send_request_t;
|
|||||||
OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
||||||
|
|
||||||
|
|
||||||
#define MCA_PML_OB1_SEND_REQUEST_ALLOC( \
|
#define MCA_PML_OB1_SEND_REQUEST_ALLOC( \
|
||||||
comm, \
|
comm, \
|
||||||
dst, \
|
dst, \
|
||||||
sendreq, \
|
sendreq, \
|
||||||
rc) \
|
rc) \
|
||||||
{ \
|
{ \
|
||||||
mca_pml_ob1_proc_t *proc = comm->c_pml_procs[dst]; \
|
mca_pml_ob1_proc_t *proc = comm->c_pml_procs[dst]; \
|
||||||
ompi_list_item_t* item; \
|
ompi_list_item_t* item; \
|
||||||
\
|
\
|
||||||
if(NULL == proc) { \
|
if(NULL == proc) { \
|
||||||
rc = OMPI_ERR_OUT_OF_RESOURCE; \
|
rc = OMPI_ERR_OUT_OF_RESOURCE; \
|
||||||
} else { \
|
} else { \
|
||||||
rc = OMPI_SUCCESS; \
|
rc = OMPI_SUCCESS; \
|
||||||
OMPI_FREE_LIST_WAIT(&mca_pml_ob1.send_requests, item, rc); \
|
OMPI_FREE_LIST_WAIT(&mca_pml_ob1.send_requests, item, rc); \
|
||||||
sendreq = (mca_pml_ob1_send_request_t*)item; \
|
sendreq = (mca_pml_ob1_send_request_t*)item; \
|
||||||
sendreq->req_proc = proc; \
|
sendreq->req_proc = proc; \
|
||||||
} \
|
} \
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#define MCA_PML_OB1_SEND_REQUEST_INIT( \
|
#define MCA_PML_OB1_SEND_REQUEST_INIT( \
|
||||||
sendreq, \
|
sendreq, \
|
||||||
buf, \
|
buf, \
|
||||||
count, \
|
count, \
|
||||||
@ -83,38 +96,17 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
|||||||
persistent); \
|
persistent); \
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* NTL doesn't require pre-pinned or "specially" allocated memory.
|
|
||||||
* Can try to directly send from the users buffer if contigous.
|
|
||||||
*/
|
|
||||||
|
|
||||||
int mca_pml_ob1_send_user(
|
|
||||||
mca_pml_ob1_send_request_t* sendreq,
|
|
||||||
mca_pml_ob1_endpoint_t* endpoint);
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* NTL requires "specially" allocated memory. Request a segment that
|
|
||||||
* is used for initial hdr and any eager data.
|
|
||||||
*/
|
|
||||||
|
|
||||||
int mca_pml_ob1_send_copy(
|
|
||||||
mca_pml_ob1_send_request_t* sendreq,
|
|
||||||
mca_pml_ob1_endpoint_t* endpoint);
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start a send request.
|
* Start a send request.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define MCA_PML_OB1_SEND_REQUEST_START(sendreq, rc) \
|
#define MCA_PML_OB1_SEND_REQUEST_START(sendreq, rc) \
|
||||||
{ \
|
{ \
|
||||||
mca_pml_ob1_endpoint_t* endpoint; \
|
mca_pml_ob1_endpoint_t* endpoint; \
|
||||||
mca_pml_ob1_proc_t* proc = sendreq->req_proc; \
|
mca_pml_ob1_proc_t* proc = sendreq->req_proc; \
|
||||||
\
|
\
|
||||||
/* select next endpoint */ \
|
/* select next endpoint */ \
|
||||||
endpoint = mca_pml_ob1_ep_array_get_next(&proc->bmi_first); \
|
endpoint = mca_pml_ob1_ep_array_get_next(&proc->bmi_first); \
|
||||||
sendreq->req_offset = 0; \
|
sendreq->req_offset = 0; \
|
||||||
sendreq->req_send.req_base.req_ompi.req_complete = false; \
|
sendreq->req_send.req_base.req_ompi.req_complete = false; \
|
||||||
sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
|
sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
|
||||||
@ -127,22 +119,84 @@ int mca_pml_ob1_send_copy(
|
|||||||
} \
|
} \
|
||||||
\
|
\
|
||||||
if(NULL != endpoint->bmi_alloc) { \
|
if(NULL != endpoint->bmi_alloc) { \
|
||||||
rc = mca_pml_ob1_send_copy(sendreq, endpoint); \
|
rc = mca_pml_ob1_send_request_start_copy(sendreq, endpoint); \
|
||||||
} else { \
|
} else { \
|
||||||
rc = mca_pml_ob1_send_user(sendreq, endpoint); \
|
rc = mca_pml_ob1_send_request_start_user(sendreq, endpoint); \
|
||||||
} \
|
} \
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Advance a request
|
||||||
|
*/
|
||||||
|
|
||||||
#define MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq) \
|
#define MCA_PML_OB1_SEND_REQUEST_ADVANCE(sendreq)
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
switch(sendreq->req_state) {
|
||||||
|
case MCA_PML_OB1_SR_WAIT:
|
||||||
|
mca_pml_ob1_send_request_wait(sendreq);
|
||||||
|
break;
|
||||||
|
case MCA_PML_OB1_SR_SEND:
|
||||||
|
mca_pml_ob1_send_request_send(sendreq);
|
||||||
|
break;
|
||||||
|
case MCA_PML_OB1_SR_PUT:
|
||||||
|
mca_pml_ob1_send_request_put(sendreq);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
#define MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq) \
|
||||||
{ \
|
{ \
|
||||||
/* Let the base handle the reference counts */ \
|
/* Let the base handle the reference counts */ \
|
||||||
MCA_PML_BASE_SEND_REQUEST_FINI((&sendreq->req_send)); \
|
MCA_PML_BASE_SEND_REQUEST_FINI((&sendreq->req_send)); \
|
||||||
OMPI_FREE_LIST_RETURN( \
|
OMPI_FREE_LIST_RETURN( \
|
||||||
&mca_pml_ob1.send_requests, (ompi_list_item_t*)sendreq); \
|
&mca_pml_ob1.send_requests, (ompi_list_item_t*)sendreq); \
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BMI doesn't require pre-pinned or "specially" allocated memory.
|
||||||
|
* Can try to directly send from the users buffer if contigous.
|
||||||
|
*/
|
||||||
|
|
||||||
|
int mca_pml_ob1_send_request_start_user(
|
||||||
|
mca_pml_ob1_send_request_t* sendreq,
|
||||||
|
mca_pml_ob1_endpoint_t* endpoint);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BMI requires "specially" allocated memory. Request a segment that
|
||||||
|
* is used for initial hdr and any eager data.
|
||||||
|
*/
|
||||||
|
|
||||||
|
int mca_pml_ob1_send_request_start_copy(
|
||||||
|
mca_pml_ob1_send_request_t* sendreq,
|
||||||
|
mca_pml_ob1_endpoint_t* endpoint);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
int mca_pml_ob1_send_request_acked(
|
||||||
|
mca_bmi_base_module_t* bmi,
|
||||||
|
struct mca_pml_ob1_ack_hdr_t* hdr);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
int mca_pml_ob1_send_request_send(
|
||||||
|
mca_pml_ob1_send_request_t* sendreq);
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
int mca_pml_ob1_send_request_put(
|
||||||
|
mca_pml_ob1_send_request_t* sendreq);
|
||||||
|
|
||||||
#if defined(c_plusplus) || defined(__cplusplus)
|
#if defined(c_plusplus) || defined(__cplusplus)
|
||||||
}
|
}
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user