1
1

sends seem to work as well as for sm - still seeing segfaults in various

IBM tests, but see same segfaults / assets in sm btl.

* Add prepare_src implementation so that we can send multiple fragments of
  large messages
* Add queuing of sends if either there are too many outstanding sends
  (we have to limit this so that we don't have more sends pending than
  we could get acks for) or if we get an ack with a 0 byte mlength,
  which means the remote side dropped the message on us.

Still need to valgrind to make sure I'm not leaking resources

This commit was SVN r6508.
Этот коммит содержится в:
Brian Barrett 2005-07-15 01:43:47 +00:00
родитель 94160da4c0
Коммит fe21bc111a
9 изменённых файлов: 164 добавлений и 35 удалений

Просмотреть файл

@ -25,6 +25,7 @@
#include "opal/util/output.h"
#include "mca/pml/pml.h"
#include "mca/btl/btl.h"
#include "ompi/datatype/convertor.h"
#include "btl_portals.h"
#include "btl_portals_compat.h"
@ -239,9 +240,8 @@ mca_btl_portals_alloc(struct mca_btl_base_module_t* btl,
}
frag->base.des_flags = 0;
frag->type = MCA_BTL_PORTALS_FRAG_SEND;
return (mca_btl_base_descriptor_t*) frag;
return &frag->base;
}
@ -271,6 +271,76 @@ mca_btl_portals_free(struct mca_btl_base_module_t* btl,
}
/* BWB - fix me - this needs to do RDMA when we get there... */
mca_btl_base_descriptor_t*
mca_btl_portals_prepare_src(struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* peer,
mca_mpool_base_registration_t* registration,
struct ompi_convertor_t* convertor,
size_t reserve,
size_t* size)
{
mca_btl_portals_frag_t* frag;
size_t max_data = *size;
struct iovec iov;
uint32_t iov_count = 1;
int32_t free_after;
int rc;
if (max_data+reserve <= btl->btl_eager_limit) {
/*
* if we aren't pinning the data and the requested size is less
* than the eager limit pack into a fragment from the eager pool
*/
MCA_BTL_PORTALS_FRAG_ALLOC_EAGER(btl, frag, rc);
if (NULL == frag) {
return NULL;
}
iov.iov_len = max_data;
iov.iov_base = (unsigned char*) frag->segment.seg_addr.pval + reserve;
rc = ompi_convertor_pack(convertor, &iov, &iov_count,
&max_data, &free_after);
*size = max_data;
if (rc < 0) {
MCA_BTL_PORTALS_FRAG_RETURN_EAGER(btl, frag);
return NULL;
}
frag->segment.seg_len = max_data + reserve;
} else {
/*
* otherwise pack as much data as we can into a fragment
* that is the max send size.
*/
MCA_BTL_PORTALS_FRAG_ALLOC_MAX(btl, frag, rc);
if (NULL == frag) {
return NULL;
}
if (max_data + reserve > btl->btl_max_send_size){
max_data = btl->btl_max_send_size - reserve;
}
iov.iov_len = max_data;
iov.iov_base = (unsigned char*) frag->segment.seg_addr.pval + reserve;
rc = ompi_convertor_pack(convertor, &iov, &iov_count,
&max_data, &free_after);
*size = max_data;
if ( rc < 0 ) {
MCA_BTL_PORTALS_FRAG_RETURN_MAX(btl, frag);
return NULL;
}
frag->segment.seg_len = max_data + reserve;
}
frag->base.des_src = &frag->segment;
frag->base.des_src_cnt = 1;
frag->base.des_dst = NULL;
frag->base.des_dst_cnt = 0;
frag->base.des_flags = 0;
return &frag->base;
}
int
mca_btl_portals_finalize(struct mca_btl_base_module_t *btl_base)
{
@ -279,6 +349,17 @@ mca_btl_portals_finalize(struct mca_btl_base_module_t *btl_base)
int ret, i;
opal_list_item_t *item;
/* finalize all communication */
while (btl->portals_outstanding_sends > 0) {
mca_btl_portals_component_progress();
}
if (0 != opal_list_get_size(&(btl->portals_queued_sends))) {
opal_output(mca_btl_portals_component.portals_output,
"Warning: there were %d queued sends not sent",
opal_list_get_size(&(btl->portals_queued_sends)));
}
OPAL_THREAD_LOCK(&ptl_btl->portals_lock);
if (0 != opal_list_get_size(&btl->portals_endpoint_list)) {
@ -306,7 +387,7 @@ mca_btl_portals_finalize(struct mca_btl_base_module_t *btl_base)
OBJ_DESTRUCT(&btl->portals_endpoint_list);
OBJ_DESTRUCT(&btl->portals_recv_chunks);
OBJ_DESTRUCT(&btl->portals_queued_sends);
OPAL_THREAD_UNLOCK(&btl->portals_lock);

Просмотреть файл

@ -112,6 +112,10 @@ struct mca_btl_portals_module_t {
/* number outstanding sends */
volatile int32_t portals_outstanding_sends;
int32_t portals_max_outstanding_sends;
/* queued sends */
opal_list_t portals_queued_sends;
/* our portals network interface */
ptl_handle_ni_t portals_ni_h;

Просмотреть файл

@ -178,9 +178,13 @@ mca_btl_portals_component_open(void)
Set sizes here */
mca_btl_portals_module.portals_eq_sizes[MCA_BTL_PORTALS_EQ_RECV] =
param_register_int("eq_recv_size", BTL_PORTALS_DEFAULT_RECV_QUEUE_SIZE);
mca_btl_portals_module.portals_max_outstanding_sends =
param_register_int("eq_send_max_pending", BTL_PORTALS_MAX_SENDS_PENDING) * 3;
/* sends_pending * 3 for start, end, ack */
mca_btl_portals_module.portals_eq_sizes[MCA_BTL_PORTALS_EQ_SEND] =
param_register_int("eq_send_max_pending", BTL_PORTALS_MAX_SENDS_PENDING) * 3;
mca_btl_portals_module.portals_max_outstanding_sends * 3;
mca_btl_portals_module.portals_eq_sizes[MCA_BTL_PORTALS_EQ_RDMA] =
param_register_int("eq_rdma_size", 512); /* BWB - FIXME - make param */
@ -302,6 +306,9 @@ mca_btl_portals_component_init(int *num_btls,
/* receive chunk list */
OBJ_CONSTRUCT(&(ptl_btl->portals_recv_chunks), opal_list_t);
/* pending sends */
OBJ_CONSTRUCT(&(ptl_btl->portals_queued_sends), opal_list_t);
/* lock */
OBJ_CONSTRUCT(&(ptl_btl->portals_lock), opal_mutex_t);
}
@ -353,6 +360,7 @@ mca_btl_portals_component_progress(void)
&which);
if (PTL_EQ_EMPTY == ret) {
/* nothing to see here - move along */
mca_btl_portals_progress_queued_sends(module);
continue;
} else if (!(PTL_OK == ret || PTL_EQ_DROPPED == ret)) {
/* BWB - how can we report errors? */
@ -366,6 +374,7 @@ mca_btl_portals_component_progress(void)
switch (which) {
case MCA_BTL_PORTALS_EQ_RECV:
mca_btl_portals_progress_queued_sends(module);
mca_btl_portals_process_recv(module, &ev);
break;
case MCA_BTL_PORTALS_EQ_SEND:

Просмотреть файл

@ -28,7 +28,7 @@ mca_btl_portals_frag_common_send_constructor(mca_btl_portals_frag_t* frag)
frag->base.des_src = &frag->segment;
frag->base.des_src_cnt = 1;
frag->segment.seg_addr.pval = frag + sizeof(mca_btl_portals_frag_t);
frag->segment.seg_addr.pval = frag + 1;
frag->segment.seg_len = frag->size;
frag->segment.seg_key.key64 = 0;
@ -61,6 +61,8 @@ mca_btl_portals_frag_user_constructor(mca_btl_portals_frag_t* frag)
frag->base.des_src = 0;
frag->base.des_src_cnt = 0;
frag->size = 0;
frag->type = MCA_BTL_PORTALS_FRAG_SEND;
}

Просмотреть файл

@ -42,7 +42,7 @@ mca_btl_portals_recv_enable(mca_btl_portals_module_t *module)
md.length = 0;
md.threshold = PTL_MD_THRESH_INF;
md.max_size = 0;
md.options = PTL_MD_TRUNCATE;
md.options = PTL_MD_TRUNCATE | PTL_MD_OP_PUT;
md.user_ptr = NULL;
md.eq_handle = PTL_EQ_NONE;
@ -211,7 +211,7 @@ mca_btl_portals_process_recv(mca_btl_portals_module_t *module,
frag->u.recv_frag.chunk = chunk;
if (ev->md.length - (ev->offset + ev->mlength) < ev->md.max_size) {
/* the chunk is full. It's deactivated, automagically but we
/* the chunk is full. It's deactivated automagically, but we
can't start it up again until everyone is done with it.
The actual reactivation and all that will happen after the
free completes the last operation... */

Просмотреть файл

@ -73,7 +73,7 @@ static inline int
mca_btl_portals_activate_chunk(mca_btl_portals_recv_chunk_t *chunk)
{
int ret;
ptl_process_id_t proc = { PTL_NID_ANY, PTL_PID_ANY };
ptl_process_id_t any_proc = { PTL_NID_ANY, PTL_PID_ANY };
ptl_md_t md;
/* if we have pending operations, something very, very, very bad
@ -83,13 +83,12 @@ mca_btl_portals_activate_chunk(mca_btl_portals_recv_chunk_t *chunk)
if (NULL == chunk->start) return OMPI_ERROR;
/* create match entry */
ret = PtlMEAttach(chunk->btl->portals_ni_h,
BTL_PORTALS_SEND_TABLE_ID,
proc,
ret = PtlMEInsert(chunk->btl->portals_recv_reject_me_h,
any_proc,
0, /* match bits */
0, /* ignore bits */
PTL_UNLINK,
PTL_INS_AFTER,
PTL_INS_BEFORE,
&(chunk->me_h));
if (PTL_OK != ret) return OMPI_ERROR;

Просмотреть файл

@ -65,19 +65,24 @@ mca_btl_portals_process_send(mca_btl_portals_module_t *module,
the reject md and we need to try to retransmit */
opal_output_verbose(90, mca_btl_portals_component.portals_output,
"send: PTL_EVENT_ACK for 0x%x",
frag);
if (0 == ev->mlength) {
/* other side did not receive the message */
/* BWB - implement check for retransmit */
"send: PTL_EVENT_ACK for 0x%x, Ox%x",
frag, frag->base.des_cbfunc);
if (ev->ni_fail_type != PTL_NI_OK) {
opal_output(mca_btl_portals_component.portals_output,
"message was dropped and retransmits not implemented");
"Failure to end send event\n");
frag->base.des_cbfunc(&module->super,
frag->u.send_frag.endpoint,
&frag->base,
OMPI_ERROR);
} else if (0 == ev->mlength) {
/* other side did not receive the message */
/* BWB - implement check for retransmit */
opal_output(mca_btl_portals_component.portals_output,
"message was dropped. Adding to front of queue list");
opal_list_prepend(&(module->portals_queued_sends),
(opal_list_item_t*) frag);
} else {
/* the other side received the message */
OPAL_THREAD_ADD32(&module->portals_outstanding_sends, -1);
@ -86,12 +91,14 @@ mca_btl_portals_process_send(mca_btl_portals_module_t *module,
gets more resources (ie, what's currently in this
md) */
PtlMDUnlink(ev->md_handle);
/* let the PML know we're done... */
frag->base.des_cbfunc(&module->super,
frag->u.send_frag.endpoint,
&frag->base,
OMPI_SUCCESS);
/* see if we can send someone else */
mca_btl_portals_progress_queued_sends(module);
}
break;
default:
@ -117,6 +124,7 @@ mca_btl_portals_send(struct mca_btl_base_module_t* btl,
mca_btl_portals_module_t *ptl_btl = (mca_btl_portals_module_t*) btl;
mca_btl_portals_frag_t *frag = (mca_btl_portals_frag_t*) descriptor;
int32_t num_sends;
int ret;
frag->u.send_frag.endpoint = endpoint;
frag->u.send_frag.hdr.tag = tag;
@ -126,7 +134,24 @@ mca_btl_portals_send(struct mca_btl_base_module_t* btl,
/* BWB - implement check for too many pending messages */
opal_output_verbose(90, mca_btl_portals_component.portals_output,
"send called for frag 0x%x", frag);
"send called for frag 0x%x, 0x%x",
frag, frag->base.des_cbfunc);
return mca_btl_portals_send_frag(frag);
if (num_sends >= ptl_btl->portals_max_outstanding_sends) {
opal_output(mca_btl_portals_component.portals_output,
"no space for message 0x%x. Adding to back of queue",
frag);
opal_list_append(&(ptl_btl->portals_queued_sends),
(opal_list_item_t*) frag);
OPAL_THREAD_ADD32(&ptl_btl->portals_outstanding_sends, 1);
ret = OMPI_SUCCESS;
} else {
ret = mca_btl_portals_send_frag(frag);
/* try to progress some events before we return */
}
mca_btl_portals_component_progress();
return ret;
}

Просмотреть файл

@ -68,4 +68,25 @@ mca_btl_portals_send_frag(mca_btl_portals_frag_t *frag)
return OMPI_SUCCESS;
}
static inline int
mca_btl_portals_progress_queued_sends(struct mca_btl_portals_module_t *module)
{
if ((0 != opal_list_get_size(&(module->portals_queued_sends))) &&
(module->portals_outstanding_sends <
module->portals_max_outstanding_sends)) {
mca_btl_portals_frag_t *frag = (mca_btl_portals_frag_t*)
opal_list_remove_first(&(module->portals_queued_sends));
opal_output_verbose(90, mca_btl_portals_component.portals_output,
"retransmit for frag 0x%x, 0x%x",
frag, frag->base.des_cbfunc);
return mca_btl_portals_send(&module->super,
frag->u.send_frag.endpoint,
&(frag->base),
frag->u.send_frag.hdr.tag);
}
return OMPI_SUCCESS;
}
#endif /* MCA_BTL_PORTALS_SEND_H */

Просмотреть файл

@ -29,18 +29,6 @@
* BWB - README - BWB - README - BWB - README - BWB - README - BWB */
mca_btl_base_descriptor_t*
mca_btl_portals_prepare_src(struct mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* peer,
mca_mpool_base_registration_t* registration,
struct ompi_convertor_t* convertor,
size_t reserve,
size_t* size)
{
printf("btl prepare src\n");
return NULL;
}
mca_btl_base_descriptor_t*
mca_btl_portals_prepare_dst(struct mca_btl_base_module_t* btl,