1
1

* Set max tag for BTLs to 255 not 256

* Major rework of Portals to better match Red Storm and hopefully get
  better performance:
  - Always assume there is only one module (since there are no machines
    on the planet with more than one Portals interface)
  - make progress all one function rather than dispatching to other
    functions and dispatch on event type, not comm type
  - remove polling of unneeded events

This commit was SVN r6769.
Этот коммит содержится в:
Brian Barrett 2005-08-08 20:56:26 +00:00
родитель 7f726081ba
Коммит 694bbc158f
15 изменённых файлов: 609 добавлений и 823 удалений

Просмотреть файл

@ -129,7 +129,7 @@ typedef uint8_t mca_btl_base_tag_t;
#define MCA_BTL_TAG_BTL 0
#define MCA_BTL_TAG_PML 1
#define MCA_BTL_TAG_USR 2
#define MCA_BTL_TAG_MAX 256 /* 1 + highest allowed tag num */
#define MCA_BTL_TAG_MAX 255 /* 1 + highest allowed tag num */
/* prefered protocol */
#define MCA_BTL_FLAGS_SEND 1

Просмотреть файл

@ -73,44 +73,42 @@ mca_btl_portals_add_procs(struct mca_btl_base_module_t* btl_base,
struct mca_btl_base_endpoint_t** peers,
ompi_bitmap_t* reachable)
{
struct mca_btl_portals_module_t *btl =
(struct mca_btl_portals_module_t*) btl_base;
int ret;
struct ompi_proc_t *curr_proc = NULL;
ptl_process_id_t *portals_procs = NULL;
size_t i;
unsigned long distance;
bool need_recv_setup = false;
bool need_activate = false;
assert(&mca_btl_portals_module == (mca_btl_portals_module_t*) btl_base);
opal_output_verbose(50, mca_btl_portals_component.portals_output,
"Adding %d procs (%d)", nprocs,
mca_btl_portals_module.portals_num_procs);
/* make sure our environment is fully initialized. At end of this
call, we have a working network handle on our module and
portals_procs will have the portals process identifier for each
proc (ordered, in theory) */
ret = mca_btl_portals_add_procs_compat(btl, nprocs, procs,
ret = mca_btl_portals_add_procs_compat(&mca_btl_portals_module,
nprocs, procs,
&portals_procs);
if (OMPI_SUCCESS != ret) return ret;
OPAL_THREAD_LOCK(&btl->portals_lock);
if (0 == opal_list_get_size(&btl->portals_endpoint_list)) {
need_recv_setup = true;
if (0 == mca_btl_portals_module.portals_num_procs) {
need_activate = true;
}
/* loop through all procs, setting our reachable flag */
for (i= 0; i < nprocs ; ++i) {
curr_proc = procs[i];
peers[i] = OBJ_NEW(mca_btl_portals_endpoint_t);
peers[i]->endpoint_btl = btl;
peers[i]->endpoint_proc = curr_proc;
peers[i]->endpoint_ptl_id = portals_procs[i];
opal_list_append(&btl->portals_endpoint_list,
(opal_list_item_t*) peers[i]);
peers[i] = malloc(sizeof(mca_btl_base_endpoint_t));
if (NULL == peers[i]) return OMPI_ERROR;
*((mca_btl_base_endpoint_t*) peers[i]) = portals_procs[i];
/* make sure we can reach the process - this is supposed to be
a cheap-ish operation */
ret = PtlNIDist(btl->portals_ni_h,
ret = PtlNIDist(mca_btl_portals_module.portals_ni_h,
portals_procs[i],
&distance);
if (ret != PTL_OK) {
@ -119,35 +117,50 @@ mca_btl_portals_add_procs(struct mca_btl_base_module_t* btl_base,
continue;
}
OPAL_THREAD_ADD32(&mca_btl_portals_module.portals_num_procs, 1);
/* and here we can reach */
ompi_bitmap_set_bit(reachable, i);
}
if (NULL != portals_procs) free(portals_procs);
if (need_recv_setup) {
if (need_activate && mca_btl_portals_module.portals_num_procs > 0) {
/* create eqs */
int i;
opal_output_verbose(50, mca_btl_portals_component.portals_output,
"Enabling progress");
for (i = 0 ; i < OMPI_BTL_PORTALS_EQ_SIZE ; ++i) {
int ptl_ret = PtlEQAlloc(btl->portals_ni_h,
btl->portals_eq_sizes[i],
int ptl_ret = PtlEQAlloc(mca_btl_portals_module.portals_ni_h,
mca_btl_portals_module.portals_eq_sizes[i],
PTL_EQ_HANDLER_NONE,
&(btl->portals_eq_handles[i]));
&(mca_btl_portals_module.portals_eq_handles[i]));
if (PTL_OK != ptl_ret) {
opal_output(mca_btl_portals_component.portals_output,
"Error creating EQ %d: %d", i, ptl_ret);
OPAL_THREAD_UNLOCK(&btl->portals_lock);
/* BWB - better error code? */
return OMPI_ERROR;
}
}
ret = mca_btl_portals_recv_enable(btl);
ret = mca_btl_portals_recv_enable(&mca_btl_portals_module);
/* fill in send memory descriptor */
mca_btl_portals_module.md_send.start = NULL;
mca_btl_portals_module.md_send.length = 0;
mca_btl_portals_module.md_send.threshold = 2; /* send and ack */
mca_btl_portals_module.md_send.max_size = 0;
mca_btl_portals_module.md_send.options = PTL_MD_EVENT_START_DISABLE;
mca_btl_portals_module.md_send.user_ptr = NULL;
mca_btl_portals_module.md_send.eq_handle =
mca_btl_portals_module.portals_eq_handles[OMPI_BTL_PORTALS_EQ_SEND];
} else {
ret = OMPI_SUCCESS;
}
OPAL_THREAD_UNLOCK(&btl->portals_lock);
opal_output_verbose(50, mca_btl_portals_component.portals_output,
"count: %d", mca_btl_portals_module.portals_num_procs);
return ret;
}
@ -159,35 +172,30 @@ mca_btl_portals_del_procs(struct mca_btl_base_module_t *btl_base,
struct ompi_proc_t **procs,
struct mca_btl_base_endpoint_t **peers)
{
mca_btl_portals_module_t *btl =
(mca_btl_portals_module_t*) btl_base;
size_t i = 0;
int ret = OMPI_SUCCESS;
bool need_recv_shutdown = false;
opal_output_verbose(100, mca_btl_portals_component.portals_output,
"del_procs called for %ld procs", (long) nprocs);
OPAL_THREAD_LOCK(&btl->portals_lock);
assert(&mca_btl_portals_module == (mca_btl_portals_module_t*) btl_base);
opal_output_verbose(50, mca_btl_portals_component.portals_output,
"Removing %d procs (%d)", nprocs,
mca_btl_portals_module.portals_num_procs);
for (i = 0 ; i < nprocs ; ++i) {
opal_list_remove_item(&btl->portals_endpoint_list,
(opal_list_item_t*) peers[i]);
OBJ_RELEASE(peers[i]);
free(peers[i]);
OPAL_THREAD_ADD32(&mca_btl_portals_module.portals_num_procs, -1);
}
if (0 == opal_list_get_size(&btl->portals_endpoint_list)) {
need_recv_shutdown = true;
}
if (need_recv_shutdown) {
if (0 == mca_btl_portals_module.portals_num_procs) {
int i;
ret = mca_btl_portals_recv_disable(btl);
opal_output_verbose(50, mca_btl_portals_component.portals_output,
"Disabling progress");
ret = mca_btl_portals_recv_disable(&mca_btl_portals_module);
/* destroy eqs */
for (i = 0 ; i < OMPI_BTL_PORTALS_EQ_SIZE ; ++i) {
int ptl_ret = PtlEQFree(btl->portals_eq_handles[i]);
int ptl_ret = PtlEQFree(mca_btl_portals_module.portals_eq_handles[i]);
if (PTL_OK != ptl_ret) {
opal_output(mca_btl_portals_component.portals_output,
"Error freeing EQ %d: %d", i, ptl_ret);
@ -198,21 +206,20 @@ mca_btl_portals_del_procs(struct mca_btl_base_module_t *btl_base,
ret = OMPI_SUCCESS;
}
OPAL_THREAD_UNLOCK(&btl->portals_lock);
return ret;
}
int
mca_btl_portals_register(struct mca_btl_base_module_t* btl,
mca_btl_portals_register(struct mca_btl_base_module_t* btl_base,
mca_btl_base_tag_t tag,
mca_btl_base_module_recv_cb_fn_t cbfunc,
void* cbdata)
{
mca_btl_portals_module_t* portals_btl = (mca_btl_portals_module_t*) btl;
portals_btl->portals_reg[tag].cbfunc = cbfunc;
portals_btl->portals_reg[tag].cbdata = cbdata;
assert(&mca_btl_portals_module == (mca_btl_portals_module_t*) btl_base);
mca_btl_portals_module.portals_reg[tag].cbfunc = cbfunc;
mca_btl_portals_module.portals_reg[tag].cbdata = cbdata;
return OMPI_SUCCESS;
}
@ -222,20 +229,21 @@ mca_btl_base_descriptor_t*
mca_btl_portals_alloc(struct mca_btl_base_module_t* btl_base,
size_t size)
{
mca_btl_portals_module_t* btl = (mca_btl_portals_module_t*) btl_base;
mca_btl_portals_frag_t* frag;
int rc;
assert(&mca_btl_portals_module == (mca_btl_portals_module_t*) btl_base);
if (size <= btl->super.btl_eager_limit) {
OMPI_BTL_PORTALS_FRAG_ALLOC_EAGER(btl, frag, rc);
if (size <= mca_btl_portals_module.super.btl_eager_limit) {
OMPI_BTL_PORTALS_FRAG_ALLOC_EAGER(&mca_btl_portals_module, frag, rc);
frag->segment.seg_len =
size <= btl->super.btl_eager_limit ?
size : btl->super.btl_eager_limit ;
size <= mca_btl_portals_module.super.btl_eager_limit ?
size : mca_btl_portals_module.super.btl_eager_limit ;
} else {
OMPI_BTL_PORTALS_FRAG_ALLOC_MAX(btl, frag, rc);
OMPI_BTL_PORTALS_FRAG_ALLOC_MAX(&mca_btl_portals_module, frag, rc);
frag->segment.seg_len =
size <= btl->super.btl_max_send_size ?
size : btl->super.btl_max_send_size ;
size <= mca_btl_portals_module.super.btl_max_send_size ?
size : mca_btl_portals_module.super.btl_max_send_size ;
}
frag->base.des_flags = 0;
@ -248,15 +256,16 @@ int
mca_btl_portals_free(struct mca_btl_base_module_t* btl_base,
mca_btl_base_descriptor_t* des)
{
mca_btl_portals_module_t* btl = (mca_btl_portals_module_t*) btl_base;
mca_btl_portals_frag_t* frag = (mca_btl_portals_frag_t*) des;
assert(&mca_btl_portals_module == (mca_btl_portals_module_t*) btl_base);
if (frag->size == 0) {
OMPI_BTL_PORTALS_FRAG_RETURN_USER(&btl->super, frag);
} else if (frag->size == btl->super.btl_eager_limit){
OMPI_BTL_PORTALS_FRAG_RETURN_EAGER(&btl->super, frag);
} else if (frag->size == btl->super.btl_max_send_size) {
OMPI_BTL_PORTALS_FRAG_RETURN_MAX(&btl->super, frag);
OMPI_BTL_PORTALS_FRAG_RETURN_USER(&mca_btl_portals_module.super, frag);
} else if (frag->size == mca_btl_portals_module.super.btl_eager_limit){
OMPI_BTL_PORTALS_FRAG_RETURN_EAGER(&mca_btl_portals_module.super, frag);
} else if (frag->size == mca_btl_portals_module.super.btl_max_send_size) {
OMPI_BTL_PORTALS_FRAG_RETURN_MAX(&mca_btl_portals_module.super, frag);
} else {
return OMPI_ERR_BAD_PARAM;
}
@ -273,7 +282,6 @@ mca_btl_portals_prepare_src(struct mca_btl_base_module_t* btl_base,
size_t reserve,
size_t* size)
{
mca_btl_portals_module_t* btl = (mca_btl_portals_module_t*) btl_base;
mca_btl_portals_frag_t* frag;
size_t max_data = *size;
struct iovec iov;
@ -281,10 +289,12 @@ mca_btl_portals_prepare_src(struct mca_btl_base_module_t* btl_base,
int32_t free_after;
int ret;
assert(&mca_btl_portals_module == (mca_btl_portals_module_t*) btl_base);
if (0 == reserve && 0 == ompi_convertor_need_buffers(convertor)) {
/* we can send right out of the buffer (woo!). */
OMPI_BTL_PORTALS_FRAG_ALLOC_USER(&btl->super, frag, ret);
OMPI_BTL_PORTALS_FRAG_ALLOC_USER(&mca_btl_portals_module.super, frag, ret);
if(NULL == frag){
return NULL;
}
@ -297,13 +307,13 @@ mca_btl_portals_prepare_src(struct mca_btl_base_module_t* btl_base,
frag->segment.seg_len = max_data;
frag->segment.seg_addr.pval = iov.iov_base;
} else if (max_data+reserve <= btl->super.btl_eager_limit) {
} else if (max_data+reserve <= mca_btl_portals_module.super.btl_eager_limit) {
/*
* if we can't send out of the buffer directly and the
* requested size is less than the eager limit, pack into a
* fragment from the eager pool
*/
OMPI_BTL_PORTALS_FRAG_ALLOC_EAGER(btl, frag, ret);
OMPI_BTL_PORTALS_FRAG_ALLOC_EAGER(&mca_btl_portals_module, frag, ret);
if (NULL == frag) {
return NULL;
}
@ -314,7 +324,7 @@ mca_btl_portals_prepare_src(struct mca_btl_base_module_t* btl_base,
&max_data, &free_after);
*size = max_data;
if (ret < 0) {
OMPI_BTL_PORTALS_FRAG_RETURN_EAGER(btl, frag);
OMPI_BTL_PORTALS_FRAG_RETURN_EAGER(&mca_btl_portals_module, frag);
return NULL;
}
frag->segment.seg_len = max_data + reserve;
@ -324,12 +334,12 @@ mca_btl_portals_prepare_src(struct mca_btl_base_module_t* btl_base,
* otherwise pack as much data as we can into a fragment
* that is the max send size.
*/
OMPI_BTL_PORTALS_FRAG_ALLOC_MAX(btl, frag, ret);
OMPI_BTL_PORTALS_FRAG_ALLOC_MAX(&mca_btl_portals_module, frag, ret);
if (NULL == frag) {
return NULL;
}
if (max_data + reserve > btl->super.btl_max_send_size){
max_data = btl->super.btl_max_send_size - reserve;
if (max_data + reserve > mca_btl_portals_module.super.btl_max_send_size){
max_data = mca_btl_portals_module.super.btl_max_send_size - reserve;
}
iov.iov_len = max_data;
iov.iov_base = (unsigned char*) frag->segment.seg_addr.pval + reserve;
@ -337,7 +347,7 @@ mca_btl_portals_prepare_src(struct mca_btl_base_module_t* btl_base,
&max_data, &free_after);
*size = max_data;
if ( ret < 0 ) {
OMPI_BTL_PORTALS_FRAG_RETURN_MAX(btl, frag);
OMPI_BTL_PORTALS_FRAG_RETURN_MAX(&mca_btl_portals_module, frag);
return NULL;
}
frag->segment.seg_len = max_data + reserve;
@ -361,37 +371,38 @@ mca_btl_portals_prepare_dst(struct mca_btl_base_module_t* btl_base,
size_t reserve,
size_t* size)
{
struct mca_btl_portals_module_t *btl =
(struct mca_btl_portals_module_t *) btl_base;
mca_btl_portals_frag_t* frag;
ptl_md_t md;
ptl_handle_me_t me_h;
ptl_handle_md_t md_h;
int ret;
OMPI_BTL_PORTALS_FRAG_ALLOC_USER(&btl->super, frag, ret);
assert(&mca_btl_portals_module == (mca_btl_portals_module_t*) btl_base);
OMPI_BTL_PORTALS_FRAG_ALLOC_USER(&mca_btl_portals_module.super, frag, ret);
if(NULL == frag) {
return NULL;
}
frag->segment.seg_len = *size;
frag->segment.seg_addr.pval = convertor->pBaseBuf + convertor->bConverted;
frag->segment.seg_key.key64 = OPAL_THREAD_ADD64(&(btl->portals_rdma_key), 1);
frag->segment.seg_key.key64 = OPAL_THREAD_ADD64(&(mca_btl_portals_module.portals_rdma_key), 1);
frag->base.des_src = NULL;
frag->base.des_src_cnt = 0;
frag->base.des_dst = &frag->segment;
frag->base.des_dst_cnt = 1;
frag->base.des_flags = 0;
frag->type = mca_btl_portals_frag_type_rdma;
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"rdma dest posted for frag 0x%x, callback 0x%x, bits %lld",
frag, frag->base.des_cbfunc, frag->segment.seg_key.key64));
/* create a match entry */
ret = PtlMEAttach(btl->portals_ni_h,
ret = PtlMEAttach(mca_btl_portals_module.portals_ni_h,
OMPI_BTL_PORTALS_RDMA_TABLE_ID,
peer->endpoint_ptl_id,
*((mca_btl_base_endpoint_t*) peer),
frag->segment.seg_key.key64, /* match */
0, /* ignore */
PTL_UNLINK,
@ -400,7 +411,7 @@ mca_btl_portals_prepare_dst(struct mca_btl_base_module_t* btl_base,
if (PTL_OK != ret) {
opal_output(mca_btl_portals_component.portals_output,
"Error creating rdma dest ME: %d", ret);
OMPI_BTL_PORTALS_FRAG_RETURN_USER(&btl->super, frag);
OMPI_BTL_PORTALS_FRAG_RETURN_USER(&mca_btl_portals_module.super, frag);
return NULL;
}
@ -410,11 +421,11 @@ mca_btl_portals_prepare_dst(struct mca_btl_base_module_t* btl_base,
later :) */
md.start = frag->segment.seg_addr.pval;
md.length = frag->segment.seg_len;
md.threshold = 1; /* unlink after START / END */
md.threshold = 1; /* unlink after put */
md.max_size = 0;
md.options = PTL_MD_OP_PUT | PTL_MD_OP_GET;
md.options = PTL_MD_OP_PUT | PTL_MD_OP_GET | PTL_MD_EVENT_START_DISABLE;
md.user_ptr = frag; /* keep a pointer to ourselves */
md.eq_handle = btl->portals_eq_handles[OMPI_BTL_PORTALS_EQ_RDMA];
md.eq_handle = mca_btl_portals_module.portals_eq_handles[OMPI_BTL_PORTALS_EQ];
ret = PtlMDAttach(me_h,
md,
@ -424,7 +435,7 @@ mca_btl_portals_prepare_dst(struct mca_btl_base_module_t* btl_base,
opal_output(mca_btl_portals_component.portals_output,
"Error creating rdma dest MD: %d", ret);
PtlMEUnlink(me_h);
OMPI_BTL_PORTALS_FRAG_RETURN_USER(&btl->super, frag);
OMPI_BTL_PORTALS_FRAG_RETURN_USER(&mca_btl_portals_module.super, frag);
return NULL;
}
@ -435,55 +446,42 @@ mca_btl_portals_prepare_dst(struct mca_btl_base_module_t* btl_base,
int
mca_btl_portals_finalize(struct mca_btl_base_module_t *btl_base)
{
struct mca_btl_portals_module_t *btl =
(struct mca_btl_portals_module_t *) btl_base;
int ret, i;
opal_list_item_t *item;
int ret;
assert(&mca_btl_portals_module == (mca_btl_portals_module_t*) btl_base);
/* finalize all communication */
while (btl->portals_outstanding_sends > 0) {
while (mca_btl_portals_module.portals_outstanding_sends > 0) {
mca_btl_portals_component_progress();
}
if (0 != opal_list_get_size(&(btl->portals_queued_sends))) {
if (0 != opal_list_get_size(&(mca_btl_portals_module.portals_queued_sends))) {
opal_output(mca_btl_portals_component.portals_output,
"Warning: there were %d queued sends not sent",
opal_list_get_size(&(btl->portals_queued_sends)));
opal_list_get_size(&(mca_btl_portals_module.portals_queued_sends)));
}
OPAL_THREAD_LOCK(&btl->portals_lock);
if (mca_btl_portals_module.portals_num_procs != 0) {
int i;
if (0 != opal_list_get_size(&btl->portals_endpoint_list)) {
OPAL_THREAD_LOCK(&btl->portals_lock);
while (NULL !=
(item = opal_list_remove_first(&btl->portals_endpoint_list))) {
OBJ_RELEASE(item);
}
/* only do this if there was something in the endpoint list.
otherwise, it has alredy been done. */
/* shut down recv queues */
ret = mca_btl_portals_recv_disable(btl);
ret = mca_btl_portals_recv_disable(&mca_btl_portals_module);
/* destroy eqs */
for (i = 0 ; i < OMPI_BTL_PORTALS_EQ_SIZE ; ++i) {
int ptl_ret = PtlEQFree(btl->portals_eq_handles[i]);
int ptl_ret = PtlEQFree(mca_btl_portals_module.portals_eq_handles[i]);
if (PTL_OK != ptl_ret) {
opal_output(mca_btl_portals_component.portals_output,
"Error freeing EQ %d: %d", i, ptl_ret);
}
}
}
OBJ_DESTRUCT(&btl->portals_endpoint_list);
OBJ_DESTRUCT(&btl->portals_recv_chunks);
OBJ_DESTRUCT(&btl->portals_queued_sends);
}
OPAL_THREAD_UNLOCK(&btl->portals_lock);
OBJ_DESTRUCT(&mca_btl_portals_module.portals_recv_blocks);
OBJ_DESTRUCT(&mca_btl_portals_module.portals_queued_sends);
if (PTL_INVALID_HANDLE != btl->portals_ni_h) {
ret = PtlNIFini(btl->portals_ni_h);
if (PTL_INVALID_HANDLE != mca_btl_portals_module.portals_ni_h) {
ret = PtlNIFini(mca_btl_portals_module.portals_ni_h);
if (PTL_OK != ret) {
opal_output_verbose(20, mca_btl_portals_component.portals_output,
"PtlNIFini returned %d", ret);
@ -491,8 +489,6 @@ mca_btl_portals_finalize(struct mca_btl_base_module_t *btl_base)
}
}
OBJ_DESTRUCT(&btl->portals_lock);
opal_output_verbose(20, mca_btl_portals_component.portals_output,
"successfully finalized module");

Просмотреть файл

@ -58,30 +58,19 @@ struct mca_btl_portals_component_t {
char *portals_ifname;
#endif
/* Number of currently active portals modules. We assume these
never change between init and finalize, so these aren't thread
locked */
uint32_t portals_num_modules;
/* List of currently available modules */
struct mca_btl_portals_module_t *portals_modules;
/* initial size of free lists */
int portals_free_list_init_num;
/* max size of free lists */
int portals_free_list_max_num;
/* numer of elements to grow free lists */
int portals_free_list_inc_num;
/* lock for accessing component */
opal_mutex_t portals_lock;
};
typedef struct mca_btl_portals_component_t mca_btl_portals_component_t;
#define OMPI_BTL_PORTALS_EQ_RECV 0
#define OMPI_BTL_PORTALS_EQ_SEND 1
#define OMPI_BTL_PORTALS_EQ_RDMA 2
#define OMPI_BTL_PORTALS_EQ_SIZE 3
#define OMPI_BTL_PORTALS_EQ_SEND 0
#define OMPI_BTL_PORTALS_EQ 1
#define OMPI_BTL_PORTALS_EQ_SIZE 2
struct mca_btl_portals_module_t {
/* base BTL module interface */
@ -90,23 +79,24 @@ struct mca_btl_portals_module_t {
/* registered callbacks */
mca_btl_base_recv_reg_t portals_reg[MCA_BTL_TAG_MAX];
/* list of connected procs */
opal_list_t portals_endpoint_list;
/* number of processes we're actively connected to. Needed to
know when to do activation / shutdown */
int32_t portals_num_procs;
/* fragment free lists */
ompi_free_list_t portals_frag_eager;
ompi_free_list_t portals_frag_max;
ompi_free_list_t portals_frag_user;
ompi_free_list_t portals_frag_recv;
/* number of mds for recv frags */
/* incoming send message receive memory descriptors */
int portals_recv_mds_num;
/* size of each md for first frags */
int portals_recv_mds_size;
/* list of recv chunks */
opal_list_t portals_recv_chunks;
opal_list_t portals_recv_blocks;
/* size for event queue */
/* event queues. Keep sends on own eq, since we can't control
space for the ack otherwise */
int portals_eq_sizes[OMPI_BTL_PORTALS_EQ_SIZE];
/* frag receive event queue */
ptl_handle_eq_t portals_eq_handles[OMPI_BTL_PORTALS_EQ_SIZE];
/* "reject" entry for recv match list */
@ -124,14 +114,12 @@ struct mca_btl_portals_module_t {
/* our portals network interface */
ptl_handle_ni_t portals_ni_h;
/* the limits returned from PtlNIInit for interface */
ptl_ni_limits_t portals_ni_limits;
/* number of dropped messages */
ptl_sr_value_t portals_sr_dropped;
/* lock for accessing module */
opal_mutex_t portals_lock;
/* descriptors for send */
ptl_md_t md_send;
};
typedef struct mca_btl_portals_module_t mca_btl_portals_module_t;
@ -178,9 +166,6 @@ int mca_btl_portals_del_procs(struct mca_btl_base_module_t* btl_base,
struct ompi_proc_t **procs,
struct mca_btl_base_endpoint_t** peers);
/*
* stubbed functions
*/
int mca_btl_portals_register(struct mca_btl_base_module_t* btl_base,
mca_btl_base_tag_t tag,
mca_btl_base_module_recv_cb_fn_t cbfunc,
@ -228,7 +213,6 @@ int mca_btl_portals_get(struct mca_btl_base_module_t* btl_base,
* global structures
*/
extern mca_btl_portals_component_t mca_btl_portals_component;
/* don't use, except as base for creating module instances */
extern mca_btl_portals_module_t mca_btl_portals_module;
#endif

Просмотреть файл

@ -46,9 +46,7 @@ mca_btl_portals_init_compat(mca_btl_portals_component_t *comp)
{
ptl_process_id_t info;
int ret, max_interfaces;
uint32_t i;
struct mca_btl_portals_module_t *btl;
#if 0 /* send all the portals internal debug to a file or stderr */
#if 1 /* send all the portals internal debug to a file or stderr */
FILE *output;
char *tmp;
@ -74,28 +72,6 @@ mca_btl_portals_init_compat(mca_btl_portals_component_t *comp)
use_modex = false;
}
/* with the utcp interface, only ever one "NIC" */
comp->portals_num_modules = 1;
comp->portals_modules = calloc(comp->portals_num_modules,
sizeof(mca_btl_portals_module_t));
if (NULL == comp->portals_modules) {
opal_output_verbose(10, mca_btl_portals_component.portals_output,
"malloc failed in mca_btl_portals_init");
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
}
btl = &(comp->portals_modules[0]);
/* compat code is responsible for copying over the "template" onto
each module instance. The calling code will create the free
lists and the like - we're only responsible for the
Portals-specific entries */
for (i = 0 ; i < comp->portals_num_modules ; ++i) {
memcpy(&(comp->portals_modules[i]),
&mca_btl_portals_module,
sizeof(mca_btl_portals_module_t));
/* the defaults are good enough for the rest */
}
if (use_modex) {
/* post our contact info in the registry */
info.nid = htonl(utcp_my_nid(mca_btl_portals_component.portals_ifname));
@ -128,8 +104,8 @@ mca_btl_portals_init_compat(mca_btl_portals_component_t *comp)
ret = PtlNIInit(PTL_IFACE_DEFAULT, /* interface to initialize */
PTL_PID_ANY, /* let library assign our pid */
NULL, /* no desired limits */
&(btl->portals_ni_limits), /* save our limits somewhere */
&(btl->portals_ni_h) /* our interface handle */
NULL, /* no need to have limits around */
&mca_btl_portals_module.portals_ni_h /* our interface handle */
);
if (PTL_OK != ret) {
opal_output_verbose(10, mca_btl_portals_component.portals_output,
@ -252,8 +228,8 @@ mca_btl_portals_add_procs_compat(struct mca_btl_portals_module_t* btl,
ret = PtlNIInit(PTL_IFACE_DEFAULT, /* interface to initialize */
PTL_PID_ANY, /* let library assign our pid */
NULL, /* no desired limits */
&(btl->portals_ni_limits), /* save our limits somewhere */
&(btl->portals_ni_h) /* our interface handle */
NULL, /* save our limits somewhere */
&(mca_btl_portals_module.portals_ni_h) /* our interface handle */
);
if (PTL_OK != ret) {
opal_output_verbose(10, mca_btl_portals_component.portals_output,
@ -300,7 +276,7 @@ mca_btl_portals_add_procs_compat(struct mca_btl_portals_module_t* btl,
}
#if 0
PtlNIDebug(btl->portals_ni_h, PTL_DBG_ALL | PTL_DBG_NI_ALL);
PtlNIDebug(mca_btl_portals_module.portals_ni_h, PTL_DBG_NI_ALL);
#endif
return OMPI_SUCCESS;

Просмотреть файл

@ -84,15 +84,23 @@ mca_btl_portals_component_open(void)
int i;
int dummy;
/* initialize component state */
mca_btl_portals_component.portals_num_modules = 0;
mca_btl_portals_component.portals_modules = NULL;
/*
* get configured state for component
*/
/* initalize component objects */
OBJ_CONSTRUCT(&mca_btl_portals_component.portals_lock,
opal_mutex_t);
/* start up debugging output */
mca_base_param_reg_int(&mca_btl_portals_component.super.btl_version,
"debug_level",
"Debugging verbosity (0 - 100)",
false,
false,
OMPI_BTL_PORTALS_DEFAULT_DEBUG_LEVEL,
&(portals_output_stream.lds_verbose_level));
asprintf(&(portals_output_stream.lds_prefix),
"btl: portals (%5d): ", getpid());
mca_btl_portals_component.portals_output =
opal_output_open(&portals_output_stream);
/* get configured state for component */
#if OMPI_BTL_PORTALS_UTCP
mca_base_param_reg_string(&mca_btl_portals_component.super.btl_version,
"ifname",
@ -125,20 +133,9 @@ mca_btl_portals_component_open(void)
OMPI_BTL_PORTALS_DEFAULT_FREE_LIST_INC_NUM,
&(mca_btl_portals_component.portals_free_list_inc_num));
/* start up debugging output */
mca_base_param_reg_int(&mca_btl_portals_component.super.btl_version,
"debug_level",
"Debugging verbosity (0 - 100)",
false,
false,
OMPI_BTL_PORTALS_DEFAULT_DEBUG_LEVEL,
&(portals_output_stream.lds_verbose_level));
asprintf(&(portals_output_stream.lds_prefix),
"btl: portals (%5d): ", getpid());
mca_btl_portals_component.portals_output =
opal_output_open(&portals_output_stream);
/* fill default module state */
/*
* fill default module state
*/
mca_base_param_reg_int(&mca_btl_portals_component.super.btl_version,
"eager_limit",
"Maximum size for eager frag",
@ -206,8 +203,9 @@ mca_btl_portals_component_open(void)
&dummy);
mca_btl_portals_module.super.btl_bandwidth = dummy;
mca_btl_portals_module.super.btl_flags = MCA_BTL_FLAGS_RDMA;
mca_btl_portals_module.super.btl_flags = MCA_BTL_FLAGS_SEND;
mca_btl_portals_module.portals_num_procs = 0;
bzero(&(mca_btl_portals_module.portals_reg),
sizeof(mca_btl_portals_module.portals_reg));
@ -218,12 +216,12 @@ mca_btl_portals_component_open(void)
/* eq handles will be created when the module is instantiated.
Set sizes here */
mca_base_param_reg_int(&mca_btl_portals_component.super.btl_version,
"eq_recv_size",
"Size of the receive event queue",
"eq_size",
"Size of the event queue",
false,
false,
OMPI_BTL_PORTALS_DEFAULT_RECV_QUEUE_SIZE,
&(mca_btl_portals_module.portals_eq_sizes[OMPI_BTL_PORTALS_EQ_RECV]));
&(mca_btl_portals_module.portals_eq_sizes[OMPI_BTL_PORTALS_EQ]));
mca_base_param_reg_int(&mca_btl_portals_component.super.btl_version,
"eq_send_max_pending",
@ -232,17 +230,9 @@ mca_btl_portals_component_open(void)
false,
OMPI_BTL_PORTALS_MAX_SENDS_PENDING,
&(mca_btl_portals_module.portals_max_outstanding_sends));
/* sends_pending * 3 for start, end, ack */
/* sends_pending * 2 for end, ack */
mca_btl_portals_module.portals_eq_sizes[OMPI_BTL_PORTALS_EQ_SEND] =
mca_btl_portals_module.portals_max_outstanding_sends * 3;
mca_base_param_reg_int(&mca_btl_portals_component.super.btl_version,
"eq_rdma_size",
"Size of the rdma event queue",
false,
false,
512,
&(mca_btl_portals_module.portals_eq_sizes[OMPI_BTL_PORTALS_EQ_RDMA]));
mca_btl_portals_module.portals_max_outstanding_sends * 2;
mca_btl_portals_module.portals_recv_reject_me_h = PTL_INVALID_HANDLE;
@ -274,12 +264,6 @@ int
mca_btl_portals_component_close(void)
{
/* release resources */
OBJ_DESTRUCT(&mca_btl_portals_component.portals_lock);
if (NULL != mca_btl_portals_component.portals_modules) {
free(mca_btl_portals_component.portals_modules);
}
#if OMPI_BTL_PORTALS_UTCP
if (NULL != mca_btl_portals_component.portals_ifname) {
free(mca_btl_portals_component.portals_ifname);
@ -303,12 +287,10 @@ mca_btl_portals_component_init(int *num_btls,
bool enable_progress_threads,
bool enable_mpi_threads)
{
mca_btl_base_module_t** btls;
uint32_t i;
mca_btl_base_module_t ** btls = malloc(sizeof(mca_btl_base_module_t*));
btls[0] = (mca_btl_base_module_t*) &mca_btl_portals_module;
*num_btls = 0;
if (enable_progress_threads) {
if (enable_progress_threads || enable_mpi_threads) {
opal_output_verbose(20, mca_btl_portals_component.portals_output,
"disabled because threads enabled");
return NULL;
@ -322,68 +304,60 @@ mca_btl_portals_component_init(int *num_btls,
return NULL;
}
/* create an array of btl* to return */
btls = malloc(mca_btl_portals_component.portals_num_modules *
sizeof(mca_btl_portals_module_t*));
/* fill in all the portable parts of the module structs - the
compat code filled in the other bits already */
for (i = 0 ; i < mca_btl_portals_component.portals_num_modules ; ++i) {
mca_btl_portals_module_t* ptl_btl =
(mca_btl_portals_component.portals_modules + i);
btls[i] = (mca_btl_base_module_t*) ptl_btl;
OBJ_CONSTRUCT(&(mca_btl_portals_module.portals_frag_eager), ompi_free_list_t);
OBJ_CONSTRUCT(&(mca_btl_portals_module.portals_frag_max), ompi_free_list_t);
OBJ_CONSTRUCT(&(mca_btl_portals_module.portals_frag_user), ompi_free_list_t);
OBJ_CONSTRUCT(&(mca_btl_portals_module.portals_frag_recv), ompi_free_list_t);
OBJ_CONSTRUCT(&(ptl_btl->portals_frag_eager), ompi_free_list_t);
OBJ_CONSTRUCT(&(ptl_btl->portals_frag_max), ompi_free_list_t);
OBJ_CONSTRUCT(&(ptl_btl->portals_frag_user), ompi_free_list_t);
/* eager frags */
ompi_free_list_init(&(mca_btl_portals_module.portals_frag_eager),
sizeof(mca_btl_portals_frag_eager_t) +
mca_btl_portals_module.super.btl_eager_limit,
OBJ_CLASS(mca_btl_portals_frag_eager_t),
mca_btl_portals_component.portals_free_list_init_num,
mca_btl_portals_component.portals_free_list_max_num,
mca_btl_portals_component.portals_free_list_inc_num,
NULL);
/* eager frags */
ompi_free_list_init(&(ptl_btl->portals_frag_eager),
sizeof(mca_btl_portals_frag_eager_t) +
ptl_btl->super.btl_eager_limit,
OBJ_CLASS(mca_btl_portals_frag_eager_t),
mca_btl_portals_component.portals_free_list_init_num,
mca_btl_portals_component.portals_free_list_max_num,
mca_btl_portals_component.portals_free_list_inc_num,
NULL);
/* send frags */
ompi_free_list_init(&(mca_btl_portals_module.portals_frag_max),
sizeof(mca_btl_portals_frag_max_t) +
mca_btl_portals_module.super.btl_max_send_size,
OBJ_CLASS(mca_btl_portals_frag_max_t),
mca_btl_portals_component.portals_free_list_init_num,
mca_btl_portals_component.portals_free_list_max_num,
mca_btl_portals_component.portals_free_list_inc_num,
NULL);
/* send frags */
ompi_free_list_init(&(ptl_btl->portals_frag_max),
sizeof(mca_btl_portals_frag_max_t) +
ptl_btl->super.btl_max_send_size,
OBJ_CLASS(mca_btl_portals_frag_max_t),
mca_btl_portals_component.portals_free_list_init_num,
mca_btl_portals_component.portals_free_list_max_num,
mca_btl_portals_component.portals_free_list_inc_num,
NULL);
/* user frags */
ompi_free_list_init(&(mca_btl_portals_module.portals_frag_user),
sizeof(mca_btl_portals_frag_user_t),
OBJ_CLASS(mca_btl_portals_frag_user_t),
mca_btl_portals_component.portals_free_list_init_num,
mca_btl_portals_component.portals_free_list_max_num,
mca_btl_portals_component.portals_free_list_inc_num,
NULL);
/* user frags */
ompi_free_list_init(&(ptl_btl->portals_frag_user),
sizeof(mca_btl_portals_frag_user_t),
OBJ_CLASS(mca_btl_portals_frag_user_t),
mca_btl_portals_component.portals_free_list_init_num,
mca_btl_portals_component.portals_free_list_max_num,
mca_btl_portals_component.portals_free_list_inc_num,
NULL);
/* recv frags */
ompi_free_list_init(&(mca_btl_portals_module.portals_frag_recv),
sizeof(mca_btl_portals_frag_recv_t),
OBJ_CLASS(mca_btl_portals_frag_recv_t),
mca_btl_portals_component.portals_free_list_init_num,
mca_btl_portals_component.portals_free_list_max_num,
mca_btl_portals_component.portals_free_list_inc_num,
NULL);
/* endpoint list */
OBJ_CONSTRUCT(&(ptl_btl->portals_endpoint_list), opal_list_t);
/* receive block list */
OBJ_CONSTRUCT(&(mca_btl_portals_module.portals_recv_blocks), opal_list_t);
/* receive chunk list */
OBJ_CONSTRUCT(&(ptl_btl->portals_recv_chunks), opal_list_t);
/* pending sends */
OBJ_CONSTRUCT(&(ptl_btl->portals_queued_sends), opal_list_t);
/* lock */
OBJ_CONSTRUCT(&(ptl_btl->portals_lock), opal_mutex_t);
}
*num_btls = mca_btl_portals_component.portals_num_modules;
/* pending sends */
OBJ_CONSTRUCT(&(mca_btl_portals_module.portals_queued_sends), opal_list_t);
*num_btls = 1;
opal_output_verbose(20, mca_btl_portals_component.portals_output,
"initialized %d modules",
*num_btls);
"initialized Portals module");
return btls;
}
@ -393,72 +367,246 @@ int
mca_btl_portals_component_progress(void)
{
int num_progressed = 0;
size_t i;
int ret, which;
static ptl_event_t ev;
mca_btl_portals_frag_t *frag = NULL;
mca_btl_portals_recv_block_t *block = NULL;
mca_btl_base_tag_t tag;
for (i = 0 ; i < mca_btl_portals_component.portals_num_modules ; ++i) {
struct mca_btl_portals_module_t *module =
&(mca_btl_portals_component.portals_modules)[i];
ptl_event_t ev;
ptl_sr_value_t numdropped;
int which;
int ret;
if (0 == mca_btl_portals_module.portals_num_procs) {
return 0;
}
if (module->portals_eq_handles[OMPI_BTL_PORTALS_EQ_SIZE - 1] ==
PTL_EQ_NONE) continue; /* they are all initialized at once */
#if OMPI_ENABLE_DEBUG
/* check for dropped packets. In theory, our protocol covers
this, but it can't hurt to check while we're debugging */
PtlNIStatus(module->portals_ni_h,
PTL_SR_DROP_COUNT,
&numdropped);
if (numdropped != module->portals_sr_dropped) {
opal_output_verbose(30, mca_btl_portals_component.portals_output,
"*** Dropped message count changed. %lld, %lld",
module->portals_sr_dropped, numdropped);
module->portals_sr_dropped = numdropped;
}
while (true) {
ret = PtlEQPoll(mca_btl_portals_module.portals_eq_handles,
OMPI_BTL_PORTALS_EQ_SIZE,
#if OMPI_BTL_PORTALS_REDSTORM
0, /* timeout */
#else
1, /* timeout */
#endif
&ev,
&which);
switch (ret) {
case PTL_OK:
frag = ev.md.user_ptr;
num_progressed++;
while (true) {
ret = PtlEQPoll(module->portals_eq_handles,
OMPI_BTL_PORTALS_EQ_SIZE, /* number of eq handles */
0, /* poll time */
&ev,
&which);
if (PTL_EQ_EMPTY == ret) {
/* nothing to see here - move along */
mca_btl_portals_progress_queued_sends(module);
break;
} else if (!(PTL_OK == ret || PTL_EQ_DROPPED == ret)) {
/* BWB - how can we report errors? */
opal_output(mca_btl_portals_component.portals_output,
"*** Error calling PtlEQGet: %d ***", ret);
break;
} else if (PTL_EQ_DROPPED == ret) {
opal_output_verbose(10, mca_btl_portals_component.portals_output,
"*** Event queue entries were dropped ***");
}
switch (which) {
case OMPI_BTL_PORTALS_EQ_RECV:
mca_btl_portals_process_recv(module, &ev);
break;
case OMPI_BTL_PORTALS_EQ_SEND:
mca_btl_portals_process_send(module, &ev);
break;
case OMPI_BTL_PORTALS_EQ_RDMA:
mca_btl_portals_process_rdma(module, &ev);
break;
default:
switch (ev.type) {
case PTL_EVENT_GET_START:
/* BWB - FIX ME - need to fill in */
abort();
break;
}
num_progressed++;
case PTL_EVENT_GET_END:
/* BWB - FIX ME - need to fill in */
abort();
break;
case PTL_EVENT_PUT_START:
OPAL_OUTPUT_VERBOSE((900, mca_btl_portals_component.portals_output,
"PTL_EVENT_PUT_START for 0x%x, %d",
frag, (int) ev.hdr_data));
#if OMPI_ENABLE_DEBUG
if (ev.ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to start event\n");
return OMPI_ERROR;
}
#endif
if (ev.hdr_data < MCA_BTL_TAG_MAX) {
block = ev.md.user_ptr;
OPAL_THREAD_ADD32(&(block->pending), 1);
}
break;
case PTL_EVENT_PUT_END:
OPAL_OUTPUT_VERBOSE((900, mca_btl_portals_component.portals_output,
"PTL_EVENT_PUT_END for 0x%x, %d",
frag, (int) ev.hdr_data));
#if OMPI_ENABLE_DEBUG
if (ev.ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to end event\n");
mca_btl_portals_return_block_part(&mca_btl_portals_module,
block);
return OMPI_ERROR;
}
#endif
if (ev.hdr_data < MCA_BTL_TAG_MAX) {
block = ev.md.user_ptr;
tag = ev.hdr_data;
OMPI_BTL_PORTALS_FRAG_ALLOC_RECV(&mca_btl_portals_module, frag, ret);
frag->segment.seg_addr.pval = (((char*) ev.md.start) + ev.offset);
frag->segment.seg_len = ev.mlength;
if (ev.md.length - (ev.offset + ev.mlength) < ev.md.max_size) {
/* the block is full. It's deactivated automagically, but we
can't start it up again until everyone is done with it.
The actual reactivation and all that will happen after the
free completes the last operation... */
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"marking block 0x%x as full", block->start));
block->full = true;
}
assert(NULL != mca_btl_portals_module.portals_reg[tag].cbfunc);
mca_btl_portals_module.portals_reg[tag].cbfunc(
&mca_btl_portals_module.super,
tag,
&frag->base,
mca_btl_portals_module.portals_reg[tag].cbdata);
OMPI_BTL_PORTALS_FRAG_RETURN_RECV(&mca_btl_portals_module.super,
frag);
mca_btl_portals_return_block_part(&mca_btl_portals_module, block);
}
break;
case PTL_EVENT_REPLY_START:
/* BWB - FIX ME - need to fill in */
abort();
break;
case PTL_EVENT_REPLY_END:
/* BWB - FIX ME - need to fill in */
abort();
break;
case PTL_EVENT_SEND_START:
#if OMPI_ENABLE_DEBUG
OPAL_OUTPUT_VERBOSE((900, mca_btl_portals_component.portals_output,
"PTL_EVENT_SEND_START for 0x%x, %d, %d",
frag, (int) frag->type, (int) ev.hdr_data));
if (ev.ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to start send event\n");
if (ev.hdr_data < MCA_BTL_TAG_MAX) {
OPAL_THREAD_ADD32(&mca_btl_portals_module.portals_outstanding_sends,
-1);
/* unlink, since we don't expect to get an end or ack */
}
PtlMDUnlink(ev.md_handle);
frag->base.des_cbfunc(&mca_btl_portals_module.super,
frag->endpoint,
&frag->base,
OMPI_ERROR);
}
#endif
break;
case PTL_EVENT_SEND_END:
#if OMPI_ENABLE_DEBUG
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"PTL_EVENT_SEND_END for 0x%x, %d, %d",
frag, (int) frag->type, (int) ev.hdr_data));
if (ev.ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to end send event\n");
if (ev.hdr_data < MCA_BTL_TAG_MAX) {
/* unlink, since we don't expect to get an ack */
OPAL_THREAD_ADD32(&mca_btl_portals_module.portals_outstanding_sends,
-1);
PtlMDUnlink(ev.md_handle);
}
frag->base.des_cbfunc(&mca_btl_portals_module.super,
frag->endpoint,
&frag->base,
OMPI_ERROR);
}
#endif
break;
case PTL_EVENT_ACK:
/* ACK for either send or RDMA put. Either way, we
just call the callback function on goodness.
Requeue the put on badness */
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"PTL_EVENT_ACK for 0x%x, %d",
frag, (int) frag->type));
if (frag->type == mca_btl_portals_frag_type_send) {
OPAL_THREAD_ADD32(&mca_btl_portals_module.portals_outstanding_sends,
-1);
}
#if OMPI_ENABLE_DEBUG
if (ev.ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to ack event\n");
/* unlink, since we don't expect to get an ack */
PtlMDUnlink(ev.md_handle);
frag->base.des_cbfunc(&mca_btl_portals_module.super,
frag->endpoint,
&frag->base,
OMPI_ERROR);
} else
#endif
if (0 == ev.mlength) {
/* other side did not receive the message */
opal_output_verbose(50,
mca_btl_portals_component.portals_output,
"message was dropped. Adding to front of queue list");
opal_list_prepend(&(mca_btl_portals_module.portals_queued_sends),
(opal_list_item_t*) frag);
} else {
/* other side did receive the message */
assert(ev.mlength == frag->segment.seg_len);
/* let the PML know we're done */
frag->base.des_cbfunc(&mca_btl_portals_module.super,
frag->endpoint,
&frag->base,
OMPI_SUCCESS);
}
if (frag->type == mca_btl_portals_frag_type_send) {
MCA_BTL_PORTALS_PROGRESS_QUEUED_SENDS();
}
break;
default:
break;
}
break;
case PTL_EQ_EMPTY:
/* there's nothing in the queue. This is actually the
common case, so the easiest way to make the compiler
emit something that doesn't completely blow here is to
just to go back to a good old goto */
goto done;
break;
case PTL_EQ_DROPPED:
opal_output(mca_btl_portals_component.portals_output,
"WARNING: EQ events dropped. Too many messages pending.");
opal_output(mca_btl_portals_component.portals_output,
"WARNING: Giving up in dispair");
abort();
break;
default:
opal_output(mca_btl_portals_component.portals_output,
"WARNING: Error in PtlEQPoll (%d). This shouldn't happen",
ret);
abort();
break;
}
}
done:
return num_progressed;
}

Просмотреть файл

@ -20,25 +20,4 @@
#include "btl_portals.h"
#include "btl_portals_endpoint.h"
/*
* Initialize state of the endpoint instance.
*
*/
static void mca_btl_portals_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
{
endpoint->endpoint_btl = NULL;
endpoint->endpoint_proc = NULL;
endpoint->endpoint_ptl_id.nid = 0;
endpoint->endpoint_ptl_id.pid = 0;
}
OBJ_CLASS_INSTANCE(
mca_btl_portals_endpoint_t,
opal_list_item_t,
mca_btl_portals_endpoint_construct,
NULL);
/* BWB - FIX ME - delete this file */

Просмотреть файл

@ -31,25 +31,8 @@ extern "C" {
* and BTL pair at startup. However, connections to the endpoint
* are established dynamically on an as-needed basis:
*/
struct mca_btl_base_endpoint_t {
opal_list_item_t super;
/** BTL instance that created this connection */
struct mca_btl_portals_module_t* endpoint_btl;
/** proc structure corresponding to endpoint */
struct ompi_proc_t *endpoint_proc;
/** Portals address for endpoint */
ptl_process_id_t endpoint_ptl_id;
};
typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t;
typedef mca_btl_base_endpoint_t mca_btl_portals_endpoint_t;
OBJ_CLASS_DECLARATION(mca_btl_portals_endpoint_t);
typedef ptl_process_id_t mca_btl_base_endpoint_t;
typedef mca_btl_base_endpoint_t mca_btl_portals_endpoint_t;
#if defined(c_plusplus) || defined(__cplusplus)
}

Просмотреть файл

@ -61,6 +61,18 @@ mca_btl_portals_frag_user_constructor(mca_btl_portals_frag_t* frag)
frag->size = 0;
}
static void
mca_btl_portals_frag_recv_constructor(mca_btl_portals_frag_t* frag)
{
frag->base.des_flags = 0;
frag->base.des_dst = &frag->segment;
frag->base.des_dst_cnt = 1;
frag->base.des_src = NULL;
frag->base.des_src_cnt = 0;
frag->size = 0;
frag->type = mca_btl_portals_frag_type_recv;
}
OBJ_CLASS_INSTANCE(
mca_btl_portals_frag_t,
@ -86,3 +98,9 @@ OBJ_CLASS_INSTANCE(
mca_btl_portals_frag_user_constructor,
NULL);
OBJ_CLASS_INSTANCE(
mca_btl_portals_frag_recv_t,
mca_btl_base_descriptor_t,
mca_btl_portals_frag_recv_constructor,
NULL);

Просмотреть файл

@ -29,9 +29,12 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_btl_portals_frag_t);
struct mca_btl_portals_frag_t {
mca_btl_base_descriptor_t base;
mca_btl_base_segment_t segment;
struct mca_btl_portals_module_t *btl;
/* needed for retransmit case */
struct mca_btl_base_endpoint_t *endpoint;
mca_btl_base_header_t hdr;
enum { mca_btl_portals_frag_type_send,
mca_btl_portals_frag_type_recv,
mca_btl_portals_frag_type_rdma} type;
size_t size;
};
@ -40,30 +43,27 @@ OBJ_CLASS_DECLARATION(mca_btl_portals_frag_t);
typedef struct mca_btl_portals_frag_t mca_btl_portals_frag_eager_t;
OBJ_CLASS_DECLARATION(mca_btl_portals_frag_eager_t);
typedef struct mca_btl_portals_frag_t mca_btl_portals_frag_max_t;
OBJ_CLASS_DECLARATION(mca_btl_portals_frag_max_t);
typedef struct mca_btl_portals_frag_t mca_btl_portals_frag_user_t;
OBJ_CLASS_DECLARATION(mca_btl_portals_frag_user_t);
typedef struct mca_btl_portals_frag_t mca_btl_portals_frag_recv_t;
OBJ_CLASS_DECLARATION(mca_btl_portals_frag_recv_t);
/*
* Macros to allocate/return descriptors from module specific
* free list(s).
*/
#define OMPI_BTL_PORTALS_FRAG_ALLOC_EAGER(btl_macro, frag, rc) \
{ \
\
opal_list_item_t *item; \
OMPI_FREE_LIST_WAIT(&((mca_btl_portals_module_t*)btl_macro)->portals_frag_eager, item, rc); \
frag = (mca_btl_portals_frag_t*) item; \
frag->btl = (mca_btl_portals_module_t*) btl_macro; \
}
#define OMPI_BTL_PORTALS_FRAG_RETURN_EAGER(btl_macro, frag) \
@ -72,13 +72,13 @@ OBJ_CLASS_DECLARATION(mca_btl_portals_frag_user_t);
(opal_list_item_t*)(frag)); \
}
#define OMPI_BTL_PORTALS_FRAG_ALLOC_MAX(btl_macro, frag, rc) \
{ \
\
opal_list_item_t *item; \
OMPI_FREE_LIST_WAIT(&((mca_btl_portals_module_t*)btl_macro)->portals_frag_max, item, rc); \
frag = (mca_btl_portals_frag_t*) item; \
frag->btl = (mca_btl_portals_module_t*) btl_macro; \
}
#define OMPI_BTL_PORTALS_FRAG_RETURN_MAX(btl_macro, frag) \
@ -93,7 +93,6 @@ OBJ_CLASS_DECLARATION(mca_btl_portals_frag_user_t);
opal_list_item_t *item; \
OMPI_FREE_LIST_WAIT(&((mca_btl_portals_module_t*)btl_macro)->portals_frag_user, item, rc); \
frag = (mca_btl_portals_frag_t*) item; \
frag->btl = (mca_btl_portals_module_t*) btl_macro; \
}
#define OMPI_BTL_PORTALS_FRAG_RETURN_USER(btl_macro, frag) \
@ -103,6 +102,20 @@ OBJ_CLASS_DECLARATION(mca_btl_portals_frag_user_t);
}
#define OMPI_BTL_PORTALS_FRAG_ALLOC_RECV(btl_macro, frag, rc) \
{ \
opal_list_item_t *item; \
OMPI_FREE_LIST_WAIT(&((mca_btl_portals_module_t*)btl_macro)->portals_frag_recv, item, rc); \
frag = (mca_btl_portals_frag_t*) item; \
}
#define OMPI_BTL_PORTALS_FRAG_RETURN_RECV(btl_macro, frag) \
{ \
OMPI_FREE_LIST_RETURN(&((mca_btl_portals_module_t*)btl_macro)->portals_frag_recv, \
(opal_list_item_t*)(frag)); \
}
#if defined(c_plusplus) || defined(__cplusplus)
}

Просмотреть файл

@ -23,116 +23,6 @@
#include "btl_portals_rdma.h"
#include "btl_portals_frag.h"
int
mca_btl_portals_process_rdma(mca_btl_portals_module_t *btl,
ptl_event_t *ev)
{
mca_btl_portals_frag_t *frag =
(mca_btl_portals_frag_t*) ev->md.user_ptr;
switch (ev->type) {
case PTL_EVENT_SEND_START:
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"rdma: PTL_EVENT_SEND_START for 0x%x",
frag));
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to start rdma send event\n");
/* unlink, since we don't expect to get an end or ack */
PtlMDUnlink(ev->md_handle);
frag->base.des_cbfunc(&btl->super,
frag->endpoint,
&frag->base,
OMPI_ERROR);
}
break;
case PTL_EVENT_SEND_END:
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"rdma: PTL_EVENT_SEND_END for 0x%x",
frag));
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to end rdma send event\n");
/* unlink, since we don't expect to get an ack */
PtlMDUnlink(ev->md_handle);
frag->base.des_cbfunc(&btl->super,
frag->endpoint,
&frag->base,
OMPI_ERROR);
}
break;
case PTL_EVENT_ACK:
/* ok, this is the real work - the message has been received
on the other side. */
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"rdma: PTL_EVENT_ACK for 0x%x, Ox%x",
frag, frag->base.des_cbfunc));
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure in rdma send event ack\n");
frag->base.des_cbfunc(&btl->super,
frag->endpoint,
&frag->base,
OMPI_ERROR);
} else {
assert(ev->mlength == frag->segment.seg_len);
/* let the PML know we're done... */
frag->base.des_cbfunc(&btl->super,
frag->endpoint,
&frag->base,
OMPI_SUCCESS);
}
break;
case PTL_EVENT_PUT_START:
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"rdma: PTL_EVENT_PUT_START for 0x%x",
frag));
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure in rdma put start\n");
/* unlink, since we don't expect to get an end */
PtlMDUnlink(ev->md_handle);
frag->base.des_cbfunc(&btl->super,
frag->endpoint,
&frag->base,
OMPI_ERROR);
}
break;
case PTL_EVENT_PUT_END:
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"rdma: PTL_EVENT_PUT_END for 0x%x, Ox%x",
frag, frag->base.des_cbfunc));
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure in rdma put end\n");
} else {
assert(ev->mlength == frag->segment.seg_len);
}
break;
default:
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"rdma: unexpected event %d for 0x%x",
ev->type, frag));
break;
}
return OMPI_SUCCESS;
}
int
mca_btl_portals_put(struct mca_btl_base_module_t* btl_base,
struct mca_btl_base_endpoint_t* btl_peer,
@ -143,19 +33,23 @@ mca_btl_portals_put(struct mca_btl_base_module_t* btl_base,
ptl_handle_md_t md_h;
int ret;
assert(&mca_btl_portals_module == (mca_btl_portals_module_t*) btl_base);
frag->endpoint = btl_peer;
frag->hdr.tag = MCA_BTL_TAG_MAX;
frag->type = mca_btl_portals_frag_type_rdma;
/* setup the send */
md.start = frag->segment.seg_addr.pval;
md.length = frag->segment.seg_len;
md.threshold = 2; /* unlink after send, ack */
md.threshold = 2; /* unlink after send & ack */
md.max_size = 0;
md.options = 0;
md.options = PTL_MD_EVENT_START_DISABLE;
md.user_ptr = frag; /* keep a pointer to ourselves */
md.eq_handle = frag->btl->portals_eq_handles[OMPI_BTL_PORTALS_EQ_RDMA];
md.eq_handle = mca_btl_portals_module.portals_eq_handles[OMPI_BTL_PORTALS_EQ];
/* make a free-floater */
ret = PtlMDBind(frag->btl->portals_ni_h,
ret = PtlMDBind(mca_btl_portals_module.portals_ni_h,
md,
PTL_UNLINK,
&md_h);
@ -165,18 +59,14 @@ mca_btl_portals_put(struct mca_btl_base_module_t* btl_base,
return OMPI_ERROR;
}
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"rdma put called for frag 0x%x, callback 0x%xbits %lld",
frag, frag->base.des_cbfunc, frag->base.des_dst[0].seg_key.key64));
ret = PtlPut(md_h,
PTL_ACK_REQ,
btl_peer->endpoint_ptl_id,
*((mca_btl_base_endpoint_t*) btl_peer),
OMPI_BTL_PORTALS_RDMA_TABLE_ID,
0, /* ac_index - not used*/
frag->base.des_dst[0].seg_key.key64, /* match bits */
0, /* remote offset - not used */
frag->hdr.tag); /* hdr_data - tag */
MCA_BTL_TAG_MAX); /* hdr_data - invalid tag */
if (ret != PTL_OK) {
opal_output(mca_btl_portals_component.portals_output,
"PtlPut failed with error %d", ret);

Просмотреть файл

@ -17,8 +17,7 @@
#ifndef MCA_BTL_PORTALS_RDMA_H
#define MCA_BTL_PORTALS_RDMA_H
int mca_btl_portals_process_rdma(mca_btl_portals_module_t *module,
ptl_event_t *ev);
/* BWB - FIX ME - delete this file */
#endif /* MCA_BTL_PORTALS_RDMA_H */

Просмотреть файл

@ -24,7 +24,7 @@
#include "btl_portals_frag.h"
OBJ_CLASS_INSTANCE(mca_btl_portals_recv_chunk_t,
OBJ_CLASS_INSTANCE(mca_btl_portals_recv_block_t,
opal_list_item_t,
NULL, NULL);
@ -73,17 +73,17 @@ mca_btl_portals_recv_enable(mca_btl_portals_module_t *btl)
return OMPI_ERROR;
}
/* create the recv chunks */
/* create the recv blocks */
for (i = 0 ; i < btl->portals_recv_mds_num ; ++i) {
mca_btl_portals_recv_chunk_t *chunk =
mca_btl_portals_recv_chunk_init(btl);
if (NULL == chunk) {
mca_btl_portals_recv_block_t *block =
mca_btl_portals_recv_block_init(btl);
if (NULL == block) {
mca_btl_portals_recv_disable(btl);
return OMPI_ERROR;
}
opal_list_append(&(btl->portals_recv_chunks),
(opal_list_item_t*) chunk);
mca_btl_portals_activate_chunk(chunk);
opal_list_append(&(btl->portals_recv_blocks),
(opal_list_item_t*) block);
mca_btl_portals_activate_block(block);
}
return OMPI_SUCCESS;
@ -95,12 +95,12 @@ mca_btl_portals_recv_disable(mca_btl_portals_module_t *btl)
{
opal_list_item_t *item;
if (opal_list_get_size(&btl->portals_recv_chunks) > 0) {
if (opal_list_get_size(&btl->portals_recv_blocks) > 0) {
while (NULL !=
(item = opal_list_remove_first(&btl->portals_recv_chunks))) {
mca_btl_portals_recv_chunk_t *chunk =
(mca_btl_portals_recv_chunk_t*) item;
mca_btl_portals_recv_chunk_free(chunk);
(item = opal_list_remove_first(&btl->portals_recv_blocks))) {
mca_btl_portals_recv_block_t *block =
(mca_btl_portals_recv_block_t*) item;
mca_btl_portals_recv_block_free(block);
}
}
@ -114,126 +114,49 @@ mca_btl_portals_recv_disable(mca_btl_portals_module_t *btl)
}
mca_btl_portals_recv_chunk_t*
mca_btl_portals_recv_chunk_init(mca_btl_portals_module_t *btl)
mca_btl_portals_recv_block_t*
mca_btl_portals_recv_block_init(mca_btl_portals_module_t *btl)
{
mca_btl_portals_recv_chunk_t *chunk;
mca_btl_portals_recv_block_t *block;
chunk = OBJ_NEW(mca_btl_portals_recv_chunk_t);
chunk->btl = btl;
chunk->length = btl->portals_recv_mds_size;
chunk->start = malloc(chunk->length);
if (chunk->start == NULL) return NULL;
block = OBJ_NEW(mca_btl_portals_recv_block_t);
block->btl = btl;
block->length = btl->portals_recv_mds_size;
block->start = malloc(block->length);
if (block->start == NULL) return NULL;
chunk->me_h = PTL_INVALID_HANDLE;
chunk->md_h = PTL_INVALID_HANDLE;
block->me_h = PTL_INVALID_HANDLE;
block->md_h = PTL_INVALID_HANDLE;
chunk->full = false;
chunk->pending = 0;
block->full = false;
block->pending = 0;
return chunk;
return block;
}
int
mca_btl_portals_recv_chunk_free(mca_btl_portals_recv_chunk_t *chunk)
mca_btl_portals_recv_block_free(mca_btl_portals_recv_block_t *block)
{
/* need to clear out the md */
while (chunk->pending != 0) {
while (block->pending != 0) {
mca_btl_portals_component_progress();
}
if (PTL_INVALID_HANDLE != chunk->md_h) {
PtlMDUnlink(chunk->md_h);
chunk->md_h = PTL_INVALID_HANDLE;
if (PTL_INVALID_HANDLE != block->md_h) {
PtlMDUnlink(block->md_h);
block->md_h = PTL_INVALID_HANDLE;
}
if (NULL != chunk->start) {
free(chunk->start);
chunk->start = NULL;
if (NULL != block->start) {
free(block->start);
block->start = NULL;
}
chunk->length = 0;
chunk->full = false;
block->length = 0;
block->full = false;
return OMPI_SUCCESS;
}
int
mca_btl_portals_process_recv(mca_btl_portals_module_t *btl,
ptl_event_t *ev)
{
mca_btl_portals_frag_t *frag = NULL;
mca_btl_portals_recv_chunk_t *chunk = ev->md.user_ptr;
mca_btl_base_tag_t tag = (mca_btl_base_tag_t) ev->hdr_data;
int ret;
switch (ev->type) {
case PTL_EVENT_PUT_START:
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"recv: PTL_EVENT_PUT_START for tag %d, link %d",
tag, (int) ev->link));
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to start event\n");
} else {
/* increase reference count on the memory chunk */
OPAL_THREAD_ADD32(&(chunk->pending), 1);
}
break;
case PTL_EVENT_PUT_END:
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"recv: PTL_EVENT_PUT_END for tag %d, link %d",
tag, (int) ev->link));
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to end event\n");
mca_btl_portals_return_chunk_part(btl, chunk);
return OMPI_ERROR;
}
/* ok, we've got data */
OPAL_OUTPUT_VERBOSE((95, mca_btl_portals_component.portals_output,
"received data for tag %d\n", tag));
/* grab a user fragment (since memory is already allocated in
as part of the chunk), fill in the right bits, and call the
callback */
OMPI_BTL_PORTALS_FRAG_ALLOC_USER(btl, frag, ret);
frag->base.des_dst = &frag->segment;
frag->base.des_dst_cnt = 1;
frag->base.des_src = NULL;
frag->base.des_src_cnt = 0;
frag->segment.seg_addr.pval = (((char*) ev->md.start) + ev->offset);
frag->segment.seg_len = ev->mlength;
if (ev->md.length - (ev->offset + ev->mlength) < ev->md.max_size) {
/* the chunk is full. It's deactivated automagically, but we
can't start it up again until everyone is done with it.
The actual reactivation and all that will happen after the
free completes the last operation... */
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"marking chunk 0x%x as full", chunk->start));
chunk->full = true;
opal_atomic_mb();
}
btl->portals_reg[tag].cbfunc(&btl->super,
tag,
&frag->base,
btl->portals_reg[tag].cbdata);
OMPI_BTL_PORTALS_FRAG_RETURN_USER(&btl->super, frag);
mca_btl_portals_return_chunk_part(btl, chunk);
break;
default:
break;
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -19,7 +19,7 @@
#include "btl_portals_frag.h"
struct mca_btl_portals_recv_chunk_t {
struct mca_btl_portals_recv_block_t {
opal_list_item_t base;
mca_btl_portals_module_t *btl;
@ -32,45 +32,42 @@ struct mca_btl_portals_recv_chunk_t {
volatile bool full;
volatile int32_t pending;
};
typedef struct mca_btl_portals_recv_chunk_t mca_btl_portals_recv_chunk_t;
OBJ_CLASS_DECLARATION(mca_btl_portals_recv_chunk_t);
typedef struct mca_btl_portals_recv_block_t mca_btl_portals_recv_block_t;
OBJ_CLASS_DECLARATION(mca_btl_portals_recv_block_t);
int mca_btl_portals_recv_enable(mca_btl_portals_module_t *btl);
int mca_btl_portals_recv_disable(mca_btl_portals_module_t *btl);
int mca_btl_portals_process_recv(mca_btl_portals_module_t *btl,
ptl_event_t *ev);
/**
* Create a chunk of memory for receiving send messages. Must call
* activate_chunk on the returned chunk of memory before it will be
* Create a block of memory for receiving send messages. Must call
* activate_block on the returned block of memory before it will be
* active with the POrtals library
*
* Module lock must be held before calling this function
*/
mca_btl_portals_recv_chunk_t*
mca_btl_portals_recv_chunk_init(mca_btl_portals_module_t *btl);
mca_btl_portals_recv_block_t*
mca_btl_portals_recv_block_init(mca_btl_portals_module_t *btl);
/**
* Free a chunk of memory. Will remove the match entry, then progress
* Free a block of memory. Will remove the match entry, then progress
* Portals until the pending count is returned to 0. Will then free
* all resources associated with chunk.
* all resources associated with block.
*
* Module lock must be held before calling this function
*/
int mca_btl_portals_recv_chunk_free(mca_btl_portals_recv_chunk_t *chunk);
int mca_btl_portals_recv_block_free(mca_btl_portals_recv_block_t *block);
/**
* activate a chunk. Chunks that are full (have gone inactive) can be
* activate a block. Blocks that are full (have gone inactive) can be
* re-activated with this call. There is no need to hold the lock
* before calling this function
*/
static inline int
mca_btl_portals_activate_chunk(mca_btl_portals_recv_chunk_t *chunk)
mca_btl_portals_activate_block(mca_btl_portals_recv_block_t *block)
{
int ret;
ptl_process_id_t any_proc = { PTL_NID_ANY, PTL_PID_ANY };
@ -78,43 +75,43 @@ mca_btl_portals_activate_chunk(mca_btl_portals_recv_chunk_t *chunk)
/* if we have pending operations, something very, very, very bad
has happened... */
assert(chunk->pending == 0);
assert(block->pending == 0);
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"reactivating chunk 0x%x", chunk->start));
"reactivating block 0x%x", block->start));
if (NULL == chunk->start) return OMPI_ERROR;
if (NULL == block->start) return OMPI_ERROR;
/* create match entry */
ret = PtlMEInsert(chunk->btl->portals_recv_reject_me_h,
ret = PtlMEInsert(block->btl->portals_recv_reject_me_h,
any_proc,
0, /* match bits */
0, /* ignore bits */
PTL_UNLINK,
PTL_INS_BEFORE,
&(chunk->me_h));
&(block->me_h));
if (PTL_OK != ret) return OMPI_ERROR;
/* and the memory descriptor */
md.start = chunk->start;
md.length = chunk->length;
md.start = block->start;
md.length = block->length;
md.threshold = PTL_MD_THRESH_INF;
md.max_size = chunk->btl->super.btl_max_send_size;
md.max_size = block->btl->super.btl_max_send_size;
md.options = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE;
md.user_ptr = chunk;
md.eq_handle = chunk->btl->portals_eq_handles[OMPI_BTL_PORTALS_EQ_RECV];
md.user_ptr = block;
md.eq_handle = block->btl->portals_eq_handles[OMPI_BTL_PORTALS_EQ];
chunk->pending = 0;
chunk->full = false;
block->pending = 0;
block->full = false;
/* make sure that everyone sees the update on full value */
opal_atomic_mb();
ret = PtlMDAttach(chunk->me_h,
ret = PtlMDAttach(block->me_h,
md,
PTL_UNLINK,
&(chunk->md_h));
&(block->md_h));
if (PTL_OK != ret) {
PtlMEUnlink(chunk->me_h);
PtlMEUnlink(block->me_h);
return OMPI_ERROR;
}
@ -126,18 +123,18 @@ mca_btl_portals_activate_chunk(mca_btl_portals_recv_chunk_t *chunk)
static inline void
mca_btl_portals_return_chunk_part(mca_btl_portals_module_t *btl,
mca_btl_portals_recv_chunk_t *chunk)
mca_btl_portals_return_block_part(mca_btl_portals_module_t *btl,
mca_btl_portals_recv_block_t *block)
{
int ret;
OPAL_OUTPUT_VERBOSE((100, mca_btl_portals_component.portals_output,
"*** return chunk called %d %d ***",
chunk->full, chunk->pending));
OPAL_THREAD_ADD32(&(chunk->pending), -1);
if (chunk->full == true) {
if (chunk->pending == 0) {
ret = mca_btl_portals_activate_chunk(chunk);
"*** return block called %d %d ***",
block->full, block->pending));
OPAL_THREAD_ADD32(&(block->pending), -1);
if (block->full == true) {
if (block->pending == 0) {
ret = mca_btl_portals_activate_block(block);
if (OMPI_SUCCESS != ret) {
/* BWB - now what? */
}

Просмотреть файл

@ -25,139 +25,71 @@
#include "btl_portals_send.h"
int
mca_btl_portals_process_send(mca_btl_portals_module_t *btl,
ptl_event_t *ev)
{
mca_btl_portals_frag_t *frag =
(mca_btl_portals_frag_t*) ev->md.user_ptr;
switch (ev->type) {
case PTL_EVENT_SEND_START:
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"send: PTL_EVENT_SEND_START for 0x%x",
frag));
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to start send event\n");
/* unlink, since we don't expect to get an end or ack */
OPAL_THREAD_ADD32(&btl->portals_outstanding_sends, -1);
PtlMDUnlink(ev->md_handle);
frag->base.des_cbfunc(&btl->super,
frag->endpoint,
&frag->base,
OMPI_ERROR);
}
break;
case PTL_EVENT_SEND_END:
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"send: PTL_EVENT_SEND_END for 0x%x",
frag));
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure to end send event\n");
/* unlink, since we don't expect to get an ack */
OPAL_THREAD_ADD32(&btl->portals_outstanding_sends, -1);
PtlMDUnlink(ev->md_handle);
frag->base.des_cbfunc(&btl->super,
frag->endpoint,
&frag->base,
OMPI_ERROR);
}
break;
case PTL_EVENT_ACK:
/* ok, this is the real work - the message has been received
on the other side. If mlength == 0, that means that we hit
the reject md and we need to try to retransmit */
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"send: PTL_EVENT_ACK for 0x%x, Ox%x",
frag, frag->base.des_cbfunc));
OPAL_THREAD_ADD32(&btl->portals_outstanding_sends, -1);
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"Failure in send event ack\n");
frag->base.des_cbfunc(&btl->super,
frag->endpoint,
&frag->base,
OMPI_ERROR);
} else if (0 == ev->mlength) {
/* other side did not receive the message */
opal_output_verbose(50,
mca_btl_portals_component.portals_output,
"message was dropped. Adding to front of queue list");
opal_list_prepend(&(btl->portals_queued_sends),
(opal_list_item_t*) frag);
} else {
/* the other side received the message */
assert(ev->mlength == frag->segment.seg_len);
/* let the PML know we're done... */
frag->base.des_cbfunc(&btl->super,
frag->endpoint,
&frag->base,
OMPI_SUCCESS);
}
/* see if we can send someone else */
mca_btl_portals_progress_queued_sends(btl);
break;
default:
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"send: unexpected event %d for 0x%x",
ev->type, frag));
break;
}
return OMPI_SUCCESS;
}
int
mca_btl_portals_send(struct mca_btl_base_module_t* btl_base,
struct mca_btl_base_endpoint_t* endpoint,
struct mca_btl_base_descriptor_t* descriptor,
mca_btl_base_tag_t tag)
{
mca_btl_portals_module_t *btl = (mca_btl_portals_module_t*) btl_base;
mca_btl_portals_frag_t *frag = (mca_btl_portals_frag_t*) descriptor;
int32_t num_sends;
int ret;
assert(&mca_btl_portals_module == (mca_btl_portals_module_t*) btl_base);
frag->endpoint = endpoint;
frag->hdr.tag = tag;
frag->type = mca_btl_portals_frag_type_send;
num_sends = OPAL_THREAD_ADD32(&btl->portals_outstanding_sends, 1);
num_sends = OPAL_THREAD_ADD32(&mca_btl_portals_module.portals_outstanding_sends, 1);
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"send called for frag 0x%x, 0x%x",
frag, frag->base.des_cbfunc));
if (num_sends >= btl->portals_max_outstanding_sends) {
if (num_sends >= mca_btl_portals_module.portals_max_outstanding_sends) {
opal_output_verbose(50, mca_btl_portals_component.portals_output,
"no space for message 0x%x. Adding to back of queue",
frag);
opal_list_append(&(btl->portals_queued_sends),
opal_list_append(&(mca_btl_portals_module.portals_queued_sends),
(opal_list_item_t*) frag);
OPAL_THREAD_ADD32(&btl->portals_outstanding_sends, -1);
OPAL_THREAD_ADD32(&mca_btl_portals_module.portals_outstanding_sends, -1);
ret = OMPI_SUCCESS;
} else {
ret = mca_btl_portals_send_frag(frag);
/* try to progress some events before we return */
ptl_handle_md_t md_h;
int ret;
/* setup the send */
mca_btl_portals_module.md_send.start = frag->segment.seg_addr.pval;
mca_btl_portals_module.md_send.length = frag->segment.seg_len;
mca_btl_portals_module.md_send.user_ptr = frag; /* keep a pointer to ourselves */
/* make a free-floater */
ret = PtlMDBind(mca_btl_portals_module.portals_ni_h,
mca_btl_portals_module.md_send,
PTL_UNLINK,
&md_h);
if (ret != PTL_OK) {
opal_output(mca_btl_portals_component.portals_output,
"PtlMDBind failed with error %d", ret);
return OMPI_ERROR;
}
ret = PtlPut(md_h,
PTL_ACK_REQ,
*((mca_btl_base_endpoint_t*) endpoint),
OMPI_BTL_PORTALS_SEND_TABLE_ID,
0, /* ac_index - not used */
0, /* match bits */
0, /* remote offset - not used */
frag->hdr.tag); /* hdr_data - tag */
if (ret != PTL_OK) {
opal_output(mca_btl_portals_component.portals_output,
"PtlPut failed with error %d", ret);
PtlMDUnlink(md_h);
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
mca_btl_portals_component_progress();
return ret;
}

Просмотреть файл

@ -19,74 +19,22 @@
#include "btl_portals_frag.h"
int mca_btl_portals_process_send(mca_btl_portals_module_t *module,
ptl_event_t *ev);
static inline int
mca_btl_portals_send_frag(mca_btl_portals_frag_t *frag)
{
ptl_md_t md;
ptl_handle_md_t md_h;
int ret;
/* setup the send */
md.start = frag->segment.seg_addr.pval;
md.length = frag->segment.seg_len;
md.threshold = 2; /* unlink after start, end, ack */
md.max_size = 0;
md.options = 0; /* BWB - can we optimize? */
md.user_ptr = frag; /* keep a pointer to ourselves */
md.eq_handle = frag->btl->portals_eq_handles[OMPI_BTL_PORTALS_EQ_SEND];
/* make a free-floater */
ret = PtlMDBind(frag->btl->portals_ni_h,
md,
PTL_UNLINK,
&md_h);
if (ret != PTL_OK) {
opal_output(mca_btl_portals_component.portals_output,
"PtlMDBind failed with error %d", ret);
return OMPI_ERROR;
}
ret = PtlPut(md_h,
PTL_ACK_REQ,
frag->endpoint->endpoint_ptl_id,
OMPI_BTL_PORTALS_SEND_TABLE_ID,
0, /* ac_index - not used*/
frag->segment.seg_key.key64, /* match bits */
0, /* remote offset - not used */
frag->hdr.tag); /* hdr_data - tag */
if (ret != PTL_OK) {
opal_output(mca_btl_portals_component.portals_output,
"PtlPut failed with error %d", ret);
PtlMDUnlink(md_h);
return OMPI_ERROR;
}
#define MCA_BTL_PORTALS_PROGRESS_QUEUED_SENDS() \
if ((0 != opal_list_get_size(&(mca_btl_portals_module.portals_queued_sends))) && \
(mca_btl_portals_module.portals_outstanding_sends < \
mca_btl_portals_module.portals_max_outstanding_sends)) { \
mca_btl_portals_frag_t *qfrag = (mca_btl_portals_frag_t*) \
opal_list_remove_first(&(mca_btl_portals_module.portals_queued_sends)); \
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output, \
"retransmit for frag 0x%x, 0x%x", \
qfrag, qfrag->base.des_cbfunc)); \
return mca_btl_portals_send(&mca_btl_portals_module.super, \
qfrag->endpoint, \
&(qfrag->base), \
qfrag->hdr.tag); \
} \
return OMPI_SUCCESS;
}
static inline int
mca_btl_portals_progress_queued_sends(struct mca_btl_portals_module_t *btl)
{
if ((0 != opal_list_get_size(&(btl->portals_queued_sends))) &&
(btl->portals_outstanding_sends <
btl->portals_max_outstanding_sends)) {
mca_btl_portals_frag_t *frag = (mca_btl_portals_frag_t*)
opal_list_remove_first(&(btl->portals_queued_sends));
OPAL_OUTPUT_VERBOSE((90, mca_btl_portals_component.portals_output,
"retransmit for frag 0x%x, 0x%x",
frag, frag->base.des_cbfunc));
return mca_btl_portals_send(&btl->super,
frag->endpoint,
&(frag->base),
frag->hdr.tag);
}
return OMPI_SUCCESS;
}
#endif /* OMPI_BTL_PORTALS_SEND_H */