1
1

Adding ofi plugin to allow for opening a conduit to use ethernet/fabric.

modified:   ../orte/mca/rml/base/rml_base_frame.c
	modified:   ../orte/mca/rml/base/rml_base_stubs.c
	deleted:    ../orte/mca/rml/ofi/.opal_ignore
	modified:   ../orte/mca/rml/ofi/Makefile.am
	modified:   ../orte/mca/rml/ofi/rml_ofi.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c
	modified:   ../orte/mca/rml/ofi/rml_ofi_send.c
	modified:   ../orte/test/system/ofi_conduit_stress.c

	Removed stale include directive
	modified:   ../orte/mca/rml/ofi/Makefile.am

The ofi plugin supports multiple providers, and identifies them
by ofi_prov_id,  changed the previous name conduit_id to ofi_prov_id
	modified:   ../orte/mca/rml/base/base.h
	modified:   ../orte/mca/rml/ofi/rml_ofi.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c
	modified:   ../orte/mca/rml/ofi/rml_ofi_request.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_send.c

Adding ofi plugin to allow for opening a conduit to use ethernet/fabric.

	modified:   ../orte/mca/rml/base/rml_base_frame.c
	modified:   ../orte/mca/rml/base/rml_base_stubs.c
	deleted:    ../orte/mca/rml/ofi/.opal_ignore
	modified:   ../orte/mca/rml/ofi/Makefile.am
	modified:   ../orte/mca/rml/ofi/rml_ofi.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c
	modified:   ../orte/mca/rml/ofi/rml_ofi_send.c
	modified:   ../orte/test/system/ofi_conduit_stress.c

	Removed stale include directive
	modified:   ../orte/mca/rml/ofi/Makefile.am

The ofi plugin supports multiple providers, and identifies them
by ofi_prov_id,  changed the previous name conduit_id to ofi_prov_id
	modified:   ../orte/mca/rml/base/base.h
	modified:   ../orte/mca/rml/ofi/rml_ofi.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c
	modified:   ../orte/mca/rml/ofi/rml_ofi_request.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_send.c

Fixed merge issues, and minor pull-request comments
	modified:   ../orte/mca/rml/base/base.h
	modified:   ../orte/mca/rml/base/rml_base_frame.c
	modified:   ../orte/mca/rml/ofi/rml_ofi.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c

Adding ofi plugin to allow for opening a conduit to use ethernet/fabric.

	modified:   ../orte/mca/rml/base/rml_base_frame.c
	modified:   ../orte/mca/rml/base/rml_base_stubs.c
	deleted:    ../orte/mca/rml/ofi/.opal_ignore
	modified:   ../orte/mca/rml/ofi/Makefile.am
	modified:   ../orte/mca/rml/ofi/rml_ofi.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c
	modified:   ../orte/mca/rml/ofi/rml_ofi_send.c
	modified:   ../orte/test/system/ofi_conduit_stress.c

	Removed stale include directive
	modified:   ../orte/mca/rml/ofi/Makefile.am

The ofi plugin supports multiple providers, and identifies them
by ofi_prov_id,  changed the previous name conduit_id to ofi_prov_id
	modified:   ../orte/mca/rml/base/base.h
	modified:   ../orte/mca/rml/ofi/rml_ofi.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c
	modified:   ../orte/mca/rml/ofi/rml_ofi_request.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_send.c

Adding ofi plugin to allow for opening a conduit to use ethernet/fabric.

	modified:   ../orte/mca/rml/base/rml_base_frame.c
	modified:   ../orte/mca/rml/base/rml_base_stubs.c
	deleted:    ../orte/mca/rml/ofi/.opal_ignore
	modified:   ../orte/mca/rml/ofi/Makefile.am
	modified:   ../orte/mca/rml/ofi/rml_ofi.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c
	modified:   ../orte/mca/rml/ofi/rml_ofi_send.c
	modified:   ../orte/test/system/ofi_conduit_stress.c

	Removed stale include directive
	modified:   ../orte/mca/rml/ofi/Makefile.am

Fixed merge issues, and minor pull-request comments
	modified:   ../orte/mca/rml/base/base.h
	modified:   ../orte/mca/rml/base/rml_base_frame.c
	modified:   ../orte/mca/rml/ofi/rml_ofi.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c

Removed trailing space
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c

Cleaned up test- ofi_conduit_stress.c
	modified:   ../orte/test/system/ofi_conduit_stress.c

cleaned up printing the provider info during initialisation
	modified:   ../orte/mca/rml/ofi/rml_ofi.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c

Signed-off-by: Anandhi S Jayakumar <anandhi.s.jayakumar@intel.com>

Fixing warnings
	modified:   ../orte/mca/rml/ofi/rml_ofi.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c
	modified:   ../orte/mca/rml/ofi/rml_ofi_send.c

Signed-off-by: Anandhi S Jayakumar <anandhi.s.jayakumar@intel.com>

minor cleanup
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c
	modified:   ../orte/mca/rml/ofi/rml_ofi_send.c

Signed-off-by: Anandhi S Jayakumar <anandhi.s.jayakumar@intel.com>

more cleanup
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c

Signed-off-by: Anandhi S Jayakumar <anandhi.s.jayakumar@intel.com>

Sending the ethernet address only in the get_contact_info, rest will be sent through modex
	modified:   ../orte/mca/rml/ofi/rml_ofi.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c

Signed-off-by: Anandhi S Jayakumar <anandhi.s.jayakumar@intel.com>

Adding error logging on failures
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c

Signed-off-by: Anandhi S Jayakumar <anandhi.s.jayakumar@intel.com>

Handling the OPAL_MODEX_SEND/RECV generically for all ofi providers.
	modified:   ../orte/mca/rml/ofi/rml_ofi.h
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c
	modified:   ../orte/mca/rml/ofi/rml_ofi_send.c

Signed-off-by: Anandhi S Jayakumar <anandhi.s.jayakumar@intel.com>

Adding to build ofi for limited people
	new file:   ../orte/mca/rml/ofi/.opal_ignore
	new file:   ../orte/mca/rml/ofi/.opal_unignore

Signed-off-by: Anandhi S Jayakumar <anandhi.s.jayakumar@intel.com>

Removign the error logging for now
	modified:   ../orte/mca/rml/ofi/rml_ofi_component.c
Этот коммит содержится в:
Anandhi S Jayakumar 2016-10-21 13:14:25 -07:00
родитель 8cc3f288c9
Коммит 94593ca20b
8 изменённых файлов: 754 добавлений и 634 удалений

Просмотреть файл

@ -145,8 +145,6 @@ typedef struct {
opal_object_t super;
opal_event_t ev;
orte_rml_send_t send;
/* conduit_id */
orte_rml_conduit_t conduit_id;
} orte_rml_send_request_t;
OBJ_CLASS_DECLARATION(orte_rml_send_request_t);

Просмотреть файл

@ -74,11 +74,8 @@ orte_rml_conduit_t orte_rml_API_open_conduit(opal_list_t *attributes)
"%s rml:base:open_conduit Component %s provided a conduit",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
active->component->base.mca_component_name);
/* retain this answer */
if (NULL != ourmod) {
free(ourmod);
}
ourmod = mod;
break;
}
}
}
@ -140,6 +137,9 @@ char* orte_rml_API_get_contact_info(void)
} else {
tmp = NULL;
}
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s rml:base:get_contact_info() returning -> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),tmp);
return tmp;
}

2
orte/mca/rml/ofi/.opal_unignore Обычный файл
Просмотреть файл

@ -0,0 +1,2 @@
anandhis
rhc

Просмотреть файл

@ -26,12 +26,15 @@
#include "rml_ofi_request.h"
/** the maximum open OFI ofi_prov - assuming system will have no more than 20 transports*/
#define MAX_OFI_PROVIDERS 40
#define RML_OFI_PROV_ID_INVALID 0xFF
/** RML/OFI key values **/
/* (char*) ofi socket address (type IN) of the node process is running on */
#define OPAL_RML_OFI_FI_SOCKADDR_IN "rml.ofi.fisockaddrin"
#define OPAL_RML_OFI_FI_SOCKADDR_IN "rml.ofi.fisockaddrin"
/* (char*) ofi socket address (type PSM) of the node process is running on */
#define OPAL_RML_OFI_FI_ADDR_PSMX "rml.ofi.fiaddrpsmx"
#define OPAL_RML_OFI_FI_ADDR_PSMX "rml.ofi.fiaddrpsmx"
// MULTI_BUF_SIZE_FACTOR defines how large the multi recv buffer will be.
// In order to use FI_MULTI_RECV feature efficiently, we need to have a
@ -40,6 +43,8 @@
#define MULTI_BUF_SIZE_FACTOR 128
#define MIN_MULTI_BUF_SIZE (1024 * 1024)
#define OFIADDR "ofiaddr"
#define CLOSE_FID(fd) \
do { \
int _ret = 0; \
@ -72,8 +77,8 @@ and also the corresponding fi_info
**/
typedef struct {
/** OFI conduit ID **/
uint8_t conduit_id;
/** ofi provider ID **/
uint8_t ofi_prov_id;
/** fi_info for this transport */
struct fi_info *fabric_info;
@ -116,37 +121,36 @@ typedef struct {
struct fi_context rx_ctx1;
/* module associated with this conduit_id returned to rml
from open_conduit call */
struct orte_rml_ofi_module_t *ofi_module;
} ofi_transport_conduit_t;
} ofi_transport_ofi_prov_t;
struct orte_rml_ofi_module_t {
orte_rml_base_module_t api;
/** current ofi transport id the component is using, this will be initialised
** in the open_conduit() call **/
** in the open_ofi_prov() call **/
int cur_transport_id;
/** Fabric info structure of all supported transports in system **/
struct fi_info *fi_info_list;
/** OFI ep and corr fi_info for all the transports (conduit) **/
ofi_transport_conduit_t ofi_conduits[MAX_CONDUIT];
/** OFI ep and corr fi_info for all the transports (ofi_providers) **/
ofi_transport_ofi_prov_t ofi_prov[MAX_OFI_PROVIDERS];
size_t min_ofi_recv_buf_sz;
/** "Any source" address */
fi_addr_t any_addr;
/** number of conduits currently opened **/
uint8_t conduit_open_num;
/** number of ofi providers currently opened **/
uint8_t ofi_prov_open_num;
/** Unique message id for every message that is fragmented to be sent over OFI **/
uint32_t cur_msgid;
/* hashtable stores the peer addresses */
opal_hash_table_t peers;
opal_list_t recv_msg_queue_list;
opal_list_t queued_routing_messages;
opal_event_t *timer_event;
@ -154,8 +158,15 @@ typedef struct {
} ;
typedef struct orte_rml_ofi_module_t orte_rml_ofi_module_t;
typedef struct {
opal_object_t super;
void* ofi_ep;
size_t ofi_ep_len;
} orte_rml_ofi_peer_t;
OBJ_CLASS_DECLARATION(orte_rml_ofi_peer_t);
ORTE_MODULE_DECLSPEC extern orte_rml_component_t mca_rml_ofi_component;
extern orte_rml_ofi_module_t orte_rml_ofi;
int orte_rml_ofi_send_buffer_nb(struct orte_rml_base_module_t *mod,
orte_process_name_t* peer,
@ -172,8 +183,11 @@ int orte_rml_ofi_send_nb(struct orte_rml_base_module_t *mod,
void* cbdata);
/****************** INTERNAL OFI Functions*************/
void free_conduit_resources( int conduit_id);
void free_ofi_prov_resources( int ofi_prov_id);
void print_provider_list_info (struct fi_info *fi );
void print_provider_info (struct fi_info *cur_fi );
int cq_progress_handler(int sd, short flags, void *cbdata);
int get_ofi_prov_id( opal_list_t *attributes);
/** Send callback */
int orte_rml_ofi_send_callback(struct fi_cq_data_entry *wc,
@ -184,7 +198,7 @@ int orte_rml_ofi_error_callback(struct fi_cq_err_entry *error,
orte_rml_ofi_request_t*);
/* OFI Recv handler */
int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id);
int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t ofi_prov_id);
END_C_DECLS

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -79,9 +79,9 @@ typedef struct {
orte_rml_send_t *send;
/** OFI conduit_id the request will use - this is
* the reference to element into the orte_rml_ofi.ofi_conduits[] **/
uint8_t conduit_id;
/** OFI provider_id the request will use - this is
* the reference to element into the orte_rml_ofi.ofi_prov[] **/
uint8_t ofi_prov_id;
/** OFI Request type */
orte_rml_ofi_request_type_t type;
@ -124,4 +124,14 @@ typedef struct {
} ofi_recv_msg_queue_t;
OBJ_CLASS_DECLARATION( ofi_recv_msg_queue_t);
/* define an object for transferring send requests to the event lib */
typedef struct {
opal_object_t super;
opal_event_t ev;
orte_rml_send_t send;
/* ofi provider id */
int ofi_prov_id;
} ofi_send_request_t;
OBJ_CLASS_DECLARATION(ofi_send_request_t);
#endif

Просмотреть файл

@ -40,6 +40,14 @@ OBJ_CLASS_INSTANCE(orte_rml_ofi_request_t,
ofi_req_cons, ofi_req_des);
static void ofi_send_req_cons(ofi_send_request_t *ptr)
{
OBJ_CONSTRUCT(&ptr->send, orte_rml_send_t);
}
OBJ_CLASS_INSTANCE(ofi_send_request_t,
opal_object_t,
ofi_send_req_cons, NULL);
OBJ_CLASS_INSTANCE(orte_rml_ofi_send_pkt_t,
opal_list_item_t,
NULL, NULL);
@ -48,6 +56,7 @@ OBJ_CLASS_INSTANCE(orte_rml_ofi_recv_pkt_t,
opal_list_item_t,
NULL, NULL);
static void ofi_recv_msg_queue_cons(ofi_recv_msg_queue_t *ptr)
{
ptr->msgid = 0;
@ -73,7 +82,7 @@ int orte_rml_ofi_send_callback(struct fi_cq_data_entry *wc,
orte_rml_ofi_request_t* ofi_req)
{
orte_rml_ofi_send_pkt_t *ofi_send_pkt, *next;
opal_output_verbose(1, orte_rml_base_framework.framework_output,
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s orte_rml_ofi_send_callback called, completion count = %d, msgid = %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ofi_req->completion_count, ofi_req->hdr.msgid);
assert(ofi_req->completion_count > 0);
@ -81,7 +90,7 @@ int orte_rml_ofi_send_callback(struct fi_cq_data_entry *wc,
if ( 0 == ofi_req->completion_count ) {
// call the callback fn of the sender
ofi_req->send->status = ORTE_SUCCESS;
opal_output_verbose(1, orte_rml_base_framework.framework_output,
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s calling ORTE_RML_SEND_COMPLETE macro for msgid = %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ofi_req->hdr.msgid);
ORTE_RML_SEND_COMPLETE(ofi_req->send);
@ -112,7 +121,7 @@ int orte_rml_ofi_send_callback(struct fi_cq_data_entry *wc,
int orte_rml_ofi_error_callback(struct fi_cq_err_entry *error,
orte_rml_ofi_request_t* ofi_req)
{
opal_output_verbose(1, orte_rml_base_framework.framework_output,
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s orte_rml_ofi_error_callback called ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) );
switch(error->err) {
@ -128,7 +137,7 @@ int orte_rml_ofi_error_callback(struct fi_cq_err_entry *error,
/* [Desc] This is called from the progress fn when a recv completion
** is received in the cq
** wc [in] : the completion queue data entry */
int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id)
int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t ofi_prov_id)
{
orte_rml_ofi_msg_header_t msg_hdr;
uint32_t msglen, datalen = 0;
@ -137,22 +146,22 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id)
orte_rml_ofi_recv_pkt_t *ofi_recv_pkt, *new_pkt, *next;
bool msg_in_queue = false;
opal_output_verbose(1, orte_rml_base_framework.framework_output,
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s orte_rml_ofi_recv_handler called ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) );
/*copy the header and data from buffer and pass it on
** since this is the conduit recv buffer don't want it to be released as
** since this is the ofi_prov recv buffer don't want it to be released as
** considering re-using it, so for now copying to newly allocated *data
** the *data will be released by orte_rml_base functions */
memcpy(&msg_hdr,wc->buf,sizeof(orte_rml_ofi_msg_header_t));
msglen = wc->len - sizeof(orte_rml_ofi_msg_header_t);
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s Received packet -> msg id = %d wc->len = %d, msglen = %d",
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s Received packet -> msg id = %d wc->len = %lu, msglen = %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg_hdr.msgid, wc->len, msglen );
data = (char *)malloc(msglen);
memcpy(data,(wc->buf+sizeof(orte_rml_ofi_msg_header_t)),msglen);
opal_output_verbose(10, orte_rml_base_framework.framework_output,
memcpy(data,((char *)wc->buf+sizeof(orte_rml_ofi_msg_header_t)),msglen);
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s header info of received packet -> cur_pkt_num = %d, tot_pkts = %d ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), msg_hdr.cur_pkt_num, msg_hdr.tot_pkts );
/* To accomodate message bigger than recv buffer size,
@ -186,25 +195,25 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id)
msg_in_queue = true;
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s msgid %d, tot_pkts=%d, opal_list_get_size()=%d,total pkt_recd=%d",
"%s msgid %d, tot_pkts=%d, opal_list_get_size()=%lu,total pkt_recd=%d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv_msg_queue->msgid, recv_msg_queue->tot_pkts,
opal_list_get_size(&recv_msg_queue->pkt_list), recv_msg_queue->pkt_recd );
if( recv_msg_queue->tot_pkts == (recv_msg_queue->pkt_recd +1) ) {
/* all packets received for this message - post message to rml and remove this from queue */
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s All packets recd for msgid %d, tot_pkts=%d, opal_list_get_size()=%d,total pkt_recd=%d",
"%s All packets recd for msgid %d, tot_pkts=%d, opal_list_get_size()=%lu,total pkt_recd=%d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv_msg_queue->msgid, recv_msg_queue->tot_pkts,
opal_list_get_size(&recv_msg_queue->pkt_list), recv_msg_queue->pkt_recd );
totdata = NULL;
datalen = 0;
OPAL_LIST_FOREACH(ofi_recv_pkt, &recv_msg_queue->pkt_list, orte_rml_ofi_recv_pkt_t) {
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s Adding data for packet %d, pktlength = %d, cumulative datalen so far = %d",
"%s Adding data for packet %d, pktlength = %lu, cumulative datalen so far = %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ofi_recv_pkt->cur_pkt_num, ofi_recv_pkt->pkt_size, datalen );
if (0 == datalen) {
totdata = (char *)malloc(ofi_recv_pkt->pkt_size);
if( totdata == NULL) {
opal_output_verbose(10, orte_rml_base_framework.framework_output,
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s Error: malloc failed for msgid %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),recv_msg_queue->msgid );
return 1; //[TODO: error-handling needs to be implemented
@ -216,7 +225,7 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id)
if (NULL != totdata ) {
memcpy((totdata+datalen),ofi_recv_pkt->data,ofi_recv_pkt->pkt_size);
} else {
opal_output_verbose(10, orte_rml_base_framework.framework_output,
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s Error: realloc failed for msgid %d, from sender jobid=%d, sender vpid=%d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv_msg_queue->msgid, recv_msg_queue->sender.jobid,
recv_msg_queue->sender.vpid);
@ -229,13 +238,13 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ofi_recv_pkt->cur_pkt_num,datalen);
}
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s Adding leftover data recd, datalen = %d, new_pkt->pkt_size = %d",
"%s Adding leftover data recd, datalen = %d, new_pkt->pkt_size = %lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), datalen, new_pkt->pkt_size);
//add the last packet
totdata =realloc(totdata,datalen+new_pkt->pkt_size);
if( NULL != totdata ) {
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s Realloc completed for leftover data recd, datalen = %d, new->pkt->pkt_size = %d",
"%s Realloc completed for leftover data recd, datalen = %d, new->pkt->pkt_size = %lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), datalen, new_pkt->pkt_size);
nextpkt = totdata+datalen;
opal_output_verbose(10, orte_rml_base_framework.framework_output,
@ -243,7 +252,7 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), totdata, nextpkt);
memcpy(nextpkt,new_pkt->data,new_pkt->pkt_size);
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s memcpy completed for leftover data recd, datalen = %d, new->pkt->pkt_size = %d",
"%s memcpy completed for leftover data recd, datalen = %d, new->pkt->pkt_size = %lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), datalen, new_pkt->pkt_size);
datalen += new_pkt->pkt_size;
opal_output_verbose(10, orte_rml_base_framework.framework_output,
@ -276,7 +285,7 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) );
OBJ_RELEASE(recv_msg_queue);
} else {
opal_output_verbose(10, orte_rml_base_framework.framework_output,
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s Error: realloc failed for msgid %d, from sender jobid=%d, sender vpid=%d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv_msg_queue->msgid, recv_msg_queue->sender.jobid,
recv_msg_queue->sender.vpid);
@ -326,20 +335,21 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id)
static void send_msg(int fd, short args, void *cbdata)
{
orte_rml_send_request_t *req = (orte_rml_send_request_t*)cbdata;
ofi_send_request_t *req = (ofi_send_request_t*)cbdata;
orte_process_name_t *peer = &(req->send.dst);
orte_rml_tag_t tag = req->send.tag;
char *dest_ep_name;
char *dest_ep_name, *pmix_key;
size_t dest_ep_namelen = 0;
int ret = OPAL_ERROR;
uint32_t total_packets;
fi_addr_t dest_fi_addr;
orte_rml_send_t *snd;
orte_rml_ofi_request_t* ofi_send_req = OBJ_NEW( orte_rml_ofi_request_t );
uint8_t conduit_id = req->conduit_id;
uint8_t ofi_prov_id = req->ofi_prov_id;
orte_rml_ofi_send_pkt_t* ofi_msg_pkt;
size_t datalen_per_pkt, hdrsize, data_in_pkt; // the length of data in per packet excluding the header size
orte_rml_ofi_peer_t* pr;
uint64_t ui64;
snd = OBJ_NEW(orte_rml_send_t);
snd->dst = *peer;
@ -364,40 +374,49 @@ static void send_msg(int fd, short args, void *cbdata)
/* get the peer address by doing modex_receive */
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s calling OPAL_MODEX_RECV_STRING ", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) );
switch ( orte_rml_ofi.ofi_conduits[conduit_id].fabric_info->addr_format)
{
case FI_SOCKADDR_IN :
OPAL_MODEX_RECV_STRING(ret, OPAL_RML_OFI_FI_SOCKADDR_IN, peer , (char **) &dest_ep_name, &dest_ep_namelen);
/*print the sockaddr - port and s_addr */
struct sockaddr_in* ep_sockaddr = (struct sockaddr_in*) dest_ep_name;
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"%s obtained for peer %s port = 0x%printinx, InternetAddr = %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),ORTE_NAME_PRINT(peer),ntohs(ep_sockaddr->sin_port),
inet_ntoa(ep_sockaddr->sin_addr));
break;
case FI_ADDR_PSMX :
OPAL_MODEX_RECV_STRING(ret, OPAL_RML_OFI_FI_ADDR_PSMX, peer , (char **) &dest_ep_name, &dest_ep_namelen);
break;
default:
/* we shouldn't be getting here as only above are supported and address sent
* to PMIX (OPAL_MODEX_SEND) in orte_component_init() */
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s Error: Unhandled address format type in ofi_send_msg", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
snd->status = ORTE_ERR_ADDRESSEE_UNKNOWN;
ORTE_RML_SEND_COMPLETE(snd);
return;
// if dest is same as me then instead of doing lookup just populate the dest_ep_name
if (peer->jobid == ORTE_PROC_MY_NAME->jobid && peer->vpid == ORTE_PROC_MY_NAME->vpid) {
dest_ep_namelen = orte_rml_ofi.ofi_prov[ofi_prov_id].epnamelen;
dest_ep_name = (char *)calloc(dest_ep_namelen,sizeof(char));
memcpy( dest_ep_name, orte_rml_ofi.ofi_prov[ofi_prov_id].ep_name,dest_ep_namelen);
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s rml:ofi: send and dest are same so proceeding with cur provider ep_name ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ret = OPAL_SUCCESS;
} else {
if (ORTE_PROC_IS_APP ) {
asprintf(&pmix_key,"%s%d",orte_rml_ofi.ofi_prov[ofi_prov_id].fabric_info->fabric_attr->prov_name,ofi_prov_id);
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s calling OPAL_MODEX_RECV_STRING peer - %s, key - %s ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(peer),pmix_key );
OPAL_MODEX_RECV_STRING(ret, pmix_key, peer , (char **) &dest_ep_name, &dest_ep_namelen);
opal_output_verbose(10, orte_rml_base_framework.framework_output, "Returned from MODEX_RECV");
free(pmix_key);
} else {
memcpy(&ui64, (char*)peer, sizeof(uint64_t));
if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_rml_ofi.peers,
ui64, (void**)&pr) || NULL == pr) {
opal_output_verbose(2, orte_rml_base_framework.framework_output,
"%s rml:ofi: Send failed to get peer OFI contact info ",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
return;
}
dest_ep_name = pr->ofi_ep;
dest_ep_namelen = pr->ofi_ep_len;
ret = OPAL_SUCCESS;
}
}
opal_output_verbose(50, orte_rml_base_framework.framework_output,
"%s Return value from OPAL_MODEX_RECV_STRING - %d, length returned - %d",
"%s Return value from OPAL_MODEX_RECV_STRING - %d, length returned - %lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ret, dest_ep_namelen);
if ( OPAL_SUCCESS == ret) {
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s OPAL_MODEX_RECV succeded, %s peer ep name obtained. length=%d",
"%s OPAL_MODEX_RECV succeded, %s peer ep name obtained. length=%lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(peer), dest_ep_namelen);
ret = fi_av_insert(orte_rml_ofi.ofi_conduits[conduit_id].av, dest_ep_name,1,&dest_fi_addr,0,NULL);
ret = fi_av_insert(orte_rml_ofi.ofi_prov[ofi_prov_id].av, dest_ep_name,1,&dest_fi_addr,0,NULL);
if( ret != 1) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s fi_av_insert failed in send_msg() returned %d",
@ -456,7 +475,7 @@ static void send_msg(int fd, short args, void *cbdata)
ofi_send_req->data_blob = (char *)malloc(ofi_send_req->length);
int iovlen=0;
for (int i=0; i < ofi_send_req->send->count; i++) {
memcpy((ofi_send_req->data_blob + iovlen ),
memcpy(((char *)ofi_send_req->data_blob + iovlen ),
ofi_send_req->send->iov[i].iov_base,
ofi_send_req->send->iov[i].iov_len);
iovlen += ofi_send_req->send->iov[i].iov_len;
@ -471,8 +490,8 @@ static void send_msg(int fd, short args, void *cbdata)
}
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s Completed copying all data into ofi_send_req->data_blob, total data - %d bytes",
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s Completed copying all data into ofi_send_req->data_blob, total data - %lu bytes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ofi_send_req->length );
/* Each packet will have header information, so the data length in each packet is datalen_per_packet.
@ -486,8 +505,8 @@ static void send_msg(int fd, short args, void *cbdata)
}
ofi_send_req->hdr.tot_pkts = total_packets;
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s datalen_per_pkt = %d, ofi_send_req->length= %d, total packets = %d",
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s datalen_per_pkt = %lu, ofi_send_req->length= %d, total packets = %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), datalen_per_pkt, ofi_send_req->length, total_packets );
/* in a loop send create and send the packets */
@ -498,25 +517,22 @@ static void send_msg(int fd, short args, void *cbdata)
data_in_pkt = ((ofi_send_req->length - sent_data) >= datalen_per_pkt) ?
datalen_per_pkt : (ofi_send_req->length - sent_data);
ofi_msg_pkt->pkt_size = hdrsize + data_in_pkt;
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s Packet %d -> data_in_pkt= %d, header_size= %d, pkt_size=%d",
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s Packet %lu -> data_in_pkt= %lu, header_size= %lu, pkt_size=%lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pkt_num,data_in_pkt,hdrsize,ofi_msg_pkt->pkt_size );
/* copy the header and data for this pkt */
ofi_msg_pkt->data = malloc( ofi_msg_pkt->pkt_size);
memcpy(ofi_msg_pkt->data, &ofi_send_req->hdr, hdrsize );
memcpy( (ofi_msg_pkt->data + hdrsize ),
memcpy( ( (char *)ofi_msg_pkt->data + hdrsize ),
(ofi_send_req->data_blob + sent_data),
data_in_pkt);
opal_output_verbose(1, orte_rml_base_framework.framework_output,
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s Copying header, data into packets completed",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME) );
/* add it to list */
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s Before adding packet %d to list. List addr -> 0x%x, ofi_msg_pkt->super is 0x%x",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),pkt_num,&(ofi_send_req->pkt_list), &ofi_msg_pkt->super );
opal_list_append(&(ofi_send_req->pkt_list), &ofi_msg_pkt->super);
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s adding packet %d to list done successful",
opal_output_verbose(15, orte_rml_base_framework.framework_output,
"%s adding packet %lu to list done successful",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),pkt_num );
sent_data += data_in_pkt;
}
@ -533,21 +549,21 @@ static void send_msg(int fd, short args, void *cbdata)
/* debug purpose - copying the header from packet to verify if it is correct */
struct orte_rml_ofi_msg_header_t *cur_hdr;
cur_hdr = (struct orte_rml_ofi_msg_header_t* ) ofi_msg_pkt->data;
opal_output_verbose(1, orte_rml_base_framework.framework_output,
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s Sending Pkt[%d] of total %d pkts for msgid:%d to peer %s with tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), cur_hdr->cur_pkt_num, ofi_send_req->completion_count,
cur_hdr->msgid, ORTE_NAME_PRINT(peer), tag);
/* end debug*/
RML_OFI_RETRY_UNTIL_DONE(fi_send(orte_rml_ofi.ofi_conduits[conduit_id].ep,
RML_OFI_RETRY_UNTIL_DONE(fi_send(orte_rml_ofi.ofi_prov[ofi_prov_id].ep,
ofi_msg_pkt->data,
ofi_msg_pkt->pkt_size,
fi_mr_desc(orte_rml_ofi.ofi_conduits[conduit_id].mr_multi_recv),
fi_mr_desc(orte_rml_ofi.ofi_prov[ofi_prov_id].mr_multi_recv),
dest_fi_addr,
(void *)&ofi_send_req->ctx));
}
opal_output_verbose(1, orte_rml_base_framework.framework_output,
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s End of send_msg_transport. fi_send completed to peer %s with tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(peer), tag);
@ -556,7 +572,7 @@ static void send_msg(int fd, short args, void *cbdata)
OBJ_RELEASE(req);
}
int orte_rml_ofi_send_nb(void* mod,
int orte_rml_ofi_send_nb(struct orte_rml_base_module_t* mod,
orte_process_name_t* peer,
struct iovec* iov,
int count,
@ -564,19 +580,19 @@ int orte_rml_ofi_send_nb(void* mod,
orte_rml_callback_fn_t cbfunc,
void* cbdata)
{
orte_rml_send_request_t *req;
ofi_send_request_t *req;
orte_rml_ofi_module_t *ofi_mod = (orte_rml_ofi_module_t*)mod;
int conduit_id = ofi_mod->cur_transport_id;
int ofi_prov_id = ofi_mod->cur_transport_id;
opal_output_verbose(1, orte_rml_base_framework.framework_output,
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s rml_ofi_send_transport to peer %s at tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(peer), tag);
if( (0 > conduit_id) || ( conduit_id >= orte_rml_ofi.conduit_open_num ) ) {
/* Invalid conduit ID provided */
if( (0 > ofi_prov_id) || ( ofi_prov_id >= orte_rml_ofi.ofi_prov_open_num ) ) {
/* Invalid ofi_prov ID provided */
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
@ -594,8 +610,8 @@ int orte_rml_ofi_send_nb(void* mod,
/* get ourselves into an event to protect against
* race conditions and threads
*/
req = OBJ_NEW(orte_rml_send_request_t);
req->conduit_id = conduit_id;
req = OBJ_NEW(ofi_send_request_t);
req->ofi_prov_id = ofi_prov_id;
req->send.dst = *peer;
req->send.iov = iov;
req->send.count = count;
@ -612,25 +628,25 @@ int orte_rml_ofi_send_nb(void* mod,
}
int orte_rml_ofi_send_buffer_nb(void* mod,
int orte_rml_ofi_send_buffer_nb(struct orte_rml_base_module_t *mod,
orte_process_name_t* peer,
struct opal_buffer_t* buffer,
orte_rml_tag_t tag,
orte_rml_buffer_callback_fn_t cbfunc,
void* cbdata)
{
orte_rml_send_request_t *req;
ofi_send_request_t *req;
orte_rml_ofi_module_t *ofi_mod = (orte_rml_ofi_module_t*)mod;
int conduit_id = ofi_mod->cur_transport_id;
int ofi_prov_id = ofi_mod->cur_transport_id;
opal_output_verbose(1, orte_rml_base_framework.framework_output,
opal_output_verbose(10, orte_rml_base_framework.framework_output,
"%s rml_ofi_send_buffer_transport to peer %s at tag %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(peer), tag);
if( (0 > conduit_id) || ( conduit_id >= orte_rml_ofi.conduit_open_num ) ) {
/* Invalid conduit ID provided */
if( (0 > ofi_prov_id) || ( ofi_prov_id >= orte_rml_ofi.ofi_prov_open_num ) ) {
/* Invalid ofi_prov ID provided */
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
@ -648,8 +664,8 @@ int orte_rml_ofi_send_buffer_nb(void* mod,
/* get ourselves into an event to protect against
* race conditions and threads
*/
req = OBJ_NEW(orte_rml_send_request_t);
req->conduit_id = conduit_id;
req = OBJ_NEW(ofi_send_request_t);
req->ofi_prov_id = ofi_prov_id;
req->send.dst = *peer;
req->send.buffer = buffer;
req->send.tag = tag;

Просмотреть файл

@ -35,56 +35,6 @@ static void send_callback(int status, orte_process_name_t *peer,
msg_active = false;
}
//debug routine to print the opal_value_t returned by query interface
void print_transports_query()
{
opal_value_t *providers=NULL;
char* prov_name = NULL;
int ret;
int32_t *protocol_ptr, protocol;
int8_t conduit_id;
int8_t *prov_num=&conduit_id;
protocol_ptr = &protocol;
opal_output(0, "\n Current conduits loaded in rml-ofi ==>");
/*opal_output(0,"\n print_transports_query() Begin- %s:%d",__FILE__,__LINE__);
opal_output(0,"\n calling the orte_rml_ofi_query_transports() ");*/
if( ORTE_SUCCESS == orte_rml.query_transports(&providers)) {
//opal_output(0,"\n query_transports() completed, printing details\n");
while (providers) {
//get the first opal_list_t;
opal_list_t temp;
opal_list_t *prov = &temp;
ret = opal_value_unload(providers,(void **)&prov,OPAL_PTR);
if (ret == OPAL_SUCCESS) {
//opal_output(0,"\n %s:%d opal_value_unload() succeeded, opal_list* prov = %x",__FILE__,__LINE__,prov);
if( orte_get_attribute( prov, ORTE_CONDUIT_ID, (void **)&prov_num,OPAL_UINT8)) {
opal_output(0," Provider conduit_id : %d",*prov_num);
}
if( orte_get_attribute( prov, ORTE_PROTOCOL, (void **)&protocol_ptr,OPAL_UINT32)) {
opal_output(0," Protocol : %d",*protocol_ptr);
}
if( orte_get_attribute( prov, ORTE_PROV_NAME, (void **)&prov_name ,OPAL_STRING)) {
opal_output(0," Provider name : %s",prov_name);
} else {
opal_output(0," Error in getting Provider name");
}
} else {
opal_output(0," %s:%d opal_value_unload() failed, opal_list* prov = %x",__FILE__,__LINE__,prov);
}
providers = (opal_value_t *)providers->super.opal_list_next;
// opal_output_verbose(1,orte_rml_base_framework.framework_output,"\n %s:%d -
// Moving on to next provider provders=%x",__FILE__,__LINE__,providers);
}
} else {
opal_output(0,"\n query_transports() returned Error ");
}
//opal_output(0,"\n End of print_transports_query() from ofi_query_test.c \n");
//need to free all the providers here
}
int
main(int argc, char *argv[]){
@ -99,18 +49,18 @@ main(int argc, char *argv[]){
int conduit_id = 0; //use the first available conduit
struct timeval start, end;
opal_list_t *conduit_attr;
/*
* Init
*/
orte_init(&argc, &argv, ORTE_PROC_NON_MPI);
print_transports_query();
conduit_attr = OBJ_NEW(opal_list_t);
if( ORTE_SUCCESS ==
( orte_set_attribute( conduit_attr, ORTE_RML_OFI_PROV_NAME_ATTRIB, ORTE_ATTR_GLOBAL,"sockets",OPAL_STRING))) {
if( ORTE_SUCCESS ==
if( ORTE_SUCCESS ==
( orte_set_attribute( conduit_attr, ORTE_RML_PROVIDER_ATTRIB, ORTE_ATTR_GLOBAL,"sockets",OPAL_STRING))) {
if( ORTE_SUCCESS ==
( orte_set_attribute( conduit_attr, ORTE_RML_INCLUDE_COMP_ATTRIB, ORTE_ATTR_GLOBAL,"ofi",OPAL_STRING))) {
opal_output(0, "%s calling open_conduit with ORTE_RML_INCLUDE_COMP_ATTRIB and ORTE_RML_OFI_PROV_NAME_ATTRIB",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));