Added send descriptor management code
This commit was SVN r2639.
Этот коммит содержится в:
родитель
388ac0dd4e
Коммит
4c122c6649
@ -119,8 +119,21 @@ int mca_ptl_ib_finalize(struct mca_ptl_base_module_t* ptl)
|
||||
int mca_ptl_ib_request_init( struct mca_ptl_base_module_t* ptl,
|
||||
struct mca_pml_base_send_request_t* request)
|
||||
{
|
||||
mca_ptl_ib_send_frag_t *ib_send_frag;
|
||||
|
||||
D_PRINT("");
|
||||
OBJ_CONSTRUCT(request+1, mca_ptl_ib_send_frag_t);
|
||||
|
||||
ib_send_frag = mca_ptl_ib_alloc_send_frag(ptl,
|
||||
request);
|
||||
|
||||
if(NULL == ib_send_frag) {
|
||||
D_PRINT("Unable to allocate ib_send_frag");
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
} else {
|
||||
((mca_ptl_ib_send_request_t *)request)->req_frag =
|
||||
ib_send_frag;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
@ -146,12 +159,16 @@ int mca_ptl_ib_send( struct mca_ptl_base_module_t* ptl,
|
||||
int flags)
|
||||
{
|
||||
mca_ptl_ib_send_frag_t* sendfrag;
|
||||
int rc;
|
||||
int rc = OMPI_SUCCESS;
|
||||
|
||||
D_PRINT("");
|
||||
|
||||
if (0 == offset) {
|
||||
sendfrag = &((mca_ptl_ib_send_request_t*)sendreq)->req_frag;
|
||||
} else {
|
||||
ompi_list_item_t* item;
|
||||
|
||||
/* TODO: Implementation for messages > frag size */
|
||||
ompi_list_item_t* item;
|
||||
OMPI_FREE_LIST_GET(&mca_ptl_ib_component.ib_send_frags, item, rc);
|
||||
if(NULL == (sendfrag = (mca_ptl_ib_send_frag_t*)item)) {
|
||||
return rc;
|
||||
|
@ -88,11 +88,18 @@ extern mca_ptl_ib_component_t mca_ptl_ib_component;
|
||||
* IB PTL Interface
|
||||
*/
|
||||
struct mca_ptl_ib_module_t {
|
||||
mca_ptl_base_module_t super; /**< base PTL interface */
|
||||
mca_ptl_base_module_t super;
|
||||
/**< base PTL interface */
|
||||
|
||||
mca_ptl_ib_state_t *ib_state;
|
||||
/* IB state holds info about queue handles, HCA handles,
|
||||
* protection domain etc. which are private to this module */
|
||||
mca_ptl_ib_state_t *ib_state;
|
||||
|
||||
ompi_free_list_t send_free;
|
||||
/**< free list of send buffer descriptors */
|
||||
|
||||
ompi_free_list_t recv_free;
|
||||
/**< free list of recv buffer descriptors */
|
||||
};
|
||||
|
||||
typedef struct mca_ptl_ib_module_t mca_ptl_ib_module_t;
|
||||
|
@ -236,8 +236,21 @@ mca_ptl_base_module_t** mca_ptl_ib_component_init(int *num_ptl_modules,
|
||||
!= OMPI_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
DUMP_IB_STATE(ib_modules[i].ib_state);
|
||||
|
||||
/* Find a better place for this */
|
||||
OBJ_CONSTRUCT(&(ib_modules[i].send_free), ompi_free_list_t);
|
||||
|
||||
OBJ_CONSTRUCT(&(ib_modules[i].recv_free), ompi_free_list_t);
|
||||
|
||||
ompi_free_list_init(&(ib_modules[i].send_free),
|
||||
sizeof(mca_ptl_ib_send_frag_t),
|
||||
OBJ_CLASS(mca_ptl_ib_send_frag_t),
|
||||
mca_ptl_ib_component.ib_free_list_num,
|
||||
mca_ptl_ib_component.ib_free_list_max,
|
||||
mca_ptl_ib_component.ib_free_list_inc,
|
||||
NULL);
|
||||
|
||||
DUMP_IB_STATE(ib_modules[i].ib_state);
|
||||
}
|
||||
|
||||
/* Post OOB receives */
|
||||
|
@ -122,6 +122,8 @@ static int mca_ptl_ib_peer_send_conn_info(mca_ptl_base_peer_t* peer)
|
||||
ompi_process_name_t *name;
|
||||
char sendbuf[50];
|
||||
|
||||
D_PRINT("");
|
||||
|
||||
name = &peer->peer_proc->proc_guid;
|
||||
|
||||
/* Zero out the send buffer */
|
||||
@ -264,6 +266,23 @@ static int mca_ptl_ib_peer_reply_start_connect(mca_ptl_ib_peer_t *peer,
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
*
|
||||
*/
|
||||
|
||||
static void mca_ptl_ib_peer_connected(mca_ptl_ib_peer_t *peer)
|
||||
{
|
||||
peer->peer_state = MCA_PTL_IB_CONNECTED;
|
||||
|
||||
if(ompi_list_get_size(&peer->peer_frags) > 0) {
|
||||
if(NULL == peer->peer_send_frag) {
|
||||
|
||||
peer->peer_send_frag = (mca_ptl_ib_send_frag_t *)
|
||||
ompi_list_remove_first(&peer->peer_frags);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Non blocking OOB recv callback.
|
||||
* Read incoming QP and other info, and if this peer
|
||||
@ -324,7 +343,8 @@ static void mca_ptl_ib_peer_connect_recv_callback(int status,
|
||||
D_PRINT("Connect Error");
|
||||
}
|
||||
|
||||
ib_peer->peer_state = MCA_PTL_IB_CONNECTED;
|
||||
/* Setup state as connected */
|
||||
mca_ptl_ib_peer_connected(ib_peer);
|
||||
|
||||
break;
|
||||
|
||||
@ -346,7 +366,8 @@ static void mca_ptl_ib_peer_connect_recv_callback(int status,
|
||||
D_PRINT("Connect Error");
|
||||
}
|
||||
|
||||
ib_peer->peer_state = MCA_PTL_IB_CONNECTED;
|
||||
/* Setup state as connected */
|
||||
mca_ptl_ib_peer_connected(ib_peer);
|
||||
|
||||
break;
|
||||
case MCA_PTL_IB_CONNECTED :
|
||||
@ -401,10 +422,9 @@ int mca_ptl_ib_peer_send(mca_ptl_base_peer_t* peer,
|
||||
|
||||
D_PRINT("Connection to peer closed ... connecting ...");
|
||||
|
||||
//ompi_list_append(&peer->peer_frags, (ompi_list_item_t*)frag);
|
||||
ompi_list_append(&peer->peer_frags, (ompi_list_item_t*)frag);
|
||||
|
||||
rc = mca_ptl_ib_peer_start_connect(peer);
|
||||
|
||||
break;
|
||||
|
||||
case MCA_PTL_IB_FAILED:
|
||||
@ -413,6 +433,29 @@ int mca_ptl_ib_peer_send(mca_ptl_base_peer_t* peer,
|
||||
break;
|
||||
|
||||
case MCA_PTL_IB_CONNECTED:
|
||||
|
||||
/* Send the frag off */
|
||||
if(NULL != peer->peer_send_frag) {
|
||||
|
||||
/* Some other frag is being processed */
|
||||
ompi_list_append(&peer->peer_frags, (ompi_list_item_t*)frag);
|
||||
|
||||
} else {
|
||||
|
||||
/* No other frag is being processed */
|
||||
|
||||
if(1) {
|
||||
|
||||
D_PRINT("I have to send it now ...");
|
||||
|
||||
} else {
|
||||
|
||||
/* Set the current frag being processed as
|
||||
* THIS frag */
|
||||
peer->peer_send_frag = frag;
|
||||
}
|
||||
|
||||
}
|
||||
break;
|
||||
default:
|
||||
rc = OMPI_ERR_UNREACH;
|
||||
|
@ -20,12 +20,10 @@ OBJ_CLASS_INSTANCE(mca_ptl_ib_send_frag_t,
|
||||
|
||||
static void mca_ptl_ib_send_frag_construct(mca_ptl_ib_send_frag_t* frag)
|
||||
{
|
||||
D_PRINT("\n");
|
||||
}
|
||||
|
||||
static void mca_ptl_ib_send_frag_destruct(mca_ptl_ib_send_frag_t* frag)
|
||||
{
|
||||
D_PRINT("\n");
|
||||
}
|
||||
|
||||
int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* sendfrag,
|
||||
@ -35,14 +33,16 @@ int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* sendfrag,
|
||||
size_t* size,
|
||||
int flags)
|
||||
{
|
||||
/* message header */
|
||||
size_t size_in = *size;
|
||||
size_t size_out;
|
||||
mca_ptl_base_header_t *hdr;
|
||||
struct iovec iov;
|
||||
int header_length;
|
||||
|
||||
D_PRINT("");
|
||||
#if 0
|
||||
|
||||
mca_ptl_base_header_t* hdr = &sendfrag->frag_header;
|
||||
/* Start of the IB buffer */
|
||||
hdr = (mca_ptl_base_header_t *) &sendfrag->ib_buf.buf[0];
|
||||
|
||||
if(offset == 0) {
|
||||
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
|
||||
@ -59,6 +59,9 @@ int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* sendfrag,
|
||||
hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag;
|
||||
hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed;
|
||||
hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence;
|
||||
|
||||
header_length = sizeof(mca_ptl_base_match_header_t);
|
||||
|
||||
} else {
|
||||
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
|
||||
hdr->hdr_common.hdr_flags = flags;
|
||||
@ -68,6 +71,8 @@ int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* sendfrag,
|
||||
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
|
||||
hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match;
|
||||
|
||||
header_length = sizeof(mca_ptl_base_frag_header_t);
|
||||
}
|
||||
|
||||
/* initialize convertor */
|
||||
@ -84,7 +89,7 @@ int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* sendfrag,
|
||||
convertor = &sendreq->req_convertor;
|
||||
} else {
|
||||
|
||||
convertor = &sendfrag->frag_convertor;
|
||||
convertor = &sendfrag->frag_send.frag_base.frag_convertor;
|
||||
ompi_convertor_copy(&sendreq->req_convertor, convertor);
|
||||
ompi_convertor_init_for_send(
|
||||
convertor,
|
||||
@ -100,33 +105,72 @@ int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* sendfrag,
|
||||
* into users buffer - otherwise will return an allocated buffer
|
||||
* that holds the packed data
|
||||
*/
|
||||
sendfrag->frag_vec[1].iov_base = NULL;
|
||||
sendfrag->frag_vec[1].iov_len = size_in;
|
||||
if((rc = ompi_convertor_pack(convertor, &sendfrag->frag_vec[1], 1)) < 0)
|
||||
iov.iov_base = &sendfrag->ib_buf.buf[header_length];
|
||||
iov.iov_len = size_in;
|
||||
|
||||
if((rc = ompi_convertor_pack(convertor,
|
||||
&iov, 1))
|
||||
< 0) {
|
||||
|
||||
ompi_output(0, "Unable to pack data");
|
||||
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* adjust size and request offset to reflect actual
|
||||
* number of bytes packed by convertor */
|
||||
size_out = sendfrag->frag_vec[1].iov_len;
|
||||
size_out = iov.iov_len;
|
||||
} else {
|
||||
size_out = size_in;
|
||||
}
|
||||
|
||||
hdr->hdr_frag.hdr_frag_length = size_out;
|
||||
|
||||
/* fragment state */
|
||||
sendfrag->frag_owner = &ptl_peer->peer_ptl->super;
|
||||
sendfrag->frag_send.frag_base.frag_owner =
|
||||
&ptl_peer->peer_module->super;
|
||||
sendfrag->frag_send.frag_request = sendreq;
|
||||
sendfrag->frag_send.frag_base.frag_addr = sendfrag->frag_vec[1].iov_base;
|
||||
|
||||
sendfrag->frag_send.frag_base.frag_addr = iov.iov_base;
|
||||
sendfrag->frag_send.frag_base.frag_size = size_out;
|
||||
|
||||
sendfrag->frag_peer = ptl_peer;
|
||||
sendfrag->frag_vec_ptr = sendfrag->frag_vec;
|
||||
sendfrag->frag_vec_cnt = (size_out == 0) ? 1 : 2;
|
||||
sendfrag->frag_vec[0].iov_base = (ompi_iov_base_ptr_t)hdr;
|
||||
sendfrag->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t);
|
||||
sendfrag->frag_send.frag_base.frag_peer = ptl_peer;
|
||||
sendfrag->frag_progressed = 0;
|
||||
|
||||
*size = size_out;
|
||||
#endif
|
||||
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Allocate a IB send descriptor
|
||||
*
|
||||
*/
|
||||
mca_ptl_ib_send_frag_t* mca_ptl_ib_alloc_send_frag(
|
||||
mca_ptl_base_module_t* ptl,
|
||||
mca_pml_base_send_request_t* request)
|
||||
{
|
||||
ompi_free_list_t *flist;
|
||||
ompi_list_item_t *item;
|
||||
mca_ptl_ib_send_frag_t *ib_send_frag;
|
||||
|
||||
D_PRINT("");
|
||||
|
||||
flist = &((mca_ptl_ib_module_t *)ptl)->send_free;
|
||||
|
||||
item = ompi_list_remove_first(&((flist)->super));
|
||||
|
||||
while(NULL == item) {
|
||||
mca_ptl_tstamp_t tstamp = 0;
|
||||
|
||||
D_PRINT("Gone one NULL descriptor ... trying again");
|
||||
|
||||
ptl->ptl_component->ptlm_progress (tstamp);
|
||||
item = ompi_list_remove_first (&((flist)->super));
|
||||
}
|
||||
|
||||
ib_send_frag = (mca_ptl_ib_send_frag_t *)item;
|
||||
|
||||
return ib_send_frag;
|
||||
}
|
||||
|
@ -5,7 +5,7 @@
|
||||
#include "mca/pml/base/pml_base_sendreq.h"
|
||||
#include "mca/ptl/base/ptl_base_sendfrag.h"
|
||||
|
||||
#include "ptl_ib_vapi.h"
|
||||
#include "ptl_ib_priv.h"
|
||||
|
||||
OBJ_CLASS_DECLARATION(mca_ptl_ib_send_frag_t);
|
||||
|
||||
@ -13,11 +13,14 @@ OBJ_CLASS_DECLARATION(mca_ptl_ib_send_frag_t);
|
||||
* IB send fragment derived type.
|
||||
*/
|
||||
struct mca_ptl_ib_send_frag_t {
|
||||
mca_ptl_base_send_frag_t frag_send; /**< base send fragment descriptor */
|
||||
struct iovec *frag_vec_ptr;
|
||||
size_t frag_vec_cnt;
|
||||
struct iovec frag_vec[2];
|
||||
volatile int frag_progressed;
|
||||
mca_ptl_base_send_frag_t frag_send;
|
||||
/**< base send fragment descriptor */
|
||||
|
||||
ib_buffer_t ib_buf;
|
||||
/**< IB buffer attached to this frag */
|
||||
|
||||
volatile int frag_progressed;
|
||||
bool frag_ack_pending;
|
||||
};
|
||||
typedef struct mca_ptl_ib_send_frag_t mca_ptl_ib_send_frag_t;
|
||||
|
||||
@ -40,5 +43,17 @@ int mca_ptl_ib_send_frag_init(
|
||||
size_t* size,
|
||||
int flags);
|
||||
|
||||
/**
|
||||
* Initialize a fragment descriptor.
|
||||
*
|
||||
* request (IN) PML base send request
|
||||
* ptl (IN) PTL module
|
||||
* RETURN mca_ptl_ib_send_frag_t*
|
||||
*
|
||||
*/
|
||||
|
||||
mca_ptl_ib_send_frag_t* mca_ptl_ib_alloc_send_frag(
|
||||
mca_ptl_base_module_t* ptl,
|
||||
mca_pml_base_send_request_t* request);
|
||||
|
||||
#endif
|
||||
|
@ -18,8 +18,10 @@ OBJ_CLASS_DECLARATION(mca_ptl_ib_send_request_t);
|
||||
* fragment on every send request.
|
||||
*/
|
||||
struct mca_ptl_ib_send_request_t {
|
||||
mca_pml_base_send_request_t super;
|
||||
mca_ptl_ib_send_frag_t req_frag; /* first fragment */
|
||||
mca_pml_base_send_request_t super;
|
||||
|
||||
mca_ptl_ib_send_frag_t *req_frag;
|
||||
/* first fragment */
|
||||
};
|
||||
typedef struct mca_ptl_ib_send_request_t mca_ptl_ib_send_request_t;
|
||||
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user