1
1

Dynamic connection management is fully setup now. MPI_Init works with

dynamic connection management.

This commit was SVN r2679.
Этот коммит содержится в:
Sayantan Sur 2004-09-15 16:45:31 +00:00
родитель 70dae461e4
Коммит 0d85c2b5b9
10 изменённых файлов: 240 добавлений и 136 удалений

Просмотреть файл

@ -14,7 +14,6 @@
#include "mca/ptl/base/ptl_base_recvfrag.h"
#include "mca/base/mca_base_module_exchange.h"
#include "ptl_ib.h"
//#include "ptl_ib_sendfrag.h"
mca_ptl_ib_module_t mca_ptl_ib_module = {
{

Просмотреть файл

@ -329,14 +329,13 @@ int mca_ptl_ib_component_control(int param, void* value, size_t size)
int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp)
{
int i, j, num_procs, num_frags, num_modules;
ompi_list_item_t *item, *frag_item;
int i, num_procs, num_modules;
ompi_list_item_t *item;
mca_ptl_ib_peer_t *peer;
mca_ptl_ib_proc_t *proc;
mca_ptl_ib_send_frag_t *sendfrag;
VAPI_ret_t ret;
VAPI_wc_desc_t comp;
mca_ptl_ib_module_t *module;
int comp_type;
void* comp_addr;
num_procs = ompi_list_get_size(&(mca_ptl_ib_component.ib_procs));
@ -348,7 +347,6 @@ int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp)
for(i = 0; i < num_procs;
item = ompi_list_get_next(item), i++) {
proc = (mca_ptl_ib_proc_t *) item;
/* We only have one peer per proc right now */
@ -356,37 +354,7 @@ int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp)
if(!ompi_list_is_empty(&(peer->pending_send_frags))) {
/*Check if peer is connected */
if(peer->peer_state != MCA_PTL_IB_CONNECTED) {
break;
}
/* Go over all the frags */
num_frags =
ompi_list_get_size(&(peer->pending_send_frags));
frag_item =
ompi_list_get_first(&(peer->pending_send_frags));
for(j = 0; j < num_frags;
frag_item = ompi_list_get_next(frag_item), j++) {
sendfrag = (mca_ptl_ib_send_frag_t *) frag_item;
if(sendfrag->frag_progressed) {
/* We've already posted this one */
continue;
} else {
/* We need to post this one */
if(mca_ptl_ib_post_send(peer->peer_module->ib_state,
peer->peer_conn, &sendfrag->ib_buf)
!= OMPI_SUCCESS) {
return OMPI_ERROR;
}
sendfrag->frag_progressed = 1;
}
}
mca_ptl_ib_progress_send_frags(peer);
}
}
@ -398,95 +366,36 @@ int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp)
module = mca_ptl_ib_component.ib_ptl_modules[i];
ret = VAPI_poll_cq(mca_ptl_ib_component.ib_ptl_modules[0]->ib_state->nic,
mca_ptl_ib_component.ib_ptl_modules[0]->ib_state->cq_hndl,
&comp);
if(VAPI_OK == ret) {
if(comp.status != VAPI_SUCCESS) {
ompi_output(0, "Got error : %s, Vendor code : %d\n",
VAPI_wc_status_sym(comp.status),
comp.vendor_err_syndrome);
mca_ptl_ib_drain_network(module->ib_state->nic,
module->ib_state->cq_hndl,
&comp_type, &comp_addr);
} else {
if(VAPI_CQE_SQ_SEND_DATA == comp.opcode) {
D_PRINT("Send completion, id:%d\n",
comp.id);
}
else if(VAPI_CQE_RQ_SEND_DATA == comp.opcode) {
D_PRINT("Received message completion len = %d, id : %d\n",
comp.byte_len, comp.id);
}
else {
D_PRINT("Got Unknown completion! Opcode : %d\n",
comp.opcode);
}
}
/* Handle n/w completions */
switch(comp_type) {
case IB_COMP_SEND :
D_PRINT("Caught a send completion");
/* Process a completed send */
mca_ptl_ib_process_send_comp((mca_ptl_base_module_t *) module,
comp_addr);
break;
case IB_COMP_RECV :
D_PRINT("Caught a recv completion");
/* Process incoming receives */
mca_ptl_ib_process_recv((mca_ptl_base_module_t *)module,
comp_addr);
break;
case IB_COMP_NOTHING:
break;
default:
ompi_output(0, "Errorneous network completion");
break;
}
}
return OMPI_SUCCESS;
}
#if 0
mca_ptl_base_header_t *header;
mca_ptl_ib_recv_buf_t *recv_buf;
mca_ptl_ib_send_buf_t *send_buf;
mca_pml_base_request_t *req;
D_PRINT("Checking completions ... \n");
ret = VAPI_poll_cq(mca_ptl_ib_component.ib_ptl_modules[0]->nic,
mca_ptl_ib_component.ib_ptl_modules[0]->cq_hndl,
&comp);
if(VAPI_OK == ret) {
if(comp.status != VAPI_SUCCESS) {
fprintf(stderr,"Got error : %s, Vendor code : %d\n",
VAPI_wc_status_sym(comp.status),
comp.vendor_err_syndrome);
}
if(VAPI_CQE_SQ_SEND_DATA == comp.opcode) {
D_PRINT("Send completion, id:%d\n",
comp.id);
send_buf = (mca_ptl_ib_send_buf_t*) (unsigned int)comp.id;
header = (mca_ptl_base_header_t*) send_buf->buf;
req = (mca_pml_base_request_t *) send_buf->req;
mca_ptl_ib_component.ib_ptl_modules[0]->super.ptl_send_progress(
mca_ptl_ib_component.ib_ptl_modules[0],
req,
header->hdr_frag.hdr_frag_length);
}
else if(VAPI_CQE_RQ_SEND_DATA == comp.opcode) {
D_PRINT("Received message completion len = %d, id : %d\n",
comp.byte_len, comp.id);
recv_buf = (mca_ptl_ib_recv_buf_t*) (unsigned int)comp.id;
header = (mca_ptl_base_header_t*) recv_buf->buf;
switch(header->hdr_common.hdr_type) {
case MCA_PTL_HDR_TYPE_MATCH:
D_PRINT("Header type match\n");
mca_ptl_ib_frag(mca_ptl_ib_component.ib_ptl_modules[0],
header);
break;
case MCA_PTL_HDR_TYPE_FRAG:
D_PRINT("Header type frag\n");
break;
default :
D_PRINT("Header, what header?\n");
break;
}
}
else {
D_PRINT("Got Unknown completion! Opcode : %d\n",
comp.opcode);
}
}
#endif

Просмотреть файл

@ -409,6 +409,8 @@ int mca_ptl_ib_peer_send(mca_ptl_base_peer_t* peer,
ompi_list_append(&peer->pending_send_frags,
(ompi_list_item_t *)frag);
rc = OMPI_SUCCESS;
break;
case MCA_PTL_IB_CLOSED:
@ -427,6 +429,11 @@ int mca_ptl_ib_peer_send(mca_ptl_base_peer_t* peer,
case MCA_PTL_IB_CONNECTED:
/* Send the frag off */
rc = mca_ptl_ib_post_send(peer->peer_module->ib_state,
peer->peer_conn,
&frag->ib_buf, (void*) frag);
break;
default:
rc = OMPI_ERR_UNREACH;
@ -435,3 +442,41 @@ int mca_ptl_ib_peer_send(mca_ptl_base_peer_t* peer,
return rc;
}
void mca_ptl_ib_progress_send_frags(mca_ptl_ib_peer_t* peer)
{
int num_frags, i;
ompi_list_item_t *frag_item;
mca_ptl_ib_send_frag_t *sendfrag;
/*Check if peer is connected */
if(peer->peer_state != MCA_PTL_IB_CONNECTED) {
return;
}
/* Go over all the frags */
num_frags =
ompi_list_get_size(&(peer->pending_send_frags));
frag_item =
ompi_list_get_first(&(peer->pending_send_frags));
for(i = 0; i < num_frags;
frag_item = ompi_list_get_next(frag_item), i++) {
sendfrag = (mca_ptl_ib_send_frag_t *) frag_item;
if(!sendfrag->frag_progressed) {
/* We need to post this one */
if(mca_ptl_ib_post_send(peer->peer_module->ib_state,
peer->peer_conn, &sendfrag->ib_buf,
(void*) sendfrag)
!= OMPI_SUCCESS) {
ompi_output(0, "Error in posting send");
}
sendfrag->frag_progressed = 1;
}
}
}

Просмотреть файл

@ -78,6 +78,8 @@ typedef struct mca_ptl_base_peer_t mca_ptl_ib_peer_t;
int mca_ptl_ib_peer_send(mca_ptl_base_peer_t*, mca_ptl_ib_send_frag_t*);
void mca_ptl_ib_post_oob_recv_nb(void);
void mca_ptl_ib_progress_send_frags(mca_ptl_ib_peer_t*);
#define DUMP_PEER(peer_ptr) { \
ompi_output(0, "[%s:%d] ", __FILE__, __LINE__); \
ompi_output(0, "Dumping peer state"); \

Просмотреть файл

@ -507,12 +507,14 @@ int mca_ptl_ib_peer_connect(mca_ptl_ib_state_t *ib_state,
int mca_ptl_ib_post_send(mca_ptl_ib_state_t *ib_state,
mca_ptl_ib_peer_conn_t *peer_conn,
ib_buffer_t *ib_buf)
ib_buffer_t *ib_buf, void* addr)
{
VAPI_ret_t ret;
IB_SET_REMOTE_QP_NUM(ib_buf, (peer_conn->rres->qp_num));
IB_SET_SEND_DESC_ID(ib_buf, addr);
D_PRINT("length : %d", ib_buf->desc.sg_entry.len);
ret = VAPI_post_sr(ib_state->nic,
@ -526,3 +528,50 @@ int mca_ptl_ib_post_send(mca_ptl_ib_state_t *ib_state,
return OMPI_SUCCESS;
}
void mca_ptl_ib_drain_network(VAPI_hca_hndl_t nic,
VAPI_cq_hndl_t cq_hndl, int* comp_type, void** comp_addr)
{
VAPI_ret_t ret;
VAPI_wc_desc_t comp;
ret = VAPI_poll_cq(nic, cq_hndl, &comp);
if(VAPI_OK == ret) {
if(comp.status != VAPI_SUCCESS) {
ompi_output(0, "Got error : %s, Vendor code : %d\n",
VAPI_wc_status_sym(comp.status),
comp.vendor_err_syndrome);
*comp_type = IB_COMP_ERROR;
*comp_addr = NULL;
} else {
if(VAPI_CQE_SQ_SEND_DATA == comp.opcode) {
D_PRINT("Send completion, id:%d\n",
comp.id);
*comp_type = IB_COMP_SEND;
*comp_addr = (void*) (unsigned int) comp.id;
}
else if(VAPI_CQE_RQ_SEND_DATA == comp.opcode) {
D_PRINT("Received message completion len = %d, id : %d\n",
comp.byte_len, comp.id);
*comp_type = IB_COMP_RECV;
*comp_addr = (void*) (unsigned int) comp.id;
}
else {
D_PRINT("Got Unknown completion! Opcode : %d\n",
comp.opcode);
*comp_type = IB_COMP_ERROR;
*comp_addr = NULL;
}
}
} else {
/* No completions from the network */
*comp_type = IB_COMP_NOTHING;
*comp_addr = NULL;
}
}

Просмотреть файл

@ -36,6 +36,13 @@ typedef enum {
IB_SEND
} IB_wr_t;
typedef enum {
IB_COMP_ERROR,
IB_COMP_RECV,
IB_COMP_SEND,
IB_COMP_NOTHING
} IB_comp_t;
struct vapi_memhandle_t {
VAPI_mr_hndl_t hndl;
/* Memory region handle */
@ -168,6 +175,10 @@ typedef struct mca_ptl_ib_peer_conn_t mca_ptl_ib_peer_conn_t;
ib_buf_ptr->desc.sr.remote_qp = qp; \
}
#define IB_SET_SEND_DESC_ID(ib_buf_ptr, addr) { \
ib_buf_ptr->desc.sr.id = (VAPI_virt_addr_t) \
(MT_virt_addr_t) addr; \
}
int mca_ptl_ib_init_module(mca_ptl_ib_state_t*, int);
int mca_ptl_ib_get_num_hcas(uint32_t*);
int mca_ptl_ib_init_peer(mca_ptl_ib_state_t*, mca_ptl_ib_peer_conn_t*);
@ -177,6 +188,8 @@ int mca_ptl_ib_register_mem(VAPI_hca_hndl_t nic, VAPI_pd_hndl_t ptag,
void* buf, int len, vapi_memhandle_t* memhandle);
int mca_ptl_ib_post_send(mca_ptl_ib_state_t *ib_state,
mca_ptl_ib_peer_conn_t *peer_conn,
ib_buffer_t *ib_buf);
ib_buffer_t *ib_buf, void*);
void mca_ptl_ib_drain_network(VAPI_hca_hndl_t nic,
VAPI_cq_hndl_t cq_hndl, int* comp_type, void** comp_addr);
#endif /* MCA_PTL_IB_PRIV_H */

Просмотреть файл

@ -13,7 +13,7 @@ OBJ_CLASS_INSTANCE(mca_ptl_ib_recv_frag_t,
mca_ptl_ib_recv_frag_destruct);
/*
* TCP fragment constructor
* IB fragment constructor
*/
static void mca_ptl_ib_recv_frag_construct(mca_ptl_ib_recv_frag_t* frag)
@ -22,7 +22,7 @@ static void mca_ptl_ib_recv_frag_construct(mca_ptl_ib_recv_frag_t* frag)
/*
* TCP fragment destructor
* IB fragment destructor
*/
static void mca_ptl_ib_recv_frag_destruct(mca_ptl_ib_recv_frag_t* frag)
@ -31,14 +31,14 @@ static void mca_ptl_ib_recv_frag_destruct(mca_ptl_ib_recv_frag_t* frag)
void
mca_ptl_ib_recv_frag_done (mca_ptl_base_header_t *header,
mca_ptl_ib_recv_frag_t* frag,
mca_ptl_base_recv_frag_t* frag,
mca_pml_base_recv_request_t *request)
{
frag->super.frag_base.frag_owner->ptl_recv_progress (
frag->super.frag_base.frag_owner,
frag->frag_base.frag_owner->ptl_recv_progress (
frag->frag_base.frag_owner,
request,
frag->super.frag_base.frag_size,
frag->super.frag_base.frag_size);
frag->frag_base.frag_size,
frag->frag_base.frag_size);
OMPI_FREE_LIST_RETURN(&mca_ptl_ib_component.ib_recv_frags,
(ompi_list_item_t*)frag);
@ -59,3 +59,70 @@ mca_ptl_ib_recv_frag_done (mca_ptl_base_header_t *header,
#endif
}
/*
* Process incoming receive fragments
*
*/
void mca_ptl_ib_process_recv(mca_ptl_base_module_t *module, void* addr)
{
bool matched;
int rc;
ib_buffer_t *ib_buf;
mca_ptl_base_header_t *header;
ompi_list_item_t *item;
mca_ptl_ib_recv_frag_t *recv_frag;
ib_buf = (ib_buffer_t *) (unsigned int) addr;
header = (mca_ptl_base_header_t *) &ib_buf->buf[0];
OMPI_FREE_LIST_GET(&mca_ptl_ib_component.ib_recv_frags,
item, rc);
while (OMPI_SUCCESS != rc) {
/* TODO: progress the recv state machine */
D_PRINT("Retry to allocate a recv fragment\n");
OMPI_FREE_LIST_GET (&mca_ptl_ib_component.ib_recv_frags,
item, rc);
}
recv_frag = (mca_ptl_ib_recv_frag_t *) item;
recv_frag->super.frag_base.frag_owner =
(mca_ptl_base_module_t *) module;
recv_frag->super.frag_base.frag_peer = NULL;
recv_frag->super.frag_request = NULL;
recv_frag->super.frag_is_buffered = false;
/* Copy the header, mca_ptl_base_match()
* does not do what it claims */
recv_frag->super.frag_base.frag_header = *header;
/* Taking the data starting point be
* default */
recv_frag->super.frag_base.frag_addr =
(char *) header + sizeof (mca_ptl_base_header_t);
recv_frag->super.frag_base.frag_size = header->hdr_frag.hdr_frag_length;
/* match with preposted
* requests */
matched = module->ptl_match(
recv_frag->super.frag_base.frag_owner,
&recv_frag->super,
&recv_frag->super.frag_base.frag_header.hdr_match);
if (!matched) {
/* Oh my GOD
* !!! */
D_PRINT("Can't match buffer. Mama is unhappy\n");
memcpy (recv_frag->unex_buf,
(char *) header + sizeof (mca_ptl_base_header_t),
header->hdr_frag.hdr_frag_length);
recv_frag->super.frag_is_buffered = true;
recv_frag->super.frag_base.frag_addr = recv_frag->unex_buf;
} else {
D_PRINT("Message matched!");
}
}

Просмотреть файл

@ -17,6 +17,7 @@ typedef struct mca_ptl_ib_recv_frag_t mca_ptl_ib_recv_frag_t;
void mca_ptl_ib_recv_frag_done (mca_ptl_base_header_t*,
mca_ptl_ib_recv_frag_t*, mca_pml_base_recv_request_t*);
mca_ptl_base_recv_frag_t*, mca_pml_base_recv_request_t*);
void mca_ptl_ib_process_recv(mca_ptl_base_module_t* , void*);
#endif

Просмотреть файл

@ -42,8 +42,6 @@ int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* sendfrag,
struct iovec iov;
int header_length;
D_PRINT("");
/* Start of the IB buffer */
hdr = (mca_ptl_base_header_t *) &sendfrag->ib_buf.buf[0];
@ -220,3 +218,22 @@ int mca_ptl_ib_register_send_frags(mca_ptl_base_module_t *ptl)
return OMPI_SUCCESS;
}
/*
* Process send completions
*
*/
void mca_ptl_ib_process_send_comp(mca_ptl_base_module_t *module,
void* addr)
{
mca_ptl_ib_send_frag_t *sendfrag;
mca_ptl_base_header_t *header;
sendfrag = (mca_ptl_ib_send_frag_t *) (unsigned int) addr;
header = (mca_ptl_base_header_t *) sendfrag->ib_buf.buf;
module->ptl_send_progress(module,
sendfrag->frag_send.frag_request,
header->hdr_frag.hdr_frag_length);
}

Просмотреть файл

@ -58,4 +58,6 @@ mca_ptl_ib_send_frag_t* mca_ptl_ib_alloc_send_frag(
int mca_ptl_ib_register_send_frags(mca_ptl_base_module_t *ptl);
void mca_ptl_ib_process_send_comp(mca_ptl_base_module_t *, void*);
#endif