diff --git a/src/mca/ptl/ib/src/ptl_ib.c b/src/mca/ptl/ib/src/ptl_ib.c index a9f38cbbcf..a8c9e4c6fe 100644 --- a/src/mca/ptl/ib/src/ptl_ib.c +++ b/src/mca/ptl/ib/src/ptl_ib.c @@ -47,9 +47,7 @@ int mca_ptl_ib_add_procs(struct mca_ptl_base_module_t* base_module, { int i, rc; struct ompi_proc_t* ompi_proc; - mca_ptl_ib_proc_t* module_proc; - mca_ptl_base_peer_t* module_peer; D_PRINT("Adding %d procs\n", nprocs); @@ -99,6 +97,9 @@ int mca_ptl_ib_add_procs(struct mca_ptl_base_module_t* base_module, OMPI_THREAD_UNLOCK(&module_proc->proc_lock); peers[i] = module_peer; } + + D_PRINT("Added %d procs\n", nprocs); + return OMPI_SUCCESS; } @@ -122,16 +123,16 @@ int mca_ptl_ib_finalize(struct mca_ptl_base_module_t* ptl) int mca_ptl_ib_request_init( struct mca_ptl_base_module_t* ptl, struct mca_pml_base_send_request_t* request) { - /* Stub */ - D_PRINT("Stub\n"); + D_PRINT(""); + OBJ_CONSTRUCT(request+1, mca_ptl_ib_send_frag_t); return OMPI_SUCCESS; } void mca_ptl_ib_request_fini( struct mca_ptl_base_module_t* ptl, struct mca_pml_base_send_request_t* request) { - /* Stub */ - D_PRINT("Stub\n"); + D_PRINT(""); + OBJ_DESTRUCT(request+1); } /* @@ -148,95 +149,32 @@ int mca_ptl_ib_send( struct mca_ptl_base_module_t* ptl, size_t size, int flags) { - int rc; - VAPI_ret_t ret; mca_ptl_ib_send_frag_t* sendfrag; - ompi_list_item_t* item; - vapi_descriptor_t* desc; - mca_ptl_base_header_t *hdr; - int header_length; + int rc; if (0 == offset) { sendfrag = &((mca_ptl_ib_send_request_t*)sendreq)->req_frag; } else { + ompi_list_item_t* item; OMPI_FREE_LIST_GET(&mca_ptl_ib_component.ib_send_frags, item, rc); if(NULL == (sendfrag = (mca_ptl_ib_send_frag_t*)item)) { return rc; } } - hdr = (mca_ptl_base_header_t *) - ptl_peer->peer_module->send_buf[0].buf; - - if(offset == 0) { - hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH; - hdr->hdr_common.hdr_flags = flags; - hdr->hdr_common.hdr_size = sizeof (mca_ptl_base_match_header_t); - hdr->hdr_frag.hdr_frag_offset = offset; - hdr->hdr_frag.hdr_frag_seq = 0; - /* Frag descriptor, so that incoming ack - * will locate it */ - hdr->hdr_frag.hdr_src_ptr.lval = 0; - hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; - hdr->hdr_frag.hdr_dst_ptr.pval = 0; - hdr->hdr_frag.hdr_dst_ptr.lval = 0; - - hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid; - hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank; - hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer; - hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag; - hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed; - hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence; - header_length = sizeof (mca_ptl_base_match_header_t); - } else { - hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; - hdr->hdr_common.hdr_flags = flags; - hdr->hdr_common.hdr_size = sizeof (mca_ptl_base_frag_header_t); - hdr->hdr_frag.hdr_frag_offset = offset; - hdr->hdr_frag.hdr_frag_seq = 0; - hdr->hdr_frag.hdr_src_ptr.lval = 0; - hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; /* Frag descriptor */ - hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match; - header_length = sizeof (mca_ptl_base_frag_header_t); + rc = mca_ptl_ib_send_frag_init(sendfrag, ptl_peer, + sendreq, offset, &size, flags); + if(rc != OMPI_SUCCESS) { + return rc; } - ptl_peer->peer_module->send_buf[0].req = &sendreq->req_base; + /* Update the offset after actual fragment size is determined, + * and before attempting to send the fragment */ + sendreq->req_offset += size; - desc = &ptl_peer->peer_module->send_buf[0].desc; + rc = mca_ptl_ib_peer_send(ptl_peer, sendfrag); - desc->sr.comp_type = VAPI_SIGNALED; - desc->sr.opcode = VAPI_SEND; - desc->sr.remote_qkey = 0; - - desc->sr.remote_qp = ptl_peer->peer_proc->proc_addrs[0].qp_num; - - desc->sr.id = &ptl_peer->peer_module->send_buf[0]; - desc->sr.sg_lst_len = 1; - desc->sr.sg_lst_p = &(desc->sg_entry); - - /* Copy the data stuff */ - /* Check this later on ... */ - memcpy((void*)((char*)ptl_peer->peer_module->send_buf[0].buf + - header_length), sendreq->req_base.req_addr, - size); - - desc->sg_entry.len = header_length + size; - D_PRINT("Sent length : %d\n", desc->sg_entry.len); - - desc->sg_entry.lkey = ptl_peer->peer_module->send_buf_hndl.lkey; - desc->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) - (ptl_peer->peer_module->send_buf[0].buf); - - ret = VAPI_post_sr(ptl_peer->peer_module->nic, - ptl_peer->peer_module->my_qp_hndl, - &desc->sr); - - if(VAPI_OK != ret) { - MCA_PTL_IB_VAPI_RET(ret, "VAPI_post_sr"); - return OMPI_ERROR; - } - - return OMPI_SUCCESS; + return rc; } diff --git a/src/mca/ptl/ib/src/ptl_ib.h b/src/mca/ptl/ib/src/ptl_ib.h index ab0a90d46b..94445586ab 100644 --- a/src/mca/ptl/ib/src/ptl_ib.h +++ b/src/mca/ptl/ib/src/ptl_ib.h @@ -35,21 +35,50 @@ */ struct mca_ptl_ib_component_t { - mca_ptl_base_component_1_0_0_t super; /**< base PTL component */ - struct mca_ptl_ib_module_t** ib_ptl_modules; /**< array of available PTLs */ - uint32_t ib_num_ptl_modules; /**< number of ptl modules actually used */ - uint32_t ib_max_ptl_modules; /**< maximum number of ptls */ - int ib_free_list_num; /**< initial size of free lists */ - int ib_free_list_max; /**< maximum size of free lists */ - int ib_free_list_inc; /**< number of elements to alloc when growing free lists */ - ompi_free_list_t ib_send_requests; /**< free list of ib send requests -- sendreq + IB */ - ompi_free_list_t ib_send_frags; /**< free list of ib send fragments */ - ompi_free_list_t ib_recv_frags; /**< free list of ib recv fragments */ - ompi_list_t ib_procs; /**< list of ib proc structures */ - ompi_event_t ib_send_event; /**< event structure for sends */ - ompi_event_t ib_recv_event; /**< event structure for recvs */ - ompi_mutex_t ib_lock; /**< lock for accessing module state */ - uint32_t ib_num_hcas; /* number of hcas available to the IB component */ + mca_ptl_base_component_1_0_0_t super; + /**< base PTL component */ + + struct mca_ptl_ib_module_t **ib_ptl_modules; + /**< array of available PTLs */ + + uint32_t ib_num_ptl_modules; + /**< number of ptl modules actually used */ + + uint32_t ib_max_ptl_modules; + /**< maximum number of ptls */ + + int ib_free_list_num; + /**< initial size of free lists */ + + int ib_free_list_max; + /**< maximum size of free lists */ + + int ib_free_list_inc; + /**< number of elements to alloc when growing free lists */ + + ompi_free_list_t ib_send_requests; + /**< free list of ib send requests -- sendreq + IB */ + + ompi_free_list_t ib_send_frags; + /**< free list of ib send fragments */ + + ompi_free_list_t ib_recv_frags; + /**< free list of ib recv fragments */ + + ompi_list_t ib_procs; + /**< list of ib proc structures */ + + ompi_event_t ib_send_event; + /**< event structure for sends */ + + ompi_event_t ib_recv_event; + /**< event structure for recvs */ + + ompi_mutex_t ib_lock; + /**< lock for accessing module state */ + + uint32_t ib_num_hcas; + /* number of hcas available to the IB component */ }; typedef struct mca_ptl_ib_component_t mca_ptl_ib_component_t; struct mca_ptl_ib_recv_frag_t; @@ -61,38 +90,10 @@ extern mca_ptl_ib_component_t mca_ptl_ib_component; */ struct mca_ptl_ib_module_t { mca_ptl_base_module_t super; /**< base PTL interface */ - VAPI_hca_id_t hca_id; /* ID of HCA this PTL is tied to */ - VAPI_hca_port_t port; /* InfiniBand port of this PTL */ - VAPI_hca_hndl_t nic; /* NIC handle */ - VAPI_pd_hndl_t ptag; /* Protection Domain tag */ - VAPI_cq_hndl_t cq_hndl; /* Completion Queue handle */ - - EVAPI_async_handler_hndl_t async_handler; /* Async event handler used to detect - weird events */ - - VAPI_cq_hndl_t ud_scq_hndl;/* UD send completion queue handle */ - VAPI_cq_hndl_t ud_rcq_hndl;/* UD recv completion queue handle */ - mca_ptl_ib_ud_buf_ctrl_t* ud_recv; /* Link to UD recv buffer structures */ - mca_ptl_ib_ud_buf_ctrl_t* ud_send; /* Link to UD bufs which are used for sending */ - - VAPI_qp_hndl_t ud_qp_hndl; /* UD queue pair handle */ - VAPI_qp_prop_t ud_qp_prop; /* UD queue pair properties */ - VAPI_rr_desc_t* ud_rr_hndl; /* UD receive descriptor pool */ - VAPI_completion_event_handler_t ud_comp_ev_handler; /* UD completion handler */ - EVAPI_compl_handler_hndl_t ud_comp_ev_hndl; /* UD completion handler handle */ - - /* Temporary fields remove after dynamic connection - * management is in place */ - VAPI_qp_hndl_t my_qp_hndl; - VAPI_qp_prop_t my_qp_prop; - /* Circular buffers */ - mca_ptl_ib_send_buf_t* send_buf; - int send_index; - mca_ptl_ib_recv_buf_t* recv_buf; - int recv_index; - vapi_memhandle_t send_buf_hndl; - vapi_memhandle_t recv_buf_hndl; + /* IB state holds info about queue handles, HCA handles, + * protection domain etc. which are private to this module */ + mca_ptl_ib_state_t *ib_state; }; typedef struct mca_ptl_ib_module_t mca_ptl_ib_module_t; diff --git a/src/mca/ptl/ib/src/ptl_ib_component.c b/src/mca/ptl/ib/src/ptl_ib_component.c index 06a8093351..a95f8e01bc 100644 --- a/src/mca/ptl/ib/src/ptl_ib_component.c +++ b/src/mca/ptl/ib/src/ptl_ib_component.c @@ -135,6 +135,7 @@ int mca_ptl_ib_component_close(void) static int mca_ptl_ib_component_send(void) { +#if 0 int i, rc, size; mca_ptl_ib_ud_addr_t* ud_qp_addr = NULL; @@ -171,6 +172,8 @@ static int mca_ptl_ib_component_send(void) free(ud_qp_addr); return rc; +#endif + return OMPI_SUCCESS; } /* @@ -238,6 +241,8 @@ mca_ptl_base_module_t** mca_ptl_ib_component_init(int *num_ptl_modules, mca_ptl_ib_component.ib_max_ptl_modules = mca_ptl_ib_component.ib_num_hcas; + /* Allocate space for number of modules available + * to this component */ ib_modules = (mca_ptl_ib_module_t*) malloc(sizeof(mca_ptl_ib_module_t) * mca_ptl_ib_component.ib_num_ptl_modules); if(NULL == ib_modules) { @@ -255,132 +260,27 @@ mca_ptl_base_module_t** mca_ptl_ib_component_init(int *num_ptl_modules, sizeof(mca_ptl_ib_module)); } - /* For each module, do this */ + /* For each module, Initialize! */ for(i = 0; i < mca_ptl_ib_component.ib_num_ptl_modules; i++) { - if(mca_ptl_ib_get_hca_id(i, &ib_modules[i].hca_id) + /* Allocate space for the state of the IB module */ + ib_modules[i].ib_state = malloc(sizeof(mca_ptl_ib_state_t)); + + if(NULL == ib_modules[i].ib_state) { + return NULL; + } + + if(mca_ptl_ib_init_module(ib_modules[i].ib_state, i) != OMPI_SUCCESS) { return NULL; } - - D_PRINT("hca_id: %s\n", ib_modules[i].hca_id); - - if(mca_ptl_ib_get_hca_hndl(ib_modules[i].hca_id, &ib_modules[i].nic) - != OMPI_SUCCESS) { - return NULL; - } - - D_PRINT("hca_hndl: %d\n", ib_modules[i].nic); - - if(mca_ptl_ib_alloc_pd(ib_modules[i].nic, &ib_modules[i].ptag) - != OMPI_SUCCESS) { - return NULL; - } - - D_PRINT("Protection Domain: %d\n", ib_modules[i].ptag); - - /* Each HCA uses only port 1. Need to change - * this so that each ptl can choose different - * ports */ - - if(mca_ptl_ib_query_hca_prop(ib_modules[i].nic, &ib_modules[i].port) - != OMPI_SUCCESS) { - return NULL; - } - - D_PRINT("LID: %d\n", ib_modules[i].port.lid); - - - if(mca_ptl_ib_create_cq(ib_modules[i].nic, &ib_modules[i].cq_hndl) - != OMPI_SUCCESS) { - return NULL; - } - - D_PRINT("CQ handle: %d\n", ib_modules[i].cq_hndl); - - if(mca_ptl_ib_create_cq(ib_modules[i].nic, &ib_modules[i].ud_scq_hndl) - != OMPI_SUCCESS) { - return NULL; - } - - if(mca_ptl_ib_create_cq(ib_modules[i].nic, &ib_modules[i].ud_rcq_hndl) - != OMPI_SUCCESS) { - return NULL; - } - - D_PRINT("UD_SCQ handle: %d, UD_RCQ handle: %d\n", - ib_modules[i].ud_scq_hndl, ib_modules[i].ud_rcq_hndl); - - if(mca_ptl_ib_create_qp(ib_modules[i].nic, ib_modules[i].ptag, - ib_modules[i].ud_rcq_hndl, ib_modules[i].ud_scq_hndl, - &ib_modules[i].ud_qp_hndl, &ib_modules[i].ud_qp_prop, - VAPI_TS_UD) - != OMPI_SUCCESS) { - return NULL; - } - - D_PRINT("UD Qp handle: %d, Qp num: %d\n", - ib_modules[i].ud_qp_hndl, ib_modules[i].ud_qp_prop.qp_num); - - if(mca_ptl_ib_ud_qp_init(ib_modules[i].nic, ib_modules[i].ud_qp_hndl) - != OMPI_SUCCESS) { - return NULL; - } - - /* Attach asynchronous handler */ - if(mca_ptl_ib_set_async_handler(ib_modules[i].nic, - &ib_modules[i].async_handler) - != OMPI_SUCCESS) { - return NULL; - } - -#if 0 - if(mca_ptl_ib_get_comp_ev_hndl(&ib_modules[i].ud_comp_ev_handler) - != OMPI_SUCCESS) { - return NULL; - } - - /* Set the completion event handler for the UD recv queue */ - if(mca_ptl_ib_set_comp_ev_hndl(ib_modules[i].nic, - ib_modules[i].cq_hndl, - ib_modules[i].ud_comp_ev_handler, - (void*)NULL, &ib_modules[i].ud_comp_ev_hndl) - != OMPI_SUCCESS) { - return NULL; - } - - /* Request for interrupts on the UD recv queue */ - if(mca_ptl_ib_req_comp_notif(ib_modules[i].nic, - ib_modules[i].cq_hndl) - != OMPI_SUCCESS) { - return NULL; - } -#endif - - /* Just for point-to-point communication */ - /* Till dynamic connection management comes */ - - /* Create the QP I am going to use to communicate - * with this peer */ - if(mca_ptl_ib_create_qp(ib_modules[i].nic, - ib_modules[i].ptag, - ib_modules[i].cq_hndl, - ib_modules[i].cq_hndl, - &ib_modules[i].my_qp_hndl, - &ib_modules[i].my_qp_prop, - VAPI_TS_RC) - != OMPI_SUCCESS) { - } - - D_PRINT("QP hndl:%d, num: %d for 1-to-1 communication\n", - ib_modules[i].my_qp_hndl, - ib_modules[i].my_qp_prop.qp_num); - - ib_modules[i].send_index = 0; - ib_modules[i].recv_index = 0; + DUMP_IB_STATE(ib_modules[i].ib_state); } + /* Post OOB receives */ + mca_ptl_ib_post_oob_recv_nb(); + /* Allocate list of IB ptl pointers */ mca_ptl_ib_component.ib_ptl_modules = (struct mca_ptl_ib_module_t**) malloc(mca_ptl_ib_component.ib_num_ptl_modules * @@ -399,7 +299,8 @@ mca_ptl_base_module_t** mca_ptl_ib_component_init(int *num_ptl_modules, } /* Allocate list of MCA ptl pointers */ - modules = (mca_ptl_base_module_t**) malloc(mca_ptl_ib_component.ib_num_ptl_modules * + modules = (mca_ptl_base_module_t**) + malloc(mca_ptl_ib_component.ib_num_ptl_modules * sizeof(mca_ptl_base_module_t*)); if(NULL == modules) { return NULL; @@ -446,6 +347,7 @@ int mca_ptl_ib_component_control(int param, void* value, size_t size) int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp) { +#if 0 VAPI_ret_t ret; VAPI_wc_desc_t comp; @@ -506,6 +408,7 @@ int mca_ptl_ib_component_progress(mca_ptl_tstamp_t tstamp) comp.opcode); } } +#endif return OMPI_SUCCESS; } diff --git a/src/mca/ptl/ib/src/ptl_ib_peer.c b/src/mca/ptl/ib/src/ptl_ib_peer.c index d9596642f6..eaf1ec92e2 100644 --- a/src/mca/ptl/ib/src/ptl_ib_peer.c +++ b/src/mca/ptl/ib/src/ptl_ib_peer.c @@ -4,11 +4,11 @@ #include "include/types.h" #include "mca/pml/base/pml_base_sendreq.h" #include "mca/ns/base/base.h" +#include "mca/oob/base/base.h" #include "ptl_ib.h" #include "ptl_ib_addr.h" #include "ptl_ib_peer.h" #include "ptl_ib_proc.h" -#include "ptl_ib_priv.h" #include "ptl_ib_sendfrag.h" static void mca_ptl_ib_peer_construct(mca_ptl_base_peer_t* module_peer); @@ -18,15 +18,61 @@ OBJ_CLASS_INSTANCE(mca_ptl_ib_peer_t, ompi_list_item_t, mca_ptl_ib_peer_construct, mca_ptl_ib_peer_destruct); +/* + * Callback function for OOB send completion. + * Not much to do over here right now ... + * + */ + +static void mca_ptl_ib_peer_connect_send_callback(int status, + ompi_process_name_t* peer, ompi_buffer_t buffer, + int tag, void* cbdata) +{ + D_PRINT("OOB Send to %d complete", peer->vpid); +} + +/* + * Wrapper around mca_oob_send_packed_nb + * + * Post a non-blocking OOB send request to peer with + * pre-allocated user buffer + * + */ + +static int mca_ptl_ib_post_oob_send_nb(ompi_process_name_t *name, + void* user_buf, int len) +{ + int rc; + ompi_buffer_t buffer; + + rc = ompi_buffer_init_preallocated(&buffer, user_buf, + len); + + if(rc != OMPI_SUCCESS) { + return rc; + } + + rc = mca_oob_send_packed_nb(name, buffer, + 131313, 0, + (mca_oob_callback_packed_fn_t)mca_ptl_ib_peer_connect_send_callback, + NULL); + + if(rc != OMPI_SUCCESS) { + return rc; + } + + return rc; +} + /* * Initialize state of the peer instance. + * */ static void mca_ptl_ib_peer_construct(mca_ptl_base_peer_t* module_peer) { module_peer->peer_module = 0; module_peer->peer_proc = 0; - module_peer->peer_addr = 0; module_peer->peer_ts = 0.0; module_peer->peer_send_frag = 0; module_peer->peer_recv_frag = 0; @@ -39,43 +85,300 @@ static void mca_ptl_ib_peer_construct(mca_ptl_base_peer_t* module_peer) OBJ_CONSTRUCT(&module_peer->peer_recv_lock, ompi_mutex_t); } +/* + * Destroy a peer + * + */ + static void mca_ptl_ib_peer_destruct(mca_ptl_base_peer_t* module_peer) { } -static int mca_ptl_ib_peer_check_timeout(mca_ptl_base_peer_t* peer) +/* + * Allocate peer connection structures + * + */ + +static int mca_ptl_ib_alloc_peer_conn(mca_ptl_base_peer_t* peer) { + /* Allocate space for peer connection */ + peer->peer_conn = (mca_ptl_ib_peer_conn_t *) + malloc(sizeof(mca_ptl_ib_peer_conn_t)); + if(NULL == peer->peer_conn) { + return OMPI_ERR_OUT_OF_RESOURCE; + } return OMPI_SUCCESS; } -static double mca_ptl_ib_get_us(void) +/* + * Send connection information to remote peer using OOB + * + */ + +static int mca_ptl_ib_peer_send_conn_info(mca_ptl_base_peer_t* peer) { - struct timeval t; - gettimeofday(&t, NULL); - return (double) t.tv_sec * (double) 1e6 + (double) t.tv_usec; + int rc; + ompi_process_name_t *name; + char sendbuf[50]; + + name = &peer->peer_proc->proc_guid; + + /* Zero out the send buffer */ + memset(sendbuf, 0, 50); + + /* Copy the info in the send buffer */ + + /* Format: + * + * + * Ofcourse without the <'s and >'s moron! + * Size of each field is limited to maximum + * 8 characters. This should be enough for all + * platforms, and is internal information + */ + sprintf(sendbuf, "%08d %08d", + peer->peer_conn->lres->qp_prop.qp_num, + peer->peer_module->ib_state->port.lid); + + /* Send it off */ + rc = mca_ptl_ib_post_oob_send_nb(name, + (void*)sendbuf, 50); + + if(rc != OMPI_SUCCESS) { + return rc; + } + return OMPI_SUCCESS; } +/* + * Set remote connection info + * + * XXX: Currently size is unutilized, this shall change + * as soon as we add more info to be exchanged at connection + * setup. + * + */ +static void mca_ptl_ib_peer_set_remote_info(mca_ptl_base_peer_t* peer, + void* baseptr, size_t size) +{ + char tempbuf[8]; + + memset(tempbuf, 0, 8); + strncpy(tempbuf, (char*)baseptr, 8); + + peer->peer_conn->rres->qp_num = atoi(tempbuf); + + memset(tempbuf, 0, 8); + strncpy(tempbuf, (char*)baseptr + 9*sizeof(char), 8); + + peer->peer_conn->rres->lid = atoi(tempbuf); + + D_PRINT("Received QP num = %d, LID = %d", + peer->peer_conn->rres->qp_num, + peer->peer_conn->rres->lid); +} + +/* + * Start to connect to the peer. We send our Queue Pair + * information over the TCP OOB communication mechanism. + * On completion of our send, a send completion handler + * is called. + * + */ + static int mca_ptl_ib_peer_start_connect(mca_ptl_base_peer_t* peer) { - int remote_qp_num; + int rc; - peer->peer_addr = (mca_ptl_ib_addr_t*) - malloc(sizeof(mca_ptl_ib_addr_t)); + /* Allocate peer connection structures */ + rc = mca_ptl_ib_alloc_peer_conn(peer); + if(rc != OMPI_SUCCESS) { + return rc; + } + /* Initialize the peer */ + rc = mca_ptl_ib_init_peer(peer->peer_module->ib_state, + peer->peer_conn); + if(rc != OMPI_SUCCESS) { + return rc; + } - D_PRINT("QP num:%d for rank %d:", - peer->peer_qp_prop.qp_num, - peer->peer_proc->proc_ompi->proc_name.vpid); - scanf("%d", &remote_qp_num); + /* Send connection info over to remote peer */ + rc = mca_ptl_ib_peer_send_conn_info(peer); + if(rc != OMPI_SUCCESS) { + return rc; + } - peer->peer_addr->rc_qp = remote_qp_num; + /* Update status of peer to as connecting */ + peer->peer_state = MCA_PTL_IB_CONNECTING; - D_PRINT("You entered: %d\n", peer->peer_addr->rc_qp); + DUMP_PEER(peer); - - return OMPI_SUCCESS; + return rc; } +/* + * Reply to a `start - connect' message + * + */ +static int mca_ptl_ib_peer_reply_start_connect(mca_ptl_ib_peer_t *peer, + void* baseptr, size_t size) +{ + int rc; + + /* Allocate peer connection structures */ + rc = mca_ptl_ib_alloc_peer_conn(peer); + if(rc != OMPI_SUCCESS) { + return rc; + } + + /* Initialize the peer */ + rc = mca_ptl_ib_init_peer(peer->peer_module->ib_state, + peer->peer_conn); + if(rc != OMPI_SUCCESS) { + return rc; + } + + /* Set the remote side info */ + mca_ptl_ib_peer_set_remote_info(peer, baseptr, size); + + /* Connect to peer */ + rc = mca_ptl_ib_peer_connect(peer->peer_module->ib_state, + peer->peer_conn); + if(rc != OMPI_SUCCESS) { + return rc; + } + + /* Register Buffers */ + + /* Post receives */ + + /* Send connection info over to remote peer */ + rc = mca_ptl_ib_peer_send_conn_info(peer); + if(rc != OMPI_SUCCESS) { + return rc; + } + + /* Update status of peer to as connected */ + peer->peer_state = MCA_PTL_IB_CONNECTED; + + DUMP_PEER(peer); + + return rc; +} + +/* + * Non blocking OOB recv callback. + * Read incoming QP and other info, and if this peer + * is trying to connect, reply with our QP info, + * otherwise try to modify QP's and establish + * reliable connection + * + */ + +static void mca_ptl_ib_peer_connect_recv_callback(int status, + ompi_process_name_t* peer, ompi_buffer_t buffer, + int tag, void* cbdata) +{ + size_t size; + void *baseptr, *dataptr, *fromptr; + mca_ptl_ib_proc_t *ib_proc; + mca_ptl_ib_peer_t *ib_peer; + int peer_state; + + ompi_buffer_size(buffer, &size); + + ompi_buffer_get_ptrs(buffer, &baseptr, + &dataptr, &fromptr); + + D_PRINT("Size recv: %d, Data: %s", size, baseptr); + + for(ib_proc = (mca_ptl_ib_proc_t*) + ompi_list_get_first(&mca_ptl_ib_component.ib_procs); + ib_proc != (mca_ptl_ib_proc_t*) + ompi_list_get_end(&mca_ptl_ib_component.ib_procs); + ib_proc = (mca_ptl_ib_proc_t*)ompi_list_get_next(ib_proc)) { + + if(ib_proc->proc_guid.vpid == peer->vpid) { + + /* Try to get the peer instance of this proc */ + + /* Limitation: Right now, we have only 1 peer + * for every process. Need several changes, some + * in PML/PTL interface to set this right */ + ib_peer = ib_proc->proc_peers[0]; + + peer_state = ib_peer->peer_state; + + /* Update status */ + switch(peer_state) { + case MCA_PTL_IB_CLOSED : + /* We had this connection closed before. + * The peer is trying to connect. Move the + * status of this connection to CONNECTING, + * and then reply with our QP information */ + + D_PRINT("Start Connect %d", + ib_proc->proc_guid.vpid); + + if(mca_ptl_ib_peer_reply_start_connect(ib_peer, + baseptr, size) + != OMPI_SUCCESS) { + D_PRINT("Connect Error"); + } + + ib_peer->peer_state = MCA_PTL_IB_CONNECTED; + + break; + + case MCA_PTL_IB_CONNECTING : + /* We are already connecting with this peer, + * this means that we have initiated OOB sends + * with this peer, and the peer is replying. + * No need to send him any more stuff */ + + D_PRINT("Connect reply %d", + ib_proc->proc_guid.vpid); + + mca_ptl_ib_peer_set_remote_info(ib_peer, + baseptr, size); + + if(mca_ptl_ib_peer_connect(ib_peer->peer_module->ib_state, + ib_peer->peer_conn) + != OMPI_SUCCESS) { + D_PRINT("Connect Error"); + } + + ib_peer->peer_state = MCA_PTL_IB_CONNECTED; + + break; + case MCA_PTL_IB_CONNECTED : + break; + default : + D_PRINT("Connected -> Connecting not possible.\n"); + } + + break; + } + + } + + /* Okay, now that we are done receiving, + * re-post the buffer */ + mca_ptl_ib_post_oob_recv_nb(); +} + +void mca_ptl_ib_post_oob_recv_nb() +{ + D_PRINT(""); + + mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, + 131313, 0, + (mca_oob_callback_packed_fn_t)mca_ptl_ib_peer_connect_recv_callback, + NULL); +} + + /* * Attempt to send a fragment using a given peer. If the peer is not * connected, queue the fragment and start the connection as required. @@ -86,6 +389,8 @@ int mca_ptl_ib_peer_send(mca_ptl_base_peer_t* peer, { int rc; + D_PRINT(""); + OMPI_THREAD_LOCK(&peer->peer_send_lock); switch(peer->peer_state) { @@ -96,13 +401,13 @@ int mca_ptl_ib_peer_send(mca_ptl_base_peer_t* peer, * to check for timeout */ ompi_list_append(&peer->peer_frags, (ompi_list_item_t*)frag); - rc = mca_ptl_ib_peer_check_timeout(peer); - break; case MCA_PTL_IB_CLOSED: - ompi_list_append(&peer->peer_frags, (ompi_list_item_t*)frag); + D_PRINT("Connection to peer closed ... connecting ..."); + + //ompi_list_append(&peer->peer_frags, (ompi_list_item_t*)frag); rc = mca_ptl_ib_peer_start_connect(peer); diff --git a/src/mca/ptl/ib/src/ptl_ib_peer.h b/src/mca/ptl/ib/src/ptl_ib_peer.h index 87f348d446..8039c041fa 100644 --- a/src/mca/ptl/ib/src/ptl_ib_peer.h +++ b/src/mca/ptl/ib/src/ptl_ib_peer.h @@ -7,6 +7,7 @@ #include "mca/ptl/ptl.h" #include "ptl_ib_recvfrag.h" #include "ptl_ib_sendfrag.h" +#include "ptl_ib_priv.h" OBJ_CLASS_DECLARATION(mca_ptl_ib_peer_t); @@ -31,7 +32,7 @@ typedef enum { /* Maximum number of retries have been used. * Report failure on send to upper layer */ MCA_PTL_IB_FAILED -} mca_ptl_ib_state_t; +} mca_ptl_ib_peer_state_t; /** * An abstraction that represents a connection to a peer process. @@ -42,26 +43,64 @@ typedef enum { struct mca_ptl_base_peer_t { ompi_list_item_t super; - struct mca_ptl_ib_module_t* peer_module; /**< PTL instance that created this connection */ - struct mca_ptl_ib_proc_t* peer_proc; /**< proc structure corresponding to peer */ - struct mca_ptl_ib_addr_t* peer_addr; /**< address of peer */ - mca_ptl_ib_send_frag_t* peer_send_frag; /**< current send frag being processed */ - mca_ptl_ib_recv_frag_t* peer_recv_frag; /**< current recv frag being processed */ - mca_ptl_ib_state_t peer_state; /**< current state of the connection */ - size_t peer_retries; /**< number of connection retries attempted */ - double peer_ts; /**< timestamp of when the first - connection was attempted */ - ompi_list_t peer_frags; /**< list of pending frags to send */ - ompi_mutex_t peer_send_lock; /**< lock for concurrent access to peer state */ - ompi_mutex_t peer_recv_lock; /**< lock for concurrent access to peer state */ - ompi_event_t peer_send_event; /**< event for async processing of send frags */ - ompi_event_t peer_recv_event; /**< event for async processing of recv frags */ - VAPI_qp_hndl_t peer_qp_hndl; /**< My QP for the peer */ - VAPI_qp_prop_t peer_qp_prop; /**< My QP properties */ + + struct mca_ptl_ib_module_t* peer_module; + /**< PTL instance that created this connection */ + + struct mca_ptl_ib_proc_t* peer_proc; + /**< proc structure corresponding to peer */ + + mca_ptl_ib_send_frag_t* peer_send_frag; + /**< current send frag being processed */ + + mca_ptl_ib_recv_frag_t* peer_recv_frag; + /**< current recv frag being processed */ + + mca_ptl_ib_peer_state_t peer_state; + /**< current state of the connection */ + + mca_ptl_ib_peer_conn_t* peer_conn; + /**< IB specific private information about peer */ + + size_t peer_retries; + /**< number of connection retries attempted */ + + double peer_ts; + /**< timestamp of when the first connection was attempted */ + + ompi_list_t peer_frags; + /**< list of pending frags to send */ + + ompi_mutex_t peer_send_lock; + /**< lock for concurrent access to peer state */ + + ompi_mutex_t peer_recv_lock; + /**< lock for concurrent access to peer state */ + + ompi_event_t peer_send_event; + /**< event for async processing of send frags */ + + ompi_event_t peer_recv_event; + /**< event for async processing of recv frags */ }; + typedef struct mca_ptl_base_peer_t mca_ptl_base_peer_t; typedef struct mca_ptl_base_peer_t mca_ptl_ib_peer_t; int mca_ptl_ib_peer_send(mca_ptl_base_peer_t*, mca_ptl_ib_send_frag_t*); +void mca_ptl_ib_post_oob_recv_nb(void); + +#define DUMP_PEER(peer_ptr) { \ + ompi_output(0, "[%s:%d] ", __FILE__, __LINE__); \ + ompi_output(0, "Dumping peer state"); \ + ompi_output(0, "Local QP hndl : %d", \ + peer_ptr->peer_conn->lres->qp_hndl); \ + ompi_output(0, "Local QP num : %d", \ + peer_ptr->peer_conn->lres->qp_prop.qp_num); \ + ompi_output(0, "Remote QP num : %d", \ + peer_ptr->peer_conn->rres->qp_num); \ + ompi_output(0, "Remte LID : %d", \ + peer_ptr->peer_conn->rres->lid); \ +} #endif diff --git a/src/mca/ptl/ib/src/ptl_ib_priv.c b/src/mca/ptl/ib/src/ptl_ib_priv.c index 70ac63230e..9f2e07539a 100644 --- a/src/mca/ptl/ib/src/ptl_ib_priv.c +++ b/src/mca/ptl/ib/src/ptl_ib_priv.c @@ -1,7 +1,14 @@ #include "ptl_ib_vapi.h" -#include "ptl_ib.h" #include "ptl_ib_priv.h" +#include "ptl_ib.h" +/* + * Asynchronous event handler to detect unforseen + * events. Usually, such events are catastrophic. + * Should have a robust mechanism to handle these + * events and abort the OMPI application if necessary. + * + */ static void async_event_handler(VAPI_hca_hndl_t hca_hndl, VAPI_event_record_t * event_p, void *priv_data) @@ -40,22 +47,134 @@ static void async_event_handler(VAPI_hca_hndl_t hca_hndl, } -static void ud_completion_handler(VAPI_hca_hndl_t nic, - VAPI_cq_hndl_t cq_hndl, void* priv_data) +/* + * This function returns the hca_id for each PTL + * in a round robin manner. Each PTL gets a different + * HCA id ... + * + * If num PTLs > num HCAs, then those ptls will be + * assigned HCA ids beginning from 0 again. + * + */ + +static int mca_ptl_ib_get_hca_id(int num, VAPI_hca_id_t* hca_id) +{ + uint32_t num_hcas; + VAPI_ret_t ret; + VAPI_hca_id_t* hca_ids = NULL; + + hca_ids = (VAPI_hca_id_t*) malloc(mca_ptl_ib_component.ib_num_hcas * + sizeof(VAPI_hca_id_t)); + + if(NULL == hca_ids) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + /* Now get the hca_id from underlying VAPI layer */ + ret = EVAPI_list_hcas(mca_ptl_ib_component.ib_num_hcas, + &num_hcas, hca_ids); + + /* HACK: right now, I have put VAPI_EAGAIN as + * acceptable condition since we are trying to have + * only 1 ptl support */ + if((VAPI_OK != ret) && (VAPI_EAGAIN != ret)) { + MCA_PTL_IB_VAPI_RET(ret, "EVAPI_list_hcas"); + return OMPI_ERROR; + } else { + + num = num % num_hcas; + + memcpy(hca_id, hca_ids[num], sizeof(VAPI_hca_id_t)); + } + + free(hca_ids); + + return OMPI_SUCCESS; +} + +static int mca_ptl_ib_get_hca_hndl(VAPI_hca_id_t hca_id, + VAPI_hca_hndl_t* hca_hndl) +{ + VAPI_ret_t ret; + + /* Open the HCA */ + ret = EVAPI_get_hca_hndl(hca_id, hca_hndl); + + if(VAPI_OK != ret) { + MCA_PTL_IB_VAPI_RET(ret, "EVAPI_get_hca_hndl"); + return OMPI_ERROR; + } + + return OMPI_SUCCESS; +} + +static int mca_ptl_ib_query_hca_prop(VAPI_hca_hndl_t nic, + VAPI_hca_port_t* port) { VAPI_ret_t ret; - fprintf(stderr,"Got interrupt!!\n"); - fflush(stderr); - - ret = VAPI_req_comp_notif(nic, cq_hndl, VAPI_NEXT_COMP); + /* Querying for port properties */ + ret = VAPI_query_hca_port_prop(nic, + (IB_port_t)DEFAULT_PORT, + port); if(VAPI_OK != ret) { - MCA_PTL_IB_VAPI_RET(ret, "VAPI_req_comp_notif"); + MCA_PTL_IB_VAPI_RET(ret, "VAPI_query_hca_port_prop"); + return OMPI_ERROR; } + + return OMPI_SUCCESS; } -int mca_ptl_ib_create_qp(VAPI_hca_hndl_t nic, +static int mca_ptl_ib_alloc_pd(VAPI_hca_hndl_t nic, + VAPI_pd_hndl_t* ptag) +{ + VAPI_ret_t ret; + + ret = VAPI_alloc_pd(nic, ptag); + + if(ret != VAPI_OK) { + MCA_PTL_IB_VAPI_RET(ret, "VAPI_alloc_pd"); + return OMPI_ERROR; + } + + return OMPI_SUCCESS; +} + +static int mca_ptl_ib_create_cq(VAPI_hca_hndl_t nic, + VAPI_cq_hndl_t* cq_hndl) +{ + uint32_t act_num_cqe = 0; + VAPI_ret_t ret; + + ret = VAPI_create_cq(nic, DEFAULT_CQ_SIZE, + cq_hndl, &act_num_cqe); + + if( (VAPI_OK != ret) || (0 == act_num_cqe)) { + MCA_PTL_IB_VAPI_RET(ret, "VAPI_create_cq"); + return OMPI_ERROR; + } + + return OMPI_SUCCESS; +} + +static int mca_ptl_ib_set_async_handler(VAPI_hca_hndl_t nic, + EVAPI_async_handler_hndl_t *async_handler) +{ + VAPI_ret_t ret; + + ret = EVAPI_set_async_event_handler(nic, + async_event_handler, 0, async_handler); + + if(VAPI_OK != ret) { + MCA_PTL_IB_VAPI_RET(ret, "EVAPI_set_async_event_handler"); + return OMPI_ERROR; + } + + return OMPI_SUCCESS; +} + +static int mca_ptl_ib_create_qp(VAPI_hca_hndl_t nic, VAPI_pd_hndl_t ptag, VAPI_cq_hndl_t recv_cq, VAPI_cq_hndl_t send_cq, @@ -68,33 +187,11 @@ int mca_ptl_ib_create_qp(VAPI_hca_hndl_t nic, switch(transport_type) { - case VAPI_TS_UD: /* Set up UD qp parameters */ - qp_init_attr.cap.max_oust_wr_rq = DEFAULT_UD_WQ_SIZE; - qp_init_attr.cap.max_oust_wr_sq = DEFAULT_UD_WQ_SIZE; - qp_init_attr.cap.max_sg_size_rq = DEFAULT_UD_SG_LIST; - qp_init_attr.cap.max_sg_size_sq = DEFAULT_UD_SG_LIST; - qp_init_attr.pd_hndl = ptag; - - /* We don't have Reliable Datagram Handle right now */ - qp_init_attr.rdd_hndl = 0; - - /* Set Send and Recv completion queues */ - qp_init_attr.rq_cq_hndl = recv_cq; - qp_init_attr.sq_cq_hndl = send_cq; - - /* Signal all work requests on this queue pair */ - qp_init_attr.rq_sig_type = VAPI_SIGNAL_REQ_WR; - qp_init_attr.sq_sig_type = VAPI_SIGNAL_REQ_WR; - - /* Use Unreliable Datagram transport service */ - qp_init_attr.ts_type = VAPI_TS_UD; - - break; case VAPI_TS_RC: /* Set up RC qp parameters */ - qp_init_attr.cap.max_oust_wr_rq = DEFAULT_UD_WQ_SIZE; - qp_init_attr.cap.max_oust_wr_sq = DEFAULT_UD_WQ_SIZE; - qp_init_attr.cap.max_sg_size_rq = DEFAULT_UD_SG_LIST; - qp_init_attr.cap.max_sg_size_sq = DEFAULT_UD_SG_LIST; + qp_init_attr.cap.max_oust_wr_rq = DEFAULT_WQ_SIZE; + qp_init_attr.cap.max_oust_wr_sq = DEFAULT_WQ_SIZE; + qp_init_attr.cap.max_sg_size_rq = DEFAULT_SG_LIST; + qp_init_attr.cap.max_sg_size_sq = DEFAULT_SG_LIST; qp_init_attr.pd_hndl = ptag; /* We don't have Reliable Datagram Handle right now */ qp_init_attr.rdd_hndl = 0; @@ -110,6 +207,7 @@ int mca_ptl_ib_create_qp(VAPI_hca_hndl_t nic, /* Use Unreliable Datagram transport service */ qp_init_attr.ts_type = VAPI_TS_RC; break; + case VAPI_TS_UD: /* Set up UD qp parameters */ default: return OMPI_ERR_NOT_IMPLEMENTED; } @@ -124,7 +222,53 @@ int mca_ptl_ib_create_qp(VAPI_hca_hndl_t nic, return OMPI_SUCCESS; } -int mca_ptl_ib_rc_qp_init(VAPI_hca_hndl_t nic, +int mca_ptl_ib_init_module(mca_ptl_ib_state_t *ib_state, int module_num) +{ + /* Get the HCA id ... InfiniHost0, 1 etc */ + if(mca_ptl_ib_get_hca_id(module_num, &ib_state->hca_id) + != OMPI_SUCCESS) { + return OMPI_ERROR; + } + + /* Get HCA handle */ + if(mca_ptl_ib_get_hca_hndl(ib_state->hca_id, &ib_state->nic) + != OMPI_SUCCESS) { + return OMPI_ERROR; + } + + /* Allocate a protection domain for this NIC */ + if(mca_ptl_ib_alloc_pd(ib_state->nic, &ib_state->ptag) + != OMPI_SUCCESS) { + return OMPI_ERROR; + } + + /* Get the properties of the HCA, + * LID etc. are part of the properties */ + if(mca_ptl_ib_query_hca_prop(ib_state->nic, &ib_state->port) + != OMPI_SUCCESS) { + return OMPI_ERROR; + } + + /* Create Completion Q */ + /* We use a single completion Q for sends & recvs + * This saves us overhead of polling 2 separate Qs */ + if(mca_ptl_ib_create_cq(ib_state->nic, &ib_state->cq_hndl) + != OMPI_SUCCESS) { + return OMPI_ERROR; + } + + /* Attach asynchronous handler */ + if(mca_ptl_ib_set_async_handler(ib_state->nic, + &ib_state->async_handler) + != OMPI_SUCCESS) { + return OMPI_ERROR; + } + + return OMPI_SUCCESS; +} + + +static int mca_ptl_ib_rc_qp_init(VAPI_hca_hndl_t nic, VAPI_qp_hndl_t qp_hndl, VAPI_qp_num_t remote_qp, IB_lid_t remote_lid) @@ -217,79 +361,6 @@ int mca_ptl_ib_rc_qp_init(VAPI_hca_hndl_t nic, return OMPI_SUCCESS; } - -/* Initialize UD queue pairs */ - -int mca_ptl_ib_ud_qp_init(VAPI_hca_hndl_t nic, - VAPI_qp_hndl_t ud_qp_hndl) -{ - VAPI_qp_attr_t qp_attr; - VAPI_qp_cap_t qp_cap; - VAPI_qp_attr_mask_t qp_attr_mask; - VAPI_ret_t ret; - - /* Modifying QP to INIT */ - QP_ATTR_MASK_CLR_ALL(qp_attr_mask); - qp_attr.qp_state = VAPI_INIT; - QP_ATTR_MASK_SET(qp_attr_mask,QP_ATTR_QP_STATE); - qp_attr.pkey_ix = DEFAULT_PKEY_IX; - QP_ATTR_MASK_SET(qp_attr_mask,QP_ATTR_PKEY_IX); - qp_attr.port = DEFAULT_PORT; - QP_ATTR_MASK_SET(qp_attr_mask,QP_ATTR_PORT); - qp_attr.qkey = 0; - QP_ATTR_MASK_SET(qp_attr_mask,QP_ATTR_QKEY); - - ret = VAPI_modify_qp(nic, - ud_qp_hndl, &qp_attr, - &qp_attr_mask, &qp_cap); - - if(VAPI_OK != ret) { - MCA_PTL_IB_VAPI_RET(ret, "VAPI_modify_qp"); - return OMPI_ERROR; - } - - D_PRINT("Modified UD to init..Qp\n"); - - /***************** - * INIT --> RTR - *****************/ - QP_ATTR_MASK_CLR_ALL(qp_attr_mask); - qp_attr.qp_state = VAPI_RTR; - QP_ATTR_MASK_SET(qp_attr_mask,QP_ATTR_QP_STATE); - - ret = VAPI_modify_qp(nic, ud_qp_hndl, &qp_attr, - &qp_attr_mask, &qp_cap); - - if(VAPI_OK != ret) { - MCA_PTL_IB_VAPI_RET(ret, "VAPI_modify_qp"); - return OMPI_ERROR; - } - - D_PRINT("Modified UD to RTR..Qp\n"); - - /********************* - * RTR --> RTS - *********************/ - QP_ATTR_MASK_CLR_ALL(qp_attr_mask); - qp_attr.qp_state = VAPI_RTS; - QP_ATTR_MASK_SET(qp_attr_mask,QP_ATTR_QP_STATE); - qp_attr.sq_psn = DEFAULT_PSN; - QP_ATTR_MASK_SET(qp_attr_mask,QP_ATTR_SQ_PSN); - - ret = VAPI_modify_qp(nic, ud_qp_hndl, &qp_attr, - &qp_attr_mask, &qp_cap); - - if(VAPI_OK != ret) { - MCA_PTL_IB_VAPI_RET(ret, "VAPI_modify_qp"); - return OMPI_ERROR; - } - - D_PRINT("Modified UD to RTS..Qp\n"); - - /* Everything was fine ... return success! */ - return OMPI_SUCCESS; -} - int mca_ptl_ib_get_num_hcas(uint32_t* num_hcas) { VAPI_ret_t ret; @@ -305,196 +376,7 @@ int mca_ptl_ib_get_num_hcas(uint32_t* num_hcas) return OMPI_SUCCESS; } -/* This function returns the hca_id for each PTL - * in a round robin manner. Each PTL gets a different - * HCA id ... - * - * If num PTLs > num HCAs, then those ptls will be - * assigned HCA ids beginning from 0 again. - */ - -int mca_ptl_ib_get_hca_id(int num, VAPI_hca_id_t* hca_id) -{ - uint32_t num_hcas; - VAPI_ret_t ret; - VAPI_hca_id_t* hca_ids = NULL; - - hca_ids = (VAPI_hca_id_t*) malloc(mca_ptl_ib_component.ib_num_hcas * - sizeof(VAPI_hca_id_t)); - - if(NULL == hca_ids) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - /* Now get the hca_id from underlying VAPI layer */ - ret = EVAPI_list_hcas(mca_ptl_ib_component.ib_num_hcas, - &num_hcas, hca_ids); - - /* HACK: right now, I have put VAPI_EAGAIN as - * acceptable condition since we are trying to have - * only 1 ptl support */ - if((VAPI_OK != ret) && (VAPI_EAGAIN != ret)) { - MCA_PTL_IB_VAPI_RET(ret, "EVAPI_list_hcas"); - return OMPI_ERROR; - } else { - - num = num % num_hcas; - - memcpy(hca_id, hca_ids[num], sizeof(VAPI_hca_id_t)); - } - - free(hca_ids); - - return OMPI_SUCCESS; -} - -int mca_ptl_ib_get_hca_hndl(VAPI_hca_id_t hca_id, - VAPI_hca_hndl_t* hca_hndl) -{ - VAPI_ret_t ret; - - /* Open the HCA */ - ret = EVAPI_get_hca_hndl(hca_id, hca_hndl); - - if(VAPI_OK != ret) { - MCA_PTL_IB_VAPI_RET(ret, "EVAPI_get_hca_hndl"); - return OMPI_ERROR; - } - - return OMPI_SUCCESS; -} - -int mca_ptl_ib_query_hca_prop(VAPI_hca_hndl_t nic, - VAPI_hca_port_t* port) -{ - VAPI_ret_t ret; - - /* Querying for port properties */ - ret = VAPI_query_hca_port_prop(nic, - (IB_port_t)DEFAULT_PORT, - port); - - if(VAPI_OK != ret) { - MCA_PTL_IB_VAPI_RET(ret, "VAPI_query_hca_port_prop"); - return OMPI_ERROR; - } - - return OMPI_SUCCESS; -} - -int mca_ptl_ib_alloc_pd(VAPI_hca_hndl_t nic, - VAPI_pd_hndl_t* ptag) -{ - VAPI_ret_t ret; - - ret = VAPI_alloc_pd(nic, ptag); - - if(ret != VAPI_OK) { - MCA_PTL_IB_VAPI_RET(ret, "VAPI_alloc_pd"); - return OMPI_ERROR; - } - - return OMPI_SUCCESS; -} - -int mca_ptl_ib_create_cq(VAPI_hca_hndl_t nic, - VAPI_cq_hndl_t* cq_hndl) -{ - uint32_t act_num_cqe = 0; - VAPI_ret_t ret; - - ret = VAPI_create_cq(nic, DEFAULT_CQ_SIZE, - cq_hndl, &act_num_cqe); - - if( (VAPI_OK != ret) || (0 == act_num_cqe)) { - MCA_PTL_IB_VAPI_RET(ret, "VAPI_create_cq"); - return OMPI_ERROR; - } - - return OMPI_SUCCESS; -} - -int mca_ptl_ib_set_async_handler(VAPI_hca_hndl_t nic, - EVAPI_async_handler_hndl_t *async_handler) -{ - VAPI_ret_t ret; - - ret = EVAPI_set_async_event_handler(nic, - async_event_handler, 0, async_handler); - - if(VAPI_OK != ret) { - MCA_PTL_IB_VAPI_RET(ret, "EVAPI_set_async_event_handler"); - return OMPI_ERROR; - } - - return OMPI_SUCCESS; -} - -int mca_ptl_ib_prep_ud_bufs(VAPI_hca_hndl_t nic, VAPI_pd_hndl_t ptag, - mca_ptl_ib_ud_buf_t* ud_buf, IB_wr_t wr_type, - int num_bufs) -{ -#if 0 - int i; - vapi_descriptor_t* desc; - - for(i = 0; i < num_bufs; i++) { - - desc = &ud_buf[i].desc; - ud_buf[i].buf_data = - malloc(sizeof(mca_ptl_ib_ud_buf_data_t)); - - if(NULL == ud_buf[i].buf_data) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - - if(mca_ptl_ib_register_mem(nic, - (void*) ud_buf[i].buf_data, ptag, - sizeof(mca_ptl_ib_ud_buf_data_t), - &ud_buf[i].memhandle) - != OMPI_SUCCESS) { - return OMPI_ERROR; - } - - D_PRINT("databuf = %p, len = %d, lkey = %d\n", - (void*) ud_buf[i].buf_data, - sizeof(mca_ptl_ib_ud_buf_data_t), - ud_buf[i].memhandle.lkey); - - if(IB_RECV == wr_type) { - desc->rr.comp_type = VAPI_SIGNALED; - desc->rr.opcode = VAPI_RECEIVE; - desc->rr.id = (VAPI_virt_addr_t)(MT_virt_addr_t) &(ud_buf[i]); - desc->rr.sg_lst_len = 1; - desc->rr.sg_lst_p = &(desc->sg_entry); - desc->sg_entry.len = sizeof(mca_ptl_ib_ud_buf_data_t); - desc->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) ud_buf[i].buf_data; - desc->sg_entry.lkey = ud_buf[i].memhandle.lkey; - } else if (IB_SEND == wr_type) { - desc->sr.comp_type = VAPI_SIGNALED; - desc->sr.opcode = VAPI_SEND; - desc->sr.id = (VAPI_virt_addr_t)(MT_virt_addr_t) &(ud_buf[i]); - desc->sr.sg_lst_len = 1; - desc->sr.sg_lst_p = &(desc->sg_entry); - desc->sg_entry.len = sizeof(mca_ptl_ib_ud_buf_data_t); - desc->sg_entry.addr = (VAPI_virt_addr_t) (MT_virt_addr_t) ud_buf[i].buf_data; - desc->sg_entry.lkey = ud_buf[i].memhandle.lkey; - } else { - return OMPI_ERROR; - } - - - D_PRINT("databuf = %p, len = %d, lkey = %d\n", - (void*) ud_buf[i].buf_data, - sizeof(mca_ptl_ib_ud_buf_data_t), - ud_buf[i].memhandle.lkey); - } -#endif - - return OMPI_SUCCESS; -} - -int mca_ptl_ib_register_mem(VAPI_hca_hndl_t nic, VAPI_pd_hndl_t ptag, +static int mca_ptl_ib_register_mem(VAPI_hca_hndl_t nic, VAPI_pd_hndl_t ptag, void* buf, int len, vapi_memhandle_t* memhandle) { VAPI_ret_t ret; @@ -528,118 +410,48 @@ int mca_ptl_ib_register_mem(VAPI_hca_hndl_t nic, VAPI_pd_hndl_t ptag, return OMPI_SUCCESS; } -int mca_ptl_ib_post_ud_recv(VAPI_hca_hndl_t nic, - VAPI_qp_hndl_t ud_qp_hndl, - mca_ptl_ib_ud_buf_t* ud_buf, int num_bufs) +int mca_ptl_ib_init_peer(mca_ptl_ib_state_t *ib_state, + mca_ptl_ib_peer_conn_t *peer_conn) { - int i; - VAPI_ret_t ret; + /* Allocate resources for the peer connection */ - for(i = 0; i < num_bufs; i++) { - - ret = VAPI_post_rr(nic, ud_qp_hndl, &ud_buf[i].desc.rr); - - if(VAPI_OK != ret) { - MCA_PTL_IB_VAPI_RET(ret, "VAPI_post_rr"); - return OMPI_ERROR; - } - } - return OMPI_SUCCESS; -} - -int mca_ptl_ib_set_comp_ev_hndl(VAPI_hca_hndl_t nic, - VAPI_cq_hndl_t cq_hndl, VAPI_completion_event_handler_t handler, - void* priv_data, EVAPI_compl_handler_hndl_t *handler_hndl) -{ - VAPI_ret_t ret; - - ret = EVAPI_set_comp_eventh(nic, cq_hndl, handler, - priv_data, handler_hndl); - - if(VAPI_OK != ret) { - MCA_PTL_IB_VAPI_RET(ret, "EVAPI_set_comp_eventh"); - return OMPI_ERROR; + peer_conn->lres = (mca_ptl_ib_peer_local_res_t *) + malloc(sizeof(mca_ptl_ib_peer_local_res_t)); + if(NULL == peer_conn->lres) { + return OMPI_ERR_OUT_OF_RESOURCE; } - D_PRINT("Completion hander: %p, Handle = %d\n", - handler, (int)*handler_hndl); - - return OMPI_SUCCESS; -} + peer_conn->rres = (mca_ptl_ib_peer_remote_res_t *) + malloc(sizeof(mca_ptl_ib_peer_remote_res_t)); + if(NULL == peer_conn->rres) { + return OMPI_ERR_OUT_OF_RESOURCE; + } -int mca_ptl_ib_req_comp_notif(VAPI_hca_hndl_t nic, VAPI_cq_hndl_t cq_hndl) -{ - VAPI_ret_t ret; - - ret = VAPI_req_comp_notif(nic, cq_hndl, VAPI_NEXT_COMP); - - if(VAPI_OK != ret) { - MCA_PTL_IB_VAPI_RET(ret, "VAPI_req_comp_notif"); - return OMPI_ERROR; + /* Create the Queue Pair */ + if(mca_ptl_ib_create_qp(ib_state->nic, + ib_state->ptag, + ib_state->cq_hndl, + ib_state->cq_hndl, + &peer_conn->lres->qp_hndl, + &peer_conn->lres->qp_prop, + VAPI_TS_RC) + != OMPI_SUCCESS) { + return OMPI_ERR_OUT_OF_RESOURCE; } return OMPI_SUCCESS; } -int mca_ptl_ib_get_comp_ev_hndl(VAPI_completion_event_handler_t* handler_ptr) +/* + * Establish Reliable Connection with peer + * + */ + +int mca_ptl_ib_peer_connect(mca_ptl_ib_state_t *ib_state, + mca_ptl_ib_peer_conn_t *peer_conn) { - *handler_ptr = ud_completion_handler; - - D_PRINT("UD Completion Event Handler = %p\n", ud_completion_handler); - - return OMPI_SUCCESS; -} - -void mca_ptl_ib_frag(struct mca_ptl_ib_module_t* module, - mca_ptl_base_header_t * header) -{ - /* Allocate a recv frag descriptor */ - mca_ptl_ib_recv_frag_t *recv_frag; - ompi_list_item_t *item; - - bool matched; - int rc = OMPI_SUCCESS; - - OMPI_FREE_LIST_GET(&mca_ptl_ib_component.ib_recv_frags, - item, rc); - - while (OMPI_SUCCESS != rc) { - /* TODO: progress the recv state machine */ - D_PRINT("Retry to allocate a recv fragment\n"); - OMPI_FREE_LIST_GET (&mca_ptl_ib_component.ib_recv_frags, - item, rc); - } - - recv_frag = (mca_ptl_ib_recv_frag_t *) item; - recv_frag->super.frag_base.frag_owner = - (mca_ptl_base_module_t *) module; - - recv_frag->super.frag_base.frag_peer = NULL; - recv_frag->super.frag_request = NULL; - recv_frag->super.frag_is_buffered = false; - - /* Copy the header, mca_ptl_base_match() does not do what it claims */ - recv_frag->super.frag_base.frag_header = *header; - - /* Taking the data starting point be default */ - recv_frag->super.frag_base.frag_addr = - (char *) header + sizeof (mca_ptl_base_header_t); - recv_frag->super.frag_base.frag_size = header->hdr_frag.hdr_frag_length; - - /* match with preposted requests */ - matched = module->super.ptl_match( - recv_frag->super.frag_base.frag_owner, - &recv_frag->super, - &recv_frag->super.frag_base.frag_header.hdr_match); - - if (!matched) { - /* Oh my GOD !!! */ - D_PRINT("Can't match buffer. Mama is unhappy\n"); - memcpy (recv_frag->unex_buf, - (char *) header + sizeof (mca_ptl_base_header_t), - header->hdr_frag.hdr_frag_length); - recv_frag->super.frag_is_buffered = true; - recv_frag->super.frag_base.frag_addr = recv_frag->unex_buf; - - } + return(mca_ptl_ib_rc_qp_init(ib_state->nic, + peer_conn->lres->qp_hndl, + peer_conn->rres->qp_num, + peer_conn->rres->lid)); } diff --git a/src/mca/ptl/ib/src/ptl_ib_priv.h b/src/mca/ptl/ib/src/ptl_ib_priv.h index 5043d66c0c..adbe805bfa 100644 --- a/src/mca/ptl/ib/src/ptl_ib_priv.h +++ b/src/mca/ptl/ib/src/ptl_ib_priv.h @@ -1,16 +1,27 @@ #ifndef MCA_PTL_IB_PRIV_H #define MCA_PTL_IB_PRIV_H +#include #include "ptl_ib_vapi.h" -/* Posting MAX_UD_PREPOST_DEPTH number of recv - * buffers at PTL initialization. What happens when - * more than these number of procs try to send their - * queue pair handles? - */ -#define MAX_UD_PREPOST_DEPTH (1) -#define BUFSIZE (4096) -#define NUM_BUFS (5000) +struct mca_ptl_ib_state_t { + VAPI_hca_id_t hca_id; /* ID of HCA */ + VAPI_hca_port_t port; /* IB port of this PTL */ + VAPI_hca_hndl_t nic; /* NIC handle */ + VAPI_pd_hndl_t ptag; /* Protection Domain tag */ + + VAPI_cq_hndl_t cq_hndl; /* Completion Queue handle */ + /* At present Send & Recv + * are tied to the same + * completion queue */ + + EVAPI_async_handler_hndl_t async_handler; + /* Async event handler used + * to detect weird/unknown + * events */ +}; + +typedef struct mca_ptl_ib_state_t mca_ptl_ib_state_t; typedef enum { IB_RECV, @@ -18,89 +29,94 @@ typedef enum { } IB_wr_t; struct vapi_memhandle_t { - VAPI_mr_hndl_t hndl; - VAPI_lkey_t lkey; - VAPI_rkey_t rkey; + VAPI_mr_hndl_t hndl; + /* Memory region handle */ + + VAPI_lkey_t lkey; + /* Local key to registered memory, needed for + * posting send/recv requests */ + + VAPI_rkey_t rkey; + /* Remote key to registered memory, need to send this + * to remote processes for incoming RDMA ops */ }; typedef struct vapi_memhandle_t vapi_memhandle_t; struct vapi_descriptor_t { union { - VAPI_rr_desc_t rr; - VAPI_sr_desc_t sr; + VAPI_rr_desc_t rr; + /* Receive descriptor */ + + VAPI_sr_desc_t sr; + /* Send descriptor */ }; VAPI_sg_lst_entry_t sg_entry; + /* Scatter/Gather entry */ }; typedef struct vapi_descriptor_t vapi_descriptor_t; -struct mca_ptl_ib_send_buf_t { - mca_pml_base_request_t *req; - vapi_descriptor_t desc; - char buf[4096]; +/* mca_ptl_ib_peer_local_res_t contains information + * regarding local resources dedicated to this + * connection */ +struct mca_ptl_ib_peer_local_res_t { + + VAPI_qp_hndl_t qp_hndl; + /* Local QP handle */ + + VAPI_qp_prop_t qp_prop; + /* Local QP properties */ }; -typedef struct mca_ptl_ib_send_buf_t mca_ptl_ib_send_buf_t; +typedef struct mca_ptl_ib_peer_local_res_t mca_ptl_ib_peer_local_res_t; -struct mca_ptl_ib_recv_buf_t { - mca_pml_base_request_t *req; - vapi_descriptor_t desc; - char buf[4096]; +/* mca_ptl_ib_peer_remote_res_t contains information + * regarding remote resources dedicated to this + * connection */ +struct mca_ptl_ib_peer_remote_res_t { + + VAPI_qp_num_t qp_num; + /* Remote side QP number */ + + IB_lid_t lid; + /* Local identifier of the remote process */ }; -typedef struct mca_ptl_ib_recv_buf_t mca_ptl_ib_recv_buf_t; +typedef struct mca_ptl_ib_peer_remote_res_t mca_ptl_ib_peer_remote_res_t; -#define MCA_PTL_IB_UD_RECV_DESC(ud_buf, len) { \ - desc->rr.comp_type = VAPI_SIGNALED; \ - desc->rr.opcode = VAPI_RECEIVE; \ - desc->rr.id = (VAPI_virt_addr_t)(MT_virt_addr_t) &(ud_buf); \ - desc->rr.sg_lst_len = 1; \ - desc->rr.sg_lst_p = &(desc->sg_entry); \ - desc->sg_entry.len = len; \ - desc->sg_entry.addr = \ - (VAPI_virt_addr_t) (MT_virt_addr_t) ud_buf.buf_data; \ - desc->sg_entry.lkey = ud_buf.memhandle.lkey; \ +/* mca_ptl_ib_peer_conn_t contains private information + * about the peer. This information is used to describe + * the connection oriented information about this peer + * and local resources associated with it. */ +struct mca_ptl_ib_peer_conn_t { + + mca_ptl_ib_peer_local_res_t* lres; + /* Local resources associated with this connection */ + + mca_ptl_ib_peer_remote_res_t* rres; + /* Remote resources associated with this connection */ +}; + +typedef struct mca_ptl_ib_peer_conn_t mca_ptl_ib_peer_conn_t; + +#define DUMP_IB_STATE(ib_state_ptr) { \ + ompi_output(0, "[%s:%d] ", __FILE__, __LINE__); \ + ompi_output(0, "Dumping IB state"); \ + ompi_output(0, "HCA ID : %s", ib_state_ptr->hca_id); \ + ompi_output(0, "LID : %d", ib_state_ptr->port.lid); \ + ompi_output(0, "HCA handle : %d", ib_state_ptr->nic); \ + ompi_output(0, "Protection Domain: %d", ib_state_ptr->ptag); \ + ompi_output(0, "Comp Q handle : %d", ib_state_ptr->cq_hndl); \ + ompi_output(0, "Async hndl : %d", ib_state_ptr->async_handler); \ } -#define MCA_PTL_IB_UD_SEND_DESC(ud_buf, len) { \ - desc->sr.comp_type = VAPI_SIGNALED; \ - desc->sr.opcode = VAPI_SEND; \ - desc->sr.id = (VAPI_virt_addr_t)(MT_virt_addr_t) ud_buf; \ - desc->sr.sg_lst_len = 1; \ - desc->sr.sg_lst_p = &(desc->sg_entry); \ - desc->sg_entry.len = len; \ - desc->sg_entry.addr = \ - (VAPI_virt_addr_t) (MT_virt_addr_t) ud_buf->buf_data; \ - desc->sg_entry.lkey = ud_buf->memhandle.lkey; \ -} -int mca_ptl_ib_ud_cq_init(VAPI_hca_hndl_t, VAPI_cq_hndl_t*, - VAPI_cq_hndl_t*); -int mca_ptl_ib_ud_qp_init(VAPI_hca_hndl_t, VAPI_qp_hndl_t); +int mca_ptl_ib_init_module(mca_ptl_ib_state_t*, int); int mca_ptl_ib_get_num_hcas(uint32_t*); -int mca_ptl_ib_get_hca_id(int, VAPI_hca_id_t*); -int mca_ptl_ib_get_hca_hndl(VAPI_hca_id_t, VAPI_hca_hndl_t*); -int mca_ptl_ib_query_hca_prop(VAPI_hca_hndl_t, VAPI_hca_port_t*); -int mca_ptl_ib_alloc_pd(VAPI_hca_hndl_t, VAPI_pd_hndl_t*); -int mca_ptl_ib_create_cq(VAPI_hca_hndl_t, VAPI_cq_hndl_t*); -int mca_ptl_ib_set_async_handler(VAPI_hca_hndl_t, - EVAPI_async_handler_hndl_t*); -int mca_ptl_ib_register_mem(VAPI_hca_hndl_t, VAPI_pd_hndl_t, void*, int, - vapi_memhandle_t*); -int mca_ptl_ib_set_comp_ev_hndl(VAPI_hca_hndl_t, VAPI_cq_hndl_t, - VAPI_completion_event_handler_t, void*, - EVAPI_compl_handler_hndl_t*); -int mca_ptl_ib_req_comp_notif(VAPI_hca_hndl_t,VAPI_cq_hndl_t); -int mca_ptl_ib_get_comp_ev_hndl(VAPI_completion_event_handler_t*); -int mca_ptl_ib_init_send(void*, VAPI_qp_hndl_t, int); -int mca_ptl_ib_create_qp(VAPI_hca_hndl_t, VAPI_pd_hndl_t, - VAPI_cq_hndl_t, VAPI_cq_hndl_t, VAPI_qp_hndl_t*, - VAPI_qp_prop_t*, int); -int mca_ptl_ib_rc_qp_init(VAPI_hca_hndl_t, VAPI_qp_hndl_t, - VAPI_qp_num_t, IB_lid_t); -void mca_ptl_ib_frag(struct mca_ptl_ib_module_t* module, - mca_ptl_base_header_t * header); +int mca_ptl_ib_init_peer(mca_ptl_ib_state_t*, mca_ptl_ib_peer_conn_t*); +int mca_ptl_ib_peer_connect(mca_ptl_ib_state_t*, + mca_ptl_ib_peer_conn_t*); #endif /* MCA_PTL_IB_PRIV_H */ diff --git a/src/mca/ptl/ib/src/ptl_ib_proc.c b/src/mca/ptl/ib/src/ptl_ib_proc.c index a6bf55d698..0fb42b3caf 100644 --- a/src/mca/ptl/ib/src/ptl_ib_proc.c +++ b/src/mca/ptl/ib/src/ptl_ib_proc.c @@ -17,7 +17,6 @@ OBJ_CLASS_INSTANCE(mca_ptl_ib_proc_t, void mca_ptl_ib_proc_construct(mca_ptl_ib_proc_t* proc) { proc->proc_ompi = 0; - proc->proc_addrs = 0; proc->proc_addr_count = 0; proc->proc_peers = 0; proc->proc_peer_count = 0; @@ -84,21 +83,22 @@ static mca_ptl_ib_proc_t* mca_ptl_ib_proc_lookup_ompi(ompi_proc_t* ompi_proc) mca_ptl_ib_proc_t* mca_ptl_ib_proc_create(ompi_proc_t* ompi_proc) { - int rc, my_rank, i; size_t size; - char* str_rank; - VAPI_ret_t ret; - - mca_ptl_ib_module_t* module = NULL; - mca_ptl_ib_proc_t* module_proc = NULL; + /* Check if we have already created a IB proc + * structure for this ompi process */ module_proc = mca_ptl_ib_proc_lookup_ompi(ompi_proc); if(module_proc != NULL) { + + /* Gotcha! */ return module_proc; } + /* Oops! First time, gotta create a new IB proc + * out of the ompi_proc ... */ + module_proc = OBJ_NEW(mca_ptl_ib_proc_t); /* Initialize number of peer */ @@ -112,36 +112,15 @@ mca_ptl_ib_proc_t* mca_ptl_ib_proc_create(ompi_proc_t* ompi_proc) D_PRINT("Creating proc for %d\n", ompi_proc->proc_name.vpid); - /* lookup ib parameters exported by - * this proc */ - rc = mca_base_modex_recv( - &mca_ptl_ib_component.super.ptlm_version, - ompi_proc, - (void**)&module_proc->proc_addrs, - &size); + /* IB module doesn't have addresses exported at + * initialization, so the addr_count is set to one. */ + module_proc->proc_addr_count = 1; - if(rc != OMPI_SUCCESS) { - ompi_output(0, "mca_ptl_ib_proc_create: mca_base_modex_recv: " - "failed with return value=%d", rc); - OBJ_RELEASE(module_proc); - return NULL; - } - D_PRINT("UD q.p. obtained is: %d, Lid : %d\n", - module_proc->proc_addrs[0].qp_num, - module_proc->proc_addrs[0].lid); - - if(0 != (size % sizeof(mca_ptl_ib_ud_addr_t))) { - ompi_output(0, "mca_ptl_ib_proc_create: mca_base_modex_recv: " - "invalid size %d\n", size); - return NULL; - } - - module_proc->proc_addr_count = size / sizeof(mca_ptl_ib_ud_addr_t); - - /* allocate space for peer array - one for - * each exported address - */ + /* XXX: Right now, there can be only 1 peer associated + * with a proc. Needs a little bit change in + * mca_ptl_ib_proc_t to allow on demand increasing of + * number of peers for this proc */ module_proc->proc_peers = (mca_ptl_base_peer_t**) malloc(module_proc->proc_addr_count * sizeof(mca_ptl_base_peer_t*)); @@ -150,101 +129,6 @@ mca_ptl_ib_proc_t* mca_ptl_ib_proc_create(ompi_proc_t* ompi_proc) OBJ_RELEASE(module_proc); return NULL; } - - /* HACK: Till dyn. connection management comes through, - * just establish the RC connection here */ - - str_rank = getenv("OMPI_MCA_pcm_cofs_procid"); - if(NULL != str_rank) { - my_rank = atoi(str_rank); - } else { - D_PRINT("Rank, what rank?"); - } - - if(my_rank != ompi_proc->proc_name.vpid) { - D_PRINT("I %d Have to create connection for %d", - my_rank, ompi_proc->proc_name.vpid); - - module = mca_ptl_ib_component.ib_ptl_modules[0]; - - /* Make the RC QP transitions */ - if(mca_ptl_ib_rc_qp_init(module->nic, - module->my_qp_hndl, - module_proc->proc_addrs[0].qp_num, - module_proc->proc_addrs[0].lid) - != OMPI_SUCCESS) { - return NULL; - } - - /* Allocate the send and recv buffers */ - - module->send_buf = - malloc(sizeof(mca_ptl_ib_send_buf_t) * NUM_BUFS); - - if(NULL == module->send_buf) { - return NULL; - } - memset(module->send_buf, - 0, sizeof(mca_ptl_ib_send_buf_t) * NUM_BUFS); - - if(mca_ptl_ib_register_mem(module->nic, - module->ptag, - module->send_buf, - sizeof(mca_ptl_ib_send_buf_t) * NUM_BUFS, - &module->send_buf_hndl) - != OMPI_SUCCESS) { - return NULL; - } - - module->recv_buf = - malloc(sizeof(mca_ptl_ib_recv_buf_t) * NUM_BUFS); - - if(NULL == module->recv_buf) { - return NULL; - } - - memset(module->recv_buf, - 0, sizeof(mca_ptl_ib_recv_buf_t) * NUM_BUFS); - - if(mca_ptl_ib_register_mem(module->nic, - module->ptag, - module->recv_buf, - sizeof(mca_ptl_ib_recv_buf_t) * NUM_BUFS, - &module->recv_buf_hndl) - != OMPI_SUCCESS) { - return NULL; - } - - /* Prepare the receivs */ - for(i = 0; i < NUM_BUFS; i++) { - module->recv_buf[i].desc.rr.comp_type = VAPI_SIGNALED; - module->recv_buf[i].desc.rr.opcode = VAPI_RECEIVE; - module->recv_buf[i].desc.rr.id = (VAPI_virt_addr_t) - (MT_virt_addr_t) &module->recv_buf[i]; - module->recv_buf[i].desc.rr.sg_lst_len = 1; - module->recv_buf[i].desc.rr.sg_lst_p = &(module->recv_buf[i].desc.sg_entry); - module->recv_buf[i].desc.sg_entry.len = 4096; - module->recv_buf[i].desc.sg_entry.lkey = module->recv_buf_hndl.lkey; - module->recv_buf[i].desc.sg_entry.addr = - (VAPI_virt_addr_t) (MT_virt_addr_t) (module->recv_buf[i].buf); - } - - /* Post the receives */ - for(i = 0; i < NUM_BUFS; i++) { - ret = VAPI_post_rr(module->nic, - module->my_qp_hndl, - &module->recv_buf[i].desc.rr); - if(VAPI_OK != ret) { - MCA_PTL_IB_VAPI_RET(ret, "VAPI_post_rr"); - return NULL; - } - } - } - - if(1 == my_rank) { - sleep(2); - } - return module_proc; } diff --git a/src/mca/ptl/ib/src/ptl_ib_proc.h b/src/mca/ptl/ib/src/ptl_ib_proc.h index 8dd4b06233..a99e68f4f8 100644 --- a/src/mca/ptl/ib/src/ptl_ib_proc.h +++ b/src/mca/ptl/ib/src/ptl_ib_proc.h @@ -27,9 +27,6 @@ struct mca_ptl_ib_proc_t { ompi_process_name_t proc_guid; /**< globally unique identifier for the process */ - struct mca_ptl_ib_ud_addr_t* proc_addrs; - /**< array of addresses published by peer */ - size_t proc_addr_count; /**< number of addresses published by peer */ @@ -45,9 +42,6 @@ struct mca_ptl_ib_proc_t { typedef struct mca_ptl_ib_proc_t mca_ptl_ib_proc_t; mca_ptl_ib_proc_t* mca_ptl_ib_proc_create(ompi_proc_t* ompi_proc); -/* -mca_ptl_ib_proc_t* mca_ptl_ib_proc_lookup(ompi_process_name_t*); -*/ int mca_ptl_ib_proc_insert(mca_ptl_ib_proc_t*, mca_ptl_base_peer_t*); #endif diff --git a/src/mca/ptl/ib/src/ptl_ib_sendfrag.c b/src/mca/ptl/ib/src/ptl_ib_sendfrag.c index dfc5d7fd56..3872de5213 100644 --- a/src/mca/ptl/ib/src/ptl_ib_sendfrag.c +++ b/src/mca/ptl/ib/src/ptl_ib_sendfrag.c @@ -28,13 +28,105 @@ static void mca_ptl_ib_send_frag_destruct(mca_ptl_ib_send_frag_t* frag) D_PRINT("\n"); } -int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* ib_send_frag, - struct mca_ptl_base_peer_t* base_peer, - struct mca_pml_base_send_request_t* base_send_req, +int mca_ptl_ib_send_frag_init(mca_ptl_ib_send_frag_t* sendfrag, + struct mca_ptl_base_peer_t* ptl_peer, + struct mca_pml_base_send_request_t* sendreq, size_t offset, size_t* size, int flags) { - D_PRINT("\n"); + /* message header */ + size_t size_in = *size; + size_t size_out; + + D_PRINT(""); +#if 0 + + mca_ptl_base_header_t* hdr = &sendfrag->frag_header; + + if(offset == 0) { + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH; + hdr->hdr_common.hdr_flags = flags; + hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t); + hdr->hdr_frag.hdr_frag_offset = offset; + hdr->hdr_frag.hdr_frag_seq = 0; + hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ + hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; + hdr->hdr_frag.hdr_dst_ptr.lval = 0; + hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid; + hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank; + hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer; + hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag; + hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed; + hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence; + } else { + hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG; + hdr->hdr_common.hdr_flags = flags; + hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_frag_header_t); + hdr->hdr_frag.hdr_frag_offset = offset; + hdr->hdr_frag.hdr_frag_seq = 0; + hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ + hdr->hdr_frag.hdr_src_ptr.pval = sendfrag; + hdr->hdr_frag.hdr_dst_ptr = sendreq->req_peer_match; + } + + /* initialize convertor */ + if(size_in > 0) { + ompi_convertor_t *convertor; + int rc; + + /* first fragment (eager send) and first fragment of long + * protocol can use the convertor initialized on the request, + * remaining fragments must copy/reinit the convertor as the + * transfer could be in parallel. + */ + if( offset <= mca_ptl_ib_module.super.ptl_first_frag_size ) { + convertor = &sendreq->req_convertor; + } else { + + convertor = &sendfrag->frag_convertor; + ompi_convertor_copy(&sendreq->req_convertor, convertor); + ompi_convertor_init_for_send( + convertor, + 0, + sendreq->req_base.req_datatype, + sendreq->req_base.req_count, + sendreq->req_base.req_addr, + offset); + } + + + /* if data is contigous convertor will return an offset + * into users buffer - otherwise will return an allocated buffer + * that holds the packed data + */ + sendfrag->frag_vec[1].iov_base = NULL; + sendfrag->frag_vec[1].iov_len = size_in; + if((rc = ompi_convertor_pack(convertor, &sendfrag->frag_vec[1], 1)) < 0) + return OMPI_ERROR; + + /* adjust size and request offset to reflect actual + * number of bytes packed by convertor */ + size_out = sendfrag->frag_vec[1].iov_len; + } else { + size_out = size_in; + } + hdr->hdr_frag.hdr_frag_length = size_out; + + /* fragment state */ + sendfrag->frag_owner = &ptl_peer->peer_ptl->super; + sendfrag->frag_send.frag_request = sendreq; + sendfrag->frag_send.frag_base.frag_addr = sendfrag->frag_vec[1].iov_base; + sendfrag->frag_send.frag_base.frag_size = size_out; + + sendfrag->frag_peer = ptl_peer; + sendfrag->frag_vec_ptr = sendfrag->frag_vec; + sendfrag->frag_vec_cnt = (size_out == 0) ? 1 : 2; + sendfrag->frag_vec[0].iov_base = (ompi_iov_base_ptr_t)hdr; + sendfrag->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t); + sendfrag->frag_progressed = 0; + *size = size_out; +#endif + return OMPI_SUCCESS; } diff --git a/src/mca/ptl/ib/src/ptl_ib_sendfrag.h b/src/mca/ptl/ib/src/ptl_ib_sendfrag.h index 7c9b50ea2f..0f4719ebed 100644 --- a/src/mca/ptl/ib/src/ptl_ib_sendfrag.h +++ b/src/mca/ptl/ib/src/ptl_ib_sendfrag.h @@ -13,7 +13,11 @@ OBJ_CLASS_DECLARATION(mca_ptl_ib_send_frag_t); * IB send fragment derived type. */ struct mca_ptl_ib_send_frag_t { - mca_ptl_base_send_frag_t super; /**< base send fragment descriptor */ + mca_ptl_base_send_frag_t frag_send; /**< base send fragment descriptor */ + struct iovec *frag_vec_ptr; + size_t frag_vec_cnt; + struct iovec frag_vec[2]; + volatile int frag_progressed; }; typedef struct mca_ptl_ib_send_frag_t mca_ptl_ib_send_frag_t; diff --git a/src/mca/ptl/ib/src/ptl_ib_vapi.h b/src/mca/ptl/ib/src/ptl_ib_vapi.h index 3d404458af..cff9fa7ff6 100644 --- a/src/mca/ptl/ib/src/ptl_ib_vapi.h +++ b/src/mca/ptl/ib/src/ptl_ib_vapi.h @@ -10,29 +10,26 @@ * all this stuff should be runtime. Ignoring for now. */ -#define DEFAULT_PORT (1) -#define DEFAULT_CQ_SIZE (40000) -#define DEFAULT_UD_WQ_SIZE (10000) -#define DEFAULT_UD_SG_LIST (1) -#define DEFAULT_PKEY_IX (0) -#define DEFAULT_PSN (0) -#define DEFAULT_QP_OUS_RD_ATOM (1) -#define DEFAULT_MTU (MTU1024) -#define DEFAULT_MIN_RNR_TIMER (5) -#define DEFAULT_TIME_OUT (10) -#define DEFAULT_RETRY_COUNT (7) -#define DEFAULT_RNR_RETRY (7) -#define DEFAULT_MAX_RDMA_DST_OPS (16) +#define DEFAULT_PORT (1) +#define DEFAULT_CQ_SIZE (40000) +#define DEFAULT_WQ_SIZE (10000) +#define DEFAULT_SG_LIST (1) +#define DEFAULT_PKEY_IX (0) +#define DEFAULT_PSN (0) +#define DEFAULT_QP_OUS_RD_ATOM (1) +#define DEFAULT_MTU (MTU1024) +#define DEFAULT_MIN_RNR_TIMER (5) +#define DEFAULT_TIME_OUT (10) +#define DEFAULT_RETRY_COUNT (7) +#define DEFAULT_RNR_RETRY (7) +#define DEFAULT_MAX_RDMA_DST_OPS (16) -#define DEFAULT_TRAFFIC_CLASS (0) -#define DEFAULT_HOP_LIMIT (63) -#define DEFAULT_FLOW_LABEL (0) -#define DEFAULT_SERVICE_LEVEL (0) -#define DEFAULT_STATIC_RATE (0) -#define DEFAULT_SRC_PATH_BITS (0) - -/* UD has a default offset of 40 bytes */ -#define UD_RECV_BUF_OFF (40) +#define DEFAULT_TRAFFIC_CLASS (0) +#define DEFAULT_HOP_LIMIT (63) +#define DEFAULT_FLOW_LABEL (0) +#define DEFAULT_SERVICE_LEVEL (0) +#define DEFAULT_STATIC_RATE (0) +#define DEFAULT_SRC_PATH_BITS (0) /* This is a convinence macro. *