Long overdue commit.. many changes.
In short, I'm very close to having connection establishment and eager send/recv working. Part of the connection process involves sending address information from the client to server. For some reason, I am never receiving an event indicating completetion of the send on the client side. Otherwise, connection establishment is working and eager send/recv should be trivial from here. Some more detailed changes: - Send partially implemented, just handles starting up new connections. - Several support functions implemented for establishing connection. Client side code went in btl_udapl_endpoint.c, server side in btl_udapl_component.c - Frags list and send/recv locks added to the endpoint structure. - BTL sets up a public service point, which listens for new connections. Steps over ports that are already bound, iterating through a range of ports. - Remove any traces of recv frags, don't think I need them after all. - Pieces of component_progress() implemented for connection establishment. - Frags have two new types for connection establishment - CONN_SEND and CONN_RECV. - Many other minor cleanups not affecting functionality This commit was SVN r9345.
Этот коммит содержится в:
родитель
200bb7d59b
Коммит
cf9246f7b9
@ -30,10 +30,10 @@
|
||||
#include "ompi/datatype/convertor.h"
|
||||
#include "ompi/datatype/datatype.h"
|
||||
#include "ompi/mca/mpool/base/base.h"
|
||||
/*#include "ompi/mca/mpool/mpool.h"*/
|
||||
#include "ompi/mca/mpool/udapl/mpool_udapl.h"
|
||||
#include "ompi/proc/proc.h"
|
||||
|
||||
|
||||
mca_btl_udapl_module_t mca_btl_udapl_module = {
|
||||
{
|
||||
&mca_btl_udapl_component.super,
|
||||
@ -67,9 +67,10 @@ mca_btl_udapl_module_t mca_btl_udapl_module = {
|
||||
*/
|
||||
|
||||
int
|
||||
mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t * btl)
|
||||
mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t* btl)
|
||||
{
|
||||
mca_mpool_base_resources_t res;
|
||||
DAT_CONN_QUAL port;
|
||||
DAT_IA_ATTR attr;
|
||||
DAT_RETURN rc;
|
||||
|
||||
@ -86,7 +87,7 @@ mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t * btl)
|
||||
rc = dat_pz_create(btl->udapl_ia, &btl->udapl_pz);
|
||||
if(DAT_SUCCESS != rc) {
|
||||
mca_btl_udapl_error(rc, "dat_pz_create");
|
||||
return OMPI_ERROR;
|
||||
goto failure;
|
||||
}
|
||||
|
||||
/* query to get address information */
|
||||
@ -95,8 +96,7 @@ mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t * btl)
|
||||
DAT_IA_FIELD_IA_ADDRESS_PTR, &attr, DAT_IA_FIELD_NONE, NULL);
|
||||
if(DAT_SUCCESS != rc) {
|
||||
mca_btl_udapl_error(rc, "dat_ia_query");
|
||||
dat_ia_close(btl->udapl_ia, DAT_CLOSE_GRACEFUL_FLAG);
|
||||
return OMPI_ERROR;
|
||||
goto failure;
|
||||
}
|
||||
|
||||
memcpy(&btl->udapl_addr.addr, attr.ia_address_ptr, sizeof(DAT_SOCK_ADDR));
|
||||
@ -107,8 +107,7 @@ mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t * btl)
|
||||
DAT_EVD_DTO_FLAG | DAT_EVD_RMR_BIND_FLAG, &btl->udapl_evd_dto);
|
||||
if(DAT_SUCCESS != rc) {
|
||||
mca_btl_udapl_error(rc, "dat_evd_create (dto)");
|
||||
dat_ia_close(btl->udapl_ia, DAT_CLOSE_GRACEFUL_FLAG);
|
||||
return OMPI_ERROR;
|
||||
goto failure;
|
||||
}
|
||||
|
||||
rc = dat_evd_create(btl->udapl_ia,
|
||||
@ -116,11 +115,32 @@ mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t * btl)
|
||||
DAT_EVD_CR_FLAG | DAT_EVD_CONNECTION_FLAG, &btl->udapl_evd_conn);
|
||||
if(DAT_SUCCESS != rc) {
|
||||
mca_btl_udapl_error(rc, "dat_evd_create (conn)");
|
||||
dat_evd_free(btl->udapl_evd_dto);
|
||||
dat_ia_close(btl->udapl_ia, DAT_CLOSE_GRACEFUL_FLAG);
|
||||
return OMPI_ERROR;
|
||||
goto failure;
|
||||
}
|
||||
|
||||
/* create our public service point */
|
||||
/* We have to specify a port, so we go through a range until we
|
||||
find a port that works */
|
||||
for(port = mca_btl_udapl_component.udapl_port_low;
|
||||
port <= mca_btl_udapl_component.udapl_port_high; port++) {
|
||||
|
||||
rc = dat_psp_create(btl->udapl_ia, port, btl->udapl_evd_conn,
|
||||
DAT_PSP_CONSUMER_FLAG, &btl->udapl_psp);
|
||||
if(DAT_SUCCESS == rc) {
|
||||
break;
|
||||
} else if(DAT_CONN_QUAL_IN_USE != rc) {
|
||||
mca_btl_udapl_error(rc, "dat_psp_create");
|
||||
goto failure;
|
||||
}
|
||||
}
|
||||
|
||||
if(port == mca_btl_udapl_component.udapl_port_high) {
|
||||
goto failure;
|
||||
}
|
||||
|
||||
/* Save the port with the address information */
|
||||
btl->udapl_addr.port = port;
|
||||
|
||||
/* initialize the memory pool */
|
||||
res.udapl_ia = btl->udapl_ia;
|
||||
res.udapl_pz = btl->udapl_pz;
|
||||
@ -165,22 +185,13 @@ mca_btl_udapl_init(DAT_NAME_PTR ia_name, mca_btl_udapl_module_t * btl)
|
||||
mca_btl_udapl_component.udapl_free_list_inc,
|
||||
NULL);
|
||||
|
||||
ompi_free_list_init(&btl->udapl_frag_recv,
|
||||
sizeof(mca_btl_udapl_frag_recv_t),
|
||||
OBJ_CLASS(mca_btl_udapl_frag_recv_t),
|
||||
mca_btl_udapl_component.udapl_free_list_num,
|
||||
mca_btl_udapl_component.udapl_free_list_max,
|
||||
mca_btl_udapl_component.udapl_free_list_inc,
|
||||
btl->super.btl_mpool);
|
||||
|
||||
/* Connections are done lazily - the process doing the send acts as a client
|
||||
when initiating the connect. progress should always be checking for
|
||||
incoming connections, and establishing them when they arrive. When
|
||||
connection is established, recv's are posted. */
|
||||
|
||||
/* TODO - post receives */
|
||||
/* TODO - can I always use SRQ, or just on new enough uDAPLs? */
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
failure:
|
||||
dat_ia_close(btl->udapl_ia, DAT_CLOSE_ABRUPT_FLAG);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
|
||||
@ -243,12 +254,12 @@ int mca_btl_udapl_add_procs(
|
||||
/*
|
||||
* Check to make sure that the peer has at least as many interface
|
||||
* addresses exported as we are trying to use. If not, then
|
||||
* don't bind this PTL instance to the proc.
|
||||
* don't bind this BTL instance to the proc.
|
||||
*/
|
||||
|
||||
OPAL_THREAD_LOCK(&udapl_proc->proc_lock);
|
||||
|
||||
/* The btl_proc datastructure is shared by all uDAPL PTL
|
||||
/* The btl_proc datastructure is shared by all uDAPL BTL
|
||||
* instances that are trying to reach this destination.
|
||||
* Cache the peer instance on the btl_proc.
|
||||
*/
|
||||
@ -265,10 +276,12 @@ int mca_btl_udapl_add_procs(
|
||||
OPAL_THREAD_UNLOCK(&udapl_proc->proc_lock);
|
||||
continue;
|
||||
}
|
||||
|
||||
ompi_bitmap_set_bit(reachable, i);
|
||||
OPAL_THREAD_UNLOCK(&udapl_proc->proc_lock);
|
||||
peers[i] = udapl_endpoint;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -323,14 +336,22 @@ mca_btl_base_descriptor_t* mca_btl_udapl_alloc(
|
||||
MCA_BTL_UDAPL_FRAG_ALLOC_EAGER(udapl_btl, frag, rc);
|
||||
frag->segment.seg_len =
|
||||
size <= btl->btl_eager_limit ?
|
||||
size : btl->btl_eager_limit ;
|
||||
size : btl->btl_eager_limit;
|
||||
} else {
|
||||
MCA_BTL_UDAPL_FRAG_ALLOC_MAX(udapl_btl, frag, rc);
|
||||
frag->segment.seg_len =
|
||||
size <= btl->btl_max_send_size ?
|
||||
size : btl->btl_max_send_size ;
|
||||
size : btl->btl_max_send_size;
|
||||
}
|
||||
|
||||
/* TODO - this the right place for this? */
|
||||
if(OMPI_SUCCESS != mca_mpool_udapl_register(btl->btl_mpool,
|
||||
frag->segment.seg_addr.pval, size, 0, &frag->registration)) {
|
||||
/* TODO - handle this fully */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
frag->btl = udapl_btl;
|
||||
frag->base.des_src = &frag->segment;
|
||||
frag->base.des_src_cnt = 1;
|
||||
frag->base.des_dst = NULL;
|
||||
@ -416,9 +437,7 @@ mca_btl_base_descriptor_t* mca_btl_udapl_prepare_src(
|
||||
/* bump reference count as so that the registration
|
||||
* doesn't go away when the operation completes
|
||||
*/
|
||||
btl->btl_mpool->mpool_retain(btl->btl_mpool,
|
||||
(mca_mpool_base_registration_t*) registration);
|
||||
|
||||
btl->btl_mpool->mpool_retain(btl->btl_mpool, registration);
|
||||
frag->registration = registration;
|
||||
|
||||
/*
|
||||
@ -615,16 +634,7 @@ int mca_btl_udapl_send(
|
||||
frag->hdr->tag = tag;
|
||||
frag->type = MCA_BTL_UDAPL_SEND;
|
||||
|
||||
/* Check if we are connected to this peer.
|
||||
Should be three states we care about -
|
||||
connected, connecting, disconnected.
|
||||
If no connection exists, request the connection and queue the send.
|
||||
If a connection is pending, queue the send
|
||||
If the connection is established, fire off the send.
|
||||
need to consider locking around the connection state and queue.
|
||||
*/
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
return mca_btl_udapl_endpoint_send(endpoint, frag);
|
||||
}
|
||||
|
||||
|
||||
|
@ -56,11 +56,15 @@ struct mca_btl_udapl_component_t {
|
||||
size_t udapl_num_mru;
|
||||
size_t udapl_evd_qlen;
|
||||
int32_t udapl_num_repost;
|
||||
int32_t udapl_timeout; /**< connection timeout, in microseconds */
|
||||
int udapl_debug; /**< turn on debug output */
|
||||
|
||||
size_t udapl_eager_frag_size;
|
||||
size_t udapl_max_frag_size;
|
||||
|
||||
DAT_CONN_QUAL udapl_port_low; /**< first port for binding service point */
|
||||
DAT_CONN_QUAL udapl_port_high; /**< last port for binding service point */
|
||||
|
||||
int udapl_free_list_num; /**< initial size of free lists */
|
||||
int udapl_free_list_max; /**< maximum size of free lists */
|
||||
int udapl_free_list_inc; /**< number of elements to alloc when growing */
|
||||
@ -84,11 +88,12 @@ struct mca_btl_udapl_module_t {
|
||||
mca_btl_base_recv_reg_t udapl_reg[256];
|
||||
mca_btl_udapl_addr_t udapl_addr;
|
||||
|
||||
/* interface handle and protection zone */
|
||||
/* uDAPL interface and other handles */
|
||||
DAT_IA_HANDLE udapl_ia;
|
||||
DAT_PZ_HANDLE udapl_pz;
|
||||
DAT_PSP_HANDLE udapl_psp;
|
||||
|
||||
/* event dispatchers - default, data transfer, connection negotiation */
|
||||
/* event dispatchers - async, data transfer, connection negotiation */
|
||||
DAT_EVD_HANDLE udapl_evd_async;
|
||||
DAT_EVD_HANDLE udapl_evd_dto;
|
||||
DAT_EVD_HANDLE udapl_evd_conn;
|
||||
@ -141,6 +146,7 @@ extern mca_btl_base_module_t** mca_btl_udapl_component_init(
|
||||
/**
|
||||
* uDAPL component progress.
|
||||
*/
|
||||
|
||||
extern int mca_btl_udapl_component_progress(void);
|
||||
|
||||
|
||||
|
@ -29,9 +29,11 @@
|
||||
#include "opal/mca/base/mca_base_param.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "ompi/mca/mpool/base/base.h"
|
||||
#include "ompi/mca/mpool/udapl/mpool_udapl.h"
|
||||
#include "btl_udapl.h"
|
||||
#include "btl_udapl_frag.h"
|
||||
#include "btl_udapl_endpoint.h"
|
||||
#include "btl_udapl_proc.h"
|
||||
#include "ompi/mca/btl/base/base.h"
|
||||
#include "ompi/mca/btl/base/btl_base_error.h"
|
||||
#include "ompi/datatype/convertor.h"
|
||||
@ -39,6 +41,15 @@
|
||||
#include "orte/util/proc_info.h"
|
||||
#include "ompi/mca/pml/base/pml_base_module_exchange.h"
|
||||
|
||||
|
||||
/*
|
||||
* Local functions
|
||||
*/
|
||||
|
||||
static int mca_btl_udapl_finish_connect(mca_btl_udapl_module_t* btl,
|
||||
mca_btl_udapl_frag_t* frag,
|
||||
DAT_EP_HANDLE endpoint);
|
||||
|
||||
mca_btl_udapl_component_t mca_btl_udapl_component = {
|
||||
{
|
||||
/* First, the mca_base_component_t struct containing meta information
|
||||
@ -160,6 +171,12 @@ int mca_btl_udapl_component_open(void)
|
||||
mca_btl_udapl_param_register_int("num_repost", 4);
|
||||
mca_btl_udapl_component.udapl_num_mru =
|
||||
mca_btl_udapl_param_register_int("num_mru", 64);
|
||||
mca_btl_udapl_component.udapl_port_low =
|
||||
mca_btl_udapl_param_register_int("port_low", 45000);
|
||||
mca_btl_udapl_component.udapl_port_high =
|
||||
mca_btl_udapl_param_register_int("port_high", 47000);
|
||||
mca_btl_udapl_component.udapl_timeout =
|
||||
mca_btl_udapl_param_register_int("timeout", 10000000);
|
||||
|
||||
/* register uDAPL module parameters */
|
||||
mca_btl_udapl_module.super.btl_exclusivity =
|
||||
@ -177,7 +194,7 @@ int mca_btl_udapl_component_open(void)
|
||||
mca_btl_udapl_module.super.btl_bandwidth =
|
||||
mca_btl_udapl_param_register_int("bandwidth", 225);
|
||||
|
||||
/* TODO - computer udapl_eager_frag_size and udapl_max_frag_size */
|
||||
/* cmpute udapl_eager_frag_size and udapl_max_frag_size */
|
||||
mca_btl_udapl_component.udapl_eager_frag_size =
|
||||
mca_btl_udapl_module.super.btl_eager_limit;
|
||||
mca_btl_udapl_component.udapl_max_frag_size =
|
||||
@ -221,9 +238,6 @@ mca_btl_udapl_modex_send(void)
|
||||
size = sizeof(mca_btl_udapl_addr_t) *
|
||||
mca_btl_udapl_component.udapl_num_btls;
|
||||
|
||||
OPAL_OUTPUT((0, "udapl_modex_send %d addrs %d bytes\n",
|
||||
mca_btl_udapl_component.udapl_num_btls, size));
|
||||
|
||||
if (0 != size) {
|
||||
addrs = (mca_btl_udapl_addr_t *)malloc(size);
|
||||
if (NULL == addrs) {
|
||||
@ -299,8 +313,6 @@ mca_btl_udapl_component_init (int *num_btl_modules,
|
||||
/* initialize this BTL */
|
||||
/* TODO - make use of the thread-safety info in datinfo also */
|
||||
if(OMPI_SUCCESS != mca_btl_udapl_init(datinfo[i].ia_name, btl)) {
|
||||
opal_output(0, "udapl module init for %s failed\n",
|
||||
datinfo[i].ia_name);
|
||||
free(btl);
|
||||
continue;
|
||||
}
|
||||
@ -345,19 +357,101 @@ mca_btl_udapl_component_init (int *num_btl_modules,
|
||||
}
|
||||
|
||||
|
||||
static int mca_btl_udapl_finish_connect(mca_btl_udapl_module_t* btl,
|
||||
mca_btl_udapl_frag_t* frag,
|
||||
DAT_EP_HANDLE endpoint)
|
||||
{
|
||||
mca_btl_udapl_proc_t* proc;
|
||||
mca_btl_base_endpoint_t* ep;
|
||||
mca_btl_udapl_addr_t* addr;
|
||||
size_t i;
|
||||
|
||||
addr = (mca_btl_udapl_addr_t*)frag->hdr;
|
||||
|
||||
OPAL_THREAD_LOCK(&mca_btl_udapl_component.udapl_lock);
|
||||
for(proc = (mca_btl_udapl_proc_t*)
|
||||
opal_list_get_first(&mca_btl_udapl_component.udapl_procs);
|
||||
proc != (mca_btl_udapl_proc_t*)
|
||||
opal_list_get_end(&mca_btl_udapl_component.udapl_procs);
|
||||
proc = (mca_btl_udapl_proc_t*)opal_list_get_next(proc)) {
|
||||
|
||||
for(i = 0; i < proc->proc_endpoint_count; i++) {
|
||||
ep = proc->proc_endpoints[i];
|
||||
|
||||
/* Does this endpoint match? */
|
||||
if(ep->endpoint_btl == btl &&
|
||||
!memcmp(addr, &ep->endpoint_addr,
|
||||
sizeof(mca_btl_udapl_addr_t))) {
|
||||
ep->endpoint_ep = endpoint;
|
||||
OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock);
|
||||
OPAL_OUTPUT((0, "btl_udapl matched endpoint! HAPPY DANCE!!!\n"));
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* If this point is reached, no matching endpoint was found */
|
||||
OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock);
|
||||
OPAL_OUTPUT((0, "btl_udapl ERROR could not match endpoint\n"));
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
|
||||
static int mca_btl_udapl_accept_connect(mca_btl_udapl_module_t* btl,
|
||||
DAT_CR_HANDLE cr_handle)
|
||||
{
|
||||
mca_btl_udapl_frag_t* frag;
|
||||
DAT_DTO_COOKIE cookie;
|
||||
DAT_EP_HANDLE endpoint;
|
||||
int rc;
|
||||
|
||||
rc = dat_ep_create(btl->udapl_ia, btl->udapl_pz,
|
||||
btl->udapl_evd_dto, btl->udapl_evd_dto,
|
||||
btl->udapl_evd_conn, NULL, &endpoint);
|
||||
if(DAT_SUCCESS != rc) {
|
||||
mca_btl_udapl_error(rc, "dat_ep_create");
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
rc = dat_cr_accept(cr_handle, endpoint, 0, NULL);
|
||||
if(DAT_SUCCESS != rc) {
|
||||
mca_btl_udapl_error(rc, "dat_cr_accept");
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* Post a receive to get the address data */
|
||||
frag = (mca_btl_udapl_frag_t*)mca_btl_udapl_alloc(
|
||||
(mca_btl_base_module_t*)btl, sizeof(mca_btl_udapl_addr_t));
|
||||
|
||||
memcpy(frag->hdr, &btl->udapl_addr, sizeof(mca_btl_udapl_addr_t));
|
||||
frag->endpoint = NULL;
|
||||
frag->type = MCA_BTL_UDAPL_CONN_RECV;
|
||||
cookie.as_ptr = frag;
|
||||
|
||||
rc = dat_ep_post_recv(endpoint, 1,
|
||||
&((mca_mpool_udapl_registration_t*)frag->registration)->lmr_triplet,
|
||||
cookie, DAT_COMPLETION_DEFAULT_FLAG);
|
||||
if(DAT_SUCCESS != rc) {
|
||||
mca_btl_udapl_error(rc, "dat_ep_post_send");
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* uDAPL component progress.
|
||||
*/
|
||||
|
||||
|
||||
int mca_btl_udapl_component_progress()
|
||||
{
|
||||
mca_btl_udapl_module_t* btl;
|
||||
mca_btl_udapl_frag_t* frag;
|
||||
static int32_t inprogress = 0;
|
||||
DAT_EVENT event;
|
||||
int count = 0;
|
||||
size_t i;
|
||||
int rc;
|
||||
|
||||
/* prevent deadlock - only one thread should be 'progressing' at a time */
|
||||
if(OPAL_THREAD_ADD32(&inprogress, 1) > 1) {
|
||||
@ -365,8 +459,6 @@ int mca_btl_udapl_component_progress()
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
OPAL_OUTPUT((0, "udapl_component_progress\n"));
|
||||
|
||||
/* check for work to do on each uDAPL btl */
|
||||
for(i = 0; i < mca_btl_udapl_component.udapl_num_btls; i++) {
|
||||
btl = mca_btl_udapl_component.udapl_btls[i];
|
||||
@ -375,35 +467,93 @@ int mca_btl_udapl_component_progress()
|
||||
/* Check DTO EVD */
|
||||
while(DAT_SUCCESS ==
|
||||
dat_evd_dequeue(btl->udapl_evd_dto, &event)) {
|
||||
DAT_DTO_COMPLETION_EVENT_DATA* dto;
|
||||
|
||||
switch(event.event_number) {
|
||||
case DAT_DTO_COMPLETION_EVENT:
|
||||
OPAL_OUTPUT((0, "btl_udapl DTO completion\n"));
|
||||
/* questions to answer:
|
||||
should i use separate endpoints for eager/max frags?
|
||||
i need to do this if i only want to post recv's for
|
||||
the exact eager/max size, and uDAPL won't just pick
|
||||
a large enough buffer
|
||||
|
||||
how about just worrying about eager frags for now?
|
||||
*/
|
||||
dto = &event.event_data.dto_completion_event_data;
|
||||
|
||||
/* Was the DTO successful? */
|
||||
if(DAT_DTO_SUCCESS != dto->status) {
|
||||
OPAL_OUTPUT((0,
|
||||
"btl_udapl DTO error %d\n", dto->status));
|
||||
break;
|
||||
}
|
||||
|
||||
frag = dto->user_cookie.as_ptr;
|
||||
|
||||
switch(frag->type) {
|
||||
case MCA_BTL_UDAPL_SEND:
|
||||
/* TODO - write me */
|
||||
break;
|
||||
case MCA_BTL_UDAPL_CONN_SEND:
|
||||
/* Set the endpoint state to connected */
|
||||
OPAL_OUTPUT((0,
|
||||
"btl_udapl SEND SIDE CONNECT COMPLETED!!\n"));
|
||||
frag->endpoint->endpoint_state =
|
||||
MCA_BTL_UDAPL_CONNECTED;
|
||||
|
||||
/* TODO - fire off any queued sends */
|
||||
|
||||
/* Retire the fragment */
|
||||
MCA_BTL_UDAPL_FRAG_RETURN_EAGER(btl, frag);
|
||||
break;
|
||||
case MCA_BTL_UDAPL_CONN_RECV:
|
||||
/* Just got the address data we need for completing
|
||||
a new connection - match endpoints */
|
||||
mca_btl_udapl_finish_connect(btl, frag, dto->ep_handle);
|
||||
|
||||
/* Retire the fragment */
|
||||
MCA_BTL_UDAPL_FRAG_RETURN_EAGER(btl, frag);
|
||||
break;
|
||||
#ifdef OMPI_ENABLE_DEBUG
|
||||
default:
|
||||
OPAL_OUTPUT((0, "WARNING unknown frag type: %d\n",
|
||||
frag->type));
|
||||
#endif
|
||||
}
|
||||
count++;
|
||||
break;
|
||||
#ifdef OMPI_ENABLE_DEBUG
|
||||
default:
|
||||
OPAL_OUTPUT((0, "WARNING unknown dto event: %d\n",
|
||||
event.event_number));
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
/* Check connection EVD */
|
||||
while(DAT_SUCCESS ==
|
||||
dat_evd_dequeue(btl->udapl_evd_conn, &event)) {
|
||||
|
||||
switch(event.event_number) {
|
||||
case DAT_CONNECTION_REQUEST_EVENT:
|
||||
/* Accept a new connection */
|
||||
rc = dat_cr_accept(
|
||||
event.event_data.cr_arrival_event_data.cr_handle,
|
||||
DAT_HANDLE_NULL, 0, NULL);
|
||||
if(DAT_SUCCESS != rc) {
|
||||
mca_btl_udapl_error(rc, "dat_cr_accept");
|
||||
}
|
||||
OPAL_OUTPUT((0, "btl_udapl accepting connection\n"));
|
||||
|
||||
mca_btl_udapl_accept_connect(btl,
|
||||
event.event_data.cr_arrival_event_data.cr_handle);
|
||||
count++;
|
||||
break;
|
||||
case DAT_CONNECTION_EVENT_ESTABLISHED:
|
||||
/* TODO - at this point we have a uDPAL enpoint in
|
||||
event.event_data.connect_event_data.ep_handle,
|
||||
need to figure out how to tie back into the BTL */
|
||||
OPAL_OUTPUT((0, "btl_udapl connection established\n"));
|
||||
|
||||
/* Both the client and server side of a connection generate
|
||||
this event */
|
||||
/* Really shouldn't do anything here, as we won't have the
|
||||
address data we need to match a uDAPL EP to a BTL EP.
|
||||
Connections are finished when DTOs are completed for
|
||||
the address transfer */
|
||||
|
||||
count++;
|
||||
break;
|
||||
case DAT_CONNECTION_EVENT_PEER_REJECTED:
|
||||
@ -412,11 +562,17 @@ int mca_btl_udapl_component_progress()
|
||||
case DAT_CONNECTION_EVENT_DISCONNECTED:
|
||||
case DAT_CONNECTION_EVENT_BROKEN:
|
||||
case DAT_CONNECTION_EVENT_TIMED_OUT:
|
||||
/* handle this case specially? if we have finite timeout,
|
||||
we might want to try connecting again here. */
|
||||
case DAT_CONNECTION_EVENT_UNREACHABLE:
|
||||
/* Need to set the BTL endpoint to MCA_BTL_UDAPL_FAILED
|
||||
See dat_ep_connect documentation pdf pg 198 */
|
||||
break;
|
||||
#ifdef OMPI_ENABLE_DEBUG
|
||||
default:
|
||||
OPAL_OUTPUT((0, "WARNING unknown conn event: %d\n",
|
||||
event.event_number));
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
@ -430,9 +586,11 @@ int mca_btl_udapl_component_progress()
|
||||
case DAT_ASYNC_ERROR_TIMED_OUT:
|
||||
case DAT_ASYNC_ERROR_PROVIDER_INTERNAL_ERROR:
|
||||
break;
|
||||
#ifdef OMPI_ENABLE_DEBUG
|
||||
default:
|
||||
OPAL_OUTPUT((0, "WARNING unknown async event: %d\n",
|
||||
event.event_number));
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -26,12 +26,106 @@
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/dss/dss.h"
|
||||
#include "ompi/mca/mpool/udapl/mpool_udapl.h"
|
||||
#include "btl_udapl.h"
|
||||
#include "btl_udapl_endpoint.h"
|
||||
#include "btl_udapl_proc.h"
|
||||
#include "btl_udapl_frag.h"
|
||||
|
||||
|
||||
static int mca_btl_udapl_start_connect(mca_btl_base_endpoint_t* endpoint);
|
||||
|
||||
|
||||
int mca_btl_udapl_endpoint_send(mca_btl_base_endpoint_t* endpoint,
|
||||
mca_btl_udapl_frag_t* frag)
|
||||
{
|
||||
int rc = OMPI_SUCCESS;
|
||||
|
||||
OPAL_THREAD_LOCK(&endpoint->endpoint_send_lock);
|
||||
switch(endpoint->endpoint_state) {
|
||||
case MCA_BTL_UDAPL_CONNECTED:
|
||||
/* just send it already.. */
|
||||
break;
|
||||
case MCA_BTL_UDAPL_CLOSED:
|
||||
/* Initiate a new connection, add this send to a queue */
|
||||
rc = mca_btl_udapl_start_connect(endpoint);
|
||||
if(OMPI_SUCCESS != rc) {
|
||||
break;
|
||||
}
|
||||
|
||||
/* Fall through on purpose to queue the send */
|
||||
case MCA_BTL_UDAPL_CONNECTING:
|
||||
/* Add this send to a queue */
|
||||
opal_list_append(&endpoint->endpoint_frags,
|
||||
(opal_list_item_t*)frag);
|
||||
break;
|
||||
case MCA_BTL_UDAPL_FAILED:
|
||||
rc = OMPI_ERR_UNREACH;
|
||||
break;
|
||||
}
|
||||
OPAL_THREAD_UNLOCK(&endpoint->endpoint_send_lock);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
static int mca_btl_udapl_start_connect(mca_btl_base_endpoint_t* endpoint)
|
||||
{
|
||||
mca_btl_udapl_module_t* btl = endpoint->endpoint_btl;
|
||||
mca_btl_udapl_frag_t* frag;
|
||||
DAT_DTO_COOKIE cookie;
|
||||
int rc;
|
||||
|
||||
/* Create a new uDAPL endpoint and start the connection process */
|
||||
rc = dat_ep_create(btl->udapl_ia, btl->udapl_pz,
|
||||
btl->udapl_evd_dto, btl->udapl_evd_dto, btl->udapl_evd_conn,
|
||||
NULL, &endpoint->endpoint_ep);
|
||||
if(DAT_SUCCESS != rc) {
|
||||
mca_btl_udapl_error(rc, "dat_ep_create");
|
||||
goto failure_create;
|
||||
}
|
||||
|
||||
rc = dat_ep_connect(endpoint->endpoint_ep, &endpoint->endpoint_addr.addr,
|
||||
endpoint->endpoint_addr.port, mca_btl_udapl_component.udapl_timeout,
|
||||
0, NULL, 0, DAT_CONNECT_DEFAULT_FLAG);
|
||||
if(DAT_SUCCESS != rc) {
|
||||
mca_btl_udapl_error(rc, "dat_ep_connect");
|
||||
goto failure;
|
||||
}
|
||||
|
||||
/* Send our local address data over this EP */
|
||||
/* Can't use btl_udapl_send here, the send will just get queued */
|
||||
frag = (mca_btl_udapl_frag_t*)mca_btl_udapl_alloc(
|
||||
(mca_btl_base_module_t*)btl, sizeof(mca_btl_udapl_addr_t));
|
||||
|
||||
memcpy(frag->hdr, &btl->udapl_addr, sizeof(mca_btl_udapl_addr_t));
|
||||
frag->endpoint = endpoint;
|
||||
frag->type = MCA_BTL_UDAPL_CONN_SEND;
|
||||
cookie.as_ptr = frag;
|
||||
|
||||
/* Do the actual send now.. */
|
||||
OPAL_OUTPUT((0, "posting send!\n"));
|
||||
rc = dat_ep_post_send(endpoint->endpoint_ep, 1,
|
||||
&((mca_mpool_udapl_registration_t*)frag->registration)->lmr_triplet,
|
||||
cookie, DAT_COMPLETION_DEFAULT_FLAG);
|
||||
if(DAT_SUCCESS != rc) {
|
||||
mca_btl_udapl_error(rc, "dat_ep_post_send");
|
||||
goto failure;
|
||||
}
|
||||
OPAL_OUTPUT((0, "after post send\n"));
|
||||
|
||||
endpoint->endpoint_state = MCA_BTL_UDAPL_CONNECTING;
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
failure:
|
||||
dat_ep_free(endpoint->endpoint_ep);
|
||||
failure_create:
|
||||
endpoint->endpoint_ep = DAT_HANDLE_NULL;
|
||||
endpoint->endpoint_state = MCA_BTL_UDAPL_FAILED;
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Initialize state of the endpoint instance.
|
||||
*
|
||||
@ -41,8 +135,15 @@ static void mca_btl_udapl_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
|
||||
{
|
||||
endpoint->endpoint_btl = 0;
|
||||
endpoint->endpoint_proc = 0;
|
||||
endpoint->endpoint_state = MCA_BTL_UDAPL_CLOSED;
|
||||
endpoint->endpoint_ep = DAT_HANDLE_NULL;
|
||||
|
||||
OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t);
|
||||
OBJ_CONSTRUCT(&endpoint->endpoint_send_lock, opal_mutex_t);
|
||||
OBJ_CONSTRUCT(&endpoint->endpoint_recv_lock, opal_mutex_t);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Destroy a endpoint
|
||||
*
|
||||
@ -50,6 +151,9 @@ static void mca_btl_udapl_endpoint_construct(mca_btl_base_endpoint_t* endpoint)
|
||||
|
||||
static void mca_btl_udapl_endpoint_destruct(mca_btl_base_endpoint_t* endpoint)
|
||||
{
|
||||
OBJ_DESTRUCT(&endpoint->endpoint_frags);
|
||||
OBJ_DESTRUCT(&endpoint->endpoint_send_lock);
|
||||
OBJ_DESTRUCT(&endpoint->endpoint_recv_lock);
|
||||
}
|
||||
|
||||
|
||||
|
@ -34,6 +34,7 @@ extern "C" {
|
||||
* Structure used to publish uDAPL id information to peers.
|
||||
*/
|
||||
struct mca_btl_udapl_addr_t {
|
||||
DAT_CONN_QUAL port;
|
||||
DAT_SOCK_ADDR addr;
|
||||
};
|
||||
typedef struct mca_btl_udapl_addr_t mca_btl_udapl_addr_t;
|
||||
@ -56,7 +57,7 @@ typedef enum {
|
||||
* An instance of mca_btl_base_endpoint_t is associated w/ each process
|
||||
* and BTL pair at startup. However, connections to the endpoint
|
||||
* are established dynamically on an as-needed basis:
|
||||
*/
|
||||
*/
|
||||
|
||||
struct mca_btl_base_endpoint_t {
|
||||
opal_list_item_t super;
|
||||
@ -70,10 +71,19 @@ struct mca_btl_base_endpoint_t {
|
||||
mca_btl_udapl_endpoint_state_t endpoint_state;
|
||||
/**< current state of the endpoint connection */
|
||||
|
||||
opal_list_t pending_frags;
|
||||
opal_list_t endpoint_frags;
|
||||
/**< pending send frags on this endpoint */
|
||||
|
||||
opal_mutex_t endpoint_send_lock;
|
||||
/**< lock for concurrent access to endpoint state */
|
||||
|
||||
opal_mutex_t endpoint_recv_lock;
|
||||
/**< lock for concurrent access to endpoint state */
|
||||
|
||||
mca_btl_udapl_addr_t endpoint_addr;
|
||||
|
||||
DAT_EP_HANDLE endpoint_ep;
|
||||
/**< uDAPL endpoint handle */
|
||||
};
|
||||
|
||||
typedef struct mca_btl_base_endpoint_t mca_btl_base_endpoint_t;
|
||||
@ -81,6 +91,10 @@ typedef mca_btl_base_endpoint_t mca_btl_udapl_endpoint_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(mca_btl_udapl_endpoint_t);
|
||||
|
||||
|
||||
int mca_btl_udapl_endpoint_send(mca_btl_base_endpoint_t* endpoint,
|
||||
mca_btl_udapl_frag_t* frag);
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
@ -1,3 +1,22 @@
|
||||
/*
|
||||
* Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana
|
||||
* University Research and Technology
|
||||
* Corporation. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The University of Tennessee and The University
|
||||
* of Tennessee Research Foundation. All rights
|
||||
* reserved.
|
||||
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
|
||||
* University of Stuttgart. All rights reserved.
|
||||
* Copyright (c) 2004-2005 The Regents of the University of California.
|
||||
* All rights reserved.
|
||||
* $COPYRIGHT$
|
||||
*
|
||||
* Additional copyrights may follow
|
||||
*
|
||||
* $HEADER$
|
||||
*/
|
||||
|
||||
#include "btl_udapl.h"
|
||||
#include "btl_udapl_frag.h"
|
||||
|
||||
|
||||
|
@ -22,7 +22,6 @@
|
||||
|
||||
#define MCA_BTL_UDAPL_FRAG_ALIGN (8)
|
||||
#include "ompi_config.h"
|
||||
#include "btl_udapl.h"
|
||||
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
extern "C" {
|
||||
@ -32,6 +31,8 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_btl_udapl_frag_t);
|
||||
|
||||
typedef enum {
|
||||
MCA_BTL_UDAPL_SEND,
|
||||
MCA_BTL_UDAPL_CONN_SEND,
|
||||
MCA_BTL_UDAPL_CONN_RECV,
|
||||
MCA_BTL_UDAPL_PUT,
|
||||
MCA_BTL_UDAPL_GET
|
||||
} mca_btl_udapl_frag_type_t;
|
||||
@ -68,10 +69,6 @@ typedef struct mca_btl_udapl_frag_t mca_btl_udapl_frag_user_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(mca_btl_udapl_frag_user_t);
|
||||
|
||||
typedef struct mca_btl_udapl_frag_t mca_btl_udapl_frag_recv_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(mca_btl_udapl_frag_recv_t);
|
||||
|
||||
|
||||
/*
|
||||
* Macros to allocate/return descriptors from module specific
|
||||
|
@ -38,12 +38,14 @@ void mca_btl_udapl_proc_construct(mca_btl_udapl_proc_t* proc)
|
||||
proc->proc_endpoints = 0;
|
||||
proc->proc_endpoint_count = 0;
|
||||
OBJ_CONSTRUCT(&proc->proc_lock, opal_mutex_t);
|
||||
|
||||
/* add to list of all proc instance */
|
||||
OPAL_THREAD_LOCK(&mca_btl_udapl_component.udapl_lock);
|
||||
opal_list_append(&mca_btl_udapl_component.udapl_procs, &proc->super);
|
||||
OPAL_THREAD_UNLOCK(&mca_btl_udapl_component.udapl_lock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Cleanup uDAPL proc instance
|
||||
*/
|
||||
@ -170,8 +172,6 @@ int mca_btl_udapl_proc_insert(
|
||||
if(udapl_proc->proc_endpoint_count > udapl_proc->proc_addr_count)
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
|
||||
opal_output(0, "udapl_proc_insert\n");
|
||||
|
||||
udapl_endpoint->endpoint_proc = udapl_proc;
|
||||
udapl_endpoint->endpoint_addr =
|
||||
udapl_proc->proc_addrs[udapl_proc->proc_endpoint_count];
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user