1
1

Corrected several issues in the ib path.. also updated calls to mpool to adhere to interface changes.

This commit was SVN r5956.
Этот коммит содержится в:
Galen Shipman 2005-06-06 20:26:22 +00:00
родитель 24bb5c2f5d
Коммит ec7aa4fba9
18 изменённых файлов: 547 добавлений и 1167 удалений

Просмотреть файл

@ -110,6 +110,7 @@
#define MCA_BMI_H
#include "include/types.h"
#include "class/ompi_free_list.h"
/*
* BMI types
@ -159,7 +160,7 @@ typedef struct mca_bmi_base_segment_t mca_bmi_base_segment_t;
*/
struct mca_bmi_base_descriptor_t {
ompi_list_item_t super;
ompi_free_list_item_t super;
mca_bmi_base_segment_t *des_src;
size_t des_src_cnt;
mca_bmi_base_segment_t *des_dst;
@ -178,6 +179,12 @@ OBJ_CLASS_DECLARATION(mca_bmi_base_descriptor_t);
#define MCA_BMI_DES_MAX_SEGMENTS 16
/*
* BMI base header, stores the tag at a minimum
*/
struct mca_bmi_base_header_t{
mca_bmi_base_tag_t tag;
}; typedef struct mca_bmi_base_header_t mca_bmi_base_header_t;
/*
* BMI component interface functions and datatype.
@ -326,6 +333,16 @@ typedef void (*mca_bmi_base_module_recv_cb_fn_t)(
);
/* holds the recv call back function to be called by the bmi on
* a receive.
*/
struct mca_bmi_base_registration_t {
mca_bmi_base_module_recv_cb_fn_t cbfunc;
void* cbdata;
};
typedef struct mca_bmi_base_registration_t mca_bmi_base_registration_t;
/**
* Register a callback function that is called on receipt
* of a fragment.

Просмотреть файл

@ -1,2 +1,2 @@
twoodall
gshipman
gshipman

Просмотреть файл

@ -23,22 +23,20 @@ libmca_bmi_ib_la_SOURCES = \
bmi_ib.h \
bmi_ib_addr.h \
bmi_ib_component.c \
bmi_ib_endpoint.c \
bmi_ib_endpoint.h \
bmi_ib_frag.c \
bmi_ib_frag.h \
bmi_ib_memory.c \
bmi_ib_peer.c \
bmi_ib_peer.h \
bmi_ib_priv.c \
bmi_ib_priv.h \
bmi_ib_proc.c \
bmi_ib_proc.h \
bmi_ib_recvfrag.c \
bmi_ib_recvfrag.h \
bmi_ib_vapi.h \
bmi_ib_sendfrag.c \
bmi_ib_sendfrag.h \
bmi_ib_vapi.h
bmi_ib_sendfrag.h \
bmi_ib_recvfrag.c \
bmi_ib_recvfrag.h
# Make the output library in this directory, and name it either
# mca_<type>_<name>.la (for DSO builds) or libmca_<type>_<name>.la

Просмотреть файл

@ -35,7 +35,7 @@ mca_bmi_ib_module_t mca_bmi_ib_module = {
0, /* exclusivity */
0, /* latency */
0, /* bandwidth */
MCA_PTL_PUT, /* bmi flags */
0, /* TODO this should be PUT bmi flags */
mca_bmi_ib_add_procs,
mca_bmi_ib_del_procs,
mca_bmi_ib_register,
@ -43,7 +43,8 @@ mca_bmi_ib_module_t mca_bmi_ib_module = {
/* we need alloc free, pack */
mca_bmi_ib_alloc,
mca_bmi_ib_free,
mca_bmi_ib_pack,
mca_bmi_ib_prepare_src,
mca_bmi_ib_prepare_dst,
mca_bmi_ib_send,
mca_bmi_ib_put,
NULL /* get */
@ -88,7 +89,7 @@ int mca_bmi_ib_add_procs(
return OMPI_ERR_OUT_OF_RESOURCE;
}
ib_peer->peer_bmi = ib_bmi;
ib_peer->endpoint_bmi = ib_bmi;
rc = mca_bmi_ib_proc_insert(ib_proc, ib_peer);
if(rc != OMPI_SUCCESS) {
OBJ_RELEASE(ib_peer);
@ -134,36 +135,56 @@ int mca_bmi_ib_register(
* @param bmi (IN) BMI module
* @param size (IN) Request segment size.
*/
extern mca_bmi_base_descriptor_t* mca_bmi_ib_alloc(
mca_bmi_base_descriptor_t* mca_bmi_ib_alloc(
struct mca_bmi_base_module_t* bmi,
size_t size)
{
mca_bmi_ib_frag_t* frag;
int rc;
if(size <= mca_bmi_ib_component.first_fragment_size) {
MCA_BMI_IB_FRAG_ALLOC1(frag,rc);
} else {
/* if(size <= mca_bmi_ib_component.first_fragment_size) { */
/* MCA_BMI_IB_FRAG_ALLOC1(frag,rc); */
/* } else { */
}
/* } */
MCA_BMI_IB_FRAG_ALLOC1(bmi, frag,rc);
frag->segment.seg_len = size;
return (mca_bmi_base_descriptor_t*)frag;
}
extern int mca_bmi_ib_free(
int mca_bmi_ib_free(
struct mca_bmi_base_module_t* bmi,
mca_bmi_base_descriptor_t* des)
{
mca_bmi_ib_frag_t* frag = (mca_bmi_ib_frag_t*)des;
MCA_BMI_IB_FRAG_RETURN1(frag);
MCA_BMI_IB_FRAG_RETURN1(bmi, frag);
}
/**
* Pack data and return a descriptor that can be
* used for send/put.
*
* @param bmi (IN) BMI module
* @param peer (IN) BMI peer addressing
*/
mca_bmi_base_descriptor_t* mca_bmi_ib_prepare_src(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* peer,
struct ompi_convertor_t* convertor,
size_t reserve,
size_t* size
)
{
return OMPI_SUCCESS;
}
/**
* Pack data
*
* @param bmi (IN) BMI module
* @param peer (IN) BMI peer addressing
*/
struct mca_bmi_base_descriptor_t* mca_bmi_ib_pack(
mca_bmi_base_descriptor_t* mca_bmi_ib_prepare_dst(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* peer,
struct ompi_convertor_t* convertor,
@ -175,36 +196,9 @@ struct mca_bmi_base_descriptor_t* mca_bmi_ib_pack(
int mca_bmi_ib_finalize(struct mca_bmi_base_module_t* bmi)
{
/* Stub */
D_PRINT("Stub\n");
return OMPI_SUCCESS;
}
int mca_bmi_ib_request_init( struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_send_request_t* request)
{
mca_bmi_ib_module_t* ib_bmi = (mca_bmi_ib_module_t*)bmi;
mca_bmi_ib_send_frag_t* sendfrag;
ompi_list_item_t* item;
int rc;
OMPI_FREE_LIST_GET(&ib_bmi->send_free, item, rc);
if(NULL == (sendfrag = (mca_bmi_ib_send_frag_t*)item)) {
return rc;
}
((mca_bmi_ib_send_request_t*) request)->req_frag = sendfrag;
return OMPI_SUCCESS;
}
void mca_bmi_ib_request_fini( struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_send_request_t* request)
{
mca_bmi_ib_module_t* ib_bmi = (mca_bmi_ib_module_t*)bmi;
mca_bmi_ib_send_request_t* sendreq = (mca_bmi_ib_send_request_t*)request;
OMPI_FREE_LIST_RETURN(&ib_bmi->send_free, (ompi_list_item_t*)sendreq->req_frag);
}
/*
* Initiate a send. If this is the first fragment, use the fragment
* descriptor allocated with the send requests, otherwise obtain
@ -214,268 +208,34 @@ void mca_bmi_ib_request_fini( struct mca_bmi_base_module_t* bmi,
int mca_bmi_ib_send(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* bmi_peer,
struct mca_bmi_base_endpoint_t* endpoint,
struct mca_bmi_base_descriptor_t* descriptor,
mca_bmi_base_tag_t tag)
{
mca_bmi_ib_frag_t* frag = (mca_bmi_ib_frag_t*)descriptor;
int rc;
mca_bmi_ib_module_t* ib_bmi = (mca_bmi_ib_module_t*)bmi;
mca_bmi_ib_frag_t* frag = (mca_bmi_ib_frag_t*)descriptor;
frag->tag = tag;
frag->hdr->tag = tag;
frag->type = MCA_BMI_IB_FRAG_SEND;
int rc = OMPI_SUCCESS;
frag->rc = rc;
ompi_convertor_t *convertor;
int rc, freeAfter;
unsigned int iov_count, max_data;
struct iovec iov;
/* first fragment (eager send) and first fragment of long
* protocol can use the convertor initialized on the request,
* remaining fragments must copy/reinit the convertor as the
* transfer could be in parallel.
*/
if( offset <= mca_bmi_ib_module.super.bmi_first_frag_size ) {
convertor = &sendreq->req_send.req_convertor;
} else {
convertor = &sendfrag->frag_send.frag_base.frag_convertor;
ompi_convertor_copy(&sendreq->req_send.req_convertor, convertor);
ompi_convertor_init_for_send( convertor,
0,
sendreq->req_send.req_base.req_datatype,
sendreq->req_send.req_base.req_count,
sendreq->req_send.req_base.req_addr,
offset,
NULL );
}
/* if data is contigous, convertor will return an offset
* into users buffer - otherwise will return an allocated buffer
* that holds the packed data
*/
if((flags & MCA_PTL_FLAGS_ACK) == 0) {
iov.iov_base = &sendfrag->ib_buf.buf[sizeof(mca_bmi_base_match_header_t)];
} else {
iov.iov_base = &sendfrag->ib_buf.buf[sizeof(mca_bmi_base_rendezvous_header_t)];
}
iov.iov_len = size;
iov_count = 1;
max_data = size;
if((rc = ompi_convertor_pack(convertor,&iov, &iov_count, &max_data, &freeAfter)) < 0) {
ompi_output(0, "Unable to pack data");
return rc;
}
/* adjust size to reflect actual number of bytes packed by convertor */
size = iov.iov_len;
sendfrag->frag_send.frag_base.frag_addr = iov.iov_base;
sendfrag->frag_send.frag_base.frag_size = iov.iov_len;
} else {
sendfrag->frag_send.frag_base.frag_addr = NULL;
sendfrag->frag_send.frag_base.frag_size = 0;
}
/* fragment state */
sendfrag->frag_send.frag_base.frag_owner = &bmi_peer->peer_bmi->super;
sendfrag->frag_send.frag_request = sendreq;
sendfrag->frag_send.frag_base.frag_peer = bmi_peer;
sendfrag->frag_progressed = 0;
/* Initialize header */
hdr = (mca_bmi_base_header_t *) &sendfrag->ib_buf.buf[0];
hdr->hdr_common.hdr_flags = flags;
hdr->hdr_match.hdr_contextid = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_msg_length = sendreq->req_send.req_bytes_packed;
hdr->hdr_match.hdr_msg_seq = sendreq->req_send.req_base.req_sequence;
if((flags & MCA_PTL_FLAGS_ACK) == 0) {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
hdr_length = sizeof(mca_bmi_base_match_header_t);
} else {
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
hdr->hdr_rndv.hdr_frag_length = sendfrag->frag_send.frag_base.frag_size;
hdr->hdr_rndv.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
hdr->hdr_rndv.hdr_src_ptr.pval = sendfrag;
hdr_length = sizeof(mca_bmi_base_rendezvous_header_t);
}
/* Update the offset after actual fragment size is determined,
* and before attempting to send the fragment */
sendreq->req_offset += size;
IB_SET_SEND_DESC_LEN((&sendfrag->ib_buf), (hdr_length + size));
if(OMPI_SUCCESS != (rc = mca_bmi_ib_peer_send(bmi_peer, sendfrag))) {
return rc;
}
/* if this is the entire message - signal request is complete */
if(sendreq->req_send.req_bytes_packed == size) {
ompi_request_complete( &(sendreq->req_send.req_base.req_ompi) );
}
return OMPI_SUCCESS;
rc = mca_bmi_ib_endpoint_send(endpoint, frag);
frag->rc = rc;
return rc;
}
/*
* RDMA local buffer to remote buffer address.
*/
int mca_bmi_ib_put( struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* bmi_peer,
struct mca_bmi_base_send_request_t* req, size_t offset,
size_t size, int flags)
int mca_bmi_ib_put( mca_bmi_base_module_t* bmi,
mca_bmi_base_endpoint_t* bmi_peer,
mca_bmi_base_descriptor_t* descriptor)
{
return OMPI_ERR_NOT_IMPLEMENTED;
}
/*
* On a match send an ack to the peer.
*/
static void mca_bmi_ib_ack(
mca_bmi_ib_module_t *ib_bmi,
mca_bmi_ib_send_frag_t *send_frag,
mca_bmi_ib_recv_frag_t *recv_frag)
{
mca_bmi_base_header_t *hdr;
mca_bmi_base_recv_request_t *request;
mca_bmi_ib_endpoint_t *ib_peer;
ib_buffer_t *ib_buf;
int recv_len;
int len_to_reg, len_added = 0;
void *addr_to_reg, *ack_buf;
/* Header starts at beginning of registered
* buffer space */
hdr = (mca_bmi_base_header_t *)
&send_frag->ib_buf.buf[0];
request = recv_frag->super.frag_request;
/* Amount of data we have already received */
recv_len =
recv_frag->super.frag_base.frag_header.hdr_rndv.hdr_frag_length;
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK;
hdr->hdr_common.hdr_flags = 0;
/* Remote side send descriptor */
hdr->hdr_ack.hdr_src_ptr =
recv_frag->super.frag_base.frag_header.hdr_rndv.hdr_src_ptr;
/* Matched request from recv side */
hdr->hdr_ack.hdr_dst_match.lval = 0;
hdr->hdr_ack.hdr_dst_match.pval = request;
hdr->hdr_ack.hdr_dst_addr.lval = 0;
addr_to_reg = (void*)((char*)request->req_recv.req_base.req_addr + recv_len);
hdr->hdr_ack.hdr_dst_addr.pval = addr_to_reg;
len_to_reg = request->req_recv.req_bytes_packed - recv_len;
hdr->hdr_ack.hdr_dst_size = len_to_reg;
A_PRINT("Dest addr : %p, RDMA Len : %d",
hdr->hdr_ack.hdr_dst_addr.pval,
hdr->hdr_ack.hdr_dst_size);
ack_buf = (void*) ((char*) (&send_frag->ib_buf.buf[0]) +
sizeof(mca_bmi_base_ack_header_t));
/* Prepare ACK packet with IB specific stuff */
mca_bmi_ib_prepare_ack(ib_bmi, addr_to_reg, len_to_reg,
ack_buf, &len_added);
/* Send it right away! */
ib_peer = (mca_bmi_ib_endpoint_t *)
recv_frag->super.frag_base.frag_peer;
ib_buf = &send_frag->ib_buf;
IB_SET_SEND_DESC_LEN(ib_buf,
(sizeof(mca_bmi_base_ack_header_t) + len_added));
mca_bmi_ib_post_send(ib_bmi, ib_peer, &send_frag->ib_buf, send_frag);
/* fragment state */
send_frag->frag_send.frag_base.frag_owner = &ib_bmi->super;
send_frag->frag_send.frag_base.frag_peer = recv_frag->super.frag_base.frag_peer;
send_frag->frag_send.frag_base.frag_addr = NULL;
send_frag->frag_send.frag_base.frag_size = 0;
}
/*
* A posted receive has been matched - if required send an
* ack back to the peer and process the fragment. Copy the
* data to user buffer
*/
void mca_bmi_ib_matched(
mca_bmi_base_module_t* bmi,
mca_bmi_base_recv_frag_t* frag)
{
mca_bmi_ib_module_t* ib_bmi = (mca_bmi_ib_module_t*)bmi;
mca_bmi_base_recv_request_t *request;
mca_bmi_base_header_t *header;
mca_bmi_ib_recv_frag_t *recv_frag;
header = &frag->frag_base.frag_header;
request = frag->frag_request;
recv_frag = (mca_bmi_ib_recv_frag_t*) frag;
D_PRINT("Matched frag\n");
if (header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK) {
mca_bmi_ib_send_frag_t *send_frag;
send_frag = mca_bmi_ib_alloc_send_frag(ib_bmi, NULL);
if(NULL == send_frag) {
ompi_output(0, "Cannot get send descriptor");
} else {
mca_bmi_ib_ack(ib_bmi, send_frag, recv_frag);
}
}
/* Process the fragment */
/* IN TCP case, IO_VEC is first allocated.
* then recv the data, and copy if needed,
* But in ELAN cases, we save the data into an
* unex buffer if the recv descriptor is not posted
* (for too long) (TODO).
* We then need to copy from
* unex_buffer to application buffer */
if ((header->hdr_common.hdr_type & MCA_PTL_HDR_TYPE_MATCH) &&
(header->hdr_match.hdr_msg_length > 0)) {
struct iovec iov;
ompi_proc_t *proc;
unsigned int iov_count, max_data;
int freeAfter;
iov.iov_base = frag->frag_base.frag_addr;
iov.iov_len = frag->frag_base.frag_size;
proc = ompi_comm_peer_lookup(request->req_recv.req_base.req_comm,
request->req_recv.req_base.req_ompi.req_status.MPI_SOURCE);
ompi_convertor_copy(proc->proc_convertor, &frag->frag_base.frag_convertor);
ompi_convertor_init_for_recv( &frag->frag_base.frag_convertor,
0,
request->req_recv.req_base.req_datatype,
request->req_recv.req_base.req_count,
request->req_recv.req_base.req_addr,
0, /* fragment offset */
NULL );
ompi_convertor_unpack(&frag->frag_base.frag_convertor, &iov, &iov_count, &max_data, &freeAfter);
}
mca_bmi_ib_recv_frag_done(header, frag, request);
}

Просмотреть файл

@ -1,3 +1,4 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
@ -32,31 +33,36 @@
#include "mca/pml/pml.h"
#include "mca/bmi/bmi.h"
#include "util/output.h"
#include "mca/mpool/mpool.h"
/* InfiniBand VAPI includes */
#include "mca/bmi/bmi.h"
#include "bmi_ib_vapi.h"
#include "bmi_ib_addr.h"
#include "bmi_ib_proc.h"
#include "bmi_ib_peer.h"
#include "bmi_ib_endpoint.h"
#include "bmi_ib_priv.h"
#include "bmi_ib_frag.h"
/* Other IB bmi includes */
#include "bmi_ib_sendreq.h"
#include "bmi_ib_recvfrag.h"
#include "bmi_ib_sendfrag.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
#if 1
#define D_PRINT(fmt, args...) { \
ompi_output(0, "[%s:%d:%s] " fmt, __FILE__, __LINE__, __func__, \
##args); \
}
#else
#define D_PRINT(fmt, args...)
#endif
/**
* Infiniband (IB) BMI component.
*/
struct mca_bmi_ib_registration_t {
mca_bmi_base_module_recv_cb_fn_t cbfunc;
void *cbdata;
}
struct mca_bmi_ib_component_t {
mca_bmi_base_component_1_0_0_t super; /**< base BMI component */
@ -75,15 +81,6 @@ struct mca_bmi_ib_component_t {
int ib_free_list_inc;
/**< number of elements to alloc when growing free lists */
ompi_free_list_t ib_send_requests;
/**< free list of ib send requests -- sendreq + IB */
ompi_free_list_t ib_send_frags;
/**< free list of ib send fragments */
ompi_free_list_t ib_recv_frags;
/**< free list of ib recv fragments */
ompi_list_t ib_procs;
/**< list of ib proc structures */
@ -98,12 +95,20 @@ struct mca_bmi_ib_component_t {
int ib_mem_registry_hints_log_size;
/**< log2 size of hints hash array used by memory registry */
};
typedef struct mca_bmi_ib_component_t mca_bmi_ib_component_t;
struct mca_bmi_ib_recv_frag_t;
char* ib_mpool_name;
/**< name of ib memory pool */
}; typedef struct mca_bmi_ib_component_t mca_bmi_ib_component_t;
extern mca_bmi_ib_component_t mca_bmi_ib_component;
typedef mca_bmi_base_registration_t mca_bmi_ib_registration_t;
/**
* IB PTL Interface
*/
@ -121,24 +126,15 @@ struct mca_bmi_ib_module_t {
/**< Async event handler used to detect weird/unknown events */
mca_bmi_ib_mem_registry_t mem_registry; /**< registry of memory regions */
ompi_free_list_t ib_frags1; /**< free list of buffer descriptors */
ompi_free_list_t send_free; /**< free list of buffer descriptors */
ompi_free_list_t recv_free; /**< free list of buffer descriptors */
ompi_list_t repost; /**< list of buffers to repost */
};
typedef struct mca_bmi_ib_module_t mca_bmi_ib_module_t;
extern mca_bmi_ib_module_tmca_bmi_ib_module;
/**
* IB FIN header
*/
typedef struct mca_bmi_ib_fin_header_t mca_bmi_ib_fin_header_t;
struct mca_bmi_ib_fin_header_t {
mca_bmi_base_frag_header_t frag_hdr;
ompi_ptr_t mr_addr;
uint64_t mr_size;
};
mca_mpool_base_module_t* ib_pool;
/**< ib memory pool */
}; typedef struct mca_bmi_ib_module_t mca_bmi_ib_module_t;
extern mca_bmi_ib_module_t mca_bmi_ib_module;
/**
* Register IB component parameters with the MCA framework
@ -169,20 +165,12 @@ extern mca_bmi_base_module_t** mca_bmi_ib_component_init(
bool have_hidden_threads
);
/**
* IB component control.
*/
extern int mca_bmi_ib_component_control(
int param,
void* value,
size_t size
);
/**
* IB component progress.
*/
extern int mca_bmi_ib_component_progress(
mca_bmi_tstamp_t tstamp
void
);
@ -236,53 +224,6 @@ extern int mca_bmi_ib_del_procs(
struct mca_bmi_base_endpoint_t** peers
);
/**
* PML->BMI Initialize a send request for TCP cache.
*
* @param bmi (IN) BMI instance
* @param request (IN) Pointer to allocated request.
*
**/
extern int mca_bmi_ib_request_init(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_send_request_t*
);
/**
* PML->BMI Cleanup a send request that is being removed from the cache.
*
* @param bmi (IN) BMI instance
* @param request (IN) Pointer to allocated request.
*
**/
extern void mca_bmi_ib_request_fini(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_send_request_t*
);
/**
* PML->BMI Return a send request to the BMI modules free list.
*
* @param bmi (IN) BMI instance
* @param request (IN) Pointer to allocated request.
*
*/
extern void mca_bmi_ib_request_return(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_send_request_t*
);
/**
* PML->BMI Notification that a receive fragment has been matched.
*
* @param bmi (IN) BMI instance
* @param recv_frag (IN) Receive fragment
*
*/
extern void mca_bmi_ib_matched(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_recv_frag_t* frag
);
/**
* PML->BMI Initiate a send of the specified size.
@ -297,10 +238,8 @@ extern void mca_bmi_ib_matched(
extern int mca_bmi_ib_send(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* bmi_peer,
struct mca_bmi_base_send_request_t*,
size_t offset,
size_t size,
int flags
struct mca_bmi_base_descriptor_t* descriptor,
mca_bmi_base_tag_t tag
);
/**
@ -316,12 +255,15 @@ extern int mca_bmi_ib_send(
extern int mca_bmi_ib_put(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* bmi_peer,
struct mca_bmi_base_send_request_t*,
size_t offset,
size_t size,
int flags
struct mca_bmi_base_descriptor_t* decriptor
);
extern int mca_bmi_ib_register(
struct mca_bmi_base_module_t* bmi,
mca_bmi_base_tag_t tag,
mca_bmi_base_module_recv_cb_fn_t cbfunc,
void* cbdata);
/**
* Return a recv fragment to the modules free list.
*
@ -331,11 +273,62 @@ extern int mca_bmi_ib_put(
*/
extern void mca_bmi_ib_recv_frag_return(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_ib_recv_frag_t* frag
struct mca_bmi_ib_frag_t* frag
);
/**
* Allocate a segment.
*
* @param bmi (IN) BMI module
* @param size (IN) Request segment size.
*/
extern mca_bmi_base_descriptor_t* mca_bmi_ib_alloc(
struct mca_bmi_base_module_t* bmi,
size_t size);
/**
* Return a segment allocated by this BMI.
*
* @param bmi (IN) BMI module
* @param segment (IN) Allocated segment.
*/
extern int mca_bmi_ib_free(
struct mca_bmi_base_module_t* bmi,
mca_bmi_base_descriptor_t* des);
/**
* Pack data and return a descriptor that can be
* used for send/put.
*
* @param bmi (IN) BMI module
* @param peer (IN) BMI peer addressing
*/
mca_bmi_base_descriptor_t* mca_bmi_ib_prepare_src(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* peer,
struct ompi_convertor_t* convertor,
size_t reserve,
size_t* size
);
/**
* Pack data and return a descriptor that can be
* used for send/put.
*
* @param bmi (IN) BMI module
* @param peer (IN) BMI peer addressing
*/
extern mca_bmi_base_descriptor_t* mca_bmi_ib_prepare_dst(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* peer,
struct ompi_convertor_t* convertor,
size_t reserve,
size_t* size);
/**
* Return a send fragment to the modules free list.
*
* @param bmi (IN) BMI instance
@ -344,7 +337,7 @@ extern void mca_bmi_ib_recv_frag_return(
*/
extern void mca_bmi_ib_send_frag_return(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_ib_send_frag_t*
struct mca_bmi_ib_frag_t*
);
#if defined(c_plusplus) || defined(__cplusplus)

Просмотреть файл

@ -3,8 +3,6 @@
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004 The Ohio State University.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
@ -16,9 +14,7 @@
* $HEADER$
*/
/* #include <hh_common.h> */
/* Open MPI includes */
#include "ompi_config.h"
#include "include/constants.h"
#include "event/event.h"
@ -27,13 +23,16 @@
#include "util/output.h"
#include "mca/pml/pml.h"
#include "mca/bmi/bmi.h"
#include "mca/base/mca_base_param.h"
#include "mca/base/mca_base_module_exchange.h"
#include "mca/errmgr/errmgr.h"
/* IB bmi includes */
#include "mca/common/vapi/vapi_mem_reg.h"
#include "mca/mpool/base/base.h"
#include "bmi_ib.h"
#include "bmi_ib_frag.h"
#include "bmi_ib_sendfrag.h"
#include "bmi_ib_recvfrag.h"
mca_bmi_ib_component_t mca_bmi_ib_component = {
{
@ -63,7 +62,6 @@ mca_bmi_ib_component_t mca_bmi_ib_component = {
},
mca_bmi_ib_component_init,
mca_bmi_ib_component_control,
mca_bmi_ib_component_progress,
}
};
@ -74,8 +72,8 @@ mca_bmi_ib_component_t mca_bmi_ib_component = {
*/
static inline char* mca_bmi_ib_param_register_string(
const char* param_name,
const char* default_value)
const char* param_name,
const char* default_value)
{
char *param_value;
int id = mca_base_param_register_string("bmi","ib",param_name,NULL,default_value);
@ -100,22 +98,15 @@ static inline int mca_bmi_ib_param_register_int(
int mca_bmi_ib_component_open(void)
{
/* register component parameters */
mca_bmi_ib_module.super.bmi_exclusivity =
mca_bmi_ib_param_register_int ("exclusivity", 0);
mca_bmi_ib_module.super.bmi_first_frag_size =
mca_bmi_ib_param_register_int ("first_frag_size",
(MCA_BMI_IB_FIRST_FRAG_SIZE
- sizeof(mca_bmi_base_header_t)));
mca_bmi_ib_module.super.bmi_min_frag_size =
mca_bmi_ib_param_register_int ("min_frag_size",
(MCA_BMI_IB_FIRST_FRAG_SIZE
- sizeof(mca_bmi_base_header_t)));
mca_bmi_ib_module.super.bmi_max_frag_size =
mca_bmi_ib_param_register_int ("max_frag_size", 2<<30);
/* initialize state */
mca_bmi_ib_component.ib_num_bmis=0;
mca_bmi_ib_component.ib_bmis=NULL;
/* initialize objects */
OBJ_CONSTRUCT(&mca_bmi_ib_component.ib_procs, ompi_list_t);
/* OBJ_CONSTRUCT (&mca_bmi_ib_component.ib_recv_frags, ompi_free_list_t); */
/* register IB component parameters */
mca_bmi_ib_component.ib_free_list_num =
@ -126,12 +117,22 @@ int mca_bmi_ib_component_open(void)
mca_bmi_ib_param_register_int ("free_list_inc", 32);
mca_bmi_ib_component.ib_mem_registry_hints_log_size =
mca_bmi_ib_param_register_int ("hints_log_size", 8);
/* initialize global state */
mca_bmi_ib_component.ib_num_bmis=0;
mca_bmi_ib_component.ib_bmis=NULL;
OBJ_CONSTRUCT(&mca_bmi_ib_component.ib_procs, ompi_list_t);
OBJ_CONSTRUCT (&mca_bmi_ib_component.ib_recv_frags, ompi_free_list_t);
mca_bmi_ib_component.ib_mpool_name =
mca_bmi_ib_param_register_string("mpool", "ib");
mca_bmi_ib_module.super.bmi_exclusivity =
mca_bmi_ib_param_register_int ("exclusivity", 0);
mca_bmi_ib_module.super.bmi_eager_limit =
mca_bmi_ib_param_register_int ("first_frag_size",
(MCA_BMI_IB_FIRST_FRAG_SIZE
- sizeof(mca_bmi_ib_header_t)));
mca_bmi_ib_module.super.bmi_min_frag_size =
mca_bmi_ib_param_register_int ("min_frag_size",
(MCA_BMI_IB_FIRST_FRAG_SIZE
- sizeof(mca_bmi_ib_header_t)));
mca_bmi_ib_module.super.bmi_max_frag_size =
mca_bmi_ib_param_register_int ("max_frag_size", 2<<30);
return OMPI_SUCCESS;
}
@ -142,8 +143,6 @@ int mca_bmi_ib_component_open(void)
int mca_bmi_ib_component_close(void)
{
D_PRINT("");
/* Stub */
return OMPI_SUCCESS;
}
@ -154,6 +153,7 @@ int mca_bmi_ib_component_close(void)
* (2) setup IB listen socket for incoming connection attempts
* (3) register BMI parameters with the MCA
*/
mca_bmi_base_module_t** mca_bmi_ib_component_init(int *num_bmi_modules,
bool enable_progress_threads,
bool enable_mpi_threads)
@ -161,23 +161,27 @@ mca_bmi_base_module_t** mca_bmi_ib_component_init(int *num_bmi_modules,
VAPI_ret_t vapi_ret;
VAPI_hca_id_t* hca_ids;
mca_bmi_base_module_t** bmis;
int i, ret;
int i, ret, length;
mca_common_vapi_hca_pd_t hca_pd;
/* initialization */
*num_bmi_modules = 0;
/* query the list of available hcas */
/* Determine the number of hca's available on the host */
vapi_ret=EVAPI_list_hcas(0, &(mca_bmi_ib_component.ib_num_bmis), NULL);
if( VAPI_EAGAIN != vapi_ret || 0 == mca_bmi_ib_component.ib_num_bmis ) {
ompi_output(0,"Warning: no IB HCAs found\n");
ompi_output(0,"No hca's found on this host \n");
return NULL;
}
/* Allocate space for the hca's */
hca_ids = (VAPI_hca_id_t*) malloc(mca_bmi_ib_component.ib_num_bmis * sizeof(VAPI_hca_id_t));
if(NULL == hca_ids) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return NULL;
}
/* obtain a list of the hca's on this host */
vapi_ret=EVAPI_list_hcas(mca_bmi_ib_component.ib_num_bmis, &mca_bmi_ib_component.ib_num_bmis, hca_ids);
if( VAPI_OK != vapi_ret ) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
@ -186,7 +190,7 @@ mca_bmi_base_module_t** mca_bmi_ib_component_init(int *num_bmi_modules,
/* Allocate space for bmi modules */
mca_bmi_ib_component.ib_bmis = (mca_bmi_ib_module_t*) malloc(sizeof(mca_bmi_ib_module_t) *
mca_bmi_ib_component.ib_num_bmis);
mca_bmi_ib_component.ib_num_bmis);
if(NULL == mca_bmi_ib_component.ib_bmis) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return NULL;
@ -198,45 +202,64 @@ mca_bmi_base_module_t** mca_bmi_ib_component_init(int *num_bmi_modules,
return NULL;
}
/* Initialize pool of receive fragments */
ompi_free_list_init (&(mca_bmi_ib_component.ib_recv_frags),
sizeof (mca_bmi_ib_recv_frag_t),
OBJ_CLASS (mca_bmi_ib_recv_frag_t),
mca_bmi_ib_component.ib_free_list_num,
mca_bmi_ib_component.ib_free_list_max,
mca_bmi_ib_component.ib_free_list_inc, NULL);
/* Initialize each module */
for(i = 0; i < mca_bmi_ib_component.ib_num_bmis; i++) {
mca_bmi_ib_module_t* ib_bmi = &mca_bmi_ib_component.ib_bmis[i];
/* Initialize the modules function pointers */
memcpy(ib_bmi, &mca_bmi_ib_module, sizeof(mca_bmi_ib_module));
/* Initialize module state */
OBJ_CONSTRUCT(&ib_bmi->send_free, ompi_free_list_t);
OBJ_CONSTRUCT(&ib_bmi->recv_free, ompi_free_list_t);
OBJ_CONSTRUCT(&ib_bmi->repost, ompi_list_t);
ompi_free_list_init(&ib_bmi->send_free,
sizeof(mca_bmi_ib_send_frag_t),
OBJ_CLASS(mca_bmi_ib_send_frag_t),
mca_bmi_ib_component.ib_free_list_num,
mca_bmi_ib_component.ib_free_list_max,
mca_bmi_ib_component.ib_free_list_inc,
NULL);
memcpy(ib_bmi->hca_id, hca_ids[i], sizeof(ib_bmi->hca_id));
if(mca_bmi_ib_module_init(ib_bmi) != OMPI_SUCCESS) {
free(hca_ids);
return NULL;
}
hca_pd.hca = ib_bmi->nic;
hca_pd.pd_tag = ib_bmi->ptag;
/* initialize the memory pool using the hca */
ib_bmi->ib_pool =
mca_mpool_base_module_create(mca_bmi_ib_component.ib_mpool_name, &hca_pd);
/* Initialize pool of send fragments */
length = sizeof(mca_bmi_ib_frag_t) +
sizeof(mca_bmi_ib_header_t) +
ib_bmi->super.bmi_eager_limit;
ompi_free_list_init(&ib_bmi->send_free,
length,
OBJ_CLASS(mca_bmi_ib_send_frag_t),
mca_bmi_ib_component.ib_free_list_num,
mca_bmi_ib_component.ib_free_list_max,
mca_bmi_ib_component.ib_free_list_inc,
ib_bmi->ib_pool);
/* Initialize pool of receive fragments */
ompi_free_list_init (&ib_bmi->recv_free,
length,
OBJ_CLASS (mca_bmi_ib_recv_frag_t),
mca_bmi_ib_component.ib_free_list_num,
mca_bmi_ib_component.ib_free_list_max,
mca_bmi_ib_component.ib_free_list_inc, ib_bmi->ib_pool);
/* Initialize the send descriptors */
if(mca_bmi_ib_send_frag_register(ib_bmi) != OMPI_SUCCESS) {
free(hca_ids);
return NULL;
}
/* This is now done by the memory pool passed to free_list_init.. Initialize the send descriptors */
/* if(mca_bmi_ib_send_frag_register(ib_bmi) != OMPI_SUCCESS) { */
/* free(hca_ids); */
/* return NULL; */
/* } */
bmis[i] = &ib_bmi->super;
}
@ -248,15 +271,6 @@ mca_bmi_base_module_t** mca_bmi_ib_component_init(int *num_bmi_modules,
return bmis;
}
/*
* IB component control
*/
int mca_bmi_ib_component_control(int param, void* value, size_t size)
{
return OMPI_SUCCESS;
}
/*
* IB component progress.
@ -300,43 +314,86 @@ int mca_bmi_ib_component_control(int param, void* value, size_t size)
}
int mca_bmi_ib_component_progress(mca_bmi_tstamp_t tstamp)
int mca_bmi_ib_component_progress()
{
int i;
int count = 0;
mca_bmi_ib_frag_t* frag;
/* Poll for completions */
for(i = 0; i < mca_bmi_ib_component.ib_num_bmis; i++) {
mca_bmi_ib_module_t* ib_bmi = &mca_bmi_ib_component.ib_bmis[i];
int comp_type = IB_COMP_NOTHING;
void* comp_addr;
MCA_BMI_IB_DRAIN_NETWORK(ib_bmi->nic, ib_bmi->cq_hndl, &comp_type, &comp_addr);
VAPI_ret_t ret;
VAPI_wc_desc_t comp;
ret = VAPI_poll_cq(ib_bmi->nic, ib_bmi->cq_hndl, &comp);
if(VAPI_OK == ret) {
if(comp.status != VAPI_SUCCESS) {
ompi_output(0, "Got error : %s, Vendor code : %d Frag : %p",
VAPI_wc_status_sym(comp.status),
comp.vendor_err_syndrome, comp.id);
comp_type = IB_COMP_ERROR;
comp_addr = NULL;
} else {
if(VAPI_CQE_SQ_SEND_DATA == comp.opcode) {
comp_type = IB_COMP_SEND;
comp_addr = (void*) (unsigned long) comp.id;
} else if(VAPI_CQE_RQ_SEND_DATA == comp.opcode) {
comp_type = IB_COMP_RECV;
comp_addr = (void*) (unsigned long) comp.id;
} else if(VAPI_CQE_SQ_RDMA_WRITE == comp.opcode) {
comp_type = IB_COMP_RDMA_W;
comp_addr = (void*) (unsigned long) comp.id;
} else {
ompi_output(0, "VAPI_poll_cq: returned unknown opcode : %d\n",
comp.opcode);
comp_type = IB_COMP_ERROR;
comp_addr = NULL;
}
}
} else {
/* No completions from the network */
comp_type = IB_COMP_NOTHING;
comp_addr = NULL;
}
/* Handle n/w completions */
switch(comp_type) {
case IB_COMP_SEND :
/* Process a completed send */
mca_bmi_ib_send_frag_send_complete(ib_bmi, (mca_bmi_ib_send_frag_t*)comp_addr);
mca_bmi_ib_sendfrag_complete(ib_bmi, (mca_bmi_ib_frag_t*)comp_addr);
count++;
break;
case IB_COMP_RECV :
ompi_output(0, "%s:%d ib recv under redesign\n", __FILE__, __LINE__);
frag = (mca_bmi_ib_frag_t*) comp_addr;
frag->segment.seg_len = comp.byte_len-sizeof(mca_bmi_ib_header_t);
/* advance the segment address past the header and subtract from the length..*/
ib_bmi->ib_reg[frag->hdr->tag].cbfunc(&ib_bmi->super, frag->hdr->tag, &frag->base, ib_bmi->ib_reg[frag->hdr->tag].cbdata);
/* Process incoming receives */
mca_bmi_ib_process_recv(ib_bmi, comp_addr);
/* Re post recv buffers */
if(ompi_list_get_size(&ib_bmi->repost) <= 1) {
ompi_list_append(&ib_bmi->repost, (ompi_list_item_t*)comp_addr);
} else {
ompi_list_item_t* item;
while(NULL != (item = ompi_list_remove_first(&ib_bmi->repost))) {
mca_bmi_ib_buffer_repost(ib_bmi->nic, item);
}
mca_bmi_ib_buffer_repost(ib_bmi->nic, comp_addr);
}
count++;
/* mca_bmi_ib_process_recv(ib_bmi, comp_addr); */
/* /\* Re post recv buffers *\/ */
/* if(ompi_list_get_size(&ib_bmi->repost) <= 1) { */
/* ompi_list_append(&ib_bmi->repost, (ompi_list_item_t*)comp_addr); */
/* } else { */
/* ompi_list_item_t* item; */
/* while(NULL != (item = ompi_list_remove_first(&ib_bmi->repost))) { */
/* mca_bmi_ib_buffer_repost(ib_bmi->nic, item); */
/* } */
/* mca_bmi_ib_buffer_repost(ib_bmi->nic, comp_addr); */
/* } */
/* count++; */
break;
case IB_COMP_RDMA_W :
@ -354,4 +411,3 @@ int mca_bmi_ib_component_progress(mca_bmi_tstamp_t tstamp)
}
return count;
}

Просмотреть файл

@ -16,15 +16,14 @@
* $HEADER$
*/
#ifndef MCA_BMI_IB_PEER_H
#define MCA_BMI_IB_PEER_H
#ifndef MCA_BMI_IB_ENDPOINT_H
#define MCA_BMI_IB_ENDPOINT_H
#include "class/ompi_list.h"
#include "event/event.h"
#include "mca/pml/pml.h"
#include "mca/bmi/bmi.h"
#include "bmi_ib_recvfrag.h"
#include "bmi_ib_sendfrag.h"
#include "bmi_ib_frag.h"
#include "bmi_ib_priv.h"
#if defined(c_plusplus) || defined(__cplusplus)
@ -33,7 +32,7 @@ extern "C" {
OBJ_CLASS_DECLARATION(mca_bmi_ib_endpoint_t);
/**
* State of IB peer connection.
* State of IB endpoint connection.
*/
typedef enum {
@ -41,7 +40,7 @@ typedef enum {
* has started the process of connection */
MCA_BMI_IB_CONNECTING,
/* Waiting for ack from peer */
/* Waiting for ack from endpoint */
MCA_BMI_IB_CONNECT_ACK,
/* Connected ... both sender & receiver have
@ -58,38 +57,38 @@ typedef enum {
} mca_bmi_ib_endpoint_state_t;
/**
* An abstraction that represents a connection to a peer process.
* An abstraction that represents a connection to a endpoint process.
* An instance of mca_bmi_base_endpoint_t is associated w/ each process
* and BMI pair at startup. However, connections to the peer
* and BMI pair at startup. However, connections to the endpoint
* are established dynamically on an as-needed basis:
*/
struct mca_bmi_base_endpoint_t {
ompi_list_item_t super;
struct mca_bmi_ib_module_t* peer_bmi;
struct mca_bmi_ib_module_t* endpoint_bmi;
/**< BMI instance that created this connection */
struct mca_bmi_ib_proc_t* peer_proc;
/**< proc structure corresponding to peer */
struct mca_bmi_ib_proc_t* endpoint_proc;
/**< proc structure corresponding to endpoint */
mca_bmi_ib_endpoint_state_t peer_state;
mca_bmi_ib_endpoint_state_t endpoint_state;
/**< current state of the connection */
size_t peer_retries;
size_t endpoint_retries;
/**< number of connection retries attempted */
double peer_tstamp;
double endpoint_tstamp;
/**< timestamp of when the first connection was attempted */
ompi_mutex_t peer_send_lock;
/**< lock for concurrent access to peer state */
ompi_mutex_t endpoint_send_lock;
/**< lock for concurrent access to endpoint state */
ompi_mutex_t peer_recv_lock;
/**< lock for concurrent access to peer state */
ompi_mutex_t endpoint_recv_lock;
/**< lock for concurrent access to endpoint state */
ompi_list_t pending_send_frags;
/**< list of pending send frags for this peer */
/**< list of pending send frags for this endpoint */
VAPI_qp_num_t rem_qp_num;
/* Remote side QP number */
@ -102,34 +101,34 @@ struct mca_bmi_base_endpoint_t {
VAPI_qp_prop_t lcl_qp_prop;
/* Local QP properties */
ib_buffer_t *lcl_recv;
/* Remote resources associated with this connection */
};
typedef struct mca_bmi_base_endpoint_t mca_bmi_base_endpoint_t;
typedef struct mca_bmi_base_endpoint_t mca_bmi_ib_endpoint_t;
typedef mca_bmi_base_endpoint_t mca_bmi_ib_endpoint_t;
int mca_bmi_ib_peer_send(mca_bmi_base_endpoint_t*, mca_bmi_ib_send_frag_t*);
int mca_bmi_ib_peer_connect(mca_bmi_base_endpoint_t*);
int mca_bmi_ib_endpoint_send(struct mca_bmi_base_endpoint_t* endpoint, struct mca_bmi_ib_frag_t* frag);
int mca_bmi_ib_endpoint_connect(mca_bmi_base_endpoint_t*);
void mca_bmi_ib_post_recv(void);
void mca_bmi_ib_progress_send_frags(mca_bmi_ib_endpoint_t*);
#define DUMP_PEER(peer_ptr) { \
#define DUMP_ENDPOINT(endpoint_ptr) { \
ompi_output(0, "[%s:%d] ", __FILE__, __LINE__); \
ompi_output(0, "Dumping peer %d state", \
peer->peer_proc->proc_guid.vpid); \
ompi_output(0, "Dumping endpoint %d state", \
endpoint->endpoint_proc->proc_guid.vpid); \
ompi_output(0, "Local QP hndl : %d", \
peer_ptr->peer_conn->lres->qp_hndl); \
endpoint_ptr->endpoint_conn->lres->qp_hndl); \
ompi_output(0, "Local QP num : %d", \
peer_ptr->peer_conn->lres->qp_prop.qp_num); \
endpoint_ptr->endpoint_conn->lres->qp_prop.qp_num); \
ompi_output(0, "Remote QP num : %d", \
peer_ptr->peer_conn->rres->qp_num); \
endpoint_ptr->endpoint_conn->rres->qp_num); \
ompi_output(0, "Remote LID : %d", \
peer_ptr->peer_conn->rres->lid); \
endpoint_ptr->endpoint_conn->rres->lid); \
}
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif

Просмотреть файл

@ -16,19 +16,24 @@
* $HEADER$
*/
#ifndef MCA_BMI_IB_SEND_FRAG_H
#define MCA_BMI_IB_SEND_FRAG_H
#ifndef MCA_BMI_IB_FRAG_H
#define MCA_BMI_IB_FRAG_H
#include "ompi_config.h"
#include "mca/bmi/base/bmi_base_sendreq.h"
#include "mca/bmi/base/bmi_base_sendfrag.h"
#include "bmi_ib_priv.h"
#include "bmi_ib.h"
#include <vapi.h>
#include <mtl_common.h>
#include <vapi_common.h>
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
OBJ_CLASS_DECLARATION(mca_bmi_ib_send_frag_t);
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_bmi_ib_frag_t);
typedef mca_bmi_base_header_t mca_bmi_ib_header_t;
typedef enum {
MCA_BMI_IB_FRAG_SEND,
@ -37,6 +42,7 @@ typedef enum {
MCA_BMI_IB_FRAG_ACK
} mca_bmi_ib_frag_type_t;
/**
* IB send fragment derived type.
*/
@ -45,67 +51,44 @@ struct mca_bmi_ib_frag_t {
mca_bmi_base_segment_t segment;
struct mca_bmi_base_endpoint_t *endpoint;
mca_bmi_ib_frag_type_t type;
mca_bmi_base_tag_t tag;
size_t size;
int rc;
bool frag_ack_pending;
union{
VAPI_rr_desc_t rr_desc;
VAPI_sr_desc_t sr_desc;
};
VAPI_sg_lst_entry_t sg_entry;
VAPI_mr_hndl_t mem_hndl;
VAPI_ret_t ret;
mca_bmi_ib_header_t *hdr;
};
typedef struct mca_bmi_ib_frag_t mca_bmi_ib_frag_t;
typedef struct mca_bmi_ib_frag_t mca_bmi_ib_send_frag_t;
typedef struct mca_bmi_ib_frag_t mca_bmi_ib_recv_frag_t;
OBJ_CLASS_DECLARATION(mca_bmi_ib_frag_t);
OBJ_CLASS_DECLARATION(mca_bmi_ib_send_frag_t);
OBJ_CLASS_DECLARATION(mca_bmi_ib_recv_frag_t);
/*
* Allocate an IB send descriptor
*
*/
#define MCA_BMI_IB_FRAG_ALLOC1(frag, rc) \
#define MCA_BMI_IB_FRAG_ALLOC1(bmi, frag, rc) \
{ \
\
ompi_list_item_t *item; \
OMPI_FREE_LIST_WAIT(&mca_bmi_ib_module.ib_frags1, item, rc); \
OMPI_FREE_LIST_WAIT(&((mca_bmi_ib_module_t*)bmi)->send_free, item, rc); \
frag = (mca_bmi_ib_frag_t*) item; \
}
#define MCA_BMI_IB_FRAG_RETURN1(frag) \
#define MCA_BMI_IB_FRAG_RETURN1(bmi, frag) \
{ \
OMPI_FREE_LIST_RETURN(&mca_bmi_ib_module.ib_frags1, &frag->super); \
}
int mca_bmi_ib_send_frag_register(mca_bmi_ib_module_t *ib_bmi)
{
int i, rc, num_send_frags;
ompi_list_item_t *item;
ompi_free_list_t *flist = &ib_bmi->ib_frags1;
ib_buffer_t *ib_buf_ptr;
mca_bmi_ib_frag_t *ib_frag;
num_send_frags = ompi_list_get_size(&(flist->super));
item = ompi_list_get_first(&((flist)->super));
/* Register the buffers */
for(i = 0; i < num_send_frags;
item = ompi_list_get_next(item), i++) {
ib_send_frag = (mca_bmi_ib_send_frag_t *) item;
ib_send_frag->frag_progressed = 0;
ib_buf_ptr = (ib_buffer_t *) &ib_send_frag->ib_buf;
rc = mca_bmi_ib_register_mem(ib_bmi->nic, ib_bmi->ptag,
(void*) ib_buf_ptr->buf,
MCA_BMI_IB_FIRST_FRAG_SIZE,
&ib_buf_ptr->hndl);
if(rc != OMPI_SUCCESS) {
return OMPI_ERROR;
}
IB_PREPARE_SEND_DESC(ib_buf_ptr, 0,
MCA_BMI_IB_FIRST_FRAG_SIZE, ib_buf_ptr);
}
return OMPI_SUCCESS;
OMPI_FREE_LIST_RETURN(&((mca_bmi_ib_module_t*)bmi)->send_free, &frag->base.super); \
}
@ -113,14 +96,6 @@ int mca_bmi_ib_send_frag_register(mca_bmi_ib_module_t *ib_bmi)
struct mca_bmi_ib_module_t;
mca_bmi_ib_send_frag_t* mca_bmi_ib_alloc_send_frag(
struct mca_bmi_ib_module_t* ib_bmi,
mca_bmi_base_send_request_t* request);
int mca_bmi_ib_send_frag_register(struct mca_bmi_ib_module_t *bmi);
void mca_bmi_ib_send_frag_send_complete(struct mca_bmi_ib_module_t *bmi, mca_bmi_ib_send_frag_t*);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif

Просмотреть файл

@ -245,8 +245,8 @@ int mca_bmi_ib_module_init(mca_bmi_ib_module_t *ib_bmi)
}
/* initialize memory region registry */
OBJ_CONSTRUCT(&ib_bmi->mem_registry, mca_bmi_ib_mem_registry_t);
mca_bmi_ib_mem_registry_init(&ib_bmi->mem_registry, ib_bmi);
/* OBJ_CONSTRUCT(&ib_bmi->mem_registry, mca_bmi_ib_mem_registry_t); */
/* mca_bmi_ib_mem_registry_init(&ib_bmi->mem_registry, ib_bmi); */
return OMPI_SUCCESS;
}
@ -344,131 +344,132 @@ int mca_bmi_ib_qp_init(VAPI_hca_hndl_t nic,
return OMPI_SUCCESS;
}
int mca_bmi_ib_register_mem(VAPI_hca_hndl_t nic, VAPI_pd_hndl_t ptag,
void* buf, int len, vapi_memhandle_t* memhandle)
{
VAPI_ret_t ret;
VAPI_mrw_t mr_in, mr_out;
vapi_memhandle_t mem_handle;
mr_in.acl = VAPI_EN_LOCAL_WRITE | VAPI_EN_REMOTE_WRITE;
mr_in.l_key = 0;
mr_in.r_key = 0;
mr_in.pd_hndl = ptag;
mr_in.size = len;
mr_in.start = (VAPI_virt_addr_t) (MT_virt_addr_t) buf;
mr_in.type = VAPI_MR;
/* int mca_bmi_ib_register_mem(VAPI_hca_hndl_t nic, VAPI_pd_hndl_t ptag, */
/* void* buf, int len, vapi_memhandle_t* memhandle) */
/* { */
/* VAPI_ret_t ret; */
/* VAPI_mrw_t mr_in, mr_out; */
/* vapi_memhandle_t mem_handle; */
ret = VAPI_register_mr(nic, &mr_in, &mem_handle.hndl, &mr_out);
if(VAPI_OK != ret) {
MCA_BMI_IB_VAPI_RET(ret, "VAPI_register_mr");
return OMPI_ERROR;
}
/* mr_in.acl = VAPI_EN_LOCAL_WRITE | VAPI_EN_REMOTE_WRITE; */
/* mr_in.l_key = 0; */
/* mr_in.r_key = 0; */
/* mr_in.pd_hndl = ptag; */
/* mr_in.size = len; */
/* mr_in.start = (VAPI_virt_addr_t) (MT_virt_addr_t) buf; */
/* mr_in.type = VAPI_MR; */
mem_handle.lkey = mr_out.l_key;
mem_handle.rkey = mr_out.r_key;
/* ret = VAPI_register_mr(nic, &mr_in, &mem_handle.hndl, &mr_out); */
/* if(VAPI_OK != ret) { */
/* MCA_BMI_IB_VAPI_RET(ret, "VAPI_register_mr"); */
/* return OMPI_ERROR; */
/* } */
memhandle->lkey = mem_handle.lkey;
memhandle->rkey = mem_handle.rkey;
/* mem_handle.lkey = mr_out.l_key; */
/* mem_handle.rkey = mr_out.r_key; */
/* D_PRINT("addr = %p, lkey = %d\n", buf, memhandle->lkey); */
/* memhandle->lkey = mem_handle.lkey; */
/* memhandle->rkey = mem_handle.rkey; */
memhandle->hndl = mem_handle.hndl;
/* /\* D_PRINT("addr = %p, lkey = %d\n", buf, memhandle->lkey); *\/ */
return OMPI_SUCCESS;
}
/* memhandle->hndl = mem_handle.hndl; */
/* return OMPI_SUCCESS; */
/* } */
int mca_bmi_ib_post_send(mca_bmi_ib_module_t *ib_bmi,
mca_bmi_ib_endpoint_t *peer,
ib_buffer_t *ib_buf, void* addr)
{
VAPI_ret_t ret;
int msg_len = ib_buf->desc.sg_entry.len;
/* int mca_bmi_ib_post_send(mca_bmi_ib_module_t *ib_bmi, */
/* mca_bmi_ib_endpoint_t *peer, */
/* ib_buffer_t *ib_buf, void* addr) */
/* { */
/* VAPI_ret_t ret; */
/* int msg_len = ib_buf->desc.sg_entry.len; */
IB_PREPARE_SEND_DESC(ib_buf, (peer->rem_qp_num),
msg_len, addr);
/* IB_PREPARE_SEND_DESC(ib_buf, (peer->rem_qp_num), */
/* msg_len, addr); */
/* TODO - get this from NIC properties */
if(msg_len < 128) { /* query this information from VAPI_query_qp(property max_inline_data_sq) */
ret = EVAPI_post_inline_sr(ib_bmi->nic,
peer->lcl_qp_hndl,
&ib_buf->desc.sr);
} else {
ret = VAPI_post_sr(ib_bmi->nic,
peer->lcl_qp_hndl,
&ib_buf->desc.sr);
}
/* /\* TODO - get this from NIC properties *\/ */
/* if(msg_len < 128) { /\* query this information from VAPI_query_qp(property max_inline_data_sq) *\/ */
/* ret = EVAPI_post_inline_sr(ib_bmi->nic, */
/* peer->lcl_qp_hndl, */
/* &ib_buf->desc.sr); */
/* } else { */
/* ret = VAPI_post_sr(ib_bmi->nic, */
/* peer->lcl_qp_hndl, */
/* &ib_buf->desc.sr); */
/* } */
if(VAPI_OK != ret) {
MCA_BMI_IB_VAPI_RET(ret, "VAPI_post_sr");
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/* if(VAPI_OK != ret) { */
/* MCA_BMI_IB_VAPI_RET(ret, "VAPI_post_sr"); */
/* return OMPI_ERROR; */
/* } */
/* return OMPI_SUCCESS; */
/* } */
void mca_bmi_ib_buffer_repost(VAPI_hca_hndl_t nic, void* addr)
{
VAPI_ret_t ret;
ib_buffer_t *ib_buf = (ib_buffer_t*)addr;
/* void mca_bmi_ib_buffer_repost(VAPI_hca_hndl_t nic, void* addr) */
/* { */
/* VAPI_ret_t ret; */
/* ib_buffer_t *ib_buf = (ib_buffer_t*)addr; */
IB_PREPARE_RECV_DESC(ib_buf);
/* IB_PREPARE_RECV_DESC(ib_buf); */
ret = VAPI_post_rr(nic, ib_buf->qp_hndl, &(ib_buf->desc.rr));
/* ret = VAPI_post_rr(nic, ib_buf->qp_hndl, &(ib_buf->desc.rr)); */
if(VAPI_OK != ret) {
MCA_BMI_IB_VAPI_RET(ret, "VAPI_post_rr");
ompi_output(0, "Error in buffer reposting");
}
}
/* if(VAPI_OK != ret) { */
/* MCA_BMI_IB_VAPI_RET(ret, "VAPI_post_rr"); */
/* ompi_output(0, "Error in buffer reposting"); */
/* } */
/* } */
void mca_bmi_ib_prepare_ack(mca_bmi_ib_module_t *ib_bmi,
void* addr_to_reg, int len_to_reg,
void* ack_buf, int* len_added)
{
mca_bmi_ib_mem_registry_info_t *info =
mca_bmi_ib_register_mem_with_registry(ib_bmi,
addr_to_reg, (size_t)len_to_reg);
/* void mca_bmi_ib_prepare_ack(mca_bmi_ib_module_t *ib_bmi, */
/* void* addr_to_reg, int len_to_reg, */
/* void* ack_buf, int* len_added) */
/* { */
/* mca_bmi_ib_mem_registry_info_t *info = */
/* mca_bmi_ib_register_mem_with_registry(ib_bmi, */
/* addr_to_reg, (size_t)len_to_reg); */
if(NULL == info) {
ompi_output(0, "Error in registering");
}
/* if(NULL == info) { */
/* ompi_output(0, "Error in registering"); */
/* } */
A_PRINT("Sending Remote key : %d", info->reply.r_key);
/* A_PRINT("Sending Remote key : %d", info->reply.r_key); */
memcpy(ack_buf,(void*) &(info->reply.r_key), sizeof(VAPI_rkey_t));
/* memcpy(ack_buf,(void*) &(info->reply.r_key), sizeof(VAPI_rkey_t)); */
*len_added = sizeof(VAPI_rkey_t);
}
/* *len_added = sizeof(VAPI_rkey_t); */
/* } */
int mca_bmi_ib_rdma_write(mca_bmi_ib_module_t *ib_bmi,
mca_bmi_ib_endpoint_t *peer, ib_buffer_t *ib_buf,
void* send_buf, size_t send_len, void* remote_buf,
VAPI_rkey_t remote_key, void* id_buf)
{
VAPI_ret_t ret;
/* int mca_bmi_ib_rdma_write(mca_bmi_ib_module_t *ib_bmi, */
/* mca_bmi_ib_endpoint_t *peer, ib_buffer_t *ib_buf, */
/* void* send_buf, size_t send_len, void* remote_buf, */
/* VAPI_rkey_t remote_key, void* id_buf) */
/* { */
/* VAPI_ret_t ret; */
mca_bmi_ib_mem_registry_info_t *info =
mca_bmi_ib_register_mem_with_registry(ib_bmi,
send_buf, send_len);
/* mca_bmi_ib_mem_registry_info_t *info = */
/* mca_bmi_ib_register_mem_with_registry(ib_bmi, */
/* send_buf, send_len); */
if (NULL == info) {
return OMPI_ERROR;
}
/* if (NULL == info) { */
/* return OMPI_ERROR; */
/* } */
/* Prepare descriptor */
IB_PREPARE_RDMA_W_DESC(ib_buf, (peer->rem_qp_num),
send_len, send_buf, (info->reply.l_key), remote_key,
id_buf, remote_buf);
/* /\* Prepare descriptor *\/ */
/* IB_PREPARE_RDMA_W_DESC(ib_buf, (peer->rem_qp_num), */
/* send_len, send_buf, (info->reply.l_key), remote_key, */
/* id_buf, remote_buf); */
ret = VAPI_post_sr(ib_bmi->nic,
peer->lcl_qp_hndl,
&ib_buf->desc.sr);
if(ret != VAPI_OK) {
MCA_BMI_IB_VAPI_RET(ret, "VAPI_post_sr");
return OMPI_ERROR;
}
/* ret = VAPI_post_sr(ib_bmi->nic, */
/* peer->lcl_qp_hndl, */
/* &ib_buf->desc.sr); */
/* if(ret != VAPI_OK) { */
/* MCA_BMI_IB_VAPI_RET(ret, "VAPI_post_sr"); */
/* return OMPI_ERROR; */
/* } */
return OMPI_SUCCESS;
}
/* return OMPI_SUCCESS; */
/* } */

Просмотреть файл

@ -72,22 +72,23 @@ struct vapi_descriptor_t {
typedef struct vapi_descriptor_t vapi_descriptor_t;
struct ib_buffer_t {
ompi_list_item_t super;
vapi_descriptor_t desc;
/* Descriptor of the buffer */
vapi_memhandle_t hndl;
/* Buffer handle */
/* struct ib_buffer_t { */
/* ompi_list_item_t super; */
/* vapi_descriptor_t desc; */
/* /\* Descriptor of the buffer *\/ */
char buf[MCA_BMI_IB_FIRST_FRAG_SIZE];
/* Buffer space */
/* vapi_memhandle_t hndl; */
/* /\* Buffer handle *\/ */
VAPI_qp_hndl_t qp_hndl;
/* Queue pair used for this IB buffer */
};
/* char buf[MCA_BMI_IB_FIRST_FRAG_SIZE]; */
/* /\* Buffer space *\/ */
typedef struct ib_buffer_t ib_buffer_t;
/* VAPI_qp_hndl_t qp_hndl; */
/* /\* Queue pair used for this IB buffer *\/ */
/* }; */
/* typedef struct ib_buffer_t ib_buffer_t; */
#define DUMP_IB_STATE(ib_bmi) { \
@ -177,28 +178,28 @@ int mca_bmi_ib_register_mem(
int len,
vapi_memhandle_t* memhandle);
int mca_bmi_ib_post_send(
struct mca_bmi_ib_module_t *ib_module,
struct mca_bmi_base_endpoint_t *peer,
ib_buffer_t *ib_buf, void*);
/* int mca_bmi_ib_post_send( */
/* struct mca_bmi_ib_module_t *ib_module, */
/* struct mca_bmi_base_endpoint_t *peer, */
/* ib_buffer_t *ib_buf, void*); */
void mca_bmi_ib_buffer_repost(
VAPI_hca_hndl_t nic,
void* addr);
/* void mca_bmi_ib_buffer_repost( */
/* VAPI_hca_hndl_t nic, */
/* void* addr); */
void mca_bmi_ib_prepare_ack(
struct mca_bmi_ib_module_t *ib_module,
void* addr_to_reg, int len_to_reg,
void* ack_buf, int* len_added);
/* void mca_bmi_ib_prepare_ack( */
/* struct mca_bmi_ib_module_t *ib_module, */
/* void* addr_to_reg, int len_to_reg, */
/* void* ack_buf, int* len_added); */
int mca_bmi_ib_rdma_write(
struct mca_bmi_ib_module_t *ib_module,
struct mca_bmi_base_endpoint_t *peer,
ib_buffer_t *ib_buf,
void* send_buf,
size_t send_len,
void* remote_buf,
VAPI_rkey_t remote_key, void*);
/* int mca_bmi_ib_rdma_write( */
/* struct mca_bmi_ib_module_t *ib_module, */
/* struct mca_bmi_base_endpoint_t *peer, */
/* ib_buffer_t *ib_buf, */
/* void* send_buf, */
/* size_t send_len, */
/* void* remote_buf, */
/* VAPI_rkey_t remote_key, void*); */
int mca_bmi_ib_create_qp(VAPI_hca_hndl_t nic,
VAPI_pd_hndl_t ptag,

Просмотреть файл

@ -36,8 +36,8 @@ void mca_bmi_ib_proc_construct(mca_bmi_ib_proc_t* proc)
{
proc->proc_ompi = 0;
proc->proc_addr_count = 0;
proc->proc_peers = 0;
proc->proc_peer_count = 0;
proc->proc_endpoints = 0;
proc->proc_endpoint_count = 0;
OBJ_CONSTRUCT(&proc->proc_lock, ompi_mutex_t);
/* add to list of all proc instance */
OMPI_THREAD_LOCK(&mca_bmi_ib_component.ib_lock);
@ -57,8 +57,8 @@ void mca_bmi_ib_proc_destruct(mca_bmi_ib_proc_t* proc)
OMPI_THREAD_UNLOCK(&mca_bmi_ib_component.ib_lock);
/* release resources */
if(NULL != proc->proc_peers) {
free(proc->proc_peers);
if(NULL != proc->proc_endpoints) {
free(proc->proc_endpoints);
}
}
@ -119,7 +119,7 @@ mca_bmi_ib_proc_t* mca_bmi_ib_proc_create(ompi_proc_t* ompi_proc)
module_proc = OBJ_NEW(mca_bmi_ib_proc_t);
/* Initialize number of peer */
module_proc->proc_peer_count = 0;
module_proc->proc_endpoint_count = 0;
module_proc->proc_ompi = ompi_proc;
@ -135,12 +135,12 @@ mca_bmi_ib_proc_t* mca_bmi_ib_proc_create(ompi_proc_t* ompi_proc)
/* XXX: Right now, there can be only 1 peer associated
* with a proc. Needs a little bit change in
* mca_bmi_ib_proc_t to allow on demand increasing of
* number of peers for this proc */
* number of endpoints for this proc */
module_proc->proc_peers = (mca_bmi_base_endpoint_t**)
module_proc->proc_endpoints = (mca_bmi_base_endpoint_t**)
malloc(module_proc->proc_addr_count * sizeof(mca_bmi_base_endpoint_t*));
if(NULL == module_proc->proc_peers) {
if(NULL == module_proc->proc_endpoints) {
OBJ_RELEASE(module_proc);
return NULL;
}
@ -154,11 +154,11 @@ mca_bmi_ib_proc_t* mca_bmi_ib_proc_create(ompi_proc_t* ompi_proc)
* it an address.
*/
int mca_bmi_ib_proc_insert(mca_bmi_ib_proc_t* module_proc,
mca_bmi_base_endpoint_t* module_peer)
mca_bmi_base_endpoint_t* module_endpoint)
{
/* insert into peer array */
module_peer->peer_proc = module_proc;
module_proc->proc_peers[module_proc->proc_peer_count++] = module_peer;
/* insert into endpoint array */
module_endpoint->endpoint_proc = module_proc;
module_proc->proc_endpoints[module_proc->proc_endpoint_count++] = module_endpoint;
return OMPI_SUCCESS;
}

Просмотреть файл

@ -25,7 +25,7 @@
#include "bmi_ib.h"
#include "bmi_ib_vapi.h"
#include "bmi_ib_addr.h"
#include "bmi_ib_peer.h"
#include "bmi_ib_endpoint.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
@ -49,13 +49,13 @@ struct mca_bmi_ib_proc_t {
/**< globally unique identifier for the process */
size_t proc_addr_count;
/**< number of addresses published by peer */
/**< number of addresses published by endpoint */
struct mca_bmi_base_endpoint_t **proc_peers;
/**< array of peers that have been created to access this proc */
struct mca_bmi_base_endpoint_t **proc_endpoints;
/**< array of endpoints that have been created to access this proc */
size_t proc_peer_count;
/**< number of peers */
size_t proc_endpoint_count;
/**< number of endpoints */
ompi_mutex_t proc_lock;
/**< lock to protect against concurrent access to proc state */

Просмотреть файл

@ -17,183 +17,14 @@
*/
#include "ompi_config.h"
#include "mca/pml/base/pml_base_sendreq.h"
#include "bmi_ib.h"
#include "bmi_ib_peer.h"
#include "bmi_ib_recvfrag.h"
#include "bmi_ib_sendfrag.h"
#include "bmi_ib_memory.h"
#include "bmi_ib_recvfrag.h"
static void mca_bmi_ib_recv_frag_construct(mca_bmi_ib_recv_frag_t* frag);
static void mca_bmi_ib_recv_frag_destruct(mca_bmi_ib_recv_frag_t* frag);
OBJ_CLASS_INSTANCE(mca_bmi_ib_recv_frag_t,
mca_bmi_base_recv_frag_t,
mca_bmi_ib_recv_frag_construct,
mca_bmi_ib_recv_frag_destruct);
/*
* IB fragment constructor
*/
static void mca_bmi_ib_recv_frag_construct(mca_bmi_ib_recv_frag_t* frag)
void mca_bmi_ib_process_recv(mca_bmi_ib_module_t *ib_bmi, mca_bmi_ib_recv_frag_t* frag)
{
}
/*
* IB fragment destructor
*/
static void mca_bmi_ib_recv_frag_destruct(mca_bmi_ib_recv_frag_t* frag)
{
}
void
mca_bmi_ib_recv_frag_done (
mca_bmi_base_header_t *header,
mca_bmi_base_recv_frag_t* frag,
mca_bmi_base_recv_request_t *request)
{
D_PRINT("");
frag->frag_base.frag_owner->bmi_recv_progress (
frag->frag_base.frag_owner,
request,
frag->frag_base.frag_size,
frag->frag_base.frag_size);
/* Return recv frag to free list */
OMPI_FREE_LIST_RETURN(&mca_bmi_ib_component.ib_recv_frags,
(ompi_list_item_t*)frag);
}
static void mca_bmi_ib_data_frag(
mca_bmi_ib_module_t *ib_bmi,
mca_bmi_base_header_t *hdr)
{
bool matched;
int rc;
ompi_list_item_t *item;
mca_bmi_ib_recv_frag_t *recv_frag;
size_t hdr_length;
OMPI_FREE_LIST_WAIT (&mca_bmi_ib_component.ib_recv_frags, item, rc);
recv_frag = (mca_bmi_ib_recv_frag_t *) item;
recv_frag->super.frag_base.frag_owner = &ib_bmi->super;
recv_frag->super.frag_base.frag_peer = NULL;
recv_frag->super.frag_request = NULL;
recv_frag->super.frag_is_buffered = false;
/* Copy the header, mca_bmi_base_match() */
recv_frag->super.frag_base.frag_header = *hdr;
switch(hdr->hdr_common.hdr_type) {
case MCA_BMI_HDR_TYPE_MATCH:
hdr_length = sizeof(mca_bmi_base_match_header_t);
recv_frag->super.frag_base.frag_size = hdr->hdr_match.hdr_msg_length;
break;
case MCA_BMI_HDR_TYPE_RNDV:
hdr_length = sizeof(mca_bmi_base_rendezvous_header_t);
recv_frag->super.frag_base.frag_size = hdr->hdr_rndv.hdr_frag_length;
break;
}
/* Taking the data starting point be default */
recv_frag->super.frag_base.frag_addr = (char *) hdr + hdr_length;
/* match against preposted requests */
matched = ib_bmi->super.bmi_match(
recv_frag->super.frag_base.frag_owner,
&recv_frag->super,
&recv_frag->super.frag_base.frag_header.hdr_match);
if (!matched) {
memcpy (recv_frag->unex_buf, (char *) hdr + hdr_length, recv_frag->super.frag_base.frag_size);
recv_frag->super.frag_is_buffered = true;
recv_frag->super.frag_base.frag_addr = recv_frag->unex_buf;
}
}
static void mca_bmi_ib_ctrl_frag(
mca_bmi_ib_module_t *ib_bmi,
mca_bmi_base_header_t *header)
{
mca_bmi_ib_send_frag_t *send_frag;
mca_bmi_base_send_request_t *req;
void *data_ptr;
send_frag = (mca_bmi_ib_send_frag_t *)
header->hdr_ack.hdr_src_ptr.pval;
req = (mca_bmi_base_send_request_t *)
send_frag->frag_send.frag_request;
req->req_peer_match = header->hdr_ack.hdr_dst_match;
req->req_peer_addr = header->hdr_ack.hdr_dst_addr;
req->req_peer_size = header->hdr_ack.hdr_dst_size;
/* Locate data in the ACK buffer */
data_ptr = (void*)
((char*) header + sizeof(mca_bmi_base_ack_header_t));
/* Copy over data to request buffer */
memcpy(&((mca_bmi_ib_send_request_t *) req)->req_key,
data_ptr, sizeof(VAPI_rkey_t));
/* Progress & release fragments */
mca_bmi_ib_send_frag_send_complete(ib_bmi, send_frag);
}
static void mca_bmi_ib_last_frag(mca_bmi_ib_module_t *ib_bmi,
mca_bmi_base_header_t *hdr)
{
mca_bmi_ib_fin_header_t *fin_hdr = (mca_bmi_ib_fin_header_t *)hdr;
mca_bmi_base_recv_request_t *request;
request = (mca_bmi_base_recv_request_t*) hdr->hdr_frag.hdr_dst_ptr.pval;
/* deregister memory if this is the last fragment */
if ((request->req_bytes_received + hdr->hdr_frag.hdr_frag_length) >=
request->req_recv.req_bytes_packed) {
mca_bmi_ib_deregister_mem_with_registry(ib_bmi,
fin_hdr->mr_addr.pval, (size_t)fin_hdr->mr_size);
}
ib_bmi->super.bmi_recv_progress (
&ib_bmi->super,
request,
hdr->hdr_frag.hdr_frag_length,
hdr->hdr_frag.hdr_frag_length);
ompi_output(0, "%s:%d ib mca_bmi_ib_process_recv got back tag %d", __FILE__, __LINE__, frag->hdr->tag);
}
/*
* Process incoming receive fragments
*
*/
void mca_bmi_ib_process_recv(mca_bmi_ib_module_t *ib_bmi, void* addr)
{
ib_buffer_t *ib_buf;
mca_bmi_base_header_t *header;
ib_buf = (ib_buffer_t *) addr;
header = (mca_bmi_base_header_t *) &ib_buf->buf[0];
switch(header->hdr_common.hdr_type) {
case MCA_BMI_HDR_TYPE_MATCH :
case MCA_BMI_HDR_TYPE_RNDV :
case MCA_BMI_HDR_TYPE_FRAG :
mca_bmi_ib_data_frag(ib_bmi, header);
break;
case MCA_BMI_HDR_TYPE_ACK :
mca_bmi_ib_ctrl_frag(ib_bmi, header);
break;
case MCA_BMI_HDR_TYPE_FIN :
A_PRINT("Fin");
mca_bmi_ib_last_frag(ib_bmi, header);
break;
default :
ompi_output(0, "Unknown fragment type");
break;
}
}

Просмотреть файл

@ -20,34 +20,7 @@
#define MCA_BMI_IB_RECV_FRAG_H
#include "mca/bmi/bmi.h"
#include "mca/bmi/base/bmi_base_recvfrag.h"
#define MCA_BMI_IB_UNEX_BUF_SIZE (4096)
void mca_bmi_ib_process_recv(mca_bmi_ib_module_t*, mca_bmi_ib_recv_frag_t*);
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
OBJ_CLASS_DECLARATION(mca_bmi_ib_recv_frag_t);
/**
* IB received fragment derived type.
*/
struct mca_bmi_ib_recv_frag_t {
mca_bmi_base_recv_frag_t super;
/**< base receive fragment descriptor */
char unex_buf[MCA_BMI_IB_UNEX_BUF_SIZE];
/**< Unexpected buffer */
};
typedef struct mca_bmi_ib_recv_frag_t mca_bmi_ib_recv_frag_t;
struct mca_bmi_ib_module_t;
void mca_bmi_ib_recv_frag_done (mca_bmi_base_header_t*,
mca_bmi_base_recv_frag_t*, mca_bmi_base_recv_request_t*);
void mca_bmi_ib_process_recv(struct mca_bmi_ib_module_t* , void*);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

Просмотреть файл

@ -18,142 +18,11 @@
#include "ompi_config.h"
#include "include/types.h"
#include "datatype/datatype.h"
#include "mca/pml/base/pml_base_sendreq.h"
#include "bmi_ib.h"
#include "bmi_ib_peer.h"
#include "bmi_ib_proc.h"
#include "bmi_ib_sendfrag.h"
#include "bmi_ib_priv.h"
#include "bmi_ib_memory.h"
static void mca_bmi_ib_send_frag_construct(mca_bmi_ib_send_frag_t* frag);
static void mca_bmi_ib_send_frag_destruct(mca_bmi_ib_send_frag_t* frag);
OBJ_CLASS_INSTANCE(mca_bmi_ib_send_frag_t,
mca_bmi_base_send_frag_t,
mca_bmi_ib_send_frag_construct,
mca_bmi_ib_send_frag_destruct);
/*
* Placeholders for send fragment constructor/destructors.
*/
static void mca_bmi_ib_send_frag_construct(mca_bmi_ib_send_frag_t* frag)
{
frag->frag_progressed = 0;
frag->frag_ack_pending = 0;
}
static void mca_bmi_ib_send_frag_destruct(mca_bmi_ib_send_frag_t* frag)
{
}
/*
* Allocate a IB send descriptor
*
*/
mca_bmi_ib_send_frag_t* mca_bmi_ib_alloc_send_frag(
mca_bmi_ib_module_t* ib_bmi,
mca_bmi_base_send_request_t* request)
{
ompi_free_list_t *flist = &ib_bmi->send_free;
ompi_list_item_t *item;
mca_bmi_ib_send_frag_t *ib_send_frag;
item = ompi_list_remove_first(&((flist)->super));
while(NULL == item) {
mca_bmi_tstamp_t tstamp = 0;
D_PRINT("Gone one NULL descriptor ... trying again");
mca_bmi_ib_component_progress(0);
item = ompi_list_remove_first (&((flist)->super));
}
ib_send_frag = (mca_bmi_ib_send_frag_t *)item;
return ib_send_frag;
}
int mca_bmi_ib_send_frag_register(mca_bmi_ib_module_t *ib_bmi)
{
int i, rc, num_send_frags;
ompi_list_item_t *item;
ompi_free_list_t *flist = &ib_bmi->send_free;
ib_buffer_t *ib_buf_ptr;
mca_bmi_ib_send_frag_t *ib_send_frag;
num_send_frags = ompi_list_get_size(&(flist->super));
item = ompi_list_get_first(&((flist)->super));
/* Register the buffers */
for(i = 0; i < num_send_frags;
item = ompi_list_get_next(item), i++) {
ib_send_frag = (mca_bmi_ib_send_frag_t *) item;
ib_send_frag->frag_progressed = 0;
ib_buf_ptr = (ib_buffer_t *) &ib_send_frag->ib_buf;
rc = mca_bmi_ib_register_mem(ib_bmi->nic, ib_bmi->ptag,
(void*) ib_buf_ptr->buf,
MCA_BMI_IB_FIRST_FRAG_SIZE,
&ib_buf_ptr->hndl);
if(rc != OMPI_SUCCESS) {
return OMPI_ERROR;
}
IB_PREPARE_SEND_DESC(ib_buf_ptr, 0,
MCA_BMI_IB_FIRST_FRAG_SIZE, ib_buf_ptr);
}
return OMPI_SUCCESS;
}
/*
* Process send completions
*
*/
void mca_bmi_ib_send_frag_send_complete(mca_bmi_ib_module_t *ib_bmi, mca_bmi_ib_send_frag_t* sendfrag)
{
mca_bmi_base_header_t *hdr;
mca_bmi_base_send_request_t* req = sendfrag->frag_send.frag_request;
hdr = (mca_bmi_base_header_t *) sendfrag->ib_buf.buf;
switch(hdr->hdr_common.hdr_type) {
case MCA_BMI_HDR_TYPE_MATCH:
if (0 == (hdr->hdr_common.hdr_flags & MCA_BMI_FLAGS_ACK)
|| mca_bmi_base_send_request_matched(req)) {
ib_bmi->super.bmi_send_progress(&ib_bmi->super,
sendfrag->frag_send.frag_request,
hdr->hdr_rndv.hdr_frag_length);
if(req->req_cached == false) {
OMPI_FREE_LIST_RETURN(&ib_bmi->send_free,
((ompi_list_item_t *) sendfrag));
}
}
break;
case MCA_BMI_HDR_TYPE_ACK:
OMPI_FREE_LIST_RETURN(&ib_bmi->send_free,
((ompi_list_item_t *) sendfrag));
break;
case MCA_BMI_HDR_TYPE_FIN:
ib_bmi->super.bmi_send_progress(&ib_bmi->super,
sendfrag->frag_send.frag_request,
hdr->hdr_frag.hdr_frag_length);
OMPI_FREE_LIST_RETURN(&ib_bmi->send_free,
((ompi_list_item_t *) sendfrag));
break;
}
}
void mca_bmi_ib_sendfrag_complete( mca_bmi_ib_module_t * ib_bmi, mca_bmi_ib_send_frag_t* frag)
{
frag->base.des_cbfunc(&ib_bmi->super, &frag->endpoint, &frag->base, frag->rc);
}

Просмотреть файл

@ -20,105 +20,10 @@
#define MCA_BMI_IB_SEND_FRAG_H
#include "ompi_config.h"
#include "mca/bmi/base/bmi_base_sendreq.h"
#include "mca/bmi/base/bmi_base_sendfrag.h"
#include "bmi_ib_frag.h"
#include "bmi_ib.h"
void mca_bmi_ib_sendfrag_complete( mca_bmi_ib_module_t* ib_bmi, mca_bmi_ib_send_frag_t* frag);
#include "bmi_ib_priv.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
OBJ_CLASS_DECLARATION(mca_bmi_ib_send_frag_t);
typedef enum {
MCA_BMI_IB_FRAG_SEND,
MCA_BMI_IB_FRAG_PUT,
MCA_BMI_IB_FRAG_GET,
MCA_BMI_IB_FRAG_ACK
} mca_bmi_ib_frag_type_t;
/**
* IB send fragment derived type.
*/
struct mca_bmi_ib_frag_t {
mca_bmi_base_descriptor_t base;
mca_bmi_base_segment_t segment;
struct mca_bmi_base_endpoint_t *endpoint;
mca_bmi_ib_frag_type_t type;
mca_bmi_base_tag_t tag;
size_t size;
int rc;
bool frag_ack_pending;
};
typedef struct mca_bmi_ib_frag_t mca_bmi_ib_frag_t;
/*
* Allocate an IB send descriptor
*
*/
#define MCA_BMI_IB_FRAG_ALLOC1(frag, rc) \
{
ompi_list_item_t *item;
OMPI_FREE_LIST_WAIT(&mca_bmi_ib_module.ib_frags1, item, rc);
frag = (mca_bmi_ib_frag_t*) item;
}
int mca_bmi_ib_send_frag_register(mca_bmi_ib_module_t *ib_bmi)
{
int i, rc, num_send_frags;
ompi_list_item_t *item;
ompi_free_list_t *flist = &ib_bmi->ib_frags1;
ib_buffer_t *ib_buf_ptr;
mca_bmi_ib_frag_t *ib_frag;
num_send_frags = ompi_list_get_size(&(flist->super));
item = ompi_list_get_first(&((flist)->super));
/* Register the buffers */
for(i = 0; i < num_send_frags;
item = ompi_list_get_next(item), i++) {
ib_send_frag = (mca_bmi_ib_send_frag_t *) item;
ib_send_frag->frag_progressed = 0;
ib_buf_ptr = (ib_buffer_t *) &ib_send_frag->ib_buf;
rc = mca_bmi_ib_register_mem(ib_bmi->nic, ib_bmi->ptag,
(void*) ib_buf_ptr->buf,
MCA_BMI_IB_FIRST_FRAG_SIZE,
&ib_buf_ptr->hndl);
if(rc != OMPI_SUCCESS) {
return OMPI_ERROR;
}
IB_PREPARE_SEND_DESC(ib_buf_ptr, 0,
MCA_BMI_IB_FIRST_FRAG_SIZE, ib_buf_ptr);
}
return OMPI_SUCCESS;
}
struct mca_bmi_ib_module_t;
mca_bmi_ib_send_frag_t* mca_bmi_ib_alloc_send_frag(
struct mca_bmi_ib_module_t* ib_bmi,
mca_bmi_base_send_request_t* request);
int mca_bmi_ib_send_frag_register(struct mca_bmi_ib_module_t *bmi);
void mca_bmi_ib_send_frag_send_complete(struct mca_bmi_ib_module_t *bmi, mca_bmi_ib_send_frag_t*);
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

Просмотреть файл

@ -86,6 +86,12 @@ AC_DEFUN([MCA_CONFIGURE_STUB],[
# some versions of Mellanox (v3.1), we need to expliitly link in
# the thread libraries. #$%#@$%@%#$!!!
# Many vapi.h's have horrid semantics and don't obey ISOC99
# standards. So we have to turn off flags like -pedantic. Sigh.
CFLAGS="`echo $CFLAGS | sed 's/-pedantic//g'`"
mca_ptl_ib_try_find_libvapi() {
func1=[$]1
func2=[$]2

Просмотреть файл

@ -60,11 +60,7 @@ extern mca_bmi_sm_module_resource_t mca_bmi_sm_module_resource;
#define DONE (char)1
#endif
struct mca_bmi_sm_registration_t {
mca_bmi_base_module_recv_cb_fn_t cbfunc;
void *cbdata;
};
typedef struct mca_bmi_sm_registration_t mca_bmi_sm_registration_t;
typedef mca_bmi_base_registration_t mca_bmi_sm_registration_t;
/**