Arbitrary size message implementation.
Implemented start->ack->put. still working on fin. This commit was SVN r2769.
Этот коммит содержится в:
родитель
42300f7489
Коммит
83dc2da533
@ -31,7 +31,7 @@ mca_ptl_ib_module_t mca_ptl_ib_module = {
|
||||
mca_ptl_ib_del_procs,
|
||||
mca_ptl_ib_finalize,
|
||||
mca_ptl_ib_send,
|
||||
mca_ptl_ib_send,
|
||||
mca_ptl_ib_put,
|
||||
NULL,
|
||||
mca_ptl_ib_matched,
|
||||
mca_ptl_ib_request_init,
|
||||
@ -39,6 +39,66 @@ mca_ptl_ib_module_t mca_ptl_ib_module = {
|
||||
}
|
||||
};
|
||||
|
||||
/*
|
||||
* 1. RDMA local buffer to remote buffer address.
|
||||
* 2. Generate a FIN
|
||||
*/
|
||||
|
||||
int mca_ptl_ib_put( struct mca_ptl_base_module_t* ptl,
|
||||
struct mca_ptl_base_peer_t* ptl_peer,
|
||||
struct mca_pml_base_send_request_t* req, size_t offset,
|
||||
size_t size, int flags)
|
||||
{
|
||||
int rc;
|
||||
mca_ptl_ib_send_frag_t *send_frag, *send_frag_fin;
|
||||
mca_ptl_ib_state_t *ib_state;
|
||||
mca_ptl_ib_peer_conn_t *peer_conn;
|
||||
void *local_addr, *remote_addr;
|
||||
VAPI_rkey_t rkey;
|
||||
|
||||
/* RDMA the data over to the peer */
|
||||
send_frag = mca_ptl_ib_alloc_send_frag(ptl, req);
|
||||
|
||||
if(NULL == send_frag) {
|
||||
ompi_output(0, "Unable to allocate send descriptor");
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
A_PRINT("IB put to %p, rkey : %d",
|
||||
req->req_peer_addr.pval,
|
||||
*(VAPI_rkey_t *)(((mca_ptl_ib_send_request_t *)req)->req_buf));
|
||||
|
||||
ib_state = ((mca_ptl_ib_module_t *)ptl)->ib_state;
|
||||
peer_conn = ((mca_ptl_ib_peer_t *)ptl_peer)->peer_conn;
|
||||
local_addr = (void*) ((char*) req->req_base.req_addr + offset);
|
||||
remote_addr = (void*) req->req_peer_addr.pval;
|
||||
rkey = *(VAPI_rkey_t *)(((mca_ptl_ib_send_request_t *)req)->req_buf);
|
||||
|
||||
rc = mca_ptl_ib_rdma_write(ib_state, peer_conn,
|
||||
&send_frag->ib_buf, local_addr, size, remote_addr, rkey);
|
||||
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* Send FIN to receiver */
|
||||
send_frag_fin = mca_ptl_ib_alloc_send_frag(ptl, req);
|
||||
|
||||
if(NULL == send_frag_fin) {
|
||||
ompi_output(0, "Unable to allocate send descriptor");
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
rc = mca_ptl_ib_put_frag_init(send_frag_fin, ptl_peer,
|
||||
req, offset, &size, flags);
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* Update offset */
|
||||
req->req_offset += size;
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int mca_ptl_ib_add_procs(struct mca_ptl_base_module_t* base_module,
|
||||
size_t nprocs, struct ompi_proc_t **ompi_procs,
|
||||
@ -131,6 +191,7 @@ int mca_ptl_ib_request_init( struct mca_ptl_base_module_t* ptl,
|
||||
|
||||
ib_send_req = (mca_ptl_ib_send_request_t *) request;
|
||||
ib_send_req->req_frag = ib_send_frag;
|
||||
memset(ib_send_req->req_buf, 7, 8);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
@ -165,12 +226,12 @@ int mca_ptl_ib_send( struct mca_ptl_base_module_t* ptl,
|
||||
((mca_ptl_ib_send_request_t*)sendreq)->req_frag;
|
||||
} else {
|
||||
|
||||
/* TODO: Implementation for messages > frag size */
|
||||
ompi_list_item_t* item;
|
||||
OMPI_FREE_LIST_GET(&mca_ptl_ib_component.ib_send_frags, item, rc);
|
||||
/* Implementation for messages > frag size */
|
||||
sendfrag = mca_ptl_ib_alloc_send_frag(ptl,
|
||||
sendreq);
|
||||
|
||||
if(NULL == (sendfrag = (mca_ptl_ib_send_frag_t*)item)) {
|
||||
return rc;
|
||||
if(NULL == sendfrag) {
|
||||
ompi_output(0,"Unable to allocate send fragment");
|
||||
}
|
||||
}
|
||||
|
||||
@ -190,6 +251,84 @@ int mca_ptl_ib_send( struct mca_ptl_base_module_t* ptl,
|
||||
}
|
||||
|
||||
|
||||
static void mca_ptl_ib_start_ack(mca_ptl_base_module_t *module,
|
||||
mca_ptl_ib_send_frag_t *send_frag,
|
||||
mca_ptl_ib_recv_frag_t *recv_frag)
|
||||
{
|
||||
mca_ptl_base_header_t *hdr;
|
||||
mca_pml_base_recv_request_t *request;
|
||||
mca_ptl_ib_peer_t *ib_peer;
|
||||
ib_buffer_t *ib_buf;
|
||||
int recv_len;
|
||||
int len_to_reg, len_added = 0;
|
||||
void *addr_to_reg, *ack_buf;
|
||||
|
||||
A_PRINT("");
|
||||
|
||||
/* Header starts at beginning of registered
|
||||
* buffer space */
|
||||
|
||||
hdr = (mca_ptl_base_header_t *)
|
||||
&send_frag->ib_buf.buf[0];
|
||||
|
||||
request = recv_frag->super.frag_request;
|
||||
|
||||
/* Amount of data we have already received */
|
||||
recv_len =
|
||||
recv_frag->super.frag_base.frag_header.hdr_frag.hdr_frag_length;
|
||||
|
||||
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK;
|
||||
hdr->hdr_common.hdr_flags = 0;
|
||||
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t);
|
||||
|
||||
/* Remote side send descriptor */
|
||||
hdr->hdr_ack.hdr_src_ptr =
|
||||
recv_frag->super.frag_base.frag_header.hdr_frag.hdr_src_ptr;
|
||||
|
||||
/* Matched request from recv side */
|
||||
hdr->hdr_ack.hdr_dst_match.lval = 0;
|
||||
hdr->hdr_ack.hdr_dst_match.pval = request;
|
||||
|
||||
hdr->hdr_ack.hdr_dst_addr.lval = 0;
|
||||
|
||||
addr_to_reg = (void*)((char*)request->req_base.req_addr + recv_len);
|
||||
hdr->hdr_ack.hdr_dst_addr.pval = addr_to_reg;
|
||||
|
||||
len_to_reg = request->req_bytes_packed - recv_len;
|
||||
hdr->hdr_ack.hdr_dst_size = len_to_reg;
|
||||
|
||||
A_PRINT("Dest addr : %p, RDMA Len : %d",
|
||||
hdr->hdr_ack.hdr_dst_addr.pval,
|
||||
hdr->hdr_ack.hdr_dst_size);
|
||||
|
||||
ack_buf = (void*) ((char*) (&send_frag->ib_buf.buf[0]) +
|
||||
sizeof(mca_ptl_base_ack_header_t));
|
||||
|
||||
/* Prepare ACK packet with IB specific stuff */
|
||||
mca_ptl_ib_prepare_ack(((mca_ptl_ib_module_t *)module)->ib_state,
|
||||
addr_to_reg, len_to_reg,
|
||||
ack_buf, &len_added);
|
||||
|
||||
/* Send it right away! */
|
||||
ib_peer = (mca_ptl_ib_peer_t *)
|
||||
recv_frag->super.frag_base.frag_peer;
|
||||
|
||||
ib_buf = &send_frag->ib_buf;
|
||||
|
||||
IB_SET_SEND_DESC_LEN(ib_buf,
|
||||
(sizeof(mca_ptl_base_ack_header_t) + len_added));
|
||||
|
||||
mca_ptl_ib_post_send(((mca_ptl_ib_module_t *)module)->ib_state,
|
||||
ib_peer->peer_conn,
|
||||
&send_frag->ib_buf, send_frag);
|
||||
|
||||
/* fragment state */
|
||||
send_frag->frag_send.frag_base.frag_owner = module;
|
||||
send_frag->frag_send.frag_base.frag_peer = recv_frag->super.frag_base.frag_peer;
|
||||
send_frag->frag_send.frag_base.frag_addr = NULL;
|
||||
send_frag->frag_send.frag_base.frag_size = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* A posted receive has been matched - if required send an
|
||||
* ack back to the peer and process the fragment. Copy the
|
||||
@ -210,7 +349,18 @@ void mca_ptl_ib_matched(mca_ptl_base_module_t* module,
|
||||
D_PRINT("Matched frag\n");
|
||||
|
||||
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) {
|
||||
D_PRINT("Doh, I cannot send an ack!\n");
|
||||
mca_ptl_ib_send_frag_t *send_frag;
|
||||
|
||||
send_frag = mca_ptl_ib_alloc_send_frag(module, NULL);
|
||||
|
||||
if(NULL == send_frag) {
|
||||
ompi_output(0, "Cannot get send descriptor");
|
||||
} else {
|
||||
mca_ptl_ib_start_ack(module, send_frag, recv_frag);
|
||||
}
|
||||
|
||||
/* Basic ACK scheme */
|
||||
A_PRINT("Doh, I cannot send an ack!\n");
|
||||
}
|
||||
|
||||
/* Process the fragment */
|
||||
|
@ -270,6 +270,25 @@ extern int mca_ptl_ib_send(
|
||||
int flags
|
||||
);
|
||||
|
||||
/**
|
||||
* PML->PTL Initiate a put of the specified size.
|
||||
*
|
||||
* @param ptl (IN) PTL instance
|
||||
* @param ptl_base_peer (IN) PTL peer addressing
|
||||
* @param send_request (IN/OUT) Send request (allocated by PML via mca_ptl_base_request_alloc_fn_t)
|
||||
* @param size (IN) Number of bytes PML is requesting PTL to deliver
|
||||
* @param flags (IN) Flags that should be passed to the peer via the message header.
|
||||
* @param request (OUT) OMPI_SUCCESS if the PTL was able to queue one or more fragments
|
||||
*/
|
||||
extern int mca_ptl_ib_put(
|
||||
struct mca_ptl_base_module_t* ptl,
|
||||
struct mca_ptl_base_peer_t* ptl_peer,
|
||||
struct mca_pml_base_send_request_t*,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
int flags
|
||||
);
|
||||
|
||||
/**
|
||||
* Return a recv fragment to the modules free list.
|
||||
*
|
||||
|
@ -87,12 +87,16 @@ int mca_ptl_ib_component_open(void)
|
||||
/* register super component parameters */
|
||||
mca_ptl_ib_module.super.ptl_exclusivity =
|
||||
mca_ptl_ib_param_register_int ("exclusivity", 0);
|
||||
|
||||
mca_ptl_ib_module.super.ptl_first_frag_size =
|
||||
mca_ptl_ib_param_register_int ("first_frag_size",
|
||||
(16384 - sizeof(mca_ptl_base_header_t))/*magic*/);
|
||||
(MCA_PTL_IB_FIRST_FRAG_SIZE
|
||||
- sizeof(mca_ptl_base_header_t)));
|
||||
|
||||
mca_ptl_ib_module.super.ptl_min_frag_size =
|
||||
mca_ptl_ib_param_register_int ("min_frag_size",
|
||||
(4096 - sizeof(mca_ptl_base_header_t))/*magic*/);
|
||||
(4096 - sizeof(mca_ptl_base_header_t)));
|
||||
|
||||
mca_ptl_ib_module.super.ptl_max_frag_size =
|
||||
mca_ptl_ib_param_register_int ("max_frag_size", 2<<30);
|
||||
|
||||
@ -357,7 +361,7 @@ int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp)
|
||||
/* Process a completed send */
|
||||
mca_ptl_ib_process_send_comp(
|
||||
(mca_ptl_base_module_t *) module,
|
||||
comp_addr, &(module->send_free));
|
||||
comp_addr);
|
||||
|
||||
break;
|
||||
case IB_COMP_RECV :
|
||||
@ -370,6 +374,13 @@ int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp)
|
||||
mca_ptl_ib_buffer_repost(module->ib_state->nic,
|
||||
comp_addr);
|
||||
|
||||
break;
|
||||
case IB_COMP_RDMA_W :
|
||||
|
||||
mca_ptl_ib_process_rdma_w_comp(
|
||||
(mca_ptl_base_module_t *) module,
|
||||
comp_addr);
|
||||
|
||||
break;
|
||||
case IB_COMP_NOTHING:
|
||||
break;
|
||||
|
@ -461,7 +461,6 @@ int mca_ptl_ib_peer_send(mca_ptl_base_peer_t* peer,
|
||||
{
|
||||
int rc;
|
||||
|
||||
D_PRINT("");
|
||||
|
||||
OMPI_THREAD_LOCK(&peer->peer_send_lock);
|
||||
|
||||
@ -510,6 +509,10 @@ int mca_ptl_ib_peer_send(mca_ptl_base_peer_t* peer,
|
||||
peer->peer_conn,
|
||||
&frag->ib_buf, (void*) frag);
|
||||
|
||||
A_PRINT("Send to : %d, len : %d",
|
||||
peer->peer_proc->proc_guid.vpid,
|
||||
frag->ib_buf.desc.sg_entry.len);
|
||||
|
||||
break;
|
||||
default:
|
||||
rc = OMPI_ERR_UNREACH;
|
||||
|
@ -84,7 +84,8 @@ void mca_ptl_ib_progress_send_frags(mca_ptl_ib_peer_t*);
|
||||
|
||||
#define DUMP_PEER(peer_ptr) { \
|
||||
ompi_output(0, "[%s:%d] ", __FILE__, __LINE__); \
|
||||
ompi_output(0, "Dumping peer state"); \
|
||||
ompi_output(0, "Dumping peer %d state", \
|
||||
peer->peer_proc->proc_guid.vpid); \
|
||||
ompi_output(0, "Local QP hndl : %d", \
|
||||
peer_ptr->peer_conn->lres->qp_hndl); \
|
||||
ompi_output(0, "Local QP num : %d", \
|
||||
|
@ -478,7 +478,8 @@ int mca_ptl_ib_peer_connect(mca_ptl_ib_state_t *ib_state,
|
||||
|
||||
rc = mca_ptl_ib_register_mem(ib_state->nic, ib_state->ptag,
|
||||
(void*) peer_conn->lres->recv[i].buf,
|
||||
4096, &peer_conn->lres->recv[i].hndl);
|
||||
MCA_PTL_IB_FIRST_FRAG_SIZE,
|
||||
&peer_conn->lres->recv[i].hndl);
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
@ -517,7 +518,9 @@ int mca_ptl_ib_post_send(mca_ptl_ib_state_t *ib_state,
|
||||
|
||||
IB_SET_SEND_DESC_ID(ib_buf, addr);
|
||||
|
||||
D_PRINT("length : %d", ib_buf->desc.sg_entry.len);
|
||||
D_PRINT("length : %d, qp_num = %d",
|
||||
ib_buf->desc.sg_entry.len,
|
||||
(peer_conn->rres->qp_num));
|
||||
|
||||
ret = VAPI_post_sr(ib_state->nic,
|
||||
peer_conn->lres->qp_hndl,
|
||||
@ -555,16 +558,21 @@ void mca_ptl_ib_drain_network(VAPI_hca_hndl_t nic,
|
||||
*comp_type = IB_COMP_SEND;
|
||||
*comp_addr = (void*) (unsigned int) comp.id;
|
||||
|
||||
}
|
||||
else if(VAPI_CQE_RQ_SEND_DATA == comp.opcode) {
|
||||
D_PRINT("Received message completion len = %d, id : %d\n",
|
||||
} else if(VAPI_CQE_RQ_SEND_DATA == comp.opcode) {
|
||||
A_PRINT("Received message completion len = %d, id : %d\n",
|
||||
comp.byte_len, comp.id);
|
||||
|
||||
*comp_type = IB_COMP_RECV;
|
||||
*comp_addr = (void*) (unsigned int) comp.id;
|
||||
}
|
||||
else {
|
||||
D_PRINT("Got Unknown completion! Opcode : %d\n",
|
||||
|
||||
} else if(VAPI_CQE_SQ_RDMA_WRITE == comp.opcode) {
|
||||
|
||||
A_PRINT("RDMA Write completion");
|
||||
*comp_type = IB_COMP_RDMA_W;
|
||||
*comp_addr = (void*) (unsigned int) comp.id;
|
||||
|
||||
} else {
|
||||
ompi_output(0, "Got Unknown completion! Opcode : %d\n",
|
||||
comp.opcode);
|
||||
|
||||
*comp_type = IB_COMP_ERROR;
|
||||
@ -597,3 +605,64 @@ void mca_ptl_ib_buffer_repost(VAPI_hca_hndl_t nic,
|
||||
ompi_output(0, "Error in buffer reposting");
|
||||
}
|
||||
}
|
||||
|
||||
void mca_ptl_ib_prepare_ack(mca_ptl_ib_state_t *ib_state,
|
||||
void* addr_to_reg, int len_to_reg,
|
||||
void* ack_buf, int* len_added)
|
||||
{
|
||||
int rc;
|
||||
vapi_memhandle_t memhandle;
|
||||
|
||||
rc = mca_ptl_ib_register_mem(ib_state->nic, ib_state->ptag,
|
||||
addr_to_reg, len_to_reg, &memhandle);
|
||||
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
ompi_output(0, "Error in registering");
|
||||
}
|
||||
|
||||
A_PRINT("Sending Remote key : %d", memhandle.rkey);
|
||||
|
||||
memcpy(ack_buf,(void*) &memhandle.rkey, sizeof(VAPI_rkey_t));
|
||||
|
||||
*len_added = sizeof(VAPI_lkey_t);
|
||||
}
|
||||
|
||||
int mca_ptl_ib_rdma_write(mca_ptl_ib_state_t *ib_state,
|
||||
mca_ptl_ib_peer_conn_t *peer_conn, ib_buffer_t *ib_buf,
|
||||
void* send_buf, size_t send_len, void* remote_buf,
|
||||
VAPI_rkey_t remote_key)
|
||||
{
|
||||
VAPI_ret_t ret;
|
||||
VAPI_mrw_t mr_in, mr_out;
|
||||
vapi_memhandle_t mem_handle;
|
||||
|
||||
/* Register local application buffer */
|
||||
mr_in.acl = VAPI_EN_LOCAL_WRITE | VAPI_EN_REMOTE_WRITE;
|
||||
mr_in.l_key = 0;
|
||||
mr_in.r_key = 0;
|
||||
mr_in.pd_hndl = ib_state->ptag;
|
||||
mr_in.size = send_len;
|
||||
mr_in.start = (VAPI_virt_addr_t) (MT_virt_addr_t) send_buf;
|
||||
mr_in.type = VAPI_MR;
|
||||
|
||||
ret = VAPI_register_mr(ib_state->nic, &mr_in,
|
||||
&mem_handle.hndl, &mr_out);
|
||||
if(VAPI_OK != ret) {
|
||||
MCA_PTL_IB_VAPI_RET(ret, "VAPI_register_mr");
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* Prepare descriptor */
|
||||
IB_PREPARE_RDMA_W_DESC(ib_buf, (peer_conn->rres->qp_num),
|
||||
send_len, send_buf, (mr_out.l_key), remote_key, remote_buf);
|
||||
|
||||
ret = VAPI_post_sr(ib_state->nic,
|
||||
peer_conn->lres->qp_hndl,
|
||||
&ib_buf->desc.sr);
|
||||
if(ret != VAPI_OK) {
|
||||
MCA_PTL_IB_VAPI_RET(ret, "VAPI_post_sr");
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -8,6 +8,8 @@
|
||||
#define NUM_IB_SEND_BUF (10)
|
||||
#define NUM_IB_RECV_BUF (10)
|
||||
|
||||
#define MCA_PTL_IB_FIRST_FRAG_SIZE (4096)
|
||||
|
||||
struct mca_ptl_ib_state_t {
|
||||
VAPI_hca_id_t hca_id;
|
||||
/* ID of HCA */
|
||||
@ -40,6 +42,7 @@ typedef enum {
|
||||
IB_COMP_ERROR,
|
||||
IB_COMP_RECV,
|
||||
IB_COMP_SEND,
|
||||
IB_COMP_RDMA_W,
|
||||
IB_COMP_NOTHING
|
||||
} IB_comp_t;
|
||||
|
||||
@ -80,7 +83,7 @@ struct ib_buffer_t {
|
||||
vapi_memhandle_t hndl;
|
||||
/* Buffer handle */
|
||||
|
||||
char buf[4096];
|
||||
char buf[MCA_PTL_IB_FIRST_FRAG_SIZE];
|
||||
/* Buffer space */
|
||||
|
||||
VAPI_qp_hndl_t qp_hndl;
|
||||
@ -153,7 +156,7 @@ typedef struct mca_ptl_ib_peer_conn_t mca_ptl_ib_peer_conn_t;
|
||||
(MT_virt_addr_t) ib_buf_ptr; \
|
||||
ib_buf_ptr->desc.rr.sg_lst_len = 1; \
|
||||
ib_buf_ptr->desc.rr.sg_lst_p = &ib_buf_ptr->desc.sg_entry; \
|
||||
ib_buf_ptr->desc.sg_entry.len = 4096; \
|
||||
ib_buf_ptr->desc.sg_entry.len = MCA_PTL_IB_FIRST_FRAG_SIZE; \
|
||||
ib_buf_ptr->desc.sg_entry.lkey = ib_buf_ptr->hndl.lkey; \
|
||||
ib_buf_ptr->desc.sg_entry.addr = (VAPI_virt_addr_t) \
|
||||
(MT_virt_addr_t) ib_buf_ptr->buf; \
|
||||
@ -187,6 +190,24 @@ typedef struct mca_ptl_ib_peer_conn_t mca_ptl_ib_peer_conn_t;
|
||||
ib_buf_ptr->desc.sg_entry.len = msg_len; \
|
||||
}
|
||||
|
||||
#define IB_PREPARE_RDMA_W_DESC(ib_buf_ptr, qp, \
|
||||
msg_len, user_buf, local_key, remote_key, remote_buf) { \
|
||||
ib_buf_ptr->desc.sr.comp_type = VAPI_SIGNALED; \
|
||||
ib_buf_ptr->desc.sr.opcode = VAPI_RDMA_WRITE; \
|
||||
ib_buf_ptr->desc.sr.remote_qkey = 0; \
|
||||
ib_buf_ptr->desc.sr.remote_qp = qp; \
|
||||
ib_buf_ptr->desc.sr.id = (VAPI_virt_addr_t) \
|
||||
(MT_virt_addr_t) ib_buf_ptr; \
|
||||
ib_buf_ptr->desc.sr.sg_lst_len = 1; \
|
||||
ib_buf_ptr->desc.sr.sg_lst_p = &ib_buf_ptr->desc.sg_entry; \
|
||||
ib_buf_ptr->desc.sg_entry.len = msg_len; \
|
||||
ib_buf_ptr->desc.sg_entry.lkey = local_key; \
|
||||
ib_buf_ptr->desc.sg_entry.addr = (VAPI_virt_addr_t) \
|
||||
(MT_virt_addr_t) user_buf; \
|
||||
ib_buf_ptr->desc.sr.remote_addr = remote_buf; \
|
||||
ib_buf_ptr->desc.sr.r_key = remote_key; \
|
||||
}
|
||||
|
||||
|
||||
int mca_ptl_ib_init_module(mca_ptl_ib_state_t*, int);
|
||||
int mca_ptl_ib_get_num_hcas(uint32_t*);
|
||||
@ -202,5 +223,7 @@ void mca_ptl_ib_drain_network(VAPI_hca_hndl_t nic,
|
||||
VAPI_cq_hndl_t cq_hndl, int* comp_type, void** comp_addr);
|
||||
void mca_ptl_ib_buffer_repost(VAPI_hca_hndl_t nic,
|
||||
void* addr);
|
||||
|
||||
void mca_ptl_ib_prepare_ack(mca_ptl_ib_state_t *ib_state,
|
||||
void* addr_to_reg, int len_to_reg,
|
||||
void* ack_buf, int* len_added);
|
||||
#endif /* MCA_PTL_IB_PRIV_H */
|
||||
|
@ -46,26 +46,15 @@ mca_ptl_ib_recv_frag_done (mca_ptl_base_header_t *header,
|
||||
(ompi_list_item_t*)frag);
|
||||
}
|
||||
|
||||
/*
|
||||
* Process incoming receive fragments
|
||||
*
|
||||
*/
|
||||
|
||||
void mca_ptl_ib_process_recv(mca_ptl_base_module_t *module, void* addr)
|
||||
static void mca_ptl_ib_data_frag(mca_ptl_base_module_t *module,
|
||||
mca_ptl_base_header_t *header)
|
||||
{
|
||||
bool matched;
|
||||
int rc;
|
||||
ib_buffer_t *ib_buf;
|
||||
mca_ptl_base_header_t *header;
|
||||
ompi_list_item_t *item;
|
||||
mca_ptl_ib_recv_frag_t *recv_frag;
|
||||
|
||||
D_PRINT("");
|
||||
|
||||
ib_buf = (ib_buffer_t *) (unsigned int) addr;
|
||||
|
||||
header = (mca_ptl_base_header_t *) &ib_buf->buf[0];
|
||||
|
||||
OMPI_FREE_LIST_GET(&mca_ptl_ib_component.ib_recv_frags,
|
||||
item, rc);
|
||||
|
||||
@ -77,8 +66,7 @@ void mca_ptl_ib_process_recv(mca_ptl_base_module_t *module, void* addr)
|
||||
}
|
||||
|
||||
recv_frag = (mca_ptl_ib_recv_frag_t *) item;
|
||||
recv_frag->super.frag_base.frag_owner =
|
||||
(mca_ptl_base_module_t *) module;
|
||||
recv_frag->super.frag_base.frag_owner = module;
|
||||
|
||||
recv_frag->super.frag_base.frag_peer = NULL;
|
||||
recv_frag->super.frag_request = NULL;
|
||||
@ -115,3 +103,69 @@ void mca_ptl_ib_process_recv(mca_ptl_base_module_t *module, void* addr)
|
||||
D_PRINT("Message matched!");
|
||||
}
|
||||
}
|
||||
|
||||
static void mca_ptl_ib_ctrl_frag(mca_ptl_base_module_t *module,
|
||||
mca_ptl_base_header_t *header)
|
||||
{
|
||||
mca_ptl_ib_send_frag_t *send_frag;
|
||||
mca_pml_base_send_request_t *req;
|
||||
void *data_ptr;
|
||||
|
||||
send_frag = (mca_ptl_ib_send_frag_t *)
|
||||
header->hdr_ack.hdr_src_ptr.pval;
|
||||
req = (mca_pml_base_send_request_t *)
|
||||
send_frag->frag_send.frag_request;
|
||||
|
||||
req->req_peer_match = header->hdr_ack.hdr_dst_match;
|
||||
req->req_peer_addr = header->hdr_ack.hdr_dst_addr;
|
||||
req->req_peer_size = header->hdr_ack.hdr_dst_size;
|
||||
|
||||
/* Locate data in the ACK buffer */
|
||||
data_ptr = (void*)
|
||||
((char*) header + sizeof(mca_ptl_base_ack_header_t));
|
||||
|
||||
/* Copy over data to request buffer */
|
||||
memcpy(((mca_ptl_ib_send_request_t *) req)->req_buf,
|
||||
data_ptr, sizeof(VAPI_rkey_t));
|
||||
|
||||
/* Progress & release fragments */
|
||||
mca_ptl_ib_process_send_comp(module, (void*) send_frag);
|
||||
}
|
||||
|
||||
static void mca_ptl_ib_last_frag(mca_ptl_base_module_t *module,
|
||||
mca_ptl_base_header_t *header)
|
||||
{
|
||||
}
|
||||
|
||||
/*
|
||||
* Process incoming receive fragments
|
||||
*
|
||||
*/
|
||||
|
||||
void mca_ptl_ib_process_recv(mca_ptl_base_module_t *module, void* addr)
|
||||
{
|
||||
ib_buffer_t *ib_buf;
|
||||
mca_ptl_base_header_t *header;
|
||||
|
||||
D_PRINT("");
|
||||
|
||||
ib_buf = (ib_buffer_t *) addr;
|
||||
|
||||
header = (mca_ptl_base_header_t *) &ib_buf->buf[0];
|
||||
|
||||
switch(header->hdr_common.hdr_type) {
|
||||
case MCA_PTL_HDR_TYPE_MATCH :
|
||||
case MCA_PTL_HDR_TYPE_FRAG :
|
||||
mca_ptl_ib_data_frag(module, header);
|
||||
break;
|
||||
case MCA_PTL_HDR_TYPE_ACK :
|
||||
mca_ptl_ib_ctrl_frag(module, header);
|
||||
break;
|
||||
case MCA_PTL_HDR_TYPE_FIN :
|
||||
mca_ptl_ib_last_frag(module, header);
|
||||
break;
|
||||
default :
|
||||
ompi_output(0, "Unknown fragment type");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,8 @@
|
||||
#include "mca/ptl/ptl.h"
|
||||
#include "mca/ptl/base/ptl_base_recvfrag.h"
|
||||
|
||||
#define MCA_PTL_IB_UNEX_BUF_SIZE (4096)
|
||||
|
||||
OBJ_CLASS_DECLARATION(mca_ptl_ib_recv_frag_t);
|
||||
|
||||
/**
|
||||
@ -12,7 +14,7 @@ OBJ_CLASS_DECLARATION(mca_ptl_ib_recv_frag_t);
|
||||
struct mca_ptl_ib_recv_frag_t {
|
||||
mca_ptl_base_recv_frag_t super;
|
||||
/**< base receive fragment descriptor */
|
||||
char unex_buf[4096];
|
||||
char unex_buf[MCA_PTL_IB_UNEX_BUF_SIZE];
|
||||
/**< Unexpected buffer */
|
||||
};
|
||||
typedef struct mca_ptl_ib_recv_frag_t mca_ptl_ib_recv_frag_t;
|
||||
|
@ -53,7 +53,10 @@ int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* sendfrag,
|
||||
hdr->hdr_frag.hdr_frag_offset = offset;
|
||||
hdr->hdr_frag.hdr_frag_seq = 0;
|
||||
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
|
||||
/* Ptr to send frag, so incoming ACK can locate the frag */
|
||||
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
|
||||
|
||||
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
|
||||
hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid;
|
||||
hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank;
|
||||
@ -65,6 +68,7 @@ int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* sendfrag,
|
||||
header_length = sizeof(mca_ptl_base_match_header_t);
|
||||
|
||||
} else {
|
||||
|
||||
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
|
||||
hdr->hdr_common.hdr_flags = flags;
|
||||
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_frag_header_t);
|
||||
@ -216,36 +220,154 @@ int mca_ptl_ib_register_send_frags(mca_ptl_base_module_t *ptl)
|
||||
|
||||
rc = mca_ptl_ib_register_mem(ib_state->nic, ib_state->ptag,
|
||||
(void*) ib_buf_ptr->buf,
|
||||
4096, &ib_buf_ptr->hndl);
|
||||
MCA_PTL_IB_FIRST_FRAG_SIZE,
|
||||
&ib_buf_ptr->hndl);
|
||||
if(rc != OMPI_SUCCESS) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
IB_PREPARE_SEND_DESC(ib_buf_ptr, 0, 4096);
|
||||
IB_PREPARE_SEND_DESC(ib_buf_ptr, 0,
|
||||
MCA_PTL_IB_FIRST_FRAG_SIZE);
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* Process RDMA Write completions
|
||||
*
|
||||
* Just return send fragment to free list
|
||||
*/
|
||||
|
||||
void mca_ptl_ib_process_rdma_w_comp(mca_ptl_base_module_t *module,
|
||||
void* comp_addr)
|
||||
{
|
||||
mca_ptl_ib_send_frag_t *sendfrag;
|
||||
ompi_free_list_t *flist;
|
||||
|
||||
sendfrag = (mca_ptl_ib_send_frag_t *) comp_addr;
|
||||
|
||||
flist = &(sendfrag->
|
||||
frag_send.frag_base.frag_peer->
|
||||
peer_module->send_free);
|
||||
|
||||
OMPI_FREE_LIST_RETURN(flist,
|
||||
((ompi_list_item_t *) sendfrag));
|
||||
}
|
||||
|
||||
/*
|
||||
* Process send completions
|
||||
*
|
||||
*/
|
||||
|
||||
void mca_ptl_ib_process_send_comp(mca_ptl_base_module_t *module,
|
||||
void* addr, ompi_free_list_t *flist)
|
||||
void* addr)
|
||||
{
|
||||
mca_ptl_ib_send_frag_t *sendfrag;
|
||||
mca_ptl_base_header_t *header;
|
||||
mca_pml_base_send_request_t *req;
|
||||
ompi_free_list_t *flist;
|
||||
|
||||
sendfrag = (mca_ptl_ib_send_frag_t *) (unsigned int) addr;
|
||||
sendfrag = (mca_ptl_ib_send_frag_t *) addr;
|
||||
header = (mca_ptl_base_header_t *) sendfrag->ib_buf.buf;
|
||||
|
||||
module->ptl_send_progress(module,
|
||||
sendfrag->frag_send.frag_request,
|
||||
header->hdr_frag.hdr_frag_length);
|
||||
req = (mca_pml_base_send_request_t *)
|
||||
sendfrag->frag_send.frag_request;
|
||||
|
||||
/* Return sendfrag to free list */
|
||||
flist = &(sendfrag->
|
||||
frag_send.frag_base.frag_peer->
|
||||
peer_module->send_free);
|
||||
|
||||
OMPI_FREE_LIST_RETURN(flist, ((ompi_list_item_t *) sendfrag));
|
||||
if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK) {
|
||||
/* Is this an ack descriptor ? */
|
||||
A_PRINT("Completion of send_ack");
|
||||
|
||||
OMPI_FREE_LIST_RETURN(flist,
|
||||
((ompi_list_item_t *) sendfrag));
|
||||
} else if(NULL == req) {
|
||||
/* An ack descriptor ? Don't know what to do! */
|
||||
OMPI_FREE_LIST_RETURN(flist,
|
||||
((ompi_list_item_t *) sendfrag));
|
||||
} else if (0 == (header->hdr_common.hdr_flags
|
||||
& MCA_PTL_FLAGS_ACK_MATCHED)
|
||||
|| mca_pml_base_send_request_matched(req)) {
|
||||
|
||||
module->ptl_send_progress(module,
|
||||
sendfrag->frag_send.frag_request,
|
||||
header->hdr_frag.hdr_frag_length);
|
||||
/* Return sendfrag to free list */
|
||||
|
||||
OMPI_FREE_LIST_RETURN(flist,
|
||||
((ompi_list_item_t *) sendfrag));
|
||||
} else {
|
||||
/* Not going to call progress on this send,
|
||||
* and not free-ing descriptor */
|
||||
A_PRINT("Why should I return sendfrag?");
|
||||
}
|
||||
}
|
||||
|
||||
int mca_ptl_ib_put_frag_init(mca_ptl_ib_send_frag_t *sendfrag,
|
||||
mca_ptl_base_peer_t *ptl_peer,
|
||||
mca_pml_base_send_request_t *req,
|
||||
size_t offset, size_t *size, int flags)
|
||||
{
|
||||
int rc;
|
||||
int size_in, size_out;
|
||||
mca_ptl_base_header_t *hdr;
|
||||
|
||||
size_in = *size;
|
||||
|
||||
hdr = (mca_ptl_base_header_t *)
|
||||
&sendfrag->ib_buf.buf[0];
|
||||
|
||||
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN;
|
||||
hdr->hdr_common.hdr_flags = flags;
|
||||
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_frag_header_t);
|
||||
hdr->hdr_frag.hdr_frag_offset = offset;
|
||||
hdr->hdr_frag.hdr_frag_seq = 0;
|
||||
hdr->hdr_frag.hdr_src_ptr.lval = 0;
|
||||
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
|
||||
hdr->hdr_frag.hdr_dst_ptr = req->req_peer_match;
|
||||
hdr->hdr_frag.hdr_frag_length = size_in;
|
||||
|
||||
if(size_in > 0 && 0) {
|
||||
struct iovec iov;
|
||||
ompi_convertor_t *convertor;
|
||||
|
||||
if( offset <= mca_ptl_ib_module.super.ptl_first_frag_size) {
|
||||
convertor = &req->req_convertor;
|
||||
} else {
|
||||
convertor = &sendfrag->frag_send.frag_base.frag_convertor;
|
||||
ompi_convertor_copy(&req->req_convertor, convertor);
|
||||
ompi_convertor_init_for_send(
|
||||
convertor,
|
||||
0,
|
||||
req->req_base.req_datatype,
|
||||
req->req_base.req_count,
|
||||
req->req_base.req_addr,
|
||||
offset);
|
||||
}
|
||||
iov.iov_base = &sendfrag->ib_buf.buf[sizeof(mca_ptl_base_frag_header_t)];
|
||||
iov.iov_len = size_in;
|
||||
|
||||
rc = ompi_convertor_pack(convertor, &iov, 1);
|
||||
if (rc < 0) {
|
||||
ompi_output (0, "[%s:%d] Unable to pack data\n",
|
||||
__FILE__, __LINE__);
|
||||
return;
|
||||
}
|
||||
size_out = iov.iov_len;
|
||||
} else {
|
||||
size_out = size_in;
|
||||
}
|
||||
|
||||
*size = size_out;
|
||||
hdr->hdr_frag.hdr_frag_length = size_out;
|
||||
|
||||
A_PRINT("size_in : %d", size_in);
|
||||
|
||||
IB_SET_SEND_DESC_LEN((&sendfrag->ib_buf),
|
||||
(sizeof(mca_ptl_base_frag_header_t) + size_in));
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -59,6 +59,13 @@ mca_ptl_ib_send_frag_t* mca_ptl_ib_alloc_send_frag(
|
||||
int mca_ptl_ib_register_send_frags(mca_ptl_base_module_t *ptl);
|
||||
|
||||
void mca_ptl_ib_process_send_comp(mca_ptl_base_module_t *,
|
||||
void*, ompi_free_list_t*);
|
||||
void*);
|
||||
void mca_ptl_ib_process_rdma_w_comp(mca_ptl_base_module_t *module,
|
||||
void* comp_addr);
|
||||
|
||||
int mca_ptl_ib_put_frag_init(mca_ptl_ib_send_frag_t *sendfrag,
|
||||
struct mca_ptl_base_peer_t *ptl_peer,
|
||||
struct mca_pml_base_send_request_t *req,
|
||||
size_t offset, size_t *size, int flags);
|
||||
|
||||
#endif
|
||||
|
@ -14,7 +14,7 @@ OBJ_CLASS_INSTANCE(mca_ptl_ib_send_request_t,
|
||||
|
||||
void mca_ptl_ib_send_request_construct(mca_ptl_ib_send_request_t* request)
|
||||
{
|
||||
D_PRINT("\n");
|
||||
A_PRINT("Request Construct");
|
||||
request->req_frag = NULL;
|
||||
|
||||
/*
|
||||
|
@ -22,6 +22,8 @@ struct mca_ptl_ib_send_request_t {
|
||||
|
||||
mca_ptl_ib_send_frag_t *req_frag;
|
||||
/* first fragment */
|
||||
char req_buf[8];
|
||||
/* temporary buffer to hold VAPI_rkey_t */
|
||||
};
|
||||
typedef struct mca_ptl_ib_send_request_t mca_ptl_ib_send_request_t;
|
||||
|
||||
|
@ -52,5 +52,13 @@
|
||||
#define D_PRINT(fmt, args...)
|
||||
#endif
|
||||
|
||||
#if 1
|
||||
#define A_PRINT(fmt, args...) { \
|
||||
ompi_output(0, "[%s:%d:%s] " fmt, __FILE__, __LINE__, __func__, \
|
||||
##args); \
|
||||
}
|
||||
#else
|
||||
#define A_PRINT(fmt, args...)
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user