More send frag management code.
This commit was SVN r2673.
Этот коммит содержится в:
родитель
148580acad
Коммит
c46f5556c1
@ -121,8 +121,6 @@ int mca_ptl_ib_request_init( struct mca_ptl_base_module_t* ptl,
|
||||
{
|
||||
mca_ptl_ib_send_frag_t *ib_send_frag;
|
||||
|
||||
D_PRINT("");
|
||||
|
||||
ib_send_frag = mca_ptl_ib_alloc_send_frag(ptl,
|
||||
request);
|
||||
|
||||
@ -134,6 +132,10 @@ int mca_ptl_ib_request_init( struct mca_ptl_base_module_t* ptl,
|
||||
ib_send_frag;
|
||||
}
|
||||
|
||||
D_PRINT("sendfrag = %p, lkey = %d",
|
||||
ib_send_frag,
|
||||
ib_send_frag->ib_buf.hndl.lkey);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -161,8 +163,6 @@ int mca_ptl_ib_send( struct mca_ptl_base_module_t* ptl,
|
||||
mca_ptl_ib_send_frag_t* sendfrag;
|
||||
int rc = OMPI_SUCCESS;
|
||||
|
||||
D_PRINT("");
|
||||
|
||||
if (0 == offset) {
|
||||
sendfrag = (mca_ptl_ib_send_frag_t *)
|
||||
&((mca_ptl_ib_send_request_t*)sendreq)->req_frag;
|
||||
@ -171,11 +171,16 @@ int mca_ptl_ib_send( struct mca_ptl_base_module_t* ptl,
|
||||
/* TODO: Implementation for messages > frag size */
|
||||
ompi_list_item_t* item;
|
||||
OMPI_FREE_LIST_GET(&mca_ptl_ib_component.ib_send_frags, item, rc);
|
||||
|
||||
if(NULL == (sendfrag = (mca_ptl_ib_send_frag_t*)item)) {
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
D_PRINT("Sendfrag = %p, Lkey = %d",
|
||||
sendfrag,
|
||||
sendfrag->ib_buf.hndl.lkey);
|
||||
|
||||
rc = mca_ptl_ib_send_frag_init(sendfrag, ptl_peer,
|
||||
sendreq, offset, &size, flags);
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
|
@ -101,12 +101,6 @@ struct mca_ptl_ib_module_t {
|
||||
|
||||
ompi_free_list_t recv_free;
|
||||
/**< free list of recv buffer descriptors */
|
||||
|
||||
ompi_list_t pending_send_frags;
|
||||
/**< list of all pending send fragments */
|
||||
|
||||
ompi_list_t pending_recv_frags;
|
||||
/**< list of all pending recv fragments */
|
||||
};
|
||||
|
||||
typedef struct mca_ptl_ib_module_t mca_ptl_ib_module_t;
|
||||
|
@ -238,9 +238,6 @@ mca_ptl_base_module_t** mca_ptl_ib_component_init(int *num_ptl_modules,
|
||||
}
|
||||
|
||||
/* Find a better place for this */
|
||||
OBJ_CONSTRUCT(&(ib_modules[i].pending_send_frags), ompi_list_t);
|
||||
OBJ_CONSTRUCT(&(ib_modules[i].pending_recv_frags), ompi_list_t);
|
||||
|
||||
OBJ_CONSTRUCT(&(ib_modules[i].send_free), ompi_free_list_t);
|
||||
|
||||
OBJ_CONSTRUCT(&(ib_modules[i].recv_free), ompi_free_list_t);
|
||||
@ -332,24 +329,108 @@ int mca_ptl_ib_component_control(int param, void* value, size_t size)
|
||||
|
||||
int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp)
|
||||
{
|
||||
uint32_t num_modules, i;
|
||||
int i, j, num_procs, num_frags, num_modules;
|
||||
ompi_list_item_t *item, *frag_item;
|
||||
mca_ptl_ib_peer_t *peer;
|
||||
mca_ptl_ib_proc_t *proc;
|
||||
mca_ptl_ib_send_frag_t *sendfrag;
|
||||
VAPI_ret_t ret;
|
||||
VAPI_wc_desc_t comp;
|
||||
mca_ptl_ib_module_t *module;
|
||||
|
||||
num_procs = ompi_list_get_size(&(mca_ptl_ib_component.ib_procs));
|
||||
|
||||
/* Traverse the list of procs associated with the
|
||||
* IB component */
|
||||
|
||||
item = ompi_list_get_first(&(mca_ptl_ib_component.ib_procs));
|
||||
|
||||
for(i = 0; i < num_procs;
|
||||
item = ompi_list_get_next(item), i++) {
|
||||
|
||||
|
||||
proc = (mca_ptl_ib_proc_t *) item;
|
||||
|
||||
/* We only have one peer per proc right now */
|
||||
peer = (mca_ptl_ib_peer_t *) proc->proc_peers[0];
|
||||
|
||||
if(!ompi_list_is_empty(&(peer->pending_send_frags))) {
|
||||
|
||||
/*Check if peer is connected */
|
||||
if(peer->peer_state != MCA_PTL_IB_CONNECTED) {
|
||||
break;
|
||||
}
|
||||
|
||||
/* Go over all the frags */
|
||||
num_frags =
|
||||
ompi_list_get_size(&(peer->pending_send_frags));
|
||||
|
||||
frag_item =
|
||||
ompi_list_get_first(&(peer->pending_send_frags));
|
||||
|
||||
for(j = 0; j < num_frags;
|
||||
frag_item = ompi_list_get_next(frag_item), j++) {
|
||||
|
||||
sendfrag = (mca_ptl_ib_send_frag_t *) frag_item;
|
||||
|
||||
if(sendfrag->frag_progressed) {
|
||||
/* We've already posted this one */
|
||||
continue;
|
||||
} else {
|
||||
/* We need to post this one */
|
||||
if(mca_ptl_ib_post_send(peer->peer_module->ib_state,
|
||||
peer->peer_conn, &sendfrag->ib_buf)
|
||||
!= OMPI_SUCCESS) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
sendfrag->frag_progressed = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Poll for completions */
|
||||
|
||||
num_modules = mca_ptl_ib_component.ib_num_ptl_modules;
|
||||
|
||||
/* Make progress on outstanding sends for
|
||||
* all IB modules */
|
||||
for(i = 0; i < num_modules; i++) {
|
||||
|
||||
|
||||
module = mca_ptl_ib_component.ib_ptl_modules[i];
|
||||
|
||||
if(!ompi_list_is_empty(&((module)->pending_send_frags))) {
|
||||
D_PRINT("Frag to progress");
|
||||
ret = VAPI_poll_cq(mca_ptl_ib_component.ib_ptl_modules[0]->ib_state->nic,
|
||||
mca_ptl_ib_component.ib_ptl_modules[0]->ib_state->cq_hndl,
|
||||
&comp);
|
||||
if(VAPI_OK == ret) {
|
||||
if(comp.status != VAPI_SUCCESS) {
|
||||
ompi_output(0, "Got error : %s, Vendor code : %d\n",
|
||||
VAPI_wc_status_sym(comp.status),
|
||||
comp.vendor_err_syndrome);
|
||||
|
||||
} else {
|
||||
if(VAPI_CQE_SQ_SEND_DATA == comp.opcode) {
|
||||
D_PRINT("Send completion, id:%d\n",
|
||||
comp.id);
|
||||
}
|
||||
else if(VAPI_CQE_RQ_SEND_DATA == comp.opcode) {
|
||||
D_PRINT("Received message completion len = %d, id : %d\n",
|
||||
comp.byte_len, comp.id);
|
||||
}
|
||||
else {
|
||||
D_PRINT("Got Unknown completion! Opcode : %d\n",
|
||||
comp.opcode);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
#if 0
|
||||
VAPI_ret_t ret;
|
||||
VAPI_wc_desc_t comp;
|
||||
|
||||
mca_ptl_base_header_t *header;
|
||||
mca_ptl_ib_recv_buf_t *recv_buf;
|
||||
@ -409,6 +490,3 @@ int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp)
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -78,6 +78,7 @@ static void mca_ptl_ib_peer_construct(mca_ptl_base_peer_t* module_peer)
|
||||
module_peer->peer_retries = 0;
|
||||
OBJ_CONSTRUCT(&module_peer->peer_send_lock, ompi_mutex_t);
|
||||
OBJ_CONSTRUCT(&module_peer->peer_recv_lock, ompi_mutex_t);
|
||||
OBJ_CONSTRUCT(&module_peer->pending_send_frags, ompi_list_t);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -401,15 +402,15 @@ int mca_ptl_ib_peer_send(mca_ptl_base_peer_t* peer,
|
||||
switch(peer->peer_state) {
|
||||
case MCA_PTL_IB_CONNECTING:
|
||||
|
||||
ompi_list_append(&peer->peer_module->pending_send_frags,
|
||||
(ompi_list_item_t*)frag);
|
||||
ompi_list_append(&peer->pending_send_frags,
|
||||
(ompi_list_item_t *)frag);
|
||||
break;
|
||||
case MCA_PTL_IB_CLOSED:
|
||||
|
||||
D_PRINT("Connection to peer closed ... connecting ...");
|
||||
|
||||
ompi_list_append(&peer->peer_module->pending_send_frags,
|
||||
(ompi_list_item_t*)frag);
|
||||
ompi_list_append(&peer->pending_send_frags,
|
||||
(ompi_list_item_t *)frag);
|
||||
|
||||
rc = mca_ptl_ib_peer_start_connect(peer);
|
||||
break;
|
||||
|
@ -67,6 +67,9 @@ struct mca_ptl_base_peer_t {
|
||||
|
||||
ompi_mutex_t peer_recv_lock;
|
||||
/**< lock for concurrent access to peer state */
|
||||
|
||||
ompi_list_t pending_send_frags;
|
||||
/**< list of pending send frags for this peer */
|
||||
};
|
||||
|
||||
typedef struct mca_ptl_base_peer_t mca_ptl_base_peer_t;
|
||||
|
@ -504,3 +504,25 @@ int mca_ptl_ib_peer_connect(mca_ptl_ib_state_t *ib_state,
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int mca_ptl_ib_post_send(mca_ptl_ib_state_t *ib_state,
|
||||
mca_ptl_ib_peer_conn_t *peer_conn,
|
||||
ib_buffer_t *ib_buf)
|
||||
{
|
||||
VAPI_ret_t ret;
|
||||
|
||||
IB_PREPARE_SEND_DESC(ib_buf, (peer_conn->rres->qp_num));
|
||||
|
||||
D_PRINT("lkey = %d", ib_buf->hndl.lkey);
|
||||
|
||||
ret = VAPI_post_sr(ib_state->nic,
|
||||
peer_conn->lres->qp_hndl,
|
||||
&ib_buf->desc.sr);
|
||||
|
||||
if(VAPI_OK != ret) {
|
||||
MCA_PTL_IB_VAPI_RET(ret, "VAPI_post_sr");
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -171,5 +171,8 @@ int mca_ptl_ib_peer_connect(mca_ptl_ib_state_t*,
|
||||
mca_ptl_ib_peer_conn_t*);
|
||||
int mca_ptl_ib_register_mem(VAPI_hca_hndl_t nic, VAPI_pd_hndl_t ptag,
|
||||
void* buf, int len, vapi_memhandle_t* memhandle);
|
||||
int mca_ptl_ib_post_send(mca_ptl_ib_state_t *ib_state,
|
||||
mca_ptl_ib_peer_conn_t *peer_conn,
|
||||
ib_buffer_t *ib_buf);
|
||||
|
||||
#endif /* MCA_PTL_IB_PRIV_H */
|
||||
|
@ -20,6 +20,8 @@ OBJ_CLASS_INSTANCE(mca_ptl_ib_send_frag_t,
|
||||
|
||||
static void mca_ptl_ib_send_frag_construct(mca_ptl_ib_send_frag_t* frag)
|
||||
{
|
||||
frag->frag_progressed = 0;
|
||||
frag->frag_ack_pending = 0;
|
||||
}
|
||||
|
||||
static void mca_ptl_ib_send_frag_destruct(mca_ptl_ib_send_frag_t* frag)
|
||||
@ -44,6 +46,7 @@ int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* sendfrag,
|
||||
/* Start of the IB buffer */
|
||||
hdr = (mca_ptl_base_header_t *) &sendfrag->ib_buf.buf[0];
|
||||
|
||||
/* Fill up the header for PML to make a match */
|
||||
if(offset == 0) {
|
||||
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
|
||||
hdr->hdr_common.hdr_flags = flags;
|
||||
@ -139,7 +142,6 @@ int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* sendfrag,
|
||||
|
||||
*size = size_out;
|
||||
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -155,8 +157,6 @@ mca_ptl_ib_send_frag_t* mca_ptl_ib_alloc_send_frag(
|
||||
ompi_list_item_t *item;
|
||||
mca_ptl_ib_send_frag_t *ib_send_frag;
|
||||
|
||||
D_PRINT("");
|
||||
|
||||
flist = &((mca_ptl_ib_module_t *)ptl)->send_free;
|
||||
|
||||
item = ompi_list_remove_first(&((flist)->super));
|
||||
@ -193,13 +193,16 @@ int mca_ptl_ib_register_send_frags(mca_ptl_base_module_t *ptl)
|
||||
|
||||
num_send_frags = ompi_list_get_size(&(flist->super));
|
||||
|
||||
/* Register the buffers */
|
||||
for(i = 0; i < num_send_frags; i++) {
|
||||
item = ompi_list_get_first(&((flist)->super));
|
||||
|
||||
item = ompi_list_remove_first (&((flist)->super));
|
||||
/* Register the buffers */
|
||||
for(i = 0; i < num_send_frags;
|
||||
item = ompi_list_get_next(item), i++) {
|
||||
|
||||
ib_send_frag = (mca_ptl_ib_send_frag_t *) item;
|
||||
|
||||
ib_send_frag->frag_progressed = 0;
|
||||
|
||||
ib_buf_ptr = (ib_buffer_t *) &ib_send_frag->ib_buf;
|
||||
|
||||
rc = mca_ptl_ib_register_mem(ib_state->nic, ib_state->ptag,
|
||||
@ -209,9 +212,11 @@ int mca_ptl_ib_register_send_frags(mca_ptl_base_module_t *ptl)
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
IB_PREPARE_SEND_DESC(ib_buf_ptr, 0);
|
||||
if(i == 0) {
|
||||
D_PRINT("lkey = %d", ib_buf_ptr->hndl.lkey);
|
||||
}
|
||||
|
||||
ompi_list_append(&((flist)->super), item);
|
||||
IB_PREPARE_SEND_DESC(ib_buf_ptr, 0);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user