diff --git a/orte/mca/rml/ofi/rml_ofi_component.c b/orte/mca/rml/ofi/rml_ofi_component.c index 1de1b56eca..58f72c0ac5 100644 --- a/orte/mca/rml/ofi/rml_ofi_component.c +++ b/orte/mca/rml/ofi/rml_ofi_component.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Intel, Inc. All rights reserved. + * Copyright (c) 2015-2016 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -715,13 +715,31 @@ static int rml_ofi_component_init(void) * for daemons the set/get_contact_info is used to exchange this information */ if (ORTE_PROC_IS_APP) { asprintf(&pmix_key,"%s%d",orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->fabric_attr->prov_name,cur_ofi_prov); - opal_output_verbose(25, orte_rml_base_framework.framework_output, - "%s calling OPAL_MODEX_SEND_STRING key - %s ", + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s calling OPAL_MODEX_SEND_STRING for key - %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pmix_key ); OPAL_MODEX_SEND_STRING( ret, OPAL_PMIX_GLOBAL, pmix_key, orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name, orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); + /*print debug information on opal_modex_string */ + switch ( orte_rml_ofi.ofi_prov[cur_ofi_prov].fabric_info->addr_format) + { + case FI_SOCKADDR_IN : + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s:%d In FI_SOCKADDR_IN. ",__FILE__,__LINE__); + /* Address is of type sockaddr_in (IPv4) */ + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s sending Opal modex string for ofi prov_id %d, epnamelen = %d ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),cur_ofi_prov,orte_rml_ofi.ofi_prov[cur_ofi_prov].epnamelen); + /*[debug] - print the sockaddr - port and s_addr */ + struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*)orte_rml_ofi.ofi_prov[cur_ofi_prov].ep_name; + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s port = 0x%x, InternetAddr = 0x%s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),inet_ntoa(ep_sockaddr->sin_addr)); + break; + } + /* end of printing opal_modex_string and port, IP */ free(pmix_key); if (ORTE_SUCCESS != ret) { opal_output_verbose(1, orte_rml_base_framework.framework_output, @@ -940,6 +958,7 @@ static orte_rml_base_module_t* open_conduit(opal_list_t *attributes) char **comps; int i; orte_attribute_t *attr; + opal_list_t provider; opal_output_verbose(20,orte_rml_base_framework.framework_output, "%s - Entering rml_ofi_open_conduit()", @@ -960,6 +979,7 @@ static orte_rml_base_module_t* open_conduit(opal_list_t *attributes) return NULL; } + /* someone may require this specific component, so look for "ofi" */ if (orte_get_attribute(attributes, ORTE_RML_INCLUDE_COMP_ATTRIB, (void**)&comp_attrib, OPAL_STRING) && NULL != comp_attrib) { @@ -988,6 +1008,27 @@ static orte_rml_base_module_t* open_conduit(opal_list_t *attributes) } } + /*[Debug] to check for daemon commn over ofi-ethernet, enable the default conduit ORTE_MGMT_CONDUIT over ofi */ + if (orte_get_attribute(attributes, ORTE_RML_TRANSPORT_TYPE, (void**)&comp_attrib, OPAL_STRING) && + NULL != comp_attrib) { + opal_output_verbose(20,orte_rml_base_framework.framework_output, + "%s - Forcibly returning ofi socket provider for ethernet transport request", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),comp_attrib); + comps = opal_argv_split(comp_attrib, ','); + for (i=0; NULL != comps[i]; i++) { + if (0 == strcmp(comps[i], "ethernet")) { + /* we are a candidate, */ + opal_argv_free(comps); + OBJ_CONSTRUCT(&provider, opal_list_t); + orte_set_attribute(&provider, ORTE_RML_PROVIDER_ATTRIB, + ORTE_ATTR_LOCAL, "sockets", OPAL_STRING); + return make_module(get_ofi_prov_id(&provider)); + } + } + opal_argv_free(comps); + } + /*[Debug] + /* Alternatively, check the attributes to see if we qualify - we only handle * "pt2pt" */ OPAL_LIST_FOREACH(attr, attributes, orte_attribute_t) { @@ -1204,15 +1245,15 @@ void convert_to_sockaddr( char *ofiuri, struct sockaddr_in* ep_sockaddr) sin_port = tmp + 1; *tmp = '\0'; - opal_output_verbose(10,orte_rml_base_framework.framework_output, + opal_output_verbose(1,orte_rml_base_framework.framework_output, "%s OFI convert_to_sockaddr uri strings got -> family = %s, InternetAddr = %s, port = %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),sin_fly,sin_addr, sin_port); ep_sockaddr->sin_family = atoi( sin_fly ); port = atoi( sin_port); ep_sockaddr->sin_port = htons(port); res = inet_aton(sin_addr,(struct in_addr *)&ep_sockaddr->sin_addr); - opal_output_verbose(10,orte_rml_base_framework.framework_output, - "%s OFI convert_to_sockaddr() port = 0x%x, InternetAddr = %s ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port), + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s OFI convert_to_sockaddr() port = 0x%x decimal-%d, InternetAddr = %s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ntohs(ep_sockaddr->sin_port),ntohs(ep_sockaddr->sin_port), inet_ntoa(ep_sockaddr->sin_addr)); } diff --git a/orte/mca/rml/ofi/rml_ofi_send.c b/orte/mca/rml/ofi/rml_ofi_send.c index c833dc5ecf..874038b4f5 100644 --- a/orte/mca/rml/ofi/rml_ofi_send.c +++ b/orte/mca/rml/ofi/rml_ofi_send.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Intel, Inc. All rights reserved + * Copyright (c) 2015-2016 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -72,6 +72,65 @@ OBJ_CLASS_INSTANCE(ofi_recv_msg_queue_t, opal_list_item_t, ofi_recv_msg_queue_cons, ofi_recv_msg_queue_des); + +typedef struct { + opal_object_t object; + opal_event_t ev; + orte_rml_tag_t tag; + struct iovec* iov; + int count; + opal_buffer_t *buffer; + union { + orte_rml_callback_fn_t iov; + orte_rml_buffer_callback_fn_t buffer; + } cbfunc; + void *cbdata; +} orte_self_send_xfer_t; +static void xfer_cons(orte_self_send_xfer_t *xfer) +{ + xfer->iov = NULL; + xfer->cbfunc.iov = NULL; + xfer->buffer = NULL; + xfer->cbfunc.buffer = NULL; + xfer->cbdata = NULL; +} +OBJ_CLASS_INSTANCE(orte_self_send_xfer_t, + opal_object_t, + xfer_cons, NULL); + + +static void send_self_exe(int fd, short args, void* data) +{ + orte_self_send_xfer_t *xfer = (orte_self_send_xfer_t*)data; + + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s rml_send_to_self ofi callback executing for tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), xfer->tag); + + /* execute the send callback function - note that + * send-to-self always returns a SUCCESS status + */ + if (NULL != xfer->iov) { + if (NULL != xfer->cbfunc.iov) { + /* non-blocking iovec send */ + xfer->cbfunc.iov(ORTE_SUCCESS, ORTE_PROC_MY_NAME, xfer->iov, xfer->count, + xfer->tag, xfer->cbdata); + } + } else if (NULL != xfer->buffer) { + if (NULL != xfer->cbfunc.buffer) { + /* non-blocking buffer send */ + xfer->cbfunc.buffer(ORTE_SUCCESS, ORTE_PROC_MY_NAME, xfer->buffer, + xfer->tag, xfer->cbdata); + } + } else { + /* should never happen */ + abort(); + } + + /* cleanup the memory */ + OBJ_RELEASE(xfer); +} + /** Send callback */ /* [Desc] This is called from the progress fn when a send completion ** is received in the cq @@ -170,8 +229,8 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t ofi_prov_id) /* Since OFI is point-to-point, no need to check if the intended destination is me send to RML */ opal_output_verbose(10, orte_rml_base_framework.framework_output, - "%s Posting Recv for msgid %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg_hdr.msgid ); + "%s Posting Recv for msgid %d, from peer - %s , Tag = %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg_hdr.msgid, ORTE_NAME_PRINT(&msg_hdr.origin),msg_hdr.tag ); ORTE_RML_POST_MESSAGE(&msg_hdr.origin, msg_hdr.tag, msg_hdr.seq_num,data,msglen); } else { msg_in_queue = false; @@ -344,12 +403,17 @@ static void send_msg(int fd, short args, void *cbdata) uint32_t total_packets; fi_addr_t dest_fi_addr; orte_rml_send_t *snd; + orte_rml_recv_t *rcv; + orte_self_send_xfer_t *xfer; orte_rml_ofi_request_t* ofi_send_req = OBJ_NEW( orte_rml_ofi_request_t ); uint8_t ofi_prov_id = req->ofi_prov_id; orte_rml_ofi_send_pkt_t* ofi_msg_pkt; size_t datalen_per_pkt, hdrsize, data_in_pkt; // the length of data in per packet excluding the header size orte_rml_ofi_peer_t* pr; uint64_t ui64; + struct sockaddr_in* ep_sockaddr; + int i, bytes; + char *ptr; snd = OBJ_NEW(orte_rml_send_t); snd->dst = *peer; @@ -375,43 +439,132 @@ static void send_msg(int fd, short args, void *cbdata) opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s calling OPAL_MODEX_RECV_STRING ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) ); // if dest is same as me then instead of doing lookup just populate the dest_ep_name - if (peer->jobid == ORTE_PROC_MY_NAME->jobid && peer->vpid == ORTE_PROC_MY_NAME->vpid) { + /*if (!ORTE_PROC_IS_APP && peer->jobid == ORTE_PROC_MY_NAME->jobid && peer->vpid == ORTE_PROC_MY_NAME->vpid) { dest_ep_namelen = orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen; dest_ep_name = (char *)calloc(dest_ep_namelen,sizeof(char)); memcpy( dest_ep_name, orte_rml_ofi.ofi_prov[ofi_prov_id].ep_name,dest_ep_namelen); - opal_output_verbose(10, orte_rml_base_framework.framework_output, + opal_output_verbose(1, orte_rml_base_framework.framework_output, "%s rml:ofi: send and dest are same so proceeding with cur provider ep_name ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); ret = OPAL_SUCCESS; - } else { - if (ORTE_PROC_IS_APP ) { + } else {*/ + if (ORTE_PROC_IS_APP ) { asprintf(&pmix_key,"%s%d",orte_rml_ofi.ofi_prov[ofi_prov_id].fabric_info->fabric_attr->prov_name,ofi_prov_id); - opal_output_verbose(10, orte_rml_base_framework.framework_output, - "%s calling OPAL_MODEX_RECV_STRING peer - %s, key - %s ", + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s calling OPAL_MODEX_RECV_STRING for ORTE_PROC_APP peer - %s, key - %s ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer),pmix_key ); OPAL_MODEX_RECV_STRING(ret, pmix_key, peer , (char **) &dest_ep_name, &dest_ep_namelen); opal_output_verbose(10, orte_rml_base_framework.framework_output, "Returned from MODEX_RECV"); - free(pmix_key); - } else { - memcpy(&ui64, (char*)peer, sizeof(uint64_t)); - if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_rml_ofi.peers, - ui64, (void**)&pr) || NULL == pr) { - opal_output_verbose(2, orte_rml_base_framework.framework_output, - "%s rml:ofi: Send failed to get peer OFI contact info ", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - return; - } - dest_ep_name = pr->ofi_ep; - dest_ep_namelen = pr->ofi_ep_len; - ret = OPAL_SUCCESS; - } - } - opal_output_verbose(50, orte_rml_base_framework.framework_output, + opal_output_verbose(50, orte_rml_base_framework.framework_output, "%s Return value from OPAL_MODEX_RECV_STRING - %d, length returned - %lu", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ret, dest_ep_namelen); + free(pmix_key); + } else { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s calling OPAL_MODEX_RECV_STRING for DAEMON peer %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer)); + if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, peer, ORTE_PROC_MY_NAME)) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s rml_ofi_send_to_self at tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tag); + /* send to self is a tad tricky - we really don't want + * to track the send callback function throughout the recv + * process and execute it upon receipt as this would provide + * very different timing from a non-self message. Specifically, + * if we just retain a pointer to the incoming data + * and then execute the send callback prior to the receive, + * then the caller will think we are done with the data and + * can release it. So we have to copy the data in order to + * execute the send callback prior to receiving the message. + * + * In truth, this really is a better mimic of the non-self + * message behavior. If we actually pushed the message out + * on the wire and had it loop back, then we would receive + * a new block of data anyway. + */ + /* setup the send callback */ + xfer = OBJ_NEW(orte_self_send_xfer_t); + if (NULL != req->send.iov) { + xfer->iov = req->send.iov; + xfer->count = req->send.count; + xfer->cbfunc.iov = req->send.cbfunc.iov; + } else { + xfer->buffer = req->send.buffer; + xfer->cbfunc.buffer = req->send.cbfunc.buffer; + } + xfer->tag = tag; + xfer->cbdata = req->send.cbdata; + /* setup the event for the send callback */ + opal_event_set(orte_event_base, &xfer->ev, -1, OPAL_EV_WRITE, send_self_exe, xfer); + opal_event_set_priority(&xfer->ev, ORTE_MSG_PRI); + opal_event_active(&xfer->ev, OPAL_EV_WRITE, 1); - + /* copy the message for the recv */ + rcv = OBJ_NEW(orte_rml_recv_t); + rcv->sender = *peer; + rcv->tag = tag; + if (NULL != req->send.iov) { + /* get the total number of bytes in the iovec array */ + bytes = 0; + for (i = 0 ; i < req->send.count ; ++i) { + bytes += req->send.iov[i].iov_len; + } + /* get the required memory allocation */ + if (0 < bytes) { + rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(bytes); + rcv->iov.iov_len = bytes; + /* transfer the bytes */ + ptr = (char*)rcv->iov.iov_base; + for (i = 0 ; i < req->send.count ; ++i) { + memcpy(ptr, req->send.iov[i].iov_base, req->send.iov[i].iov_len); + ptr += req->send.iov[i].iov_len; + } + } + } else if (0 < req->send.buffer->bytes_used) { + rcv->iov.iov_base = (IOVBASE_TYPE*)malloc(req->send.buffer->bytes_used); + memcpy(rcv->iov.iov_base, req->send.buffer->base_ptr, req->send.buffer->bytes_used); + rcv->iov.iov_len = req->send.buffer->bytes_used; + } + /* post the message for receipt - since the send callback was posted + * first and has the same priority, it will execute first + */ + ORTE_RML_ACTIVATE_MESSAGE(rcv); + OBJ_RELEASE(req); + return; + } else { + memcpy(&ui64, (char*)peer, sizeof(uint64_t)); + if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_rml_ofi.peers, + ui64, (void**)&pr) || NULL == pr) { + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s rml:ofi: Send failed to get peer OFI contact info ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + return; + } + opal_output_verbose(1, orte_rml_base_framework.framework_output, + "%s rml:ofi: OFI peer contact info got from hash table", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + dest_ep_name = pr->ofi_ep; + dest_ep_namelen = pr->ofi_ep_len; + ret = OPAL_SUCCESS; + } + } if ( OPAL_SUCCESS == ret) { + //Anandhi added for debug purpose + switch ( orte_rml_ofi.ofi_prov[ofi_prov_id].fabric_info->addr_format) + { + case FI_SOCKADDR_IN : + /* Address is of type sockaddr_in (IPv4) */ + /*[debug] - print the sockaddr - port and s_addr */ + ep_sockaddr = (struct sockaddr_in*)dest_ep_name; + opal_output_verbose(1,orte_rml_base_framework.framework_output, + "%s peer %s epnamelen is %d, port = %d (or) 0x%x, InternetAddr = 0x%s ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ORTE_NAME_PRINT(peer), + orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen,ntohs(ep_sockaddr->sin_port), + ntohs(ep_sockaddr->sin_port),inet_ntoa(ep_sockaddr->sin_addr)); + /*[end debug]*/ + break; + } + //Anandhi end debug opal_output_verbose(10, orte_rml_base_framework.framework_output, "%s OPAL_MODEX_RECV succeded, %s peer ep name obtained. length=%lu", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -423,11 +576,10 @@ static void send_msg(int fd, short args, void *cbdata) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ret ); /* call the send-callback fn with error and return, also return failure status */ snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN; - ORTE_RML_SEND_COMPLETE(snd); - //OBJ_RELEASE( ofi_send_req); - return; + + ORTE_RML_SEND_COMPLETE(snd); + return; } - } else { opal_output_verbose(1, orte_rml_base_framework.framework_output, @@ -567,8 +719,6 @@ static void send_msg(int fd, short args, void *cbdata) "%s End of send_msg_transport. fi_send completed to peer %s with tag %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer), tag); - - free(dest_ep_name); OBJ_RELEASE(req); }