diff --git a/src/mca/ptl/ib/src/Makefile.am b/src/mca/ptl/ib/src/Makefile.am index 6ed0cf45df..12c6074041 100644 --- a/src/mca/ptl/ib/src/Makefile.am +++ b/src/mca/ptl/ib/src/Makefile.am @@ -22,4 +22,5 @@ libmca_ptl_ib_la_SOURCES = \ ptl_ib_sendfrag.h \ ptl_ib_sendreq.c \ ptl_ib_sendreq.h \ + ptl_ib_ud.h \ ptl_ib_vapi.h diff --git a/src/mca/ptl/ib/src/ptl_ib.c b/src/mca/ptl/ib/src/ptl_ib.c index 8bcf8ecead..a9f38cbbcf 100644 --- a/src/mca/ptl/ib/src/ptl_ib.c +++ b/src/mca/ptl/ib/src/ptl_ib.c @@ -32,16 +32,11 @@ mca_ptl_ib_module_t mca_ptl_ib_module = { mca_ptl_ib_del_procs, mca_ptl_ib_finalize, mca_ptl_ib_send, + mca_ptl_ib_send, NULL, -#if 0 mca_ptl_ib_matched, - mca_ptl_ib_request_alloc, - mca_ptl_ib_request_return -#else - NULL, /* Sayantan: need to update matched */ - NULL, /* Sayantan: need request_init */ - NULL, /* Sayantan: need request_fini */ -#endif + mca_ptl_ib_request_init, + mca_ptl_ib_request_fini } }; @@ -124,16 +119,15 @@ int mca_ptl_ib_finalize(struct mca_ptl_base_module_t* ptl) return OMPI_SUCCESS; } -int mca_ptl_ib_request_alloc(struct mca_ptl_base_module_t* ptl, - struct mca_pml_base_send_request_t** request) +int mca_ptl_ib_request_init( struct mca_ptl_base_module_t* ptl, + struct mca_pml_base_send_request_t* request) { /* Stub */ D_PRINT("Stub\n"); return OMPI_SUCCESS; } - -void mca_ptl_ib_request_return(struct mca_ptl_base_module_t* ptl, +void mca_ptl_ib_request_fini( struct mca_ptl_base_module_t* ptl, struct mca_pml_base_send_request_t* request) { /* Stub */ @@ -155,8 +149,12 @@ int mca_ptl_ib_send( struct mca_ptl_base_module_t* ptl, int flags) { int rc; + VAPI_ret_t ret; mca_ptl_ib_send_frag_t* sendfrag; ompi_list_item_t* item; + vapi_descriptor_t* desc; + mca_ptl_base_header_t *hdr; + int header_length; if (0 == offset) { sendfrag = &((mca_ptl_ib_send_request_t*)sendreq)->req_frag; @@ -167,32 +165,136 @@ int mca_ptl_ib_send( struct mca_ptl_base_module_t* ptl, } } - rc = mca_ptl_ib_send_frag_init(sendfrag, ptl_peer, sendreq, offset, &size, flags); + hdr = (mca_ptl_base_header_t *) + ptl_peer->peer_module->send_buf[0].buf; - if(rc != OMPI_SUCCESS) { - return rc; + if(offset == 0) { + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH; + hdr->hdr_common.hdr_flags = flags; + hdr->hdr_common.hdr_size = sizeof (mca_ptl_base_match_header_t); + hdr->hdr_frag.hdr_frag_offset = offset; + hdr->hdr_frag.hdr_frag_seq = 0; + /* Frag descriptor, so that incoming ack + * will locate it */ + hdr->hdr_frag.hdr_src_ptr.lval = 0; + hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; + hdr->hdr_frag.hdr_dst_ptr.pval = 0; + hdr->hdr_frag.hdr_dst_ptr.lval = 0; + + hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid; + hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank; + hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer; + hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag; + hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed; + hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence; + header_length = sizeof (mca_ptl_base_match_header_t); + } else { + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; + hdr->hdr_common.hdr_flags = flags; + hdr->hdr_common.hdr_size = sizeof (mca_ptl_base_frag_header_t); + hdr->hdr_frag.hdr_frag_offset = offset; + hdr->hdr_frag.hdr_frag_seq = 0; + hdr->hdr_frag.hdr_src_ptr.lval = 0; + hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; /* Frag descriptor */ + hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match; + header_length = sizeof (mca_ptl_base_frag_header_t); } - /* must update the offset after actual fragment - * size is determined -- and very important -- - * before attempting to send the fragment - */ - sendreq->req_offset += size; - rc = mca_ptl_ib_peer_send(ptl_peer, sendfrag); + ptl_peer->peer_module->send_buf[0].req = &sendreq->req_base; - return rc; + desc = &ptl_peer->peer_module->send_buf[0].desc; + + desc->sr.comp_type = VAPI_SIGNALED; + desc->sr.opcode = VAPI_SEND; + desc->sr.remote_qkey = 0; + + desc->sr.remote_qp = ptl_peer->peer_proc->proc_addrs[0].qp_num; + + desc->sr.id = &ptl_peer->peer_module->send_buf[0]; + desc->sr.sg_lst_len = 1; + desc->sr.sg_lst_p = &(desc->sg_entry); + + /* Copy the data stuff */ + /* Check this later on ... */ + memcpy((void*)((char*)ptl_peer->peer_module->send_buf[0].buf + + header_length), sendreq->req_base.req_addr, + size); + + desc->sg_entry.len = header_length + size; + D_PRINT("Sent length : %d\n", desc->sg_entry.len); + + desc->sg_entry.lkey = ptl_peer->peer_module->send_buf_hndl.lkey; + desc->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) + (ptl_peer->peer_module->send_buf[0].buf); + + ret = VAPI_post_sr(ptl_peer->peer_module->nic, + ptl_peer->peer_module->my_qp_hndl, + &desc->sr); + + if(VAPI_OK != ret) { + MCA_PTL_IB_VAPI_RET(ret, "VAPI_post_sr"); + return OMPI_ERROR; + } + + return OMPI_SUCCESS; } /* * A posted receive has been matched - if required send an - * ack back to the peer and process the fragment. + * ack back to the peer and process the fragment. Copy the + * data to user buffer */ -void mca_ptl_ib_matched( - mca_ptl_base_module_t* ptl, +void mca_ptl_ib_matched(mca_ptl_base_module_t* module, mca_ptl_base_recv_frag_t* frag) { - /* Stub */ - D_PRINT("Stub\n"); + mca_pml_base_recv_request_t *request; + mca_ptl_base_header_t *header; + mca_ptl_ib_recv_frag_t *recv_frag; + + header = &frag->frag_base.frag_header; + request = frag->frag_request; + recv_frag = (mca_ptl_ib_recv_frag_t*) frag; + + D_PRINT("Matched frag\n"); + + if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) { + D_PRINT("Doh, I cannot send an ack!\n"); + } + + /* Process the fragment */ + + /* IN TCP case, IO_VEC is first allocated. + * then recv the data, and copy if needed, + * But in ELAN cases, we save the data into an + * unex buffer if the recv descriptor is not posted + * (for too long) (TODO). + * We then need to copy from + * unex_buffer to application buffer */ + + if (header->hdr_frag.hdr_frag_length > 0) { + + struct iovec iov; + ompi_proc_t *proc; + + iov.iov_base = frag->frag_base.frag_addr; + iov.iov_len = frag->frag_base.frag_size; + + proc = ompi_comm_peer_lookup(request->req_base.req_comm, + request->req_base.req_peer); + + ompi_convertor_copy(proc->proc_convertor, + &frag->frag_base.frag_convertor); + + ompi_convertor_init_for_recv( + &frag->frag_base.frag_convertor, + 0, + request->req_base.req_datatype, + request->req_base.req_count, + request->req_base.req_addr, + header->hdr_frag.hdr_frag_offset); + ompi_convertor_unpack(&frag->frag_base.frag_convertor, &iov, 1); + } + mca_ptl_ib_recv_frag_done(header, frag, request); } diff --git a/src/mca/ptl/ib/src/ptl_ib.h b/src/mca/ptl/ib/src/ptl_ib.h index 9c9b971e7c..ab0a90d46b 100644 --- a/src/mca/ptl/ib/src/ptl_ib.h +++ b/src/mca/ptl/ib/src/ptl_ib.h @@ -24,6 +24,7 @@ #include "ptl_ib_proc.h" #include "ptl_ib_peer.h" #include "ptl_ib_priv.h" +#include "ptl_ib_ud.h" /* Other IB ptl includes */ #include "ptl_ib_sendreq.h" @@ -66,21 +67,32 @@ struct mca_ptl_ib_module_t { VAPI_pd_hndl_t ptag; /* Protection Domain tag */ VAPI_cq_hndl_t cq_hndl; /* Completion Queue handle */ - VAPI_qp_hndl_t *qp_hndl; /* Array of Queue Pair handles */ EVAPI_async_handler_hndl_t async_handler; /* Async event handler used to detect weird events */ VAPI_cq_hndl_t ud_scq_hndl;/* UD send completion queue handle */ VAPI_cq_hndl_t ud_rcq_hndl;/* UD recv completion queue handle */ - mca_ptl_ib_ud_buf_t* ud_recv_buf;/* Link to UD recv buffer structures */ - mca_ptl_ib_ud_buf_t* ud_send_buf;/* Link to UD bufs which are used for sending */ + mca_ptl_ib_ud_buf_ctrl_t* ud_recv; /* Link to UD recv buffer structures */ + mca_ptl_ib_ud_buf_ctrl_t* ud_send; /* Link to UD bufs which are used for sending */ VAPI_qp_hndl_t ud_qp_hndl; /* UD queue pair handle */ VAPI_qp_prop_t ud_qp_prop; /* UD queue pair properties */ VAPI_rr_desc_t* ud_rr_hndl; /* UD receive descriptor pool */ VAPI_completion_event_handler_t ud_comp_ev_handler; /* UD completion handler */ EVAPI_compl_handler_hndl_t ud_comp_ev_hndl; /* UD completion handler handle */ + + /* Temporary fields remove after dynamic connection + * management is in place */ + VAPI_qp_hndl_t my_qp_hndl; + VAPI_qp_prop_t my_qp_prop; + /* Circular buffers */ + mca_ptl_ib_send_buf_t* send_buf; + int send_index; + mca_ptl_ib_recv_buf_t* recv_buf; + int recv_index; + vapi_memhandle_t send_buf_hndl; + vapi_memhandle_t recv_buf_hndl; }; typedef struct mca_ptl_ib_module_t mca_ptl_ib_module_t; @@ -184,17 +196,28 @@ extern int mca_ptl_ib_del_procs( ); /** - * PML->PTL Allocate a send request from the PTL modules free list. + * PML->PTL Initialize a send request for TCP cache. * * @param ptl (IN) PTL instance - * @param request (OUT) Pointer to allocated request. - * @return Status indicating if allocation was successful. + * @param request (IN) Pointer to allocated request. * - */ -extern int mca_ptl_ib_request_alloc( - struct mca_ptl_base_module_t* ptl, - struct mca_pml_base_send_request_t** -); + **/ +extern int mca_ptl_ib_request_init( + struct mca_ptl_base_module_t* ptl, + struct mca_pml_base_send_request_t* + ); + +/** + * PML->PTL Cleanup a send request that is being removed from the cache. + * + * @param ptl (IN) PTL instance + * @param request (IN) Pointer to allocated request. + * + **/ +extern void mca_ptl_ib_request_fini( + struct mca_ptl_base_module_t* ptl, + struct mca_pml_base_send_request_t* + ); /** * PML->PTL Return a send request to the PTL modules free list. diff --git a/src/mca/ptl/ib/src/ptl_ib_addr.h b/src/mca/ptl/ib/src/ptl_ib_addr.h index 81cdb44c64..a4f08bf5fd 100644 --- a/src/mca/ptl/ib/src/ptl_ib_addr.h +++ b/src/mca/ptl/ib/src/ptl_ib_addr.h @@ -7,13 +7,14 @@ * Unreliable Datagram address to peers */ struct mca_ptl_ib_ud_addr_t { - VAPI_qp_hndl_t ud_qp; /* UD qp hndl to be published */ - IB_lid_t lid; /* Local identifier */ + VAPI_qp_num_t qp_num; /* UD qp hndl to be published */ + IB_lid_t lid; /* Local identifier */ }; typedef struct mca_ptl_ib_ud_addr_t mca_ptl_ib_ud_addr_t; struct mca_ptl_ib_addr_t { - VAPI_qp_hndl_t rc_qp; /* RC qp hndl */ + VAPI_qp_num_t rc_qp; /* RC qp hndl */ + IB_lid_t lid; /* LID of the peer */ }; typedef struct mca_ptl_ib_addr_t mca_ptl_ib_addr_t; diff --git a/src/mca/ptl/ib/src/ptl_ib_component.c b/src/mca/ptl/ib/src/ptl_ib_component.c index 42fb820a61..06a8093351 100644 --- a/src/mca/ptl/ib/src/ptl_ib_component.c +++ b/src/mca/ptl/ib/src/ptl_ib_component.c @@ -90,8 +90,6 @@ static inline int mca_ptl_ib_param_register_int( int mca_ptl_ib_component_open(void) { - OBJ_CONSTRUCT(&mca_ptl_ib_component.ib_procs, ompi_list_t); - /* register super component parameters */ mca_ptl_ib_module.super.ptl_exclusivity = mca_ptl_ib_param_register_int ("exclusivity", 0); @@ -112,6 +110,10 @@ int mca_ptl_ib_component_open(void) mca_ptl_ib_component.ib_free_list_inc = mca_ptl_ib_param_register_int ("free_list_inc", 32); + OBJ_CONSTRUCT(&mca_ptl_ib_component.ib_procs, ompi_list_t); + + OBJ_CONSTRUCT (&mca_ptl_ib_component.ib_recv_frags, ompi_free_list_t); + return OMPI_SUCCESS; } @@ -146,12 +148,18 @@ static int mca_ptl_ib_component_send(void) for(i = 0; i < mca_ptl_ib_component.ib_num_ptl_modules; i++) { mca_ptl_ib_module_t* ptl = mca_ptl_ib_component.ib_ptl_modules[i]; - ud_qp_addr[i].ud_qp = ptl->ud_qp_hndl; + /* This is for the UD dynamic connection interface + ud_qp_addr[i].qp_num = ptl->ud_qp_prop.qp_num; + ud_qp_addr[i].lid = ptl->port.lid; + */ + + /* Just a quick hack for 1-to-1 communications */ + ud_qp_addr[i].qp_num = ptl->my_qp_prop.qp_num; ud_qp_addr[i].lid = ptl->port.lid; } - D_PRINT("ud_qp_addr[0].ud_qp = %d\n",(int)ud_qp_addr[0].ud_qp); - D_PRINT("ud_qp_addr[0].lid = %d\n", (int)ud_qp_addr[0].lid); + D_PRINT("QP num sent = %d, LID sent = %d\n", + ud_qp_addr[0].qp_num, ud_qp_addr[0].lid); rc = mca_base_modex_send(&mca_ptl_ib_component.super.ptlm_version, ud_qp_addr, size); @@ -198,6 +206,15 @@ mca_ptl_base_module_t** mca_ptl_ib_component_init(int *num_ptl_modules, return NULL; } + /* Initialize Receive fragments */ + ompi_free_list_init (&(mca_ptl_ib_component.ib_recv_frags), + sizeof (mca_ptl_ib_recv_frag_t), + OBJ_CLASS (mca_ptl_ib_recv_frag_t), + mca_ptl_ib_component.ib_free_list_num, + mca_ptl_ib_component.ib_free_list_max, + mca_ptl_ib_component.ib_free_list_inc, NULL); + + ret = mca_ptl_ib_get_num_hcas(&num_hcas); if ((0 == num_hcas) || (OMPI_SUCCESS != ret)) { @@ -238,7 +255,7 @@ mca_ptl_base_module_t** mca_ptl_ib_component_init(int *num_ptl_modules, sizeof(mca_ptl_ib_module)); } - /* For each ptl, do this */ + /* For each module, do this */ for(i = 0; i < mca_ptl_ib_component.ib_num_ptl_modules; i++) { if(mca_ptl_ib_get_hca_id(i, &ib_modules[i].hca_id) @@ -255,6 +272,13 @@ mca_ptl_base_module_t** mca_ptl_ib_component_init(int *num_ptl_modules, D_PRINT("hca_hndl: %d\n", ib_modules[i].nic); + if(mca_ptl_ib_alloc_pd(ib_modules[i].nic, &ib_modules[i].ptag) + != OMPI_SUCCESS) { + return NULL; + } + + D_PRINT("Protection Domain: %d\n", ib_modules[i].ptag); + /* Each HCA uses only port 1. Need to change * this so that each ptl can choose different * ports */ @@ -266,29 +290,39 @@ mca_ptl_base_module_t** mca_ptl_ib_component_init(int *num_ptl_modules, D_PRINT("LID: %d\n", ib_modules[i].port.lid); - if(mca_ptl_ib_alloc_pd(ib_modules[i].nic, &ib_modules[i].ptag) - != OMPI_SUCCESS) { - return NULL; - } - - D_PRINT("Protection Domain: %d\n", (int)ib_modules[i].ptag); if(mca_ptl_ib_create_cq(ib_modules[i].nic, &ib_modules[i].cq_hndl) != OMPI_SUCCESS) { return NULL; } - D_PRINT("CQ handle: %d\n", (int)ib_modules[i].cq_hndl); + D_PRINT("CQ handle: %d\n", ib_modules[i].cq_hndl); - if(mca_ptl_ib_ud_cq_init(ib_modules[i].nic, &ib_modules[i].ud_scq_hndl, - &ib_modules[i].ud_rcq_hndl) + if(mca_ptl_ib_create_cq(ib_modules[i].nic, &ib_modules[i].ud_scq_hndl) != OMPI_SUCCESS) { return NULL; } - if(mca_ptl_ib_ud_qp_init(ib_modules[i].nic, ib_modules[i].ud_rcq_hndl, - ib_modules[i].ud_scq_hndl, ib_modules[i].ptag, - &ib_modules[i].ud_qp_hndl, &ib_modules[i].ud_qp_prop) + if(mca_ptl_ib_create_cq(ib_modules[i].nic, &ib_modules[i].ud_rcq_hndl) + != OMPI_SUCCESS) { + return NULL; + } + + D_PRINT("UD_SCQ handle: %d, UD_RCQ handle: %d\n", + ib_modules[i].ud_scq_hndl, ib_modules[i].ud_rcq_hndl); + + if(mca_ptl_ib_create_qp(ib_modules[i].nic, ib_modules[i].ptag, + ib_modules[i].ud_rcq_hndl, ib_modules[i].ud_scq_hndl, + &ib_modules[i].ud_qp_hndl, &ib_modules[i].ud_qp_prop, + VAPI_TS_UD) + != OMPI_SUCCESS) { + return NULL; + } + + D_PRINT("UD Qp handle: %d, Qp num: %d\n", + ib_modules[i].ud_qp_hndl, ib_modules[i].ud_qp_prop.qp_num); + + if(mca_ptl_ib_ud_qp_init(ib_modules[i].nic, ib_modules[i].ud_qp_hndl) != OMPI_SUCCESS) { return NULL; } @@ -300,35 +334,7 @@ mca_ptl_base_module_t** mca_ptl_ib_component_init(int *num_ptl_modules, return NULL; } - /* Allocate the UD buffers */ - - ib_modules[i].ud_recv_buf = NULL; - - ib_modules[i].ud_recv_buf = malloc(MAX_UD_PREPOST_DEPTH * - sizeof(mca_ptl_ib_ud_buf_t)); - - if(NULL == ib_modules[i].ud_recv_buf) { - return NULL; - } - - /* Prepare the UD buffers for communication: - * - * 1. register - * 2. fill up descriptors - */ - if(mca_ptl_ib_prep_ud_bufs(ib_modules[i].nic, ib_modules[i].ud_recv_buf, - IB_RECV, MAX_UD_PREPOST_DEPTH) - != OMPI_SUCCESS) { - return NULL; - } - - /* Post the UD recv descriptors */ - if(mca_ptl_ib_post_ud_recv(ib_modules[i].nic, ib_modules[i].ud_qp_hndl, - ib_modules[i].ud_recv_buf, MAX_UD_PREPOST_DEPTH) - != OMPI_SUCCESS) { - return NULL; - } - +#if 0 if(mca_ptl_ib_get_comp_ev_hndl(&ib_modules[i].ud_comp_ev_handler) != OMPI_SUCCESS) { return NULL; @@ -336,7 +342,7 @@ mca_ptl_base_module_t** mca_ptl_ib_component_init(int *num_ptl_modules, /* Set the completion event handler for the UD recv queue */ if(mca_ptl_ib_set_comp_ev_hndl(ib_modules[i].nic, - ib_modules[i].ud_rcq_hndl, + ib_modules[i].cq_hndl, ib_modules[i].ud_comp_ev_handler, (void*)NULL, &ib_modules[i].ud_comp_ev_hndl) != OMPI_SUCCESS) { @@ -344,10 +350,35 @@ mca_ptl_base_module_t** mca_ptl_ib_component_init(int *num_ptl_modules, } /* Request for interrupts on the UD recv queue */ - if(mca_ptl_ib_req_comp_notif(ib_modules[i].nic, ib_modules[i].ud_rcq_hndl) + if(mca_ptl_ib_req_comp_notif(ib_modules[i].nic, + ib_modules[i].cq_hndl) != OMPI_SUCCESS) { return NULL; } +#endif + + /* Just for point-to-point communication */ + /* Till dynamic connection management comes */ + + /* Create the QP I am going to use to communicate + * with this peer */ + if(mca_ptl_ib_create_qp(ib_modules[i].nic, + ib_modules[i].ptag, + ib_modules[i].cq_hndl, + ib_modules[i].cq_hndl, + &ib_modules[i].my_qp_hndl, + &ib_modules[i].my_qp_prop, + VAPI_TS_RC) + != OMPI_SUCCESS) { + } + + D_PRINT("QP hndl:%d, num: %d for 1-to-1 communication\n", + ib_modules[i].my_qp_hndl, + ib_modules[i].my_qp_prop.qp_num); + + ib_modules[i].send_index = 0; + ib_modules[i].recv_index = 0; + } /* Allocate list of IB ptl pointers */ @@ -405,8 +436,6 @@ int mca_ptl_ib_component_control(int param, void* value, size_t size) } } #endif - /* Stub */ - D_PRINT("Stub\n"); return OMPI_SUCCESS; } @@ -417,7 +446,66 @@ int mca_ptl_ib_component_control(int param, void* value, size_t size) int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp) { - /* Stub */ - D_PRINT("Stub\n"); + VAPI_ret_t ret; + VAPI_wc_desc_t comp; + + mca_ptl_base_header_t *header; + mca_ptl_ib_recv_buf_t *recv_buf; + mca_ptl_ib_send_buf_t *send_buf; + mca_pml_base_request_t *req; + + D_PRINT("Checking completions ... \n"); + + ret = VAPI_poll_cq(mca_ptl_ib_component.ib_ptl_modules[0]->nic, + mca_ptl_ib_component.ib_ptl_modules[0]->cq_hndl, + &comp); + if(VAPI_OK == ret) { + if(comp.status != VAPI_SUCCESS) { + fprintf(stderr,"Got error : %s, Vendor code : %d\n", + VAPI_wc_status_sym(comp.status), + comp.vendor_err_syndrome); + + } + if(VAPI_CQE_SQ_SEND_DATA == comp.opcode) { + D_PRINT("Send completion, id:%d\n", + comp.id); + send_buf = (mca_ptl_ib_send_buf_t*) (unsigned int)comp.id; + header = (mca_ptl_base_header_t*) send_buf->buf; + + req = (mca_pml_base_request_t *) send_buf->req; + + mca_ptl_ib_component.ib_ptl_modules[0]->super.ptl_send_progress( + mca_ptl_ib_component.ib_ptl_modules[0], + req, + header->hdr_frag.hdr_frag_length); + } + else if(VAPI_CQE_RQ_SEND_DATA == comp.opcode) { + D_PRINT("Received message completion len = %d, id : %d\n", + comp.byte_len, comp.id); + + recv_buf = (mca_ptl_ib_recv_buf_t*) (unsigned int)comp.id; + header = (mca_ptl_base_header_t*) recv_buf->buf; + + switch(header->hdr_common.hdr_type) { + case MCA_PTL_HDR_TYPE_MATCH: + D_PRINT("Header type match\n"); + + mca_ptl_ib_frag(mca_ptl_ib_component.ib_ptl_modules[0], + header); + break; + case MCA_PTL_HDR_TYPE_FRAG: + D_PRINT("Header type frag\n"); + break; + default : + D_PRINT("Header, what header?\n"); + break; + } + } + else { + D_PRINT("Got Unknown completion! Opcode : %d\n", + comp.opcode); + } + } + return OMPI_SUCCESS; } diff --git a/src/mca/ptl/ib/src/ptl_ib_peer.c b/src/mca/ptl/ib/src/ptl_ib_peer.c index 8b67d340a3..d9596642f6 100644 --- a/src/mca/ptl/ib/src/ptl_ib_peer.c +++ b/src/mca/ptl/ib/src/ptl_ib_peer.c @@ -57,62 +57,21 @@ static double mca_ptl_ib_get_us(void) static int mca_ptl_ib_peer_start_connect(mca_ptl_base_peer_t* peer) { - VAPI_ret_t ret; - VAPI_ud_av_t av; - VAPI_ud_av_hndl_t av_hndl; - mca_ptl_ib_module_t* peer_module; - - peer_module = peer->peer_module; + int remote_qp_num; - peer_module->ud_send_buf = malloc(sizeof(mca_ptl_ib_ud_buf_t) * - MAX_UD_PREPOST_DEPTH); + peer->peer_addr = (mca_ptl_ib_addr_t*) + malloc(sizeof(mca_ptl_ib_addr_t)); - mca_ptl_ib_prep_ud_bufs(peer_module->nic, peer_module->ud_send_buf, - IB_SEND, MAX_UD_PREPOST_DEPTH); - peer_module->ud_send_buf[0].buf_data->qp_hndl = 101; + D_PRINT("QP num:%d for rank %d:", + peer->peer_qp_prop.qp_num, + peer->peer_proc->proc_ompi->proc_name.vpid); + scanf("%d", &remote_qp_num); - av.dgid[0] = 0; - av.dlid = peer->peer_proc->proc_addrs[0].lid; - av.grh_flag = TRUE; - av.flow_label = DEFAULT_FLOW_LABEL; - av.hop_limit = DEFAULT_HOP_LIMIT; - av.sgid_index = 0; - av.sl = DEFAULT_SERVICE_LEVEL; - av.port = DEFAULT_PORT; - av.src_path_bits = DEFAULT_SRC_PATH_BITS; - av.static_rate = DEFAULT_STATIC_RATE; - av.traffic_class = DEFAULT_TRAFFIC_CLASS; + peer->peer_addr->rc_qp = remote_qp_num; - ret = VAPI_create_addr_hndl(peer_module->nic, peer_module->ptag, - &av, &av_hndl); + D_PRINT("You entered: %d\n", peer->peer_addr->rc_qp); - if (VAPI_OK != ret) { - return OMPI_ERROR; - } - - peer_module->ud_send_buf[0].desc.sr.remote_qkey = 0; - peer_module->ud_send_buf[0].desc.sr.remote_qp = peer->peer_proc->proc_addrs[0].ud_qp; - peer_module->ud_send_buf[0].desc.sr.remote_ah = av_hndl; - peer_module->ud_send_buf[0].desc.sr.id = (VAPI_virt_addr_t) (MT_virt_addr_t)&peer_module->ud_send_buf[0]; - peer_module->ud_send_buf[0].desc.sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) - peer_module->ud_send_buf[0].buf_data; - peer_module->ud_send_buf[0].desc.sg_entry.len = 4; - peer_module->ud_send_buf[0].desc.sg_entry.lkey = peer_module->ud_send_buf[0].memhandle.lkey; - peer_module->ud_send_buf[0].desc.sr.set_se = FALSE; - peer_module->ud_send_buf[0].desc.sr.fence = FALSE; - - ret = VAPI_post_sr(peer_module->nic, - peer_module->ud_qp_hndl, - &(peer_module->ud_send_buf[0].desc.sr)); - - D_PRINT("Remote QP: %d, Remote LID : %d, Posted sr: %s\n", - peer_module->ud_send_buf[0].desc.sr.remote_qp, - av.dlid, VAPI_strerror(ret)); - - if(VAPI_OK != ret) { - return OMPI_ERROR; - } return OMPI_SUCCESS; } diff --git a/src/mca/ptl/ib/src/ptl_ib_peer.h b/src/mca/ptl/ib/src/ptl_ib_peer.h index c2772b0275..87f348d446 100644 --- a/src/mca/ptl/ib/src/ptl_ib_peer.h +++ b/src/mca/ptl/ib/src/ptl_ib_peer.h @@ -56,6 +56,8 @@ struct mca_ptl_base_peer_t { ompi_mutex_t peer_recv_lock; /**< lock for concurrent access to peer state */ ompi_event_t peer_send_event; /**< event for async processing of send frags */ ompi_event_t peer_recv_event; /**< event for async processing of recv frags */ + VAPI_qp_hndl_t peer_qp_hndl; /**< My QP for the peer */ + VAPI_qp_prop_t peer_qp_prop; /**< My QP properties */ }; typedef struct mca_ptl_base_peer_t mca_ptl_base_peer_t; typedef struct mca_ptl_base_peer_t mca_ptl_ib_peer_t; diff --git a/src/mca/ptl/ib/src/ptl_ib_priv.c b/src/mca/ptl/ib/src/ptl_ib_priv.c index 1d61c44ba4..70ac63230e 100644 --- a/src/mca/ptl/ib/src/ptl_ib_priv.c +++ b/src/mca/ptl/ib/src/ptl_ib_priv.c @@ -55,87 +55,178 @@ static void ud_completion_handler(VAPI_hca_hndl_t nic, } } -int mca_ptl_ib_ud_cq_init(VAPI_hca_hndl_t nic, - VAPI_cq_hndl_t* ud_scq_hndl, - VAPI_cq_hndl_t* ud_rcq_hndl) -{ - VAPI_ret_t ret; - VAPI_cqe_num_t act_num_cqe = 0; - - ret = VAPI_create_cq(nic, DEFAULT_CQ_SIZE, - ud_scq_hndl, &act_num_cqe); - - D_PRINT("UD Send CQ handle :%d\n", (int)*ud_scq_hndl); - - if((VAPI_OK != ret) || (0 == act_num_cqe)) { - MCA_PTL_IB_VAPI_RET(ret, "VAPI_create_cq"); - return OMPI_ERROR; - } - - /* Send completion queue was allocated successfully, - * proceed to allocate receive completion queue */ - - act_num_cqe = 0; - - ret = VAPI_create_cq(nic, DEFAULT_CQ_SIZE, - ud_rcq_hndl, &act_num_cqe); - - D_PRINT("UD Recv CQ handle :%d\n", (int)*ud_rcq_hndl); - - if((VAPI_OK != ret) || (act_num_cqe == 0)) { - MCA_PTL_IB_VAPI_RET(ret, "VAPI_create_cq"); - return OMPI_ERROR; - } - - return OMPI_SUCCESS; -} - -/* Set up UD Completion Queue and Queue pair */ - -int mca_ptl_ib_ud_qp_init(VAPI_hca_hndl_t nic, - VAPI_cq_hndl_t ud_rcq_hndl, - VAPI_cq_hndl_t ud_scq_hndl, +int mca_ptl_ib_create_qp(VAPI_hca_hndl_t nic, VAPI_pd_hndl_t ptag, - VAPI_qp_hndl_t* ud_qp_hndl, - VAPI_qp_prop_t* ud_qp_prop) + VAPI_cq_hndl_t recv_cq, + VAPI_cq_hndl_t send_cq, + VAPI_qp_hndl_t* qp_hndl, + VAPI_qp_prop_t* qp_prop, + int transport_type) { - VAPI_qp_init_attr_t qp_init_attr; - VAPI_qp_attr_t qp_attr; - VAPI_qp_cap_t qp_cap; - VAPI_qp_attr_mask_t qp_attr_mask; - VAPI_ret_t ret; + VAPI_ret_t ret; + VAPI_qp_init_attr_t qp_init_attr; - qp_init_attr.cap.max_oust_wr_rq = DEFAULT_UD_WQ_SIZE; - qp_init_attr.cap.max_oust_wr_sq = DEFAULT_UD_WQ_SIZE; - qp_init_attr.cap.max_sg_size_rq = DEFAULT_UD_SG_LIST; - qp_init_attr.cap.max_sg_size_sq = DEFAULT_UD_SG_LIST; - qp_init_attr.pd_hndl = ptag; + switch(transport_type) { - /* We don't have Reliable Datagram Handle right now */ - qp_init_attr.rdd_hndl = 0; + case VAPI_TS_UD: /* Set up UD qp parameters */ + qp_init_attr.cap.max_oust_wr_rq = DEFAULT_UD_WQ_SIZE; + qp_init_attr.cap.max_oust_wr_sq = DEFAULT_UD_WQ_SIZE; + qp_init_attr.cap.max_sg_size_rq = DEFAULT_UD_SG_LIST; + qp_init_attr.cap.max_sg_size_sq = DEFAULT_UD_SG_LIST; + qp_init_attr.pd_hndl = ptag; - /* Set Send and Recv completion queues */ - qp_init_attr.rq_cq_hndl = ud_rcq_hndl; - qp_init_attr.sq_cq_hndl = ud_scq_hndl; - - /* Signal all work requests on this queue pair */ - qp_init_attr.rq_sig_type = VAPI_SIGNAL_REQ_WR; - qp_init_attr.sq_sig_type = VAPI_SIGNAL_REQ_WR; + /* We don't have Reliable Datagram Handle right now */ + qp_init_attr.rdd_hndl = 0; - /* Use Unreliable Datagram transport service */ - qp_init_attr.ts_type = VAPI_TS_UD; + /* Set Send and Recv completion queues */ + qp_init_attr.rq_cq_hndl = recv_cq; + qp_init_attr.sq_cq_hndl = send_cq; + + /* Signal all work requests on this queue pair */ + qp_init_attr.rq_sig_type = VAPI_SIGNAL_REQ_WR; + qp_init_attr.sq_sig_type = VAPI_SIGNAL_REQ_WR; + + /* Use Unreliable Datagram transport service */ + qp_init_attr.ts_type = VAPI_TS_UD; + + break; + case VAPI_TS_RC: /* Set up RC qp parameters */ + qp_init_attr.cap.max_oust_wr_rq = DEFAULT_UD_WQ_SIZE; + qp_init_attr.cap.max_oust_wr_sq = DEFAULT_UD_WQ_SIZE; + qp_init_attr.cap.max_sg_size_rq = DEFAULT_UD_SG_LIST; + qp_init_attr.cap.max_sg_size_sq = DEFAULT_UD_SG_LIST; + qp_init_attr.pd_hndl = ptag; + /* We don't have Reliable Datagram Handle right now */ + qp_init_attr.rdd_hndl = 0; + + /* Set Send and Recv completion queues */ + qp_init_attr.rq_cq_hndl = recv_cq; + qp_init_attr.sq_cq_hndl = send_cq; + + /* Signal all work requests on this queue pair */ + qp_init_attr.rq_sig_type = VAPI_SIGNAL_REQ_WR; + qp_init_attr.sq_sig_type = VAPI_SIGNAL_REQ_WR; + + /* Use Unreliable Datagram transport service */ + qp_init_attr.ts_type = VAPI_TS_RC; + break; + default: + return OMPI_ERR_NOT_IMPLEMENTED; + } ret = VAPI_create_qp(nic, &qp_init_attr, - ud_qp_hndl, ud_qp_prop); + qp_hndl, qp_prop); if(VAPI_OK != ret) { MCA_PTL_IB_VAPI_RET(ret, "VAPI_create_qp"); return OMPI_ERROR; } + return OMPI_SUCCESS; +} - D_PRINT("UD QP[%d] created ..hndl=%d\n", - ud_qp_prop->qp_num, - (int)*ud_qp_hndl); +int mca_ptl_ib_rc_qp_init(VAPI_hca_hndl_t nic, + VAPI_qp_hndl_t qp_hndl, + VAPI_qp_num_t remote_qp, + IB_lid_t remote_lid) +{ + VAPI_ret_t ret; + VAPI_qp_attr_t qp_attr; + VAPI_qp_attr_mask_t qp_attr_mask; + VAPI_qp_cap_t qp_cap; + + /* Modifying QP to INIT */ + QP_ATTR_MASK_CLR_ALL(qp_attr_mask); + qp_attr.qp_state = VAPI_INIT; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_STATE); + qp_attr.pkey_ix = DEFAULT_PKEY_IX; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PKEY_IX); + qp_attr.port = DEFAULT_PORT; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PORT); + qp_attr.remote_atomic_flags = VAPI_EN_REM_WRITE | VAPI_EN_REM_READ; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_REMOTE_ATOMIC_FLAGS); + + ret = VAPI_modify_qp(nic, qp_hndl, + &qp_attr, &qp_attr_mask, &qp_cap); + + if(VAPI_OK != ret) { + MCA_PTL_IB_VAPI_RET(ret, "VAPI_modify_qp"); + return OMPI_ERROR; + } + + D_PRINT("Modified to init..Qp %d", qp_hndl); + + /********************** INIT --> RTR ************************/ + QP_ATTR_MASK_CLR_ALL(qp_attr_mask); + qp_attr.qp_state = VAPI_RTR; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_STATE); + qp_attr.qp_ous_rd_atom = DEFAULT_QP_OUS_RD_ATOM; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_OUS_RD_ATOM); + qp_attr.path_mtu = DEFAULT_MTU; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PATH_MTU); + qp_attr.rq_psn = DEFAULT_PSN; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_RQ_PSN); + qp_attr.pkey_ix = DEFAULT_PKEY_IX; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PKEY_IX); + qp_attr.min_rnr_timer = DEFAULT_MIN_RNR_TIMER; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_MIN_RNR_TIMER); + + qp_attr.av.sl = DEFAULT_SERVICE_LEVEL; + qp_attr.av.grh_flag = FALSE; + qp_attr.av.static_rate = DEFAULT_STATIC_RATE; + qp_attr.av.src_path_bits = DEFAULT_SRC_PATH_BITS; + + qp_attr.dest_qp_num = remote_qp; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_DEST_QP_NUM); + qp_attr.av.dlid = remote_lid; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_AV); + + ret = VAPI_modify_qp(nic, qp_hndl, + &qp_attr, &qp_attr_mask, &qp_cap); + + if(VAPI_OK != ret) { + MCA_PTL_IB_VAPI_RET(ret, "VAPI_modify_qp"); + return OMPI_ERROR; + } + + D_PRINT("Modified to RTR..Qp %d", qp_hndl); + + /************** RTS *******************/ + QP_ATTR_MASK_CLR_ALL(qp_attr_mask); + qp_attr.qp_state = VAPI_RTS; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_STATE); + qp_attr.sq_psn = DEFAULT_PSN; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_SQ_PSN); + qp_attr.timeout = DEFAULT_TIME_OUT; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_TIMEOUT); + qp_attr.retry_count = DEFAULT_RETRY_COUNT; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_RETRY_COUNT); + qp_attr.rnr_retry = DEFAULT_RNR_RETRY; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_RNR_RETRY); + qp_attr.ous_dst_rd_atom = DEFAULT_MAX_RDMA_DST_OPS; + QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_OUS_DST_RD_ATOM); + + ret = VAPI_modify_qp(nic, qp_hndl, + &qp_attr, &qp_attr_mask, &qp_cap); + + if(VAPI_OK != ret) { + MCA_PTL_IB_VAPI_RET(ret, "VAPI_modify_qp"); + return OMPI_ERROR; + } + D_PRINT("Modified to RTS..Qp %d", qp_hndl); + + return OMPI_SUCCESS; +} + + +/* Initialize UD queue pairs */ + +int mca_ptl_ib_ud_qp_init(VAPI_hca_hndl_t nic, + VAPI_qp_hndl_t ud_qp_hndl) +{ + VAPI_qp_attr_t qp_attr; + VAPI_qp_cap_t qp_cap; + VAPI_qp_attr_mask_t qp_attr_mask; + VAPI_ret_t ret; /* Modifying QP to INIT */ QP_ATTR_MASK_CLR_ALL(qp_attr_mask); @@ -149,7 +240,7 @@ int mca_ptl_ib_ud_qp_init(VAPI_hca_hndl_t nic, QP_ATTR_MASK_SET(qp_attr_mask,QP_ATTR_QKEY); ret = VAPI_modify_qp(nic, - (VAPI_qp_hndl_t)*ud_qp_hndl, &qp_attr, + ud_qp_hndl, &qp_attr, &qp_attr_mask, &qp_cap); if(VAPI_OK != ret) { @@ -166,8 +257,7 @@ int mca_ptl_ib_ud_qp_init(VAPI_hca_hndl_t nic, qp_attr.qp_state = VAPI_RTR; QP_ATTR_MASK_SET(qp_attr_mask,QP_ATTR_QP_STATE); - ret = VAPI_modify_qp(nic, - (VAPI_qp_hndl_t)*ud_qp_hndl, &qp_attr, + ret = VAPI_modify_qp(nic, ud_qp_hndl, &qp_attr, &qp_attr_mask, &qp_cap); if(VAPI_OK != ret) { @@ -186,14 +276,14 @@ int mca_ptl_ib_ud_qp_init(VAPI_hca_hndl_t nic, qp_attr.sq_psn = DEFAULT_PSN; QP_ATTR_MASK_SET(qp_attr_mask,QP_ATTR_SQ_PSN); - ret = VAPI_modify_qp(nic, - (VAPI_qp_hndl_t)*ud_qp_hndl, &qp_attr, + ret = VAPI_modify_qp(nic, ud_qp_hndl, &qp_attr, &qp_attr_mask, &qp_cap); if(VAPI_OK != ret) { MCA_PTL_IB_VAPI_RET(ret, "VAPI_modify_qp"); return OMPI_ERROR; } + D_PRINT("Modified UD to RTS..Qp\n"); /* Everything was fine ... return success! */ @@ -340,10 +430,11 @@ int mca_ptl_ib_set_async_handler(VAPI_hca_hndl_t nic, return OMPI_SUCCESS; } -int mca_ptl_ib_prep_ud_bufs(VAPI_hca_hndl_t nic, +int mca_ptl_ib_prep_ud_bufs(VAPI_hca_hndl_t nic, VAPI_pd_hndl_t ptag, mca_ptl_ib_ud_buf_t* ud_buf, IB_wr_t wr_type, int num_bufs) { +#if 0 int i; vapi_descriptor_t* desc; @@ -358,7 +449,7 @@ int mca_ptl_ib_prep_ud_bufs(VAPI_hca_hndl_t nic, } if(mca_ptl_ib_register_mem(nic, - (void*) ud_buf[i].buf_data, + (void*) ud_buf[i].buf_data, ptag, sizeof(mca_ptl_ib_ud_buf_data_t), &ud_buf[i].memhandle) != OMPI_SUCCESS) { @@ -398,11 +489,12 @@ int mca_ptl_ib_prep_ud_bufs(VAPI_hca_hndl_t nic, sizeof(mca_ptl_ib_ud_buf_data_t), ud_buf[i].memhandle.lkey); } +#endif return OMPI_SUCCESS; } -int mca_ptl_ib_register_mem(VAPI_hca_hndl_t nic, +int mca_ptl_ib_register_mem(VAPI_hca_hndl_t nic, VAPI_pd_hndl_t ptag, void* buf, int len, vapi_memhandle_t* memhandle) { VAPI_ret_t ret; @@ -412,7 +504,7 @@ int mca_ptl_ib_register_mem(VAPI_hca_hndl_t nic, mr_in.acl = VAPI_EN_LOCAL_WRITE | VAPI_EN_REMOTE_WRITE; mr_in.l_key = 0; mr_in.r_key = 0; - mr_in.pd_hndl = nic; + mr_in.pd_hndl = ptag; mr_in.size = len; mr_in.start = (VAPI_virt_addr_t) (MT_virt_addr_t) buf; mr_in.type = VAPI_MR; @@ -497,3 +589,57 @@ int mca_ptl_ib_get_comp_ev_hndl(VAPI_completion_event_handler_t* handler_ptr) return OMPI_SUCCESS; } + +void mca_ptl_ib_frag(struct mca_ptl_ib_module_t* module, + mca_ptl_base_header_t * header) +{ + /* Allocate a recv frag descriptor */ + mca_ptl_ib_recv_frag_t *recv_frag; + ompi_list_item_t *item; + + bool matched; + int rc = OMPI_SUCCESS; + + OMPI_FREE_LIST_GET(&mca_ptl_ib_component.ib_recv_frags, + item, rc); + + while (OMPI_SUCCESS != rc) { + /* TODO: progress the recv state machine */ + D_PRINT("Retry to allocate a recv fragment\n"); + OMPI_FREE_LIST_GET (&mca_ptl_ib_component.ib_recv_frags, + item, rc); + } + + recv_frag = (mca_ptl_ib_recv_frag_t *) item; + recv_frag->super.frag_base.frag_owner = + (mca_ptl_base_module_t *) module; + + recv_frag->super.frag_base.frag_peer = NULL; + recv_frag->super.frag_request = NULL; + recv_frag->super.frag_is_buffered = false; + + /* Copy the header, mca_ptl_base_match() does not do what it claims */ + recv_frag->super.frag_base.frag_header = *header; + + /* Taking the data starting point be default */ + recv_frag->super.frag_base.frag_addr = + (char *) header + sizeof (mca_ptl_base_header_t); + recv_frag->super.frag_base.frag_size = header->hdr_frag.hdr_frag_length; + + /* match with preposted requests */ + matched = module->super.ptl_match( + recv_frag->super.frag_base.frag_owner, + &recv_frag->super, + &recv_frag->super.frag_base.frag_header.hdr_match); + + if (!matched) { + /* Oh my GOD !!! */ + D_PRINT("Can't match buffer. Mama is unhappy\n"); + memcpy (recv_frag->unex_buf, + (char *) header + sizeof (mca_ptl_base_header_t), + header->hdr_frag.hdr_frag_length); + recv_frag->super.frag_is_buffered = true; + recv_frag->super.frag_base.frag_addr = recv_frag->unex_buf; + + } +} diff --git a/src/mca/ptl/ib/src/ptl_ib_priv.h b/src/mca/ptl/ib/src/ptl_ib_priv.h index 64e5051e6e..5043d66c0c 100644 --- a/src/mca/ptl/ib/src/ptl_ib_priv.h +++ b/src/mca/ptl/ib/src/ptl_ib_priv.h @@ -9,6 +9,8 @@ * queue pair handles? */ #define MAX_UD_PREPOST_DEPTH (1) +#define BUFSIZE (4096) +#define NUM_BUFS (5000) typedef enum { IB_RECV, @@ -28,25 +30,27 @@ struct vapi_descriptor_t { VAPI_rr_desc_t rr; VAPI_sr_desc_t sr; }; + VAPI_sg_lst_entry_t sg_entry; }; typedef struct vapi_descriptor_t vapi_descriptor_t; -struct mca_ptl_ib_ud_buf_data_t { - VAPI_qp_hndl_t qp_hndl; /* Remote QP handle */ -}; - -typedef struct mca_ptl_ib_ud_buf_data_t mca_ptl_ib_ud_buf_data_t; - -struct mca_ptl_ib_ud_buf_t { +struct mca_ptl_ib_send_buf_t { + mca_pml_base_request_t *req; vapi_descriptor_t desc; - vapi_memhandle_t memhandle; - mca_ptl_ib_ud_buf_data_t* buf_data; + char buf[4096]; }; -typedef struct mca_ptl_ib_ud_buf_t mca_ptl_ib_ud_buf_t; +typedef struct mca_ptl_ib_send_buf_t mca_ptl_ib_send_buf_t; +struct mca_ptl_ib_recv_buf_t { + mca_pml_base_request_t *req; + vapi_descriptor_t desc; + char buf[4096]; +}; + +typedef struct mca_ptl_ib_recv_buf_t mca_ptl_ib_recv_buf_t; #define MCA_PTL_IB_UD_RECV_DESC(ud_buf, len) { \ desc->rr.comp_type = VAPI_SIGNALED; \ @@ -74,9 +78,7 @@ typedef struct mca_ptl_ib_ud_buf_t mca_ptl_ib_ud_buf_t; int mca_ptl_ib_ud_cq_init(VAPI_hca_hndl_t, VAPI_cq_hndl_t*, VAPI_cq_hndl_t*); -int mca_ptl_ib_ud_qp_init(VAPI_hca_hndl_t, VAPI_cq_hndl_t, - VAPI_cq_hndl_t, VAPI_pd_hndl_t, VAPI_qp_hndl_t*, - VAPI_qp_prop_t*); +int mca_ptl_ib_ud_qp_init(VAPI_hca_hndl_t, VAPI_qp_hndl_t); int mca_ptl_ib_get_num_hcas(uint32_t*); int mca_ptl_ib_get_hca_id(int, VAPI_hca_id_t*); int mca_ptl_ib_get_hca_hndl(VAPI_hca_id_t, VAPI_hca_hndl_t*); @@ -85,10 +87,7 @@ int mca_ptl_ib_alloc_pd(VAPI_hca_hndl_t, VAPI_pd_hndl_t*); int mca_ptl_ib_create_cq(VAPI_hca_hndl_t, VAPI_cq_hndl_t*); int mca_ptl_ib_set_async_handler(VAPI_hca_hndl_t, EVAPI_async_handler_hndl_t*); -int mca_ptl_ib_post_ud_recv(VAPI_hca_hndl_t, VAPI_qp_hndl_t, - mca_ptl_ib_ud_buf_t*, int); -int mca_ptl_ib_prep_ud_bufs(VAPI_hca_hndl_t, mca_ptl_ib_ud_buf_t*, IB_wr_t, int); -int mca_ptl_ib_register_mem(VAPI_hca_hndl_t, void*, int, +int mca_ptl_ib_register_mem(VAPI_hca_hndl_t, VAPI_pd_hndl_t, void*, int, vapi_memhandle_t*); int mca_ptl_ib_set_comp_ev_hndl(VAPI_hca_hndl_t, VAPI_cq_hndl_t, VAPI_completion_event_handler_t, void*, @@ -96,6 +95,12 @@ int mca_ptl_ib_set_comp_ev_hndl(VAPI_hca_hndl_t, VAPI_cq_hndl_t, int mca_ptl_ib_req_comp_notif(VAPI_hca_hndl_t,VAPI_cq_hndl_t); int mca_ptl_ib_get_comp_ev_hndl(VAPI_completion_event_handler_t*); int mca_ptl_ib_init_send(void*, VAPI_qp_hndl_t, int); - +int mca_ptl_ib_create_qp(VAPI_hca_hndl_t, VAPI_pd_hndl_t, + VAPI_cq_hndl_t, VAPI_cq_hndl_t, VAPI_qp_hndl_t*, + VAPI_qp_prop_t*, int); +int mca_ptl_ib_rc_qp_init(VAPI_hca_hndl_t, VAPI_qp_hndl_t, + VAPI_qp_num_t, IB_lid_t); +void mca_ptl_ib_frag(struct mca_ptl_ib_module_t* module, + mca_ptl_base_header_t * header); #endif /* MCA_PTL_IB_PRIV_H */ diff --git a/src/mca/ptl/ib/src/ptl_ib_proc.c b/src/mca/ptl/ib/src/ptl_ib_proc.c index 02ea402dda..a6bf55d698 100644 --- a/src/mca/ptl/ib/src/ptl_ib_proc.c +++ b/src/mca/ptl/ib/src/ptl_ib_proc.c @@ -1,6 +1,5 @@ #include "ompi_config.h" -#include "include/sys/atomic.h" #include "class/ompi_hash_table.h" #include "mca/base/mca_base_module_exchange.h" @@ -85,8 +84,12 @@ static mca_ptl_ib_proc_t* mca_ptl_ib_proc_lookup_ompi(ompi_proc_t* ompi_proc) mca_ptl_ib_proc_t* mca_ptl_ib_proc_create(ompi_proc_t* ompi_proc) { - int rc; + int rc, my_rank, i; size_t size; + char* str_rank; + VAPI_ret_t ret; + + mca_ptl_ib_module_t* module = NULL; mca_ptl_ib_proc_t* module_proc = NULL; @@ -125,8 +128,8 @@ mca_ptl_ib_proc_t* mca_ptl_ib_proc_create(ompi_proc_t* ompi_proc) } D_PRINT("UD q.p. obtained is: %d, Lid : %d\n", - (int)module_proc->proc_addrs[0].ud_qp, - (int)module_proc->proc_addrs[0].lid); + module_proc->proc_addrs[0].qp_num, + module_proc->proc_addrs[0].lid); if(0 != (size % sizeof(mca_ptl_ib_ud_addr_t))) { ompi_output(0, "mca_ptl_ib_proc_create: mca_base_modex_recv: " @@ -148,6 +151,100 @@ mca_ptl_ib_proc_t* mca_ptl_ib_proc_create(ompi_proc_t* ompi_proc) return NULL; } + /* HACK: Till dyn. connection management comes through, + * just establish the RC connection here */ + + str_rank = getenv("OMPI_MCA_pcm_cofs_procid"); + if(NULL != str_rank) { + my_rank = atoi(str_rank); + } else { + D_PRINT("Rank, what rank?"); + } + + if(my_rank != ompi_proc->proc_name.vpid) { + D_PRINT("I %d Have to create connection for %d", + my_rank, ompi_proc->proc_name.vpid); + + module = mca_ptl_ib_component.ib_ptl_modules[0]; + + /* Make the RC QP transitions */ + if(mca_ptl_ib_rc_qp_init(module->nic, + module->my_qp_hndl, + module_proc->proc_addrs[0].qp_num, + module_proc->proc_addrs[0].lid) + != OMPI_SUCCESS) { + return NULL; + } + + /* Allocate the send and recv buffers */ + + module->send_buf = + malloc(sizeof(mca_ptl_ib_send_buf_t) * NUM_BUFS); + + if(NULL == module->send_buf) { + return NULL; + } + memset(module->send_buf, + 0, sizeof(mca_ptl_ib_send_buf_t) * NUM_BUFS); + + if(mca_ptl_ib_register_mem(module->nic, + module->ptag, + module->send_buf, + sizeof(mca_ptl_ib_send_buf_t) * NUM_BUFS, + &module->send_buf_hndl) + != OMPI_SUCCESS) { + return NULL; + } + + module->recv_buf = + malloc(sizeof(mca_ptl_ib_recv_buf_t) * NUM_BUFS); + + if(NULL == module->recv_buf) { + return NULL; + } + + memset(module->recv_buf, + 0, sizeof(mca_ptl_ib_recv_buf_t) * NUM_BUFS); + + if(mca_ptl_ib_register_mem(module->nic, + module->ptag, + module->recv_buf, + sizeof(mca_ptl_ib_recv_buf_t) * NUM_BUFS, + &module->recv_buf_hndl) + != OMPI_SUCCESS) { + return NULL; + } + + /* Prepare the receivs */ + for(i = 0; i < NUM_BUFS; i++) { + module->recv_buf[i].desc.rr.comp_type = VAPI_SIGNALED; + module->recv_buf[i].desc.rr.opcode = VAPI_RECEIVE; + module->recv_buf[i].desc.rr.id = (VAPI_virt_addr_t) + (MT_virt_addr_t) &module->recv_buf[i]; + module->recv_buf[i].desc.rr.sg_lst_len = 1; + module->recv_buf[i].desc.rr.sg_lst_p = &(module->recv_buf[i].desc.sg_entry); + module->recv_buf[i].desc.sg_entry.len = 4096; + module->recv_buf[i].desc.sg_entry.lkey = module->recv_buf_hndl.lkey; + module->recv_buf[i].desc.sg_entry.addr = + (VAPI_virt_addr_t) (MT_virt_addr_t) (module->recv_buf[i].buf); + } + + /* Post the receives */ + for(i = 0; i < NUM_BUFS; i++) { + ret = VAPI_post_rr(module->nic, + module->my_qp_hndl, + &module->recv_buf[i].desc.rr); + if(VAPI_OK != ret) { + MCA_PTL_IB_VAPI_RET(ret, "VAPI_post_rr"); + return NULL; + } + } + } + + if(1 == my_rank) { + sleep(2); + } + return module_proc; } diff --git a/src/mca/ptl/ib/src/ptl_ib_recvfrag.c b/src/mca/ptl/ib/src/ptl_ib_recvfrag.c index 9138d325f2..397b04bb1f 100644 --- a/src/mca/ptl/ib/src/ptl_ib_recvfrag.c +++ b/src/mca/ptl/ib/src/ptl_ib_recvfrag.c @@ -28,3 +28,34 @@ static void mca_ptl_ib_recv_frag_construct(mca_ptl_ib_recv_frag_t* frag) static void mca_ptl_ib_recv_frag_destruct(mca_ptl_ib_recv_frag_t* frag) { } + +void +mca_ptl_ib_recv_frag_done (mca_ptl_base_header_t *header, + mca_ptl_ib_recv_frag_t* frag, + mca_pml_base_recv_request_t *request) +{ + frag->super.frag_base.frag_owner->ptl_recv_progress ( + frag->super.frag_base.frag_owner, + request, + frag->super.frag_base.frag_size, + frag->super.frag_base.frag_size); + + OMPI_FREE_LIST_RETURN(&mca_ptl_ib_component.ib_recv_frags, + (ompi_list_item_t*)frag); + +#if 0 + mca_ptl_ib_recv_frag_return( + frag->super.frag_base.frag_owner, frag); + /* FIXME: + * To support the required ACK, do not return + * until the ack is out */ + if (frag->frag_ack_pending == false) { + } else { + /* XXX: Chaining it into the list of + * completion pending recv_frag, + * * Until the ack frag is sent out, + * they will stay in the list */ + } +#endif +} + diff --git a/src/mca/ptl/ib/src/ptl_ib_recvfrag.h b/src/mca/ptl/ib/src/ptl_ib_recvfrag.h index 1be9174cc4..3da4820dcb 100644 --- a/src/mca/ptl/ib/src/ptl_ib_recvfrag.h +++ b/src/mca/ptl/ib/src/ptl_ib_recvfrag.h @@ -1,7 +1,6 @@ #ifndef MCA_PTL_IB_RECV_FRAG_H #define MCA_PTL_IB_RECV_FRAG_H -#include "include/sys/atomic.h" #include "mca/ptl/ptl.h" #include "mca/ptl/base/ptl_base_recvfrag.h" @@ -12,7 +11,12 @@ OBJ_CLASS_DECLARATION(mca_ptl_ib_recv_frag_t); */ struct mca_ptl_ib_recv_frag_t { mca_ptl_base_recv_frag_t super; /**< base receive fragment descriptor */ + char unex_buf[4096]; /* Unexpected buffer */ }; typedef struct mca_ptl_ib_recv_frag_t mca_ptl_ib_recv_frag_t; + +void mca_ptl_ib_recv_frag_done (mca_ptl_base_header_t*, + mca_ptl_ib_recv_frag_t*, mca_pml_base_recv_request_t*); + #endif diff --git a/src/mca/ptl/ib/src/ptl_ib_sendfrag.h b/src/mca/ptl/ib/src/ptl_ib_sendfrag.h index 8acc737990..7c9b50ea2f 100644 --- a/src/mca/ptl/ib/src/ptl_ib_sendfrag.h +++ b/src/mca/ptl/ib/src/ptl_ib_sendfrag.h @@ -1,8 +1,6 @@ - #ifndef MCA_PTL_IB_SEND_FRAG_H #define MCA_PTL_IB_SEND_FRAG_H -#include "include/sys/atomic.h" #include "ompi_config.h" #include "mca/pml/base/pml_base_sendreq.h" #include "mca/ptl/base/ptl_base_sendfrag.h" diff --git a/src/mca/ptl/ib/src/ptl_ib_ud.h b/src/mca/ptl/ib/src/ptl_ib_ud.h new file mode 100644 index 0000000000..d5dc00fb14 --- /dev/null +++ b/src/mca/ptl/ib/src/ptl_ib_ud.h @@ -0,0 +1,29 @@ +#include "ptl_ib_priv.h" + + +struct mca_ptl_ib_ud_buf_data_t { + VAPI_qp_num_t qp_num; /* Remote QP num */ +}; + +typedef struct mca_ptl_ib_ud_buf_data_t mca_ptl_ib_ud_buf_data_t; + +struct mca_ptl_ib_ud_buf_t { + uint32_t in_use; /* Set to 1 when using; after comp. set to 0 */ + vapi_descriptor_t desc; + vapi_memhandle_t memhandle; + void* buf_data; +}; + +typedef struct mca_ptl_ib_ud_buf_t mca_ptl_ib_ud_buf_t; + +struct mca_ptl_ib_ud_buf_ctrl_t { + uint32_t index; /* The buffer to use, (circular fashion) */ + mca_ptl_ib_ud_buf_t* bufs; /* Array of structures of ud buffers */ +}; + +typedef struct mca_ptl_ib_ud_buf_ctrl_t mca_ptl_ib_ud_buf_ctrl_t; + +int mca_ptl_ib_post_ud_recv(VAPI_hca_hndl_t, VAPI_qp_hndl_t, + mca_ptl_ib_ud_buf_t*, int); +int mca_ptl_ib_prep_ud_bufs(VAPI_hca_hndl_t, VAPI_pd_hndl_t, + mca_ptl_ib_ud_buf_t*, IB_wr_t, int); diff --git a/src/mca/ptl/ib/src/ptl_ib_vapi.h b/src/mca/ptl/ib/src/ptl_ib_vapi.h index a13fb5116b..3d404458af 100644 --- a/src/mca/ptl/ib/src/ptl_ib_vapi.h +++ b/src/mca/ptl/ib/src/ptl_ib_vapi.h @@ -16,6 +16,13 @@ #define DEFAULT_UD_SG_LIST (1) #define DEFAULT_PKEY_IX (0) #define DEFAULT_PSN (0) +#define DEFAULT_QP_OUS_RD_ATOM (1) +#define DEFAULT_MTU (MTU1024) +#define DEFAULT_MIN_RNR_TIMER (5) +#define DEFAULT_TIME_OUT (10) +#define DEFAULT_RETRY_COUNT (7) +#define DEFAULT_RNR_RETRY (7) +#define DEFAULT_MAX_RDMA_DST_OPS (16) #define DEFAULT_TRAFFIC_CLASS (0) #define DEFAULT_HOP_LIMIT (63) @@ -24,6 +31,9 @@ #define DEFAULT_STATIC_RATE (0) #define DEFAULT_SRC_PATH_BITS (0) +/* UD has a default offset of 40 bytes */ +#define UD_RECV_BUF_OFF (40) + /* This is a convinence macro. * * ret : The value to return if call failed