initial changes for rdma protocol
This commit was SVN r5996.
Этот коммит содержится в:
родитель
ba7673a83f
Коммит
fc9a84ec21
@ -234,7 +234,7 @@ int mca_pml_ob1_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
mca_pml_ob1_endpoint_t* endpoint;
|
||||
size_t size;
|
||||
|
||||
/* this ob1 can be used */
|
||||
/* this bmi can be used */
|
||||
bmi_inuse++;
|
||||
|
||||
/* initialize each proc */
|
||||
@ -249,17 +249,18 @@ int mca_pml_ob1_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
}
|
||||
|
||||
/* preallocate space in array for max number of ob1s */
|
||||
mca_pml_ob1_ep_array_reserve(&proc_pml->bmi_first, mca_pml_ob1.num_bmi_modules);
|
||||
mca_pml_ob1_ep_array_reserve(&proc_pml->bmi_next, mca_pml_ob1.num_bmi_modules);
|
||||
mca_pml_ob1_ep_array_reserve(&proc_pml->bmi_eager, mca_pml_ob1.num_bmi_modules);
|
||||
mca_pml_ob1_ep_array_reserve(&proc_pml->bmi_send, mca_pml_ob1.num_bmi_modules);
|
||||
mca_pml_ob1_ep_array_reserve(&proc_pml->bmi_rdma, mca_pml_ob1.num_bmi_modules);
|
||||
proc_pml->proc_ompi = proc;
|
||||
proc->proc_pml = proc_pml;
|
||||
}
|
||||
|
||||
/* dont allow an additional PTL with a lower exclusivity ranking */
|
||||
size = mca_pml_ob1_ep_array_get_size(&proc_pml->bmi_next);
|
||||
size = mca_pml_ob1_ep_array_get_size(&proc_pml->bmi_send);
|
||||
if(size > 0) {
|
||||
endpoint = mca_pml_ob1_ep_array_get_index(&proc_pml->bmi_next, size-1);
|
||||
/* skip this ob1 if the exclusivity is less than the previous */
|
||||
endpoint = mca_pml_ob1_ep_array_get_index(&proc_pml->bmi_send, size-1);
|
||||
/* skip this bmi if the exclusivity is less than the previous */
|
||||
if(endpoint->bmi->bmi_exclusivity > bmi->bmi_exclusivity) {
|
||||
if(bmi_endpoints[p] != NULL) {
|
||||
bmi->bmi_del_procs(bmi, 1, &proc, &bmi_endpoints[p]);
|
||||
@ -269,7 +270,7 @@ int mca_pml_ob1_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
}
|
||||
|
||||
/* cache the endpoint on the proc */
|
||||
endpoint = mca_pml_ob1_ep_array_insert(&proc_pml->bmi_next);
|
||||
endpoint = mca_pml_ob1_ep_array_insert(&proc_pml->bmi_send);
|
||||
endpoint->bmi = bmi;
|
||||
endpoint->bmi_eager_limit = bmi->bmi_eager_limit;
|
||||
endpoint->bmi_min_frag_size = bmi->bmi_min_frag_size;
|
||||
@ -321,10 +322,10 @@ int mca_pml_ob1_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
* note that we need to do this here, as we may already have ob1s configured
|
||||
* (2) determine the highest priority ranking for latency
|
||||
*/
|
||||
n_size = mca_pml_ob1_ep_array_get_size(&proc_pml->bmi_next);
|
||||
n_size = mca_pml_ob1_ep_array_get_size(&proc_pml->bmi_send);
|
||||
for(n_index = 0; n_index < n_size; n_index++) {
|
||||
mca_pml_ob1_endpoint_t* endpoint =
|
||||
mca_pml_ob1_ep_array_get_index(&proc_pml->bmi_next, n_index);
|
||||
mca_pml_ob1_ep_array_get_index(&proc_pml->bmi_send, n_index);
|
||||
mca_bmi_base_module_t* ob1 = endpoint->bmi;
|
||||
total_bandwidth += endpoint->bmi->bmi_bandwidth;
|
||||
if(ob1->bmi_latency > latency)
|
||||
@ -338,7 +339,7 @@ int mca_pml_ob1_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
|
||||
for(n_index = 0; n_index < n_size; n_index++) {
|
||||
mca_pml_ob1_endpoint_t* endpoint =
|
||||
mca_pml_ob1_ep_array_get_index(&proc_pml->bmi_next, n_index);
|
||||
mca_pml_ob1_ep_array_get_index(&proc_pml->bmi_send, n_index);
|
||||
mca_bmi_base_module_t *ob1 = endpoint->bmi;
|
||||
double weight;
|
||||
|
||||
@ -354,9 +355,16 @@ int mca_pml_ob1_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
*/
|
||||
if(ob1->bmi_latency == latency) {
|
||||
mca_pml_ob1_endpoint_t* ep_new =
|
||||
mca_pml_ob1_ep_array_insert(&proc_pml->bmi_first);
|
||||
mca_pml_ob1_ep_array_insert(&proc_pml->bmi_eager);
|
||||
*ep_new = *endpoint;
|
||||
}
|
||||
|
||||
/* check flags - is rdma prefered */
|
||||
if(endpoint->bmi->bmi_flags & MCA_BMI_FLAGS_RDMA &&
|
||||
proc->proc_arch == ompi_proc_local_proc->proc_arch) {
|
||||
mca_pml_ob1_endpoint_t* rdma_ep = mca_pml_ob1_ep_array_insert(&proc_pml->bmi_rdma);
|
||||
*rdma_ep = *endpoint;
|
||||
}
|
||||
}
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
@ -378,9 +386,9 @@ int mca_pml_ob1_del_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
size_t n_index, n_size;
|
||||
|
||||
/* notify each ob1 that the proc is going away */
|
||||
f_size = mca_pml_ob1_ep_array_get_size(&proc_pml->bmi_first);
|
||||
f_size = mca_pml_ob1_ep_array_get_size(&proc_pml->bmi_eager);
|
||||
for(f_index = 0; f_index < f_size; f_index++) {
|
||||
mca_pml_ob1_endpoint_t* endpoint = mca_pml_ob1_ep_array_get_index(&proc_pml->bmi_first, f_index);
|
||||
mca_pml_ob1_endpoint_t* endpoint = mca_pml_ob1_ep_array_get_index(&proc_pml->bmi_eager, f_index);
|
||||
mca_bmi_base_module_t* ob1 = endpoint->bmi;
|
||||
|
||||
rc = ob1->bmi_del_procs(ob1,1,&proc,&endpoint->bmi_endpoint);
|
||||
@ -391,9 +399,9 @@ int mca_pml_ob1_del_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
/* remove this from next array so that we dont call it twice w/
|
||||
* the same address pointer
|
||||
*/
|
||||
n_size = mca_pml_ob1_ep_array_get_size(&proc_pml->bmi_first);
|
||||
n_size = mca_pml_ob1_ep_array_get_size(&proc_pml->bmi_eager);
|
||||
for(n_index = 0; n_index < n_size; n_index++) {
|
||||
mca_pml_ob1_endpoint_t* endpoint = mca_pml_ob1_ep_array_get_index(&proc_pml->bmi_next, n_index);
|
||||
mca_pml_ob1_endpoint_t* endpoint = mca_pml_ob1_ep_array_get_index(&proc_pml->bmi_send, n_index);
|
||||
if(endpoint->bmi == ob1) {
|
||||
memset(endpoint, 0, sizeof(mca_pml_ob1_endpoint_t));
|
||||
break;
|
||||
@ -402,9 +410,9 @@ int mca_pml_ob1_del_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
}
|
||||
|
||||
/* notify each ob1 that was not in the array of ob1s for first fragments */
|
||||
n_size = mca_pml_ob1_ep_array_get_size(&proc_pml->bmi_next);
|
||||
n_size = mca_pml_ob1_ep_array_get_size(&proc_pml->bmi_send);
|
||||
for(n_index = 0; n_index < n_size; n_index++) {
|
||||
mca_pml_ob1_endpoint_t* endpoint = mca_pml_ob1_ep_array_get_index(&proc_pml->bmi_first, n_index);
|
||||
mca_pml_ob1_endpoint_t* endpoint = mca_pml_ob1_ep_array_get_index(&proc_pml->bmi_eager, n_index);
|
||||
mca_bmi_base_module_t* ob1 = endpoint->bmi;
|
||||
if (ob1 != 0) {
|
||||
rc = ob1->bmi_del_procs(ob1,1,&proc,&endpoint->bmi_endpoint);
|
||||
|
@ -52,10 +52,12 @@ struct mca_pml_ob1_t {
|
||||
size_t num_bmi_progress;
|
||||
|
||||
int priority;
|
||||
int free_list_num; /* initial size of free list */
|
||||
int free_list_max; /* maximum size of free list */
|
||||
int free_list_inc; /* number of elements to grow free list */
|
||||
size_t eager_limit;
|
||||
int free_list_num; /* initial size of free list */
|
||||
int free_list_max; /* maximum size of free list */
|
||||
int free_list_inc; /* number of elements to grow free list */
|
||||
size_t eager_limit; /* maximum eager limit size - overrides bmi setting */
|
||||
size_t rdma_offset; /* offset at which we attempt to initiate rdma */
|
||||
size_t rdma_threshold; /* message size at which rdma is attempted */
|
||||
size_t send_pipeline_depth;
|
||||
size_t recv_pipeline_depth;
|
||||
|
||||
|
@ -33,8 +33,8 @@
|
||||
#define MCA_PML_OB1_HDR_TYPE_ACK 3
|
||||
#define MCA_PML_OB1_HDR_TYPE_NACK 4
|
||||
#define MCA_PML_OB1_HDR_TYPE_FRAG 5
|
||||
#define MCA_PML_OB1_HDR_TYPE_SEG 6
|
||||
#define MCA_PML_OB1_HDR_TYPE_GET 7
|
||||
#define MCA_PML_OB1_HDR_TYPE_GET 6
|
||||
#define MCA_PML_OB1_HDR_TYPE_PUT 7
|
||||
#define MCA_PML_OB1_HDR_TYPE_FIN 8
|
||||
#define MCA_PML_OB1_HDR_TYPE_MAX 9
|
||||
|
||||
@ -192,7 +192,7 @@ struct mca_pml_ob1_ack_hdr_t {
|
||||
mca_pml_ob1_common_hdr_t hdr_common; /**< common attributes */
|
||||
ompi_ptr_t hdr_src_req; /**< source request */
|
||||
ompi_ptr_t hdr_dst_req; /**< matched receive request */
|
||||
mca_bmi_base_segment_t hdr_seg; /**< segment */
|
||||
uint64_t hdr_rdma_offset; /**< starting point rdma protocol */
|
||||
};
|
||||
typedef struct mca_pml_ob1_ack_hdr_t mca_pml_ob1_ack_hdr_t;
|
||||
|
||||
@ -208,6 +208,20 @@ typedef struct mca_pml_ob1_ack_hdr_t mca_pml_ob1_ack_hdr_t;
|
||||
(h).hdr_dst_size = hton64((h).hdr_dst_size); \
|
||||
} while (0)
|
||||
|
||||
/**
|
||||
* Header used to initiate an RDMA operation.
|
||||
*/
|
||||
|
||||
struct mca_pml_ob1_rdma_hdr_t {
|
||||
mca_pml_ob1_common_hdr_t hdr_common; /**< common attributes */
|
||||
ompi_ptr_t hdr_src_req; /**< source request */
|
||||
ompi_ptr_t hdr_dst_req; /**< matched receive request */
|
||||
uint64_t hdr_offset; /**< current offset into user buffer */
|
||||
uint32_t hdr_num_segments; /**< number of segments for rdma */
|
||||
mca_bmi_base_segment_t segments[1]; /**< list of segments for rdma */
|
||||
};
|
||||
typedef struct mca_pml_ob1_rdma_hdr_t mca_pml_ob1_rdma_hdr_t;
|
||||
|
||||
/**
|
||||
* Union of defined hdr types.
|
||||
*/
|
||||
@ -217,6 +231,7 @@ union mca_pml_ob1_hdr_t {
|
||||
mca_pml_ob1_rendezvous_hdr_t hdr_rndv;
|
||||
mca_pml_ob1_frag_hdr_t hdr_frag;
|
||||
mca_pml_ob1_ack_hdr_t hdr_ack;
|
||||
mca_pml_ob1_rdma_hdr_t hdr_rdma;
|
||||
};
|
||||
typedef union mca_pml_ob1_hdr_t mca_pml_ob1_hdr_t;
|
||||
|
||||
|
@ -26,16 +26,18 @@ static void mca_pml_ob1_proc_construct(mca_pml_ob1_proc_t* proc)
|
||||
proc->proc_ompi = NULL;
|
||||
proc->proc_sequence = 0;
|
||||
OBJ_CONSTRUCT(&proc->proc_lock, ompi_mutex_t);
|
||||
OBJ_CONSTRUCT(&proc->bmi_first, mca_pml_ob1_ep_array_t);
|
||||
OBJ_CONSTRUCT(&proc->bmi_next, mca_pml_ob1_ep_array_t);
|
||||
OBJ_CONSTRUCT(&proc->bmi_eager, mca_pml_ob1_ep_array_t);
|
||||
OBJ_CONSTRUCT(&proc->bmi_send, mca_pml_ob1_ep_array_t);
|
||||
OBJ_CONSTRUCT(&proc->bmi_rdma, mca_pml_ob1_ep_array_t);
|
||||
}
|
||||
|
||||
|
||||
static void mca_pml_ob1_proc_destruct(mca_pml_ob1_proc_t* proc)
|
||||
{
|
||||
OBJ_DESTRUCT(&proc->proc_lock);
|
||||
OBJ_DESTRUCT(&proc->bmi_first);
|
||||
OBJ_DESTRUCT(&proc->bmi_next);
|
||||
OBJ_DESTRUCT(&proc->bmi_eager);
|
||||
OBJ_DESTRUCT(&proc->bmi_send);
|
||||
OBJ_DESTRUCT(&proc->bmi_rdma);
|
||||
}
|
||||
|
||||
|
||||
|
@ -38,8 +38,9 @@ struct mca_pml_proc_t {
|
||||
ompi_mutex_t proc_lock; /**< lock to protect against concurrent access */
|
||||
int proc_flags; /**< prefered method of accessing this peer */
|
||||
volatile uint32_t proc_sequence; /**< sequence number for send */
|
||||
mca_pml_ob1_ep_array_t bmi_first; /**< array of endpoints to use for first fragments */
|
||||
mca_pml_ob1_ep_array_t bmi_next; /**< array of endpoints to use for remaining fragments */
|
||||
mca_pml_ob1_ep_array_t bmi_eager; /**< array of endpoints to use for first fragments */
|
||||
mca_pml_ob1_ep_array_t bmi_send; /**< array of endpoints to use for remaining fragments */
|
||||
mca_pml_ob1_ep_array_t bmi_rdma; /**< array of endpoints that support (prefer) rdma */
|
||||
};
|
||||
typedef struct mca_pml_proc_t mca_pml_ob1_proc_t;
|
||||
|
||||
@ -86,8 +87,8 @@ static inline struct mca_bmi_base_endpoint_t* mca_pml_ob1_proc_lookup_remote_end
|
||||
struct mca_bmi_base_module_t* bmi)
|
||||
{
|
||||
mca_pml_ob1_proc_t* proc = comm->c_pml_procs[rank];
|
||||
size_t i, size = mca_pml_ob1_ep_array_get_size(&proc->bmi_first);
|
||||
mca_pml_ob1_endpoint_t* endpoint = proc->bmi_first.arr_endpoints;
|
||||
size_t i, size = mca_pml_ob1_ep_array_get_size(&proc->bmi_eager);
|
||||
mca_pml_ob1_endpoint_t* endpoint = proc->bmi_eager.arr_endpoints;
|
||||
for(i = 0; i < size; i++) {
|
||||
if(endpoint->bmi == bmi) {
|
||||
return endpoint->bmi_endpoint;
|
||||
|
@ -79,6 +79,7 @@ void mca_pml_ob1_recv_frag_callback(
|
||||
hdr->hdr_ack.hdr_src_req.pval;
|
||||
sendreq->req_state = MCA_PML_OB1_SR_SEND;
|
||||
sendreq->req_recv = hdr->hdr_ack.hdr_dst_req;
|
||||
sendreq->req_rdma_offset = hdr->hdr_ack.hdr_rdma_offset;
|
||||
mca_pml_ob1_send_request_schedule(sendreq);
|
||||
break;
|
||||
}
|
||||
|
@ -118,10 +118,8 @@ static void mca_pml_ob1_recv_request_ack(
|
||||
mca_pml_ob1_recv_request_t* recvreq,
|
||||
mca_pml_ob1_rendezvous_hdr_t* hdr)
|
||||
{
|
||||
mca_pml_ob1_proc_t* proc = mca_pml_ob1_proc_lookup_remote(
|
||||
recvreq->req_recv.req_base.req_comm,
|
||||
hdr->hdr_match.hdr_src);
|
||||
mca_pml_ob1_endpoint_t* ep = mca_pml_ob1_ep_array_get_next(&proc->bmi_first);
|
||||
mca_pml_ob1_proc_t* proc = recvreq->req_proc;
|
||||
mca_pml_ob1_endpoint_t* ep = mca_pml_ob1_ep_array_get_next(&proc->bmi_eager);
|
||||
mca_bmi_base_descriptor_t* des;
|
||||
mca_pml_ob1_fragment_t* frag;
|
||||
mca_pml_ob1_ack_hdr_t* ack;
|
||||
@ -136,6 +134,25 @@ static void mca_pml_ob1_recv_request_ack(
|
||||
/* fill out header */
|
||||
ack = (mca_pml_ob1_ack_hdr_t*)des->des_src->seg_addr.pval;
|
||||
ack->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_ACK;
|
||||
|
||||
/* use the rdma protocol for this request if:
|
||||
* - size is larger than the rdma threshold
|
||||
* - rdma devices are available
|
||||
*/
|
||||
if(recvreq->req_recv.req_bytes_packed > mca_pml_ob1.rdma_threshold &&
|
||||
mca_pml_ob1_ep_array_get_size(&proc->bmi_rdma)) {
|
||||
|
||||
/* use convertor to figure out the rdma offset for this request */
|
||||
recvreq->req_rdma_offset = mca_pml_ob1.rdma_offset;
|
||||
ompi_convertor_set_position(
|
||||
&recvreq->req_convertor, /* convertor */
|
||||
&recvreq->req_rdma_offset);
|
||||
ack->hdr_rdma_offset = recvreq->req_rdma_offset;
|
||||
} else {
|
||||
recvreq->req_rdma_offset = recvreq->req_recv.req_bytes_packed;
|
||||
ack->hdr_rdma_offset = recvreq->req_recv.req_bytes_packed;
|
||||
}
|
||||
|
||||
ack->hdr_common.hdr_flags = 0;
|
||||
ack->hdr_src_req = hdr->hdr_src_req;
|
||||
ack->hdr_dst_req.pval = recvreq;
|
||||
@ -149,6 +166,7 @@ static void mca_pml_ob1_recv_request_ack(
|
||||
ep->bmi_free(ep->bmi,des);
|
||||
goto retry;
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
/* queue request to retry later */
|
||||
@ -175,6 +193,7 @@ void mca_pml_ob1_recv_request_progress(
|
||||
{
|
||||
size_t bytes_received = 0;
|
||||
size_t bytes_delivered = 0;
|
||||
size_t data_offset = 0;
|
||||
mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
|
||||
|
||||
switch(hdr->hdr_common.hdr_type) {
|
||||
@ -186,7 +205,7 @@ void mca_pml_ob1_recv_request_progress(
|
||||
segments,
|
||||
num_segments,
|
||||
sizeof(mca_pml_ob1_match_hdr_t),
|
||||
0,
|
||||
data_offset,
|
||||
bytes_received,
|
||||
bytes_delivered);
|
||||
break;
|
||||
@ -200,7 +219,7 @@ void mca_pml_ob1_recv_request_progress(
|
||||
segments,
|
||||
num_segments,
|
||||
sizeof(mca_pml_ob1_rendezvous_hdr_t),
|
||||
0,
|
||||
data_offset,
|
||||
bytes_received,
|
||||
bytes_delivered);
|
||||
break;
|
||||
@ -213,7 +232,7 @@ void mca_pml_ob1_recv_request_progress(
|
||||
segments,
|
||||
num_segments,
|
||||
sizeof(mca_pml_ob1_frag_hdr_t),
|
||||
hdr->hdr_frag.frag_offset,
|
||||
hdr->hdr_frag.hdr_frag_offset,
|
||||
bytes_received,
|
||||
bytes_delivered);
|
||||
break;
|
||||
|
@ -31,6 +31,7 @@ struct mca_pml_ob1_recv_request_t {
|
||||
mca_pml_base_recv_request_t req_recv;
|
||||
size_t req_bytes_received;
|
||||
size_t req_bytes_delivered;
|
||||
size_t req_rdma_offset;
|
||||
|
||||
ompi_convertor_t req_convertor;
|
||||
/* note that we allocate additional space for the recv
|
||||
@ -39,6 +40,7 @@ struct mca_pml_ob1_recv_request_t {
|
||||
* the last element of this struct.
|
||||
*/
|
||||
mca_bmi_base_descriptor_t *req_pipeline[1];
|
||||
struct mca_pml_proc_t *req_proc;
|
||||
};
|
||||
typedef struct mca_pml_ob1_recv_request_t mca_pml_ob1_recv_request_t;
|
||||
|
||||
@ -170,11 +172,14 @@ do {
|
||||
ompi_comm_peer_lookup( \
|
||||
(request)->req_recv.req_base.req_comm, (hdr)->hdr_src); \
|
||||
\
|
||||
(request)->req_proc = proc->proc_pml; \
|
||||
ompi_convertor_copy_and_prepare_for_recv( proc->proc_convertor, \
|
||||
(request)->req_recv.req_base.req_datatype, \
|
||||
(request)->req_recv.req_base.req_count, \
|
||||
(request)->req_recv.req_base.req_addr, \
|
||||
&(request)->req_convertor ); \
|
||||
} else { \
|
||||
(request)->req_proc = NULL; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
@ -209,6 +214,9 @@ do {
|
||||
iov_count++; \
|
||||
} \
|
||||
} \
|
||||
ompi_convertor_set_position( \
|
||||
&(request->req_convertor), \
|
||||
&data_offset); \
|
||||
ompi_convertor_unpack( \
|
||||
&(request)->req_convertor, \
|
||||
iov, \
|
||||
|
@ -68,7 +68,47 @@ OBJ_CLASS_INSTANCE(
|
||||
mca_pml_ob1_send_request_destruct);
|
||||
|
||||
/**
|
||||
*
|
||||
* Completion of a short message - nothing left to schedule.
|
||||
*/
|
||||
|
||||
static void mca_pml_ob1_short_completion(
|
||||
mca_bmi_base_module_t* bmi,
|
||||
struct mca_bmi_base_endpoint_t* ep,
|
||||
struct mca_bmi_base_descriptor_t* descriptor,
|
||||
int status)
|
||||
{
|
||||
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)descriptor->des_cbdata;
|
||||
mca_pml_ob1_endpoint_t* bmi_ep = sendreq->req_endpoint;
|
||||
|
||||
/* attempt to cache the descriptor */
|
||||
MCA_PML_OB1_ENDPOINT_DES_RETURN(bmi_ep,descriptor);
|
||||
|
||||
/* signal request completion */
|
||||
OMPI_THREAD_LOCK(&ompi_request_lock);
|
||||
sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed;
|
||||
sendreq->req_send.req_base.req_pml_complete = true;
|
||||
if (sendreq->req_send.req_base.req_ompi.req_complete == false) {
|
||||
sendreq->req_send.req_base.req_ompi.req_status.MPI_SOURCE = sendreq->req_send.req_base.req_comm->c_my_rank;
|
||||
sendreq->req_send.req_base.req_ompi.req_status.MPI_TAG = sendreq->req_send.req_base.req_tag;
|
||||
sendreq->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
sendreq->req_send.req_base.req_ompi.req_status._count = sendreq->req_send.req_bytes_packed;
|
||||
sendreq->req_send.req_base.req_ompi.req_complete = true;
|
||||
sendreq->req_state = MCA_PML_OB1_SR_COMPLETE;
|
||||
if(ompi_request_waiting) {
|
||||
ompi_condition_broadcast(&ompi_request_cond);
|
||||
}
|
||||
} else if(sendreq->req_send.req_base.req_free_called) {
|
||||
MCA_PML_OB1_FREE((ompi_request_t**)&sendreq);
|
||||
} else if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) {
|
||||
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq);
|
||||
sendreq->req_state = MCA_PML_OB1_SR_COMPLETE;
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&ompi_request_lock);
|
||||
}
|
||||
|
||||
/**
|
||||
* Completion of a long or synchronous message - may need to schedule
|
||||
* additional fragments.
|
||||
*/
|
||||
|
||||
static void mca_pml_ob1_send_completion(
|
||||
@ -79,14 +119,38 @@ static void mca_pml_ob1_send_completion(
|
||||
{
|
||||
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)descriptor->des_cbdata;
|
||||
mca_pml_ob1_endpoint_t* bmi_ep = sendreq->req_endpoint;
|
||||
mca_bmi_base_segment_t* segments = descriptor->des_src;
|
||||
size_t i;
|
||||
|
||||
/* for now - return the descriptor - may cache these at some point */
|
||||
MCA_PML_OB1_ENDPOINT_DES_RETURN(bmi_ep,descriptor);
|
||||
OMPI_THREAD_LOCK(&ompi_request_lock);
|
||||
|
||||
/* count bytes of user data actually delivered */
|
||||
for(i=0; i<descriptor->des_src_cnt; i++) {
|
||||
sendreq->req_bytes_delivered += segments[i].seg_len;
|
||||
}
|
||||
|
||||
/* adjust for message header */
|
||||
switch(((mca_pml_ob1_common_hdr_t*)segments->seg_addr.pval)->hdr_type) {
|
||||
case MCA_PML_OB1_HDR_TYPE_MATCH:
|
||||
sendreq->req_bytes_delivered -= sizeof(mca_pml_ob1_match_hdr_t);
|
||||
break;
|
||||
case MCA_PML_OB1_HDR_TYPE_RNDV:
|
||||
sendreq->req_bytes_delivered -= sizeof(mca_pml_ob1_rendezvous_hdr_t);
|
||||
break;
|
||||
case MCA_PML_OB1_HDR_TYPE_FRAG:
|
||||
sendreq->req_bytes_delivered -= sizeof(mca_pml_ob1_frag_hdr_t);
|
||||
break;
|
||||
default:
|
||||
ompi_output(0, "mca_pml_ob1_send_completion: invalid header type\n");
|
||||
break;
|
||||
}
|
||||
|
||||
/* return the descriptor */
|
||||
bmi_ep->bmi_free(bmi_ep->bmi, descriptor);
|
||||
|
||||
/* check for request completion */
|
||||
OMPI_THREAD_LOCK(&ompi_request_lock);
|
||||
if (OMPI_THREAD_ADD32(&sendreq->req_pending,-1) == 0 &&
|
||||
sendreq->req_offset == sendreq->req_send.req_bytes_packed) {
|
||||
if (OMPI_THREAD_ADD32(&sendreq->req_send_pending,-1) == 0 &&
|
||||
sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
|
||||
sendreq->req_send.req_base.req_pml_complete = true;
|
||||
if (sendreq->req_send.req_base.req_ompi.req_complete == false) {
|
||||
sendreq->req_send.req_base.req_ompi.req_status.MPI_SOURCE = sendreq->req_send.req_base.req_comm->c_my_rank;
|
||||
@ -123,24 +187,12 @@ static void mca_pml_ob1_send_completion(
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* BMI can send directly from user allocated memory.
|
||||
*/
|
||||
|
||||
int mca_pml_ob1_send_request_start_user(
|
||||
mca_pml_ob1_send_request_t* sendreq,
|
||||
mca_pml_ob1_endpoint_t* endpoint)
|
||||
{
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* BMI requires "specially" allocated memory. Request a segment that
|
||||
* is used for initial hdr and any eager data.
|
||||
*/
|
||||
|
||||
int mca_pml_ob1_send_request_start_copy(
|
||||
int mca_pml_ob1_send_request_start(
|
||||
mca_pml_ob1_send_request_t* sendreq,
|
||||
mca_pml_ob1_endpoint_t* endpoint)
|
||||
{
|
||||
@ -151,38 +203,32 @@ int mca_pml_ob1_send_request_start_copy(
|
||||
int rc;
|
||||
|
||||
/* shortcut for zero byte */
|
||||
if(size == 0) {
|
||||
if(size == 0 && sendreq->req_send.req_send_mode != MCA_PML_BASE_SEND_SYNCHRONOUS) {
|
||||
|
||||
/* allocate a descriptor */
|
||||
MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor);
|
||||
if(NULL == descriptor) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
descriptor->des_cbfunc = mca_pml_ob1_send_completion;
|
||||
segment = descriptor->des_src;
|
||||
segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t);
|
||||
|
||||
/* build hdr */
|
||||
hdr = (mca_pml_ob1_hdr_t*)segment->seg_addr.pval;
|
||||
hdr->hdr_common.hdr_flags = 0;
|
||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
|
||||
hdr->hdr_match.hdr_contextid = sendreq->req_send.req_base.req_comm->c_contextid;
|
||||
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
|
||||
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
|
||||
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
|
||||
hdr->hdr_match.hdr_msg_length = sendreq->req_send.req_bytes_packed;
|
||||
hdr->hdr_match.hdr_msg_length = 0;
|
||||
hdr->hdr_match.hdr_msg_seq = sendreq->req_send.req_base.req_sequence;
|
||||
|
||||
/* if an acknowledgment is not required - can get by w/ shorter hdr */
|
||||
if (sendreq->req_send.req_send_mode != MCA_PML_BASE_SEND_SYNCHRONOUS) {
|
||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
|
||||
segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t);
|
||||
ompi_request_complete((ompi_request_t*)sendreq);
|
||||
} else {
|
||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV;
|
||||
hdr->hdr_rndv.hdr_frag_length = 0;
|
||||
hdr->hdr_rndv.hdr_src_req.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_rndv.hdr_src_req.pval = sendreq;
|
||||
segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t);
|
||||
}
|
||||
/* short message */
|
||||
descriptor->des_cbfunc = mca_pml_ob1_short_completion;
|
||||
|
||||
/* request is complete at mpi level */
|
||||
ompi_request_complete((ompi_request_t*)sendreq);
|
||||
|
||||
} else {
|
||||
|
||||
@ -199,28 +245,16 @@ int mca_pml_ob1_send_request_start_copy(
|
||||
ack = true;
|
||||
}
|
||||
|
||||
/* allocate space for hdr + first fragment */
|
||||
MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor);
|
||||
if(NULL == descriptor) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
descriptor->des_cbfunc = mca_pml_ob1_send_completion;
|
||||
segment = descriptor->des_src;
|
||||
|
||||
/* build hdr */
|
||||
hdr = (mca_pml_ob1_hdr_t*)segment->seg_addr.pval;
|
||||
hdr->hdr_common.hdr_flags = 0;
|
||||
hdr->hdr_match.hdr_contextid = sendreq->req_send.req_base.req_comm->c_contextid;
|
||||
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
|
||||
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
|
||||
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
|
||||
hdr->hdr_match.hdr_msg_length = sendreq->req_send.req_bytes_packed;
|
||||
hdr->hdr_match.hdr_msg_seq = sendreq->req_send.req_base.req_sequence;
|
||||
|
||||
/* if an acknowledgment is not required - can get by w/ shorter hdr */
|
||||
if (ack == false) {
|
||||
int32_t free_after;
|
||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
|
||||
|
||||
/* allocate descriptor */
|
||||
MCA_PML_OB1_ENDPOINT_DES_ALLOC(endpoint, descriptor);
|
||||
if(NULL == descriptor) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
segment = descriptor->des_src;
|
||||
|
||||
/* pack the data into the supplied buffer */
|
||||
iov.iov_base = (unsigned char*)segment->seg_addr.pval + sizeof(mca_pml_ob1_match_hdr_t);
|
||||
@ -236,21 +270,39 @@ int mca_pml_ob1_send_request_start_copy(
|
||||
endpoint->bmi_free(endpoint->bmi, descriptor);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* update length w/ number of bytes actually packed */
|
||||
segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t) + max_data;
|
||||
|
||||
sendreq->req_offset = max_data;
|
||||
if(sendreq->req_offset == sendreq->req_send.req_bytes_packed) {
|
||||
ompi_request_complete((ompi_request_t*)sendreq);
|
||||
}
|
||||
/* build match header */
|
||||
hdr = (mca_pml_ob1_hdr_t*)segment->seg_addr.pval;
|
||||
hdr->hdr_common.hdr_flags = 0;
|
||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH;
|
||||
hdr->hdr_match.hdr_contextid = sendreq->req_send.req_base.req_comm->c_contextid;
|
||||
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
|
||||
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
|
||||
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
|
||||
hdr->hdr_match.hdr_msg_length = sendreq->req_send.req_bytes_packed;
|
||||
hdr->hdr_match.hdr_msg_seq = sendreq->req_send.req_base.req_sequence;
|
||||
|
||||
/* update lengths */
|
||||
segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t) + max_data;
|
||||
sendreq->req_send_offset = max_data;
|
||||
sendreq->req_rdma_offset = max_data;
|
||||
|
||||
/* short message */
|
||||
descriptor->des_cbfunc = mca_pml_ob1_short_completion;
|
||||
|
||||
/* request is complete at mpi level */
|
||||
ompi_request_complete((ompi_request_t*)sendreq);
|
||||
|
||||
/* rendezvous header is required */
|
||||
} else {
|
||||
int32_t free_after;
|
||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV;
|
||||
hdr->hdr_rndv.hdr_src_req.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_rndv.hdr_src_req.pval = sendreq;
|
||||
|
||||
/* allocate space for hdr + first fragment */
|
||||
descriptor = endpoint->bmi_alloc(endpoint->bmi, size);
|
||||
if(NULL == descriptor) {
|
||||
return OMPI_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
segment = descriptor->des_src;
|
||||
|
||||
/* pack the data into the supplied buffer */
|
||||
iov.iov_base = (unsigned char*)segment->seg_addr.pval + sizeof(mca_pml_ob1_rendezvous_hdr_t);
|
||||
@ -267,13 +319,30 @@ int mca_pml_ob1_send_request_start_copy(
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* build hdr */
|
||||
hdr = (mca_pml_ob1_hdr_t*)segment->seg_addr.pval;
|
||||
hdr->hdr_common.hdr_flags = 0;
|
||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV;
|
||||
hdr->hdr_match.hdr_contextid = sendreq->req_send.req_base.req_comm->c_contextid;
|
||||
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
|
||||
hdr->hdr_match.hdr_dst = sendreq->req_send.req_base.req_peer;
|
||||
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
|
||||
hdr->hdr_match.hdr_msg_length = sendreq->req_send.req_bytes_packed;
|
||||
hdr->hdr_match.hdr_msg_seq = sendreq->req_send.req_base.req_sequence;
|
||||
hdr->hdr_rndv.hdr_src_req.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_rndv.hdr_src_req.pval = sendreq;
|
||||
hdr->hdr_rndv.hdr_frag_length = max_data;
|
||||
|
||||
/* update lengths with number of bytes actually packed */
|
||||
segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t) + max_data;
|
||||
sendreq->req_offset = max_data;
|
||||
sendreq->req_send_offset = max_data;
|
||||
|
||||
/* long message */
|
||||
descriptor->des_cbfunc = mca_pml_ob1_send_completion;
|
||||
}
|
||||
}
|
||||
descriptor->des_cbdata = sendreq;
|
||||
OMPI_THREAD_ADD32(&sendreq->req_pending,1);
|
||||
OMPI_THREAD_ADD32(&sendreq->req_send_pending,1);
|
||||
|
||||
/* send */
|
||||
rc = endpoint->bmi_send(
|
||||
@ -302,12 +371,12 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
|
||||
*/
|
||||
if(OMPI_THREAD_ADD32(&sendreq->req_lock,1) == 1) {
|
||||
mca_pml_ob1_proc_t* proc = sendreq->req_proc;
|
||||
size_t num_bmi_avail = mca_pml_ob1_ep_array_get_size(&proc->bmi_next);
|
||||
size_t num_bmi_avail = mca_pml_ob1_ep_array_get_size(&proc->bmi_send);
|
||||
do {
|
||||
/* allocate remaining bytes to PTLs */
|
||||
size_t bytes_remaining = sendreq->req_send.req_bytes_packed - sendreq->req_offset;
|
||||
while(bytes_remaining > 0 && sendreq->req_pending < mca_pml_ob1.send_pipeline_depth) {
|
||||
mca_pml_ob1_endpoint_t* ep = mca_pml_ob1_ep_array_get_next(&proc->bmi_next);
|
||||
size_t bytes_remaining = sendreq->req_rdma_offset - sendreq->req_send_offset;
|
||||
while(bytes_remaining > 0 && sendreq->req_send_pending < mca_pml_ob1.send_pipeline_depth) {
|
||||
mca_pml_ob1_endpoint_t* ep = mca_pml_ob1_ep_array_get_next(&proc->bmi_send);
|
||||
mca_pml_ob1_frag_hdr_t* hdr;
|
||||
mca_bmi_base_descriptor_t* des;
|
||||
int rc;
|
||||
@ -358,21 +427,21 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
|
||||
hdr->hdr_common.hdr_flags = 0;
|
||||
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_FRAG;
|
||||
hdr->hdr_frag_length = size;
|
||||
hdr->hdr_frag_offset = sendreq->req_offset;
|
||||
hdr->hdr_frag_offset = sendreq->req_send_offset;
|
||||
hdr->hdr_src_req.pval = sendreq;
|
||||
hdr->hdr_dst_req = sendreq->req_recv;
|
||||
|
||||
/* update state */
|
||||
sendreq->req_offset += size;
|
||||
OMPI_THREAD_ADD32(&sendreq->req_pending,1);
|
||||
sendreq->req_send_offset += size;
|
||||
OMPI_THREAD_ADD32(&sendreq->req_send_pending,1);
|
||||
|
||||
/* initiate send - note that this may complete before the call returns */
|
||||
rc = ep->bmi_send(ep->bmi, ep->bmi_endpoint, des, MCA_BMI_TAG_PML);
|
||||
if(rc == OMPI_SUCCESS) {
|
||||
bytes_remaining = sendreq->req_send.req_bytes_packed - sendreq->req_offset;
|
||||
bytes_remaining = sendreq->req_rdma_offset - sendreq->req_send_offset;
|
||||
} else {
|
||||
sendreq->req_offset -= size;
|
||||
OMPI_THREAD_ADD32(&sendreq->req_pending,-1);
|
||||
sendreq->req_send_offset -= size;
|
||||
OMPI_THREAD_ADD32(&sendreq->req_send_pending,-1);
|
||||
OMPI_THREAD_LOCK(&mca_pml_ob1.lock);
|
||||
ompi_list_append(&mca_pml_ob1.send_pending, (ompi_list_item_t*)sendreq);
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_ob1.lock);
|
||||
|
@ -46,8 +46,10 @@ struct mca_pml_ob1_send_request_t {
|
||||
mca_pml_ob1_send_request_state_t req_state;
|
||||
ompi_ptr_t req_recv;
|
||||
int32_t req_lock;
|
||||
size_t req_pending;
|
||||
size_t req_offset;
|
||||
size_t req_bytes_delivered;
|
||||
size_t req_send_pending;
|
||||
size_t req_send_offset;
|
||||
size_t req_rdma_offset;
|
||||
};
|
||||
typedef struct mca_pml_ob1_send_request_t mca_pml_ob1_send_request_t;
|
||||
|
||||
@ -108,10 +110,11 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
||||
mca_pml_ob1_proc_t* proc = sendreq->req_proc; \
|
||||
\
|
||||
/* select next endpoint */ \
|
||||
endpoint = mca_pml_ob1_ep_array_get_next(&proc->bmi_first); \
|
||||
endpoint = mca_pml_ob1_ep_array_get_next(&proc->bmi_eager); \
|
||||
sendreq->req_lock = 0; \
|
||||
sendreq->req_offset = 0; \
|
||||
sendreq->req_pending = 0; \
|
||||
sendreq->req_bytes_delivered = 0; \
|
||||
sendreq->req_send_offset = 0; \
|
||||
sendreq->req_send_pending = 0; \
|
||||
sendreq->req_state = MCA_PML_OB1_SR_START; \
|
||||
sendreq->req_send.req_base.req_ompi.req_complete = false; \
|
||||
sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
|
||||
@ -123,11 +126,7 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
||||
mca_pml_base_bsend_request_start(&sendreq->req_send.req_base.req_ompi); \
|
||||
} \
|
||||
\
|
||||
if(NULL != endpoint->bmi_alloc) { \
|
||||
rc = mca_pml_ob1_send_request_start_copy(sendreq, endpoint); \
|
||||
} else { \
|
||||
rc = mca_pml_ob1_send_request_start_user(sendreq, endpoint); \
|
||||
} \
|
||||
rc = mca_pml_ob1_send_request_start(sendreq, endpoint); \
|
||||
}
|
||||
|
||||
/*
|
||||
@ -143,23 +142,11 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
|
||||
&mca_pml_ob1.send_requests, (ompi_list_item_t*)sendreq); \
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* BMI doesn't require pre-pinned or "specially" allocated memory.
|
||||
* Can try to directly send from the users buffer if contigous.
|
||||
*
|
||||
*/
|
||||
|
||||
int mca_pml_ob1_send_request_start_user(
|
||||
mca_pml_ob1_send_request_t* sendreq,
|
||||
mca_pml_ob1_endpoint_t* endpoint);
|
||||
|
||||
|
||||
/**
|
||||
* BMI requires "specially" allocated memory. Request a segment that
|
||||
* is used for initial hdr and any eager data.
|
||||
*/
|
||||
|
||||
int mca_pml_ob1_send_request_start_copy(
|
||||
int mca_pml_ob1_send_request_start(
|
||||
mca_pml_ob1_send_request_t* sendreq,
|
||||
mca_pml_ob1_endpoint_t* endpoint);
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user