Change the way we handle unexpected messages,
if less than or equal pml_ob1_unexpected_limit just buffer in the PML level recv fragment else allocate a buffer via the bucket allocator This commit was SVN r14117.
Этот коммит содержится в:
родитель
43ef61b808
Коммит
ace68b1883
@ -112,7 +112,7 @@ typedef void* (*mca_allocator_base_component_segment_alloc_fn_t)(
|
||||
* back to the system. This function is to be provided by the module to the
|
||||
* allocator frmaework.
|
||||
*/
|
||||
typedef void* (*mca_allocator_base_component_segment_free_fn_t)(
|
||||
typedef void (*mca_allocator_base_component_segment_free_fn_t)(
|
||||
struct mca_mpool_base_module_t* module,
|
||||
void* segment);
|
||||
|
||||
|
@ -159,15 +159,6 @@ int mca_pml_ob1_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
/* register error handlers */
|
||||
rc = mca_bml.bml_register_error(mca_pml_ob1_error_handler);
|
||||
|
||||
/* initialize free list of receive buffers */
|
||||
ompi_free_list_init(
|
||||
&mca_pml_ob1.buffers,
|
||||
sizeof(mca_pml_ob1_buffer_t) + mca_pml_ob1.eager_limit,
|
||||
OBJ_CLASS(mca_pml_ob1_buffer_t),
|
||||
0,
|
||||
mca_pml_ob1.free_list_max,
|
||||
mca_pml_ob1.free_list_inc,
|
||||
NULL);
|
||||
|
||||
/* we don't have any endpoint data we need to cache on the
|
||||
ompi_proc_t, so set proc_pml to NULL */
|
||||
|
@ -35,6 +35,7 @@
|
||||
#include "pml_ob1_hdr.h"
|
||||
#include "ompi/mca/bml/base/base.h"
|
||||
#include "ompi/proc/proc.h"
|
||||
#include "ompi/mca/allocator/base/base.h"
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
extern "C" {
|
||||
@ -73,6 +74,9 @@ struct mca_pml_ob1_t {
|
||||
opal_list_t recv_pending;
|
||||
opal_list_t rdma_pending;
|
||||
bool enabled;
|
||||
char* allocator_name;
|
||||
mca_allocator_base_module_t* allocator;
|
||||
uint32_t unexpected_limit;
|
||||
};
|
||||
typedef struct mca_pml_ob1_t mca_pml_ob1_t;
|
||||
|
||||
|
@ -35,6 +35,7 @@
|
||||
#include "pml_ob1_recvfrag.h"
|
||||
#include "ompi/mca/bml/base/base.h"
|
||||
#include "pml_ob1_component.h"
|
||||
#include "ompi/mca/allocator/base/base.h"
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
mca_pml_ob1_pckt_pending_t,
|
||||
@ -71,9 +72,15 @@ mca_pml_base_component_1_0_0_t mca_pml_ob1_component = {
|
||||
|
||||
mca_pml_ob1_component_init, /* component init */
|
||||
mca_pml_ob1_component_fini /* component finalize */
|
||||
|
||||
};
|
||||
|
||||
|
||||
void *mca_pml_ob1_seg_alloc( struct mca_mpool_base_module_t* mpool,
|
||||
size_t* size,
|
||||
mca_mpool_base_registration_t** registration);
|
||||
|
||||
void mca_pml_ob1_seg_free( struct mca_mpool_base_module_t* mpool,
|
||||
void* segment );
|
||||
|
||||
static inline int mca_pml_ob1_param_register_int(
|
||||
const char* param_name,
|
||||
@ -84,10 +91,12 @@ static inline int mca_pml_ob1_param_register_int(
|
||||
mca_base_param_lookup_int(id,¶m_value);
|
||||
return param_value;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
int mca_pml_ob1_component_open(void)
|
||||
{
|
||||
mca_allocator_base_component_t* allocator_component;
|
||||
mca_pml_ob1.free_list_num =
|
||||
mca_pml_ob1_param_register_int("free_list_num", 4);
|
||||
mca_pml_ob1.free_list_max =
|
||||
@ -102,8 +111,33 @@ int mca_pml_ob1_component_open(void)
|
||||
mca_pml_ob1_param_register_int("send_pipeline_depth", 3);
|
||||
mca_pml_ob1.recv_pipeline_depth =
|
||||
mca_pml_ob1_param_register_int("recv_pipeline_depth", 4);
|
||||
|
||||
|
||||
mca_pml_ob1.unexpected_limit =
|
||||
mca_pml_ob1_param_register_int("unexpected_limit", 128);
|
||||
|
||||
mca_base_param_reg_string(&mca_pml_ob1_component.pmlm_version,
|
||||
"allocator",
|
||||
"Name of allocator component for unexpected messages",
|
||||
false, false,
|
||||
"bucket",
|
||||
&mca_pml_ob1.allocator_name);
|
||||
|
||||
|
||||
|
||||
allocator_component = mca_allocator_component_lookup( mca_pml_ob1.allocator_name );
|
||||
if(NULL == allocator_component) {
|
||||
opal_output(0, "mca_pml_ob1_component_open: can't find allocator: %s\n", mca_pml_ob1.allocator_name);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
mca_pml_ob1.allocator = allocator_component->allocator_init(true,
|
||||
mca_pml_ob1_seg_alloc, mca_pml_ob1_seg_free, NULL);
|
||||
|
||||
|
||||
if(NULL == mca_pml_ob1.allocator) {
|
||||
opal_output(0, "mca_pml_ob1_component_open: unable to initialize allocator\n");
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.lock, opal_mutex_t);
|
||||
|
||||
/* requests */
|
||||
@ -142,13 +176,15 @@ int mca_pml_ob1_component_open(void)
|
||||
|
||||
ompi_free_list_init(
|
||||
&mca_pml_ob1.recv_frags,
|
||||
sizeof(mca_pml_ob1_recv_frag_t),
|
||||
sizeof(mca_pml_ob1_recv_frag_t) + mca_pml_ob1.unexpected_limit,
|
||||
OBJ_CLASS(mca_pml_ob1_recv_frag_t),
|
||||
mca_pml_ob1.free_list_num,
|
||||
mca_pml_ob1.free_list_max,
|
||||
mca_pml_ob1.free_list_inc,
|
||||
NULL);
|
||||
|
||||
|
||||
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.pending_pckts, ompi_free_list_t);
|
||||
ompi_free_list_init(
|
||||
&mca_pml_ob1.pending_pckts,
|
||||
@ -159,6 +195,7 @@ int mca_pml_ob1_component_open(void)
|
||||
mca_pml_ob1.free_list_inc,
|
||||
NULL);
|
||||
|
||||
|
||||
OBJ_CONSTRUCT(&mca_pml_ob1.buffers, ompi_free_list_t);
|
||||
|
||||
/* pending operations */
|
||||
@ -249,3 +286,14 @@ mca_pml_base_module_t* mca_pml_ob1_component_init(int* priority,
|
||||
return &mca_pml_ob1.super;
|
||||
}
|
||||
|
||||
|
||||
void *mca_pml_ob1_seg_alloc( struct mca_mpool_base_module_t* mpool,
|
||||
size_t* size,
|
||||
mca_mpool_base_registration_t** registration) {
|
||||
return malloc(*size);
|
||||
}
|
||||
|
||||
void mca_pml_ob1_seg_free( struct mca_mpool_base_module_t* mpool,
|
||||
void* segment ) {
|
||||
free(segment);
|
||||
}
|
||||
|
@ -31,14 +31,11 @@ extern "C" {
|
||||
#endif
|
||||
|
||||
struct mca_pml_ob1_buffer_t {
|
||||
ompi_free_list_item_t super;
|
||||
size_t len;
|
||||
unsigned char addr[1];
|
||||
void * addr;
|
||||
};
|
||||
typedef struct mca_pml_ob1_buffer_t mca_pml_ob1_buffer_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(mca_pml_ob1_buffer_t);
|
||||
|
||||
|
||||
struct mca_pml_ob1_recv_frag_t {
|
||||
ompi_free_list_item_t super;
|
||||
@ -47,7 +44,8 @@ struct mca_pml_ob1_recv_frag_t {
|
||||
size_t num_segments;
|
||||
mca_btl_base_module_t* btl;
|
||||
mca_btl_base_segment_t segments[MCA_BTL_DES_MAX_SEGMENTS];
|
||||
mca_pml_ob1_buffer_t* buffers[MCA_BTL_DES_MAX_SEGMENTS];
|
||||
mca_pml_ob1_buffer_t buffers[MCA_BTL_DES_MAX_SEGMENTS];
|
||||
unsigned char addr[1];
|
||||
};
|
||||
typedef struct mca_pml_ob1_recv_frag_t mca_pml_ob1_recv_frag_t;
|
||||
|
||||
@ -66,44 +64,56 @@ do { \
|
||||
do { \
|
||||
size_t i; \
|
||||
mca_btl_base_segment_t* macro_segments = frag->segments; \
|
||||
mca_pml_ob1_buffer_t** buffers = frag->buffers; \
|
||||
mca_pml_ob1_buffer_t* buffers = frag->buffers; \
|
||||
\
|
||||
/* init recv_frag */ \
|
||||
frag->btl = btl; \
|
||||
frag->hdr = *(mca_pml_ob1_hdr_t*)hdr; \
|
||||
frag->num_segments = cnt; \
|
||||
/* copy over data */ \
|
||||
for(i=0; i<cnt; i++) { \
|
||||
ompi_free_list_item_t* item; \
|
||||
mca_pml_ob1_buffer_t* buff; \
|
||||
OMPI_FREE_LIST_WAIT(&mca_pml_ob1.buffers, item, rc); \
|
||||
buff = (mca_pml_ob1_buffer_t*)item; \
|
||||
buffers[i] = buff; \
|
||||
macro_segments[i].seg_addr.pval = buff->addr; \
|
||||
macro_segments[i].seg_len = segs[i].seg_len; \
|
||||
memcpy(buff->addr, \
|
||||
segs[i].seg_addr.pval, \
|
||||
segs[i].seg_len); \
|
||||
} \
|
||||
\
|
||||
} while(0)
|
||||
if(cnt == 1 && segs[0].seg_len <= mca_pml_ob1.unexpected_limit ) { \
|
||||
macro_segments[0].seg_addr.pval = frag->addr; \
|
||||
macro_segments[0].seg_len = segs[0].seg_len; \
|
||||
memcpy(frag->addr, \
|
||||
segs[0].seg_addr.pval, \
|
||||
segs[0].seg_len); \
|
||||
} else { \
|
||||
for(i=0; i<cnt; i++) { \
|
||||
buffers[i].len = segs[i].seg_len; \
|
||||
buffers[i].addr = (char*) mca_pml_ob1.allocator->alc_alloc( \
|
||||
mca_pml_ob1.allocator, \
|
||||
segs[i].seg_len, \
|
||||
0, \
|
||||
NULL); \
|
||||
macro_segments[i].seg_addr.pval = buffers[i].addr; \
|
||||
macro_segments[i].seg_len = segs[i].seg_len; \
|
||||
memcpy(buffers[i].addr, \
|
||||
segs[i].seg_addr.pval, \
|
||||
segs[i].seg_len); \
|
||||
} \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
|
||||
#define MCA_PML_OB1_RECV_FRAG_RETURN(frag) \
|
||||
do { \
|
||||
size_t i; \
|
||||
\
|
||||
/* return buffers */ \
|
||||
for(i=0; i<frag->num_segments; i++) { \
|
||||
OMPI_FREE_LIST_RETURN(&mca_pml_ob1.buffers, \
|
||||
(ompi_free_list_item_t*)frag->buffers[i]); \
|
||||
} \
|
||||
frag->num_segments = 0; \
|
||||
\
|
||||
/* return recv_frag */ \
|
||||
OMPI_FREE_LIST_RETURN(&mca_pml_ob1.recv_frags, \
|
||||
(ompi_free_list_item_t*)frag); \
|
||||
} while(0)
|
||||
#define MCA_PML_OB1_RECV_FRAG_RETURN(frag) \
|
||||
do { \
|
||||
size_t i; \
|
||||
if(!(frag->num_segments == 1 && frag->segments[0].seg_len <= \
|
||||
mca_pml_ob1.unexpected_limit)) { \
|
||||
\
|
||||
/* return buffers */ \
|
||||
for(i=0; i<frag->num_segments; i++) { \
|
||||
mca_pml_ob1.allocator->alc_free( \
|
||||
mca_pml_ob1.allocator, \
|
||||
frag->buffers[i].addr ); \
|
||||
} \
|
||||
} \
|
||||
frag->num_segments = 0; \
|
||||
\
|
||||
/* return recv_frag */ \
|
||||
OMPI_FREE_LIST_RETURN(&mca_pml_ob1.recv_frags, \
|
||||
(ompi_free_list_item_t*)frag); \
|
||||
} while(0)
|
||||
|
||||
|
||||
/**
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user