From 83dc2da533e221b5daebc6f7f56a413e8aada541 Mon Sep 17 00:00:00 2001 From: Sayantan Sur Date: Mon, 20 Sep 2004 05:37:27 +0000 Subject: [PATCH] Arbitrary size message implementation. Implemented start->ack->put. still working on fin. This commit was SVN r2769. --- src/mca/ptl/ib/src/ptl_ib.c | 164 ++++++++++++++++++++++++-- src/mca/ptl/ib/src/ptl_ib.h | 19 +++ src/mca/ptl/ib/src/ptl_ib_component.c | 17 ++- src/mca/ptl/ib/src/ptl_ib_peer.c | 5 +- src/mca/ptl/ib/src/ptl_ib_peer.h | 3 +- src/mca/ptl/ib/src/ptl_ib_priv.c | 85 +++++++++++-- src/mca/ptl/ib/src/ptl_ib_priv.h | 29 ++++- src/mca/ptl/ib/src/ptl_ib_recvfrag.c | 84 ++++++++++--- src/mca/ptl/ib/src/ptl_ib_recvfrag.h | 4 +- src/mca/ptl/ib/src/ptl_ib_sendfrag.c | 140 ++++++++++++++++++++-- src/mca/ptl/ib/src/ptl_ib_sendfrag.h | 9 +- src/mca/ptl/ib/src/ptl_ib_sendreq.c | 2 +- src/mca/ptl/ib/src/ptl_ib_sendreq.h | 2 + src/mca/ptl/ib/src/ptl_ib_vapi.h | 8 ++ 14 files changed, 521 insertions(+), 50 deletions(-) diff --git a/src/mca/ptl/ib/src/ptl_ib.c b/src/mca/ptl/ib/src/ptl_ib.c index b1994d9e0a..8e85f5e3d8 100644 --- a/src/mca/ptl/ib/src/ptl_ib.c +++ b/src/mca/ptl/ib/src/ptl_ib.c @@ -31,7 +31,7 @@ mca_ptl_ib_module_t mca_ptl_ib_module = { mca_ptl_ib_del_procs, mca_ptl_ib_finalize, mca_ptl_ib_send, - mca_ptl_ib_send, + mca_ptl_ib_put, NULL, mca_ptl_ib_matched, mca_ptl_ib_request_init, @@ -39,6 +39,66 @@ mca_ptl_ib_module_t mca_ptl_ib_module = { } }; +/* + * 1. RDMA local buffer to remote buffer address. + * 2. Generate a FIN + */ + +int mca_ptl_ib_put( struct mca_ptl_base_module_t* ptl, + struct mca_ptl_base_peer_t* ptl_peer, + struct mca_pml_base_send_request_t* req, size_t offset, + size_t size, int flags) +{ + int rc; + mca_ptl_ib_send_frag_t *send_frag, *send_frag_fin; + mca_ptl_ib_state_t *ib_state; + mca_ptl_ib_peer_conn_t *peer_conn; + void *local_addr, *remote_addr; + VAPI_rkey_t rkey; + + /* RDMA the data over to the peer */ + send_frag = mca_ptl_ib_alloc_send_frag(ptl, req); + + if(NULL == send_frag) { + ompi_output(0, "Unable to allocate send descriptor"); + return OMPI_ERROR; + } + + A_PRINT("IB put to %p, rkey : %d", + req->req_peer_addr.pval, + *(VAPI_rkey_t *)(((mca_ptl_ib_send_request_t *)req)->req_buf)); + + ib_state = ((mca_ptl_ib_module_t *)ptl)->ib_state; + peer_conn = ((mca_ptl_ib_peer_t *)ptl_peer)->peer_conn; + local_addr = (void*) ((char*) req->req_base.req_addr + offset); + remote_addr = (void*) req->req_peer_addr.pval; + rkey = *(VAPI_rkey_t *)(((mca_ptl_ib_send_request_t *)req)->req_buf); + + rc = mca_ptl_ib_rdma_write(ib_state, peer_conn, + &send_frag->ib_buf, local_addr, size, remote_addr, rkey); + + if(rc != OMPI_SUCCESS) { + return OMPI_ERROR; + } + + /* Send FIN to receiver */ + send_frag_fin = mca_ptl_ib_alloc_send_frag(ptl, req); + + if(NULL == send_frag_fin) { + ompi_output(0, "Unable to allocate send descriptor"); + return OMPI_ERROR; + } + rc = mca_ptl_ib_put_frag_init(send_frag_fin, ptl_peer, + req, offset, &size, flags); + if(rc != OMPI_SUCCESS) { + return rc; + } + + /* Update offset */ + req->req_offset += size; + + return OMPI_SUCCESS; +} int mca_ptl_ib_add_procs(struct mca_ptl_base_module_t* base_module, size_t nprocs, struct ompi_proc_t **ompi_procs, @@ -131,6 +191,7 @@ int mca_ptl_ib_request_init( struct mca_ptl_base_module_t* ptl, ib_send_req = (mca_ptl_ib_send_request_t *) request; ib_send_req->req_frag = ib_send_frag; + memset(ib_send_req->req_buf, 7, 8); } return OMPI_SUCCESS; @@ -165,12 +226,12 @@ int mca_ptl_ib_send( struct mca_ptl_base_module_t* ptl, ((mca_ptl_ib_send_request_t*)sendreq)->req_frag; } else { - /* TODO: Implementation for messages > frag size */ - ompi_list_item_t* item; - OMPI_FREE_LIST_GET(&mca_ptl_ib_component.ib_send_frags, item, rc); + /* Implementation for messages > frag size */ + sendfrag = mca_ptl_ib_alloc_send_frag(ptl, + sendreq); - if(NULL == (sendfrag = (mca_ptl_ib_send_frag_t*)item)) { - return rc; + if(NULL == sendfrag) { + ompi_output(0,"Unable to allocate send fragment"); } } @@ -190,6 +251,84 @@ int mca_ptl_ib_send( struct mca_ptl_base_module_t* ptl, } +static void mca_ptl_ib_start_ack(mca_ptl_base_module_t *module, + mca_ptl_ib_send_frag_t *send_frag, + mca_ptl_ib_recv_frag_t *recv_frag) +{ + mca_ptl_base_header_t *hdr; + mca_pml_base_recv_request_t *request; + mca_ptl_ib_peer_t *ib_peer; + ib_buffer_t *ib_buf; + int recv_len; + int len_to_reg, len_added = 0; + void *addr_to_reg, *ack_buf; + + A_PRINT(""); + + /* Header starts at beginning of registered + * buffer space */ + + hdr = (mca_ptl_base_header_t *) + &send_frag->ib_buf.buf[0]; + + request = recv_frag->super.frag_request; + + /* Amount of data we have already received */ + recv_len = + recv_frag->super.frag_base.frag_header.hdr_frag.hdr_frag_length; + + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK; + hdr->hdr_common.hdr_flags = 0; + hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t); + + /* Remote side send descriptor */ + hdr->hdr_ack.hdr_src_ptr = + recv_frag->super.frag_base.frag_header.hdr_frag.hdr_src_ptr; + + /* Matched request from recv side */ + hdr->hdr_ack.hdr_dst_match.lval = 0; + hdr->hdr_ack.hdr_dst_match.pval = request; + + hdr->hdr_ack.hdr_dst_addr.lval = 0; + + addr_to_reg = (void*)((char*)request->req_base.req_addr + recv_len); + hdr->hdr_ack.hdr_dst_addr.pval = addr_to_reg; + + len_to_reg = request->req_bytes_packed - recv_len; + hdr->hdr_ack.hdr_dst_size = len_to_reg; + + A_PRINT("Dest addr : %p, RDMA Len : %d", + hdr->hdr_ack.hdr_dst_addr.pval, + hdr->hdr_ack.hdr_dst_size); + + ack_buf = (void*) ((char*) (&send_frag->ib_buf.buf[0]) + + sizeof(mca_ptl_base_ack_header_t)); + + /* Prepare ACK packet with IB specific stuff */ + mca_ptl_ib_prepare_ack(((mca_ptl_ib_module_t *)module)->ib_state, + addr_to_reg, len_to_reg, + ack_buf, &len_added); + + /* Send it right away! */ + ib_peer = (mca_ptl_ib_peer_t *) + recv_frag->super.frag_base.frag_peer; + + ib_buf = &send_frag->ib_buf; + + IB_SET_SEND_DESC_LEN(ib_buf, + (sizeof(mca_ptl_base_ack_header_t) + len_added)); + + mca_ptl_ib_post_send(((mca_ptl_ib_module_t *)module)->ib_state, + ib_peer->peer_conn, + &send_frag->ib_buf, send_frag); + + /* fragment state */ + send_frag->frag_send.frag_base.frag_owner = module; + send_frag->frag_send.frag_base.frag_peer = recv_frag->super.frag_base.frag_peer; + send_frag->frag_send.frag_base.frag_addr = NULL; + send_frag->frag_send.frag_base.frag_size = 0; +} + /* * A posted receive has been matched - if required send an * ack back to the peer and process the fragment. Copy the @@ -210,7 +349,18 @@ void mca_ptl_ib_matched(mca_ptl_base_module_t* module, D_PRINT("Matched frag\n"); if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) { - D_PRINT("Doh, I cannot send an ack!\n"); + mca_ptl_ib_send_frag_t *send_frag; + + send_frag = mca_ptl_ib_alloc_send_frag(module, NULL); + + if(NULL == send_frag) { + ompi_output(0, "Cannot get send descriptor"); + } else { + mca_ptl_ib_start_ack(module, send_frag, recv_frag); + } + + /* Basic ACK scheme */ + A_PRINT("Doh, I cannot send an ack!\n"); } /* Process the fragment */ diff --git a/src/mca/ptl/ib/src/ptl_ib.h b/src/mca/ptl/ib/src/ptl_ib.h index 1b1d556228..c43eeca9d7 100644 --- a/src/mca/ptl/ib/src/ptl_ib.h +++ b/src/mca/ptl/ib/src/ptl_ib.h @@ -270,6 +270,25 @@ extern int mca_ptl_ib_send( int flags ); +/** + * PML->PTL Initiate a put of the specified size. + * + * @param ptl (IN) PTL instance + * @param ptl_base_peer (IN) PTL peer addressing + * @param send_request (IN/OUT) Send request (allocated by PML via mca_ptl_base_request_alloc_fn_t) + * @param size (IN) Number of bytes PML is requesting PTL to deliver + * @param flags (IN) Flags that should be passed to the peer via the message header. + * @param request (OUT) OMPI_SUCCESS if the PTL was able to queue one or more fragments + */ +extern int mca_ptl_ib_put( + struct mca_ptl_base_module_t* ptl, + struct mca_ptl_base_peer_t* ptl_peer, + struct mca_pml_base_send_request_t*, + size_t offset, + size_t size, + int flags +); + /** * Return a recv fragment to the modules free list. * diff --git a/src/mca/ptl/ib/src/ptl_ib_component.c b/src/mca/ptl/ib/src/ptl_ib_component.c index 120549b858..929858da61 100644 --- a/src/mca/ptl/ib/src/ptl_ib_component.c +++ b/src/mca/ptl/ib/src/ptl_ib_component.c @@ -87,12 +87,16 @@ int mca_ptl_ib_component_open(void) /* register super component parameters */ mca_ptl_ib_module.super.ptl_exclusivity = mca_ptl_ib_param_register_int ("exclusivity", 0); + mca_ptl_ib_module.super.ptl_first_frag_size = mca_ptl_ib_param_register_int ("first_frag_size", - (16384 - sizeof(mca_ptl_base_header_t))/*magic*/); + (MCA_PTL_IB_FIRST_FRAG_SIZE + - sizeof(mca_ptl_base_header_t))); + mca_ptl_ib_module.super.ptl_min_frag_size = mca_ptl_ib_param_register_int ("min_frag_size", - (4096 - sizeof(mca_ptl_base_header_t))/*magic*/); + (4096 - sizeof(mca_ptl_base_header_t))); + mca_ptl_ib_module.super.ptl_max_frag_size = mca_ptl_ib_param_register_int ("max_frag_size", 2<<30); @@ -357,7 +361,7 @@ int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp) /* Process a completed send */ mca_ptl_ib_process_send_comp( (mca_ptl_base_module_t *) module, - comp_addr, &(module->send_free)); + comp_addr); break; case IB_COMP_RECV : @@ -370,6 +374,13 @@ int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp) mca_ptl_ib_buffer_repost(module->ib_state->nic, comp_addr); + break; + case IB_COMP_RDMA_W : + + mca_ptl_ib_process_rdma_w_comp( + (mca_ptl_base_module_t *) module, + comp_addr); + break; case IB_COMP_NOTHING: break; diff --git a/src/mca/ptl/ib/src/ptl_ib_peer.c b/src/mca/ptl/ib/src/ptl_ib_peer.c index ced6f5e5d4..7d480632f0 100644 --- a/src/mca/ptl/ib/src/ptl_ib_peer.c +++ b/src/mca/ptl/ib/src/ptl_ib_peer.c @@ -461,7 +461,6 @@ int mca_ptl_ib_peer_send(mca_ptl_base_peer_t* peer, { int rc; - D_PRINT(""); OMPI_THREAD_LOCK(&peer->peer_send_lock); @@ -510,6 +509,10 @@ int mca_ptl_ib_peer_send(mca_ptl_base_peer_t* peer, peer->peer_conn, &frag->ib_buf, (void*) frag); + A_PRINT("Send to : %d, len : %d", + peer->peer_proc->proc_guid.vpid, + frag->ib_buf.desc.sg_entry.len); + break; default: rc = OMPI_ERR_UNREACH; diff --git a/src/mca/ptl/ib/src/ptl_ib_peer.h b/src/mca/ptl/ib/src/ptl_ib_peer.h index 334181dec0..9e1be60f3a 100644 --- a/src/mca/ptl/ib/src/ptl_ib_peer.h +++ b/src/mca/ptl/ib/src/ptl_ib_peer.h @@ -84,7 +84,8 @@ void mca_ptl_ib_progress_send_frags(mca_ptl_ib_peer_t*); #define DUMP_PEER(peer_ptr) { \ ompi_output(0, "[%s:%d] ", __FILE__, __LINE__); \ - ompi_output(0, "Dumping peer state"); \ + ompi_output(0, "Dumping peer %d state", \ + peer->peer_proc->proc_guid.vpid); \ ompi_output(0, "Local QP hndl : %d", \ peer_ptr->peer_conn->lres->qp_hndl); \ ompi_output(0, "Local QP num : %d", \ diff --git a/src/mca/ptl/ib/src/ptl_ib_priv.c b/src/mca/ptl/ib/src/ptl_ib_priv.c index e01c9d401b..357a7dfc59 100644 --- a/src/mca/ptl/ib/src/ptl_ib_priv.c +++ b/src/mca/ptl/ib/src/ptl_ib_priv.c @@ -478,7 +478,8 @@ int mca_ptl_ib_peer_connect(mca_ptl_ib_state_t *ib_state, rc = mca_ptl_ib_register_mem(ib_state->nic, ib_state->ptag, (void*) peer_conn->lres->recv[i].buf, - 4096, &peer_conn->lres->recv[i].hndl); + MCA_PTL_IB_FIRST_FRAG_SIZE, + &peer_conn->lres->recv[i].hndl); if(rc != OMPI_SUCCESS) { return OMPI_ERROR; } @@ -517,7 +518,9 @@ int mca_ptl_ib_post_send(mca_ptl_ib_state_t *ib_state, IB_SET_SEND_DESC_ID(ib_buf, addr); - D_PRINT("length : %d", ib_buf->desc.sg_entry.len); + D_PRINT("length : %d, qp_num = %d", + ib_buf->desc.sg_entry.len, + (peer_conn->rres->qp_num)); ret = VAPI_post_sr(ib_state->nic, peer_conn->lres->qp_hndl, @@ -555,16 +558,21 @@ void mca_ptl_ib_drain_network(VAPI_hca_hndl_t nic, *comp_type = IB_COMP_SEND; *comp_addr = (void*) (unsigned int) comp.id; - } - else if(VAPI_CQE_RQ_SEND_DATA == comp.opcode) { - D_PRINT("Received message completion len = %d, id : %d\n", + } else if(VAPI_CQE_RQ_SEND_DATA == comp.opcode) { + A_PRINT("Received message completion len = %d, id : %d\n", comp.byte_len, comp.id); *comp_type = IB_COMP_RECV; *comp_addr = (void*) (unsigned int) comp.id; - } - else { - D_PRINT("Got Unknown completion! Opcode : %d\n", + + } else if(VAPI_CQE_SQ_RDMA_WRITE == comp.opcode) { + + A_PRINT("RDMA Write completion"); + *comp_type = IB_COMP_RDMA_W; + *comp_addr = (void*) (unsigned int) comp.id; + + } else { + ompi_output(0, "Got Unknown completion! Opcode : %d\n", comp.opcode); *comp_type = IB_COMP_ERROR; @@ -597,3 +605,64 @@ void mca_ptl_ib_buffer_repost(VAPI_hca_hndl_t nic, ompi_output(0, "Error in buffer reposting"); } } + +void mca_ptl_ib_prepare_ack(mca_ptl_ib_state_t *ib_state, + void* addr_to_reg, int len_to_reg, + void* ack_buf, int* len_added) +{ + int rc; + vapi_memhandle_t memhandle; + + rc = mca_ptl_ib_register_mem(ib_state->nic, ib_state->ptag, + addr_to_reg, len_to_reg, &memhandle); + + if(rc != OMPI_SUCCESS) { + ompi_output(0, "Error in registering"); + } + + A_PRINT("Sending Remote key : %d", memhandle.rkey); + + memcpy(ack_buf,(void*) &memhandle.rkey, sizeof(VAPI_rkey_t)); + + *len_added = sizeof(VAPI_lkey_t); +} + +int mca_ptl_ib_rdma_write(mca_ptl_ib_state_t *ib_state, + mca_ptl_ib_peer_conn_t *peer_conn, ib_buffer_t *ib_buf, + void* send_buf, size_t send_len, void* remote_buf, + VAPI_rkey_t remote_key) +{ + VAPI_ret_t ret; + VAPI_mrw_t mr_in, mr_out; + vapi_memhandle_t mem_handle; + + /* Register local application buffer */ + mr_in.acl = VAPI_EN_LOCAL_WRITE | VAPI_EN_REMOTE_WRITE; + mr_in.l_key = 0; + mr_in.r_key = 0; + mr_in.pd_hndl = ib_state->ptag; + mr_in.size = send_len; + mr_in.start = (VAPI_virt_addr_t) (MT_virt_addr_t) send_buf; + mr_in.type = VAPI_MR; + + ret = VAPI_register_mr(ib_state->nic, &mr_in, + &mem_handle.hndl, &mr_out); + if(VAPI_OK != ret) { + MCA_PTL_IB_VAPI_RET(ret, "VAPI_register_mr"); + return OMPI_ERROR; + } + + /* Prepare descriptor */ + IB_PREPARE_RDMA_W_DESC(ib_buf, (peer_conn->rres->qp_num), + send_len, send_buf, (mr_out.l_key), remote_key, remote_buf); + + ret = VAPI_post_sr(ib_state->nic, + peer_conn->lres->qp_hndl, + &ib_buf->desc.sr); + if(ret != VAPI_OK) { + MCA_PTL_IB_VAPI_RET(ret, "VAPI_post_sr"); + return OMPI_ERROR; + } + + return OMPI_SUCCESS; +} diff --git a/src/mca/ptl/ib/src/ptl_ib_priv.h b/src/mca/ptl/ib/src/ptl_ib_priv.h index 4184c299f3..c9d62f9695 100644 --- a/src/mca/ptl/ib/src/ptl_ib_priv.h +++ b/src/mca/ptl/ib/src/ptl_ib_priv.h @@ -8,6 +8,8 @@ #define NUM_IB_SEND_BUF (10) #define NUM_IB_RECV_BUF (10) +#define MCA_PTL_IB_FIRST_FRAG_SIZE (4096) + struct mca_ptl_ib_state_t { VAPI_hca_id_t hca_id; /* ID of HCA */ @@ -40,6 +42,7 @@ typedef enum { IB_COMP_ERROR, IB_COMP_RECV, IB_COMP_SEND, + IB_COMP_RDMA_W, IB_COMP_NOTHING } IB_comp_t; @@ -80,7 +83,7 @@ struct ib_buffer_t { vapi_memhandle_t hndl; /* Buffer handle */ - char buf[4096]; + char buf[MCA_PTL_IB_FIRST_FRAG_SIZE]; /* Buffer space */ VAPI_qp_hndl_t qp_hndl; @@ -153,7 +156,7 @@ typedef struct mca_ptl_ib_peer_conn_t mca_ptl_ib_peer_conn_t; (MT_virt_addr_t) ib_buf_ptr; \ ib_buf_ptr->desc.rr.sg_lst_len = 1; \ ib_buf_ptr->desc.rr.sg_lst_p = &ib_buf_ptr->desc.sg_entry; \ - ib_buf_ptr->desc.sg_entry.len = 4096; \ + ib_buf_ptr->desc.sg_entry.len = MCA_PTL_IB_FIRST_FRAG_SIZE; \ ib_buf_ptr->desc.sg_entry.lkey = ib_buf_ptr->hndl.lkey; \ ib_buf_ptr->desc.sg_entry.addr = (VAPI_virt_addr_t) \ (MT_virt_addr_t) ib_buf_ptr->buf; \ @@ -187,6 +190,24 @@ typedef struct mca_ptl_ib_peer_conn_t mca_ptl_ib_peer_conn_t; ib_buf_ptr->desc.sg_entry.len = msg_len; \ } +#define IB_PREPARE_RDMA_W_DESC(ib_buf_ptr, qp, \ + msg_len, user_buf, local_key, remote_key, remote_buf) { \ + ib_buf_ptr->desc.sr.comp_type = VAPI_SIGNALED; \ + ib_buf_ptr->desc.sr.opcode = VAPI_RDMA_WRITE; \ + ib_buf_ptr->desc.sr.remote_qkey = 0; \ + ib_buf_ptr->desc.sr.remote_qp = qp; \ + ib_buf_ptr->desc.sr.id = (VAPI_virt_addr_t) \ + (MT_virt_addr_t) ib_buf_ptr; \ + ib_buf_ptr->desc.sr.sg_lst_len = 1; \ + ib_buf_ptr->desc.sr.sg_lst_p = &ib_buf_ptr->desc.sg_entry; \ + ib_buf_ptr->desc.sg_entry.len = msg_len; \ + ib_buf_ptr->desc.sg_entry.lkey = local_key; \ + ib_buf_ptr->desc.sg_entry.addr = (VAPI_virt_addr_t) \ + (MT_virt_addr_t) user_buf; \ + ib_buf_ptr->desc.sr.remote_addr = remote_buf; \ + ib_buf_ptr->desc.sr.r_key = remote_key; \ +} + int mca_ptl_ib_init_module(mca_ptl_ib_state_t*, int); int mca_ptl_ib_get_num_hcas(uint32_t*); @@ -202,5 +223,7 @@ void mca_ptl_ib_drain_network(VAPI_hca_hndl_t nic, VAPI_cq_hndl_t cq_hndl, int* comp_type, void** comp_addr); void mca_ptl_ib_buffer_repost(VAPI_hca_hndl_t nic, void* addr); - +void mca_ptl_ib_prepare_ack(mca_ptl_ib_state_t *ib_state, + void* addr_to_reg, int len_to_reg, + void* ack_buf, int* len_added); #endif /* MCA_PTL_IB_PRIV_H */ diff --git a/src/mca/ptl/ib/src/ptl_ib_recvfrag.c b/src/mca/ptl/ib/src/ptl_ib_recvfrag.c index 0bfa9e87b3..e46cf51c8b 100644 --- a/src/mca/ptl/ib/src/ptl_ib_recvfrag.c +++ b/src/mca/ptl/ib/src/ptl_ib_recvfrag.c @@ -46,26 +46,15 @@ mca_ptl_ib_recv_frag_done (mca_ptl_base_header_t *header, (ompi_list_item_t*)frag); } -/* - * Process incoming receive fragments - * - */ - -void mca_ptl_ib_process_recv(mca_ptl_base_module_t *module, void* addr) +static void mca_ptl_ib_data_frag(mca_ptl_base_module_t *module, + mca_ptl_base_header_t *header) { bool matched; int rc; ib_buffer_t *ib_buf; - mca_ptl_base_header_t *header; ompi_list_item_t *item; mca_ptl_ib_recv_frag_t *recv_frag; - D_PRINT(""); - - ib_buf = (ib_buffer_t *) (unsigned int) addr; - - header = (mca_ptl_base_header_t *) &ib_buf->buf[0]; - OMPI_FREE_LIST_GET(&mca_ptl_ib_component.ib_recv_frags, item, rc); @@ -77,8 +66,7 @@ void mca_ptl_ib_process_recv(mca_ptl_base_module_t *module, void* addr) } recv_frag = (mca_ptl_ib_recv_frag_t *) item; - recv_frag->super.frag_base.frag_owner = - (mca_ptl_base_module_t *) module; + recv_frag->super.frag_base.frag_owner = module; recv_frag->super.frag_base.frag_peer = NULL; recv_frag->super.frag_request = NULL; @@ -115,3 +103,69 @@ void mca_ptl_ib_process_recv(mca_ptl_base_module_t *module, void* addr) D_PRINT("Message matched!"); } } + +static void mca_ptl_ib_ctrl_frag(mca_ptl_base_module_t *module, + mca_ptl_base_header_t *header) +{ + mca_ptl_ib_send_frag_t *send_frag; + mca_pml_base_send_request_t *req; + void *data_ptr; + + send_frag = (mca_ptl_ib_send_frag_t *) + header->hdr_ack.hdr_src_ptr.pval; + req = (mca_pml_base_send_request_t *) + send_frag->frag_send.frag_request; + + req->req_peer_match = header->hdr_ack.hdr_dst_match; + req->req_peer_addr = header->hdr_ack.hdr_dst_addr; + req->req_peer_size = header->hdr_ack.hdr_dst_size; + + /* Locate data in the ACK buffer */ + data_ptr = (void*) + ((char*) header + sizeof(mca_ptl_base_ack_header_t)); + + /* Copy over data to request buffer */ + memcpy(((mca_ptl_ib_send_request_t *) req)->req_buf, + data_ptr, sizeof(VAPI_rkey_t)); + + /* Progress & release fragments */ + mca_ptl_ib_process_send_comp(module, (void*) send_frag); +} + +static void mca_ptl_ib_last_frag(mca_ptl_base_module_t *module, + mca_ptl_base_header_t *header) +{ +} + +/* + * Process incoming receive fragments + * + */ + +void mca_ptl_ib_process_recv(mca_ptl_base_module_t *module, void* addr) +{ + ib_buffer_t *ib_buf; + mca_ptl_base_header_t *header; + + D_PRINT(""); + + ib_buf = (ib_buffer_t *) addr; + + header = (mca_ptl_base_header_t *) &ib_buf->buf[0]; + + switch(header->hdr_common.hdr_type) { + case MCA_PTL_HDR_TYPE_MATCH : + case MCA_PTL_HDR_TYPE_FRAG : + mca_ptl_ib_data_frag(module, header); + break; + case MCA_PTL_HDR_TYPE_ACK : + mca_ptl_ib_ctrl_frag(module, header); + break; + case MCA_PTL_HDR_TYPE_FIN : + mca_ptl_ib_last_frag(module, header); + break; + default : + ompi_output(0, "Unknown fragment type"); + break; + } +} diff --git a/src/mca/ptl/ib/src/ptl_ib_recvfrag.h b/src/mca/ptl/ib/src/ptl_ib_recvfrag.h index 376bd475b1..e3a7d595e6 100644 --- a/src/mca/ptl/ib/src/ptl_ib_recvfrag.h +++ b/src/mca/ptl/ib/src/ptl_ib_recvfrag.h @@ -4,6 +4,8 @@ #include "mca/ptl/ptl.h" #include "mca/ptl/base/ptl_base_recvfrag.h" +#define MCA_PTL_IB_UNEX_BUF_SIZE (4096) + OBJ_CLASS_DECLARATION(mca_ptl_ib_recv_frag_t); /** @@ -12,7 +14,7 @@ OBJ_CLASS_DECLARATION(mca_ptl_ib_recv_frag_t); struct mca_ptl_ib_recv_frag_t { mca_ptl_base_recv_frag_t super; /**< base receive fragment descriptor */ - char unex_buf[4096]; + char unex_buf[MCA_PTL_IB_UNEX_BUF_SIZE]; /**< Unexpected buffer */ }; typedef struct mca_ptl_ib_recv_frag_t mca_ptl_ib_recv_frag_t; diff --git a/src/mca/ptl/ib/src/ptl_ib_sendfrag.c b/src/mca/ptl/ib/src/ptl_ib_sendfrag.c index a39beda9ec..9db28c72e9 100644 --- a/src/mca/ptl/ib/src/ptl_ib_sendfrag.c +++ b/src/mca/ptl/ib/src/ptl_ib_sendfrag.c @@ -53,7 +53,10 @@ int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* sendfrag, hdr->hdr_frag.hdr_frag_offset = offset; hdr->hdr_frag.hdr_frag_seq = 0; hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ + + /* Ptr to send frag, so incoming ACK can locate the frag */ hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; + hdr->hdr_frag.hdr_dst_ptr.lval = 0; hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid; hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank; @@ -65,6 +68,7 @@ int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* sendfrag, header_length = sizeof(mca_ptl_base_match_header_t); } else { + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; hdr->hdr_common.hdr_flags = flags; hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_frag_header_t); @@ -216,36 +220,154 @@ int mca_ptl_ib_register_send_frags(mca_ptl_base_module_t *ptl) rc = mca_ptl_ib_register_mem(ib_state->nic, ib_state->ptag, (void*) ib_buf_ptr->buf, - 4096, &ib_buf_ptr->hndl); + MCA_PTL_IB_FIRST_FRAG_SIZE, + &ib_buf_ptr->hndl); if(rc != OMPI_SUCCESS) { return OMPI_ERROR; } - IB_PREPARE_SEND_DESC(ib_buf_ptr, 0, 4096); + IB_PREPARE_SEND_DESC(ib_buf_ptr, 0, + MCA_PTL_IB_FIRST_FRAG_SIZE); } return OMPI_SUCCESS; } +/* + * Process RDMA Write completions + * + * Just return send fragment to free list + */ + +void mca_ptl_ib_process_rdma_w_comp(mca_ptl_base_module_t *module, + void* comp_addr) +{ + mca_ptl_ib_send_frag_t *sendfrag; + ompi_free_list_t *flist; + + sendfrag = (mca_ptl_ib_send_frag_t *) comp_addr; + + flist = &(sendfrag-> + frag_send.frag_base.frag_peer-> + peer_module->send_free); + + OMPI_FREE_LIST_RETURN(flist, + ((ompi_list_item_t *) sendfrag)); +} + /* * Process send completions * */ void mca_ptl_ib_process_send_comp(mca_ptl_base_module_t *module, - void* addr, ompi_free_list_t *flist) + void* addr) { mca_ptl_ib_send_frag_t *sendfrag; mca_ptl_base_header_t *header; + mca_pml_base_send_request_t *req; + ompi_free_list_t *flist; - sendfrag = (mca_ptl_ib_send_frag_t *) (unsigned int) addr; + sendfrag = (mca_ptl_ib_send_frag_t *) addr; header = (mca_ptl_base_header_t *) sendfrag->ib_buf.buf; - module->ptl_send_progress(module, - sendfrag->frag_send.frag_request, - header->hdr_frag.hdr_frag_length); + req = (mca_pml_base_send_request_t *) + sendfrag->frag_send.frag_request; - /* Return sendfrag to free list */ + flist = &(sendfrag-> + frag_send.frag_base.frag_peer-> + peer_module->send_free); - OMPI_FREE_LIST_RETURN(flist, ((ompi_list_item_t *) sendfrag)); + if(header->hdr_common.hdr_type == MCA_PTL_HDR_TYPE_ACK) { + /* Is this an ack descriptor ? */ + A_PRINT("Completion of send_ack"); + + OMPI_FREE_LIST_RETURN(flist, + ((ompi_list_item_t *) sendfrag)); + } else if(NULL == req) { + /* An ack descriptor ? Don't know what to do! */ + OMPI_FREE_LIST_RETURN(flist, + ((ompi_list_item_t *) sendfrag)); + } else if (0 == (header->hdr_common.hdr_flags + & MCA_PTL_FLAGS_ACK_MATCHED) + || mca_pml_base_send_request_matched(req)) { + + module->ptl_send_progress(module, + sendfrag->frag_send.frag_request, + header->hdr_frag.hdr_frag_length); + /* Return sendfrag to free list */ + + OMPI_FREE_LIST_RETURN(flist, + ((ompi_list_item_t *) sendfrag)); + } else { + /* Not going to call progress on this send, + * and not free-ing descriptor */ + A_PRINT("Why should I return sendfrag?"); + } +} + +int mca_ptl_ib_put_frag_init(mca_ptl_ib_send_frag_t *sendfrag, + mca_ptl_base_peer_t *ptl_peer, + mca_pml_base_send_request_t *req, + size_t offset, size_t *size, int flags) +{ + int rc; + int size_in, size_out; + mca_ptl_base_header_t *hdr; + + size_in = *size; + + hdr = (mca_ptl_base_header_t *) + &sendfrag->ib_buf.buf[0]; + + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FIN; + hdr->hdr_common.hdr_flags = flags; + hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_frag_header_t); + hdr->hdr_frag.hdr_frag_offset = offset; + hdr->hdr_frag.hdr_frag_seq = 0; + hdr->hdr_frag.hdr_src_ptr.lval = 0; + hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; + hdr->hdr_frag.hdr_dst_ptr = req->req_peer_match; + hdr->hdr_frag.hdr_frag_length = size_in; + + if(size_in > 0 && 0) { + struct iovec iov; + ompi_convertor_t *convertor; + + if( offset <= mca_ptl_ib_module.super.ptl_first_frag_size) { + convertor = &req->req_convertor; + } else { + convertor = &sendfrag->frag_send.frag_base.frag_convertor; + ompi_convertor_copy(&req->req_convertor, convertor); + ompi_convertor_init_for_send( + convertor, + 0, + req->req_base.req_datatype, + req->req_base.req_count, + req->req_base.req_addr, + offset); + } + iov.iov_base = &sendfrag->ib_buf.buf[sizeof(mca_ptl_base_frag_header_t)]; + iov.iov_len = size_in; + + rc = ompi_convertor_pack(convertor, &iov, 1); + if (rc < 0) { + ompi_output (0, "[%s:%d] Unable to pack data\n", + __FILE__, __LINE__); + return; + } + size_out = iov.iov_len; + } else { + size_out = size_in; + } + + *size = size_out; + hdr->hdr_frag.hdr_frag_length = size_out; + + A_PRINT("size_in : %d", size_in); + + IB_SET_SEND_DESC_LEN((&sendfrag->ib_buf), + (sizeof(mca_ptl_base_frag_header_t) + size_in)); + + return OMPI_SUCCESS; } diff --git a/src/mca/ptl/ib/src/ptl_ib_sendfrag.h b/src/mca/ptl/ib/src/ptl_ib_sendfrag.h index 1818dbd210..5d4503ed06 100644 --- a/src/mca/ptl/ib/src/ptl_ib_sendfrag.h +++ b/src/mca/ptl/ib/src/ptl_ib_sendfrag.h @@ -59,6 +59,13 @@ mca_ptl_ib_send_frag_t* mca_ptl_ib_alloc_send_frag( int mca_ptl_ib_register_send_frags(mca_ptl_base_module_t *ptl); void mca_ptl_ib_process_send_comp(mca_ptl_base_module_t *, - void*, ompi_free_list_t*); + void*); +void mca_ptl_ib_process_rdma_w_comp(mca_ptl_base_module_t *module, + void* comp_addr); + +int mca_ptl_ib_put_frag_init(mca_ptl_ib_send_frag_t *sendfrag, + struct mca_ptl_base_peer_t *ptl_peer, + struct mca_pml_base_send_request_t *req, + size_t offset, size_t *size, int flags); #endif diff --git a/src/mca/ptl/ib/src/ptl_ib_sendreq.c b/src/mca/ptl/ib/src/ptl_ib_sendreq.c index 34368ffb4f..cdde74ff9d 100644 --- a/src/mca/ptl/ib/src/ptl_ib_sendreq.c +++ b/src/mca/ptl/ib/src/ptl_ib_sendreq.c @@ -14,7 +14,7 @@ OBJ_CLASS_INSTANCE(mca_ptl_ib_send_request_t, void mca_ptl_ib_send_request_construct(mca_ptl_ib_send_request_t* request) { - D_PRINT("\n"); + A_PRINT("Request Construct"); request->req_frag = NULL; /* diff --git a/src/mca/ptl/ib/src/ptl_ib_sendreq.h b/src/mca/ptl/ib/src/ptl_ib_sendreq.h index dfbeecb4b8..6365d85501 100644 --- a/src/mca/ptl/ib/src/ptl_ib_sendreq.h +++ b/src/mca/ptl/ib/src/ptl_ib_sendreq.h @@ -22,6 +22,8 @@ struct mca_ptl_ib_send_request_t { mca_ptl_ib_send_frag_t *req_frag; /* first fragment */ + char req_buf[8]; + /* temporary buffer to hold VAPI_rkey_t */ }; typedef struct mca_ptl_ib_send_request_t mca_ptl_ib_send_request_t; diff --git a/src/mca/ptl/ib/src/ptl_ib_vapi.h b/src/mca/ptl/ib/src/ptl_ib_vapi.h index 97ad365104..564dc6b588 100644 --- a/src/mca/ptl/ib/src/ptl_ib_vapi.h +++ b/src/mca/ptl/ib/src/ptl_ib_vapi.h @@ -52,5 +52,13 @@ #define D_PRINT(fmt, args...) #endif +#if 1 +#define A_PRINT(fmt, args...) { \ + ompi_output(0, "[%s:%d:%s] " fmt, __FILE__, __LINE__, __func__, \ + ##args); \ +} +#else +#define A_PRINT(fmt, args...) +#endif #endif