1
1

Various pieces all over to make basic small message send/recv work. Next step

is clean up the code.. it is in need of refactoring and testing.

Thanks to Brian for help in troubleshooting!

This commit was SVN r9466.
Этот коммит содержится в:
Andrew Friedley 2006-03-29 21:55:41 +00:00
родитель 142f59a435
Коммит 0eba366b07
6 изменённых файлов: 126 добавлений и 37 удалений

Просмотреть файл

@ -310,7 +310,7 @@ int mca_btl_udapl_register(
udapl_btl->udapl_reg[tag].cbfunc = cbfunc;
udapl_btl->udapl_reg[tag].cbdata = cbdata;
OPAL_OUTPUT((0, "udapl_register\n"));
OPAL_OUTPUT((0, "udapl_register %d %p\n", tag, cbfunc));
return OMPI_SUCCESS;
}
@ -332,6 +332,7 @@ mca_btl_base_descriptor_t* mca_btl_udapl_alloc(
OPAL_OUTPUT((0, "udapl_alloc\n"));
/* TODO - note we allocate 'size' but we also have the header */
if(size <= btl->btl_eager_limit) {
MCA_BTL_UDAPL_FRAG_ALLOC_EAGER(udapl_btl, frag, rc);
frag->segment.seg_len =
@ -347,8 +348,8 @@ mca_btl_base_descriptor_t* mca_btl_udapl_alloc(
/* Set up the LMR triplet from the frag segment */
/* Note that this triplet defines a sub-region of a registered LMR */
frag->triplet.lmr_context = frag->segment.seg_key.key32[0];
frag->triplet.virtual_address = (DAT_VADDR)frag->segment.seg_addr.pval;
frag->triplet.segment_length = frag->segment.seg_len;
frag->triplet.virtual_address = (DAT_VADDR)frag->hdr;
frag->triplet.segment_length = frag->segment.seg_len + sizeof(mca_btl_base_header_t);
frag->btl = udapl_btl;
frag->base.des_src = &frag->segment;
@ -626,7 +627,7 @@ int mca_btl_udapl_send(
mca_btl_udapl_module_t* udapl_btl = (mca_btl_udapl_module_t*)btl;
mca_btl_udapl_frag_t* frag = (mca_btl_udapl_frag_t*)des;
OPAL_OUTPUT((0, "udapl_send\n"));
OPAL_OUTPUT((0, "udapl_send %d\n", tag));
frag->btl = udapl_btl;
frag->endpoint = endpoint;

Просмотреть файл

@ -104,8 +104,8 @@ struct mca_btl_udapl_module_t {
ompi_free_list_t udapl_frag_user;
ompi_free_list_t udapl_frag_recv;
opal_list_t udapl_pending; /**< list of pending send descriptors */
opal_list_t udapl_repost; /**< list of pending fragments */
opal_list_t udapl_pending; /**< list of pending send fragments */
opal_list_t udapl_repost; /**< list of pending recv fragments */
opal_list_t udapl_mru_reg; /**< list of most recently used registrations */
opal_mutex_t udapl_lock; /* lock for accessing module state */
};

Просмотреть файл

@ -357,6 +357,44 @@ mca_btl_udapl_component_init (int *num_btl_modules,
}
static int mca_btl_udapl_endpoint_post_recv(mca_btl_udapl_endpoint_t* endpoint)
{
mca_btl_udapl_frag_t* frag;
int rc;
int i;
/* TODO - only posting eager frags for now. */
for(i = 0; i < mca_btl_udapl_component.udapl_num_repost; i++) {
MCA_BTL_UDAPL_FRAG_ALLOC_EAGER(endpoint->endpoint_btl, frag, rc);
/* Set up the LMR triplet from the frag segment */
/* Note that this triplet defines a sub-region of a registered LMR */
frag->triplet.lmr_context = frag->segment.seg_key.key32[0];
frag->triplet.virtual_address = (DAT_VADDR)frag->hdr;
frag->triplet.segment_length = frag->segment.seg_len + sizeof(mca_btl_base_header_t);
frag->btl = endpoint->endpoint_btl;
frag->endpoint = endpoint;
frag->base.des_src = NULL;
frag->base.des_src_cnt = 0;
frag->base.des_dst = &frag->segment;
frag->base.des_dst_cnt = 1;
frag->base.des_flags = 0;
frag->type = MCA_BTL_UDAPL_RECV;
rc = dat_ep_post_recv(endpoint->endpoint_ep, 1, &frag->triplet,
(DAT_DTO_COOKIE)(void*)frag, DAT_COMPLETION_DEFAULT_FLAG);
if(DAT_SUCCESS != rc) {
mca_btl_udapl_error(rc, "dat_ep_post_recv");
return OMPI_ERROR;
}
}
return OMPI_SUCCESS;
}
static int mca_btl_udapl_finish_connect(mca_btl_udapl_module_t* btl,
mca_btl_udapl_frag_t* frag,
DAT_EP_HANDLE endpoint)
@ -366,10 +404,8 @@ static int mca_btl_udapl_finish_connect(mca_btl_udapl_module_t* btl,
mca_btl_udapl_addr_t* addr;
size_t i;
/*addr = (mca_btl_udapl_addr_t*)frag->hdr;*/
addr = (mca_btl_udapl_addr_t*)frag->segment.seg_addr.pval;
OPAL_THREAD_LOCK(&mca_btl_udapl_component.udapl_lock);
for(proc = (mca_btl_udapl_proc_t*)
opal_list_get_first(&mca_btl_udapl_component.udapl_procs);
proc != (mca_btl_udapl_proc_t*)
@ -383,16 +419,16 @@ static int mca_btl_udapl_finish_connect(mca_btl_udapl_module_t* btl,
if(ep->endpoint_btl == btl &&
!memcmp(addr, &ep->endpoint_addr,
sizeof(mca_btl_udapl_addr_t))) {
OPAL_OUTPUT((0, "btl_udapl matched endpoint! HAPPY DANCE!!\n"));
ep->endpoint_ep = endpoint;
OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock);
OPAL_OUTPUT((0, "btl_udapl matched endpoint! HAPPY DANCE!!!\n"));
ep->endpoint_state = MCA_BTL_UDAPL_CONNECTED;
mca_btl_udapl_endpoint_post_recv(ep);
return OMPI_SUCCESS;
}
}
}
/* If this point is reached, no matching endpoint was found */
OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock);
OPAL_OUTPUT((0, "btl_udapl ERROR could not match endpoint\n"));
return OMPI_ERROR;
}
@ -429,7 +465,7 @@ static int mca_btl_udapl_accept_connect(mca_btl_udapl_module_t* btl,
rc = dat_ep_post_recv(endpoint, 1, &frag->triplet,
(DAT_DTO_COOKIE)(void*)frag, DAT_COMPLETION_DEFAULT_FLAG);
if(DAT_SUCCESS != rc) {
mca_btl_udapl_error(rc, "dat_ep_post_send");
mca_btl_udapl_error(rc, "dat_ep_post_recv");
return OMPI_ERROR;
}
@ -437,6 +473,28 @@ static int mca_btl_udapl_accept_connect(mca_btl_udapl_module_t* btl,
}
static int mca_btl_udapl_flush_send_queue(mca_btl_udapl_endpoint_t* endpoint)
{
mca_btl_udapl_frag_t* frag;
int rc = OMPI_SUCCESS;
OPAL_THREAD_LOCK(&endpoint->endpoint_send_lock);
while(NULL != (frag = (mca_btl_udapl_frag_t*)
opal_list_remove_first(&endpoint->endpoint_frags))) {
rc = dat_ep_post_send(endpoint->endpoint_ep, 1, &frag->triplet,
(DAT_DTO_COOKIE)(void*)frag, DAT_COMPLETION_DEFAULT_FLAG);
if(DAT_SUCCESS != rc) {
mca_btl_udapl_error(rc, "dat_ep_post_send");
rc = OMPI_ERROR;
break;
}
}
OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock);
return rc;
}
/*
* uDAPL component progress.
*/
@ -444,7 +502,6 @@ static int mca_btl_udapl_accept_connect(mca_btl_udapl_module_t* btl,
int mca_btl_udapl_component_progress()
{
mca_btl_udapl_module_t* btl;
mca_btl_udapl_frag_t* frag;
static int32_t inprogress = 0;
DAT_EVENT event;
int count = 0;
@ -457,28 +514,27 @@ int mca_btl_udapl_component_progress()
}
/* check for work to do on each uDAPL btl */
OPAL_THREAD_LOCK(&mca_btl_udapl_component.udapl_lock);
for(i = 0; i < mca_btl_udapl_component.udapl_num_btls; i++) {
btl = mca_btl_udapl_component.udapl_btls[i];
/* TODO - lock this properly */
/* Check DTO EVD */
while(DAT_SUCCESS ==
dat_evd_dequeue(btl->udapl_evd_dto, &event)) {
DAT_DTO_COMPLETION_EVENT_DATA* dto;
mca_btl_udapl_frag_t* frag;
switch(event.event_number) {
case DAT_DTO_COMPLETION_EVENT:
OPAL_OUTPUT((0, "btl_udapl DTO completion\n"));
/* questions to answer:
should i use separate endpoints for eager/max frags?
i need to do this if i only want to post recv's for
the exact eager/max size, and uDAPL won't just pick
a large enough buffer
how about just worrying about eager frags for now?
*/
dto = &event.event_data.dto_completion_event_data;
OPAL_OUTPUT((0, "DTO transferred %d bytes\n", dto->transfered_length));
OPAL_OUTPUT((0, "btl_udapl DTO transferred %d bytes\n",
dto->transfered_length));
/* Was the DTO successful? */
if(DAT_DTO_SUCCESS != dto->status) {
@ -491,18 +547,46 @@ int mca_btl_udapl_component_progress()
switch(frag->type) {
case MCA_BTL_UDAPL_SEND:
/* TODO - write me */
OPAL_OUTPUT((0, "btl_udapl UDAPL_SEND"));
frag->base.des_cbfunc(&btl->super, frag->endpoint,
&frag->base, OMPI_SUCCESS);
/* TODO - anything else to do here? */
break;
case MCA_BTL_UDAPL_RECV:
{
mca_btl_base_recv_reg_t* reg =
&btl->udapl_reg[frag->hdr->tag];
OPAL_OUTPUT((0, "btl_udapl UDAPL_RECV\n"));
frag->segment.seg_addr.pval = frag->hdr + 1;
frag->segment.seg_len = dto->transfered_length -
sizeof(mca_btl_base_header_t);
OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock);
reg->cbfunc(&btl->super,
frag->hdr->tag, &frag->base, reg->cbdata);
OPAL_THREAD_LOCK(&mca_btl_udapl_component.udapl_lock);
/* Repost the frag */
frag->segment.seg_addr.pval = frag->hdr;
frag->segment.seg_len = frag->size;
dat_ep_post_recv(frag->endpoint->endpoint_ep,
1, &frag->triplet, (DAT_DTO_COOKIE)(void*)frag,
DAT_COMPLETION_DEFAULT_FLAG);
break;
}
case MCA_BTL_UDAPL_CONN_SEND:
/* Set the endpoint state to connected */
/* Client (send) side connection established */
OPAL_OUTPUT((0,
"btl_udapl SEND SIDE CONNECT COMPLETED!!\n"));
frag->endpoint->endpoint_state =
MCA_BTL_UDAPL_CONNECTED;
/* TODO - fire off any queued sends */
mca_btl_udapl_endpoint_post_recv(frag->endpoint);
mca_btl_udapl_flush_send_queue(frag->endpoint);
/* Retire the fragment */
MCA_BTL_UDAPL_FRAG_RETURN_EAGER(btl, frag);
break;
case MCA_BTL_UDAPL_CONN_RECV:
@ -510,10 +594,9 @@ int mca_btl_udapl_component_progress()
a new connection - match endpoints */
mca_btl_udapl_finish_connect(btl, frag, dto->ep_handle);
/* Retire the fragment */
MCA_BTL_UDAPL_FRAG_RETURN_EAGER(btl, frag);
break;
#ifdef OMPI_ENABLE_DEBUG
#if OMPI_ENABLE_DEBUG
default:
OPAL_OUTPUT((0, "WARNING unknown frag type: %d\n",
frag->type));
@ -521,7 +604,7 @@ int mca_btl_udapl_component_progress()
}
count++;
break;
#ifdef OMPI_ENABLE_DEBUG
#if OMPI_ENABLE_DEBUG
default:
OPAL_OUTPUT((0, "WARNING unknown dto event: %d\n",
event.event_number));
@ -536,15 +619,11 @@ int mca_btl_udapl_component_progress()
switch(event.event_number) {
case DAT_CONNECTION_REQUEST_EVENT:
/* Accept a new connection */
OPAL_OUTPUT((0, "btl_udapl accepting connection\n"));
mca_btl_udapl_accept_connect(btl,
event.event_data.cr_arrival_event_data.cr_handle);
count++;
break;
case DAT_CONNECTION_EVENT_ESTABLISHED:
OPAL_OUTPUT((0, "btl_udapl connection established\n"));
/* Both the client and server side of a connection generate
this event */
/* Really shouldn't do anything here, as we won't have the
@ -566,7 +645,7 @@ int mca_btl_udapl_component_progress()
/* Need to set the BTL endpoint to MCA_BTL_UDAPL_FAILED
See dat_ep_connect documentation pdf pg 198 */
break;
#ifdef OMPI_ENABLE_DEBUG
#if OMPI_ENABLE_DEBUG
default:
OPAL_OUTPUT((0, "WARNING unknown conn event: %d\n",
event.event_number));
@ -584,7 +663,7 @@ int mca_btl_udapl_component_progress()
case DAT_ASYNC_ERROR_TIMED_OUT:
case DAT_ASYNC_ERROR_PROVIDER_INTERNAL_ERROR:
break;
#ifdef OMPI_ENABLE_DEBUG
#if OMPI_ENABLE_DEBUG
default:
OPAL_OUTPUT((0, "WARNING unknown async event: %d\n",
event.event_number));
@ -594,6 +673,7 @@ int mca_btl_udapl_component_progress()
}
/* unlock and return */
OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock);
OPAL_THREAD_ADD32(&inprogress, -1);
return count;
}

Просмотреть файл

@ -36,6 +36,7 @@
static int mca_btl_udapl_start_connect(mca_btl_base_endpoint_t* endpoint);
/* TODO - do we need to pass the endpoint? It's in the frag */
int mca_btl_udapl_endpoint_send(mca_btl_base_endpoint_t* endpoint,
mca_btl_udapl_frag_t* frag)
{
@ -45,6 +46,13 @@ int mca_btl_udapl_endpoint_send(mca_btl_base_endpoint_t* endpoint,
switch(endpoint->endpoint_state) {
case MCA_BTL_UDAPL_CONNECTED:
/* just send it already.. */
rc = dat_ep_post_send(endpoint->endpoint_ep, 1, &frag->triplet,
(DAT_DTO_COOKIE)(void*)frag, DAT_COMPLETION_DEFAULT_FLAG);
if(DAT_SUCCESS != rc) {
mca_btl_udapl_error(rc, "dat_ep_post_send");
rc = OMPI_ERROR;
}
break;
case MCA_BTL_UDAPL_CLOSED:
/* Initiate a new connection, add this send to a queue */

Просмотреть файл

@ -30,16 +30,17 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_btl_udapl_frag_t);
typedef enum {
MCA_BTL_UDAPL_SEND,
MCA_BTL_UDAPL_CONN_SEND,
MCA_BTL_UDAPL_CONN_RECV,
MCA_BTL_UDAPL_SEND,
MCA_BTL_UDAPL_RECV,
MCA_BTL_UDAPL_PUT,
MCA_BTL_UDAPL_GET
} mca_btl_udapl_frag_type_t;
/**
* uDAPL send fragment derived type.
* uDAPL fragment derived type.
*/
struct mca_btl_udapl_frag_t {
mca_btl_base_descriptor_t base;
@ -132,7 +133,6 @@ do { \
} while(0)
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif