1
1
This commit was SVN r6018.
Этот коммит содержится в:
Tim Woodall 2005-06-09 20:16:33 +00:00
родитель 95cd062784
Коммит 3e7ffb6399
17 изменённых файлов: 429 добавлений и 108 удалений

Просмотреть файл

@ -398,7 +398,7 @@ typedef int (*mca_bmi_base_module_free_fn_t)(
*/
typedef struct mca_bmi_base_descriptor_t* (*mca_bmi_base_module_prepare_fn_t)(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* peer,
struct mca_bmi_base_endpoint_t* endpoint,
struct ompi_convertor_t* convertor,
size_t reserve,
size_t* size

Просмотреть файл

@ -202,7 +202,6 @@ int mca_bmi_ib_free(
*/
mca_bmi_base_descriptor_t* mca_bmi_ib_prepare_src(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* peer,
struct ompi_convertor_t* convertor,
size_t reserve,
size_t* size
@ -323,7 +322,6 @@ mca_bmi_base_descriptor_t* mca_bmi_ib_prepare_src(
*/
mca_bmi_base_descriptor_t* mca_bmi_ib_prepare_dst(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* peer,
struct ompi_convertor_t* convertor,
size_t reserve,
size_t* size)

Просмотреть файл

@ -772,11 +772,10 @@ extern int mca_bmi_sm_free(
* Pack data
*
* @param bmi (IN) BMI module
* @param peer (IN) BMI peer addressing
*/
struct mca_bmi_base_descriptor_t* mca_bmi_sm_prepare_src(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* peer,
struct mca_bmi_base_endpoint_t* endpoint,
struct ompi_convertor_t* convertor,
size_t reserve,
size_t* size)

Просмотреть файл

@ -296,7 +296,7 @@ extern int mca_bmi_sm_free(
*/
struct mca_bmi_base_descriptor_t* mca_bmi_sm_prepare_src(
struct mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* peer,
struct mca_bmi_base_endpoint_t* endpoint,
struct ompi_convertor_t* convertor,
size_t reserve,
size_t* size

Просмотреть файл

@ -31,6 +31,8 @@ libmca_pml_ob1_la_SOURCES = \
pml_ob1_proc.c \
pml_ob1_proc.h \
pml_ob1_progress.c \
pml_ob1_rdmafrag.c \
pml_ob1_rdmafrag.h \
pml_ob1_recvfrag.c \
pml_ob1_recvfrag.h \
pml_ob1_recvreq.c \

Просмотреть файл

@ -20,6 +20,7 @@
#ifndef MCA_PML_OB1_H
#define MCA_PML_OB1_H
#include "ompi_config.h"
#include "threads/thread.h"
#include "threads/condition.h"
#include "class/ompi_free_list.h"
@ -67,13 +68,15 @@ struct mca_pml_ob1_t {
/* free lists */
ompi_free_list_t send_requests;
ompi_free_list_t recv_requests;
ompi_free_list_t rdma_frags;
ompi_free_list_t recv_frags;
ompi_free_list_t buffers;
ompi_free_list_t fragments;
/* list of pending send requests */
/* list of pending operations */
ompi_list_t acks_pending;
ompi_list_t send_pending;
ompi_list_t recv_pending;
ompi_list_t acks_pending;
ompi_list_t rdma_pending;
};
typedef struct mca_pml_ob1_t mca_pml_ob1_t;

Просмотреть файл

@ -28,6 +28,7 @@
#include "pml_ob1_hdr.h"
#include "pml_ob1_sendreq.h"
#include "pml_ob1_recvreq.h"
#include "pml_ob1_rdmafrag.h"
#include "pml_ob1_recvfrag.h"
@ -83,7 +84,8 @@ int mca_pml_ob1_component_open(void)
OBJ_CONSTRUCT(&mca_pml_ob1.recv_requests, ompi_free_list_t);
/* fragments */
OBJ_CONSTRUCT(&mca_pml_ob1.fragments, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_pml_ob1.rdma_frags, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_pml_ob1.recv_frags, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_pml_ob1.buffers, ompi_free_list_t);
/* pending operations */
@ -152,6 +154,9 @@ int mca_pml_ob1_component_close(void)
OBJ_DESTRUCT(&mca_pml_ob1.recv_pending);
OBJ_DESTRUCT(&mca_pml_ob1.send_requests);
OBJ_DESTRUCT(&mca_pml_ob1.recv_requests);
OBJ_DESTRUCT(&mca_pml_ob1.rdma_frags);
OBJ_DESTRUCT(&mca_pml_ob1.recv_frags);
OBJ_DESTRUCT(&mca_pml_ob1.buffers);
OBJ_DESTRUCT(&mca_pml_ob1.lock);
return OMPI_SUCCESS;
}
@ -184,11 +189,20 @@ mca_pml_base_module_t* mca_pml_ob1_component_init(int* priority,
mca_pml_ob1.free_list_inc,
NULL);
/* recv fragments */
/* fragments */
ompi_free_list_init(
&mca_pml_ob1.fragments,
sizeof(mca_pml_ob1_fragment_t),
OBJ_CLASS(mca_pml_ob1_fragment_t),
&mca_pml_ob1.rdma_frags,
sizeof(mca_pml_ob1_rdma_frag_t),
OBJ_CLASS(mca_pml_ob1_rdma_frag_t),
mca_pml_ob1.free_list_num,
mca_pml_ob1.free_list_max,
mca_pml_ob1.free_list_inc,
NULL);
ompi_free_list_init(
&mca_pml_ob1.recv_frags,
sizeof(mca_pml_ob1_recv_frag_t),
OBJ_CLASS(mca_pml_ob1_recv_frag_t),
mca_pml_ob1.free_list_num,
mca_pml_ob1.free_list_max,
mca_pml_ob1.free_list_inc,

Просмотреть файл

@ -155,6 +155,24 @@ static inline mca_pml_ob1_endpoint_t* mca_pml_ob1_ep_array_get_next(mca_pml_ob1_
return endpoint;
}
/**
* Locate an element in the array
*
* @param array (IN)
* @param index (IN)
*/
static inline mca_pml_ob1_endpoint_t* mca_pml_ob1_ep_array_find(
mca_pml_ob1_ep_array_t* array, struct mca_bmi_base_module_t* bmi)
{
size_t i=0;
for(i=0; i<array->arr_size; i++) {
if(array->arr_endpoints[i].bmi == bmi) {
return &array->arr_endpoints[i];
}
}
return NULL;
}
/**
* Allocate a descriptor
*/

Просмотреть файл

@ -216,12 +216,25 @@ struct mca_pml_ob1_rdma_hdr_t {
mca_pml_ob1_common_hdr_t hdr_common; /**< common attributes */
ompi_ptr_t hdr_src; /**< source request/descriptor */
ompi_ptr_t hdr_dst; /**< receive request/descriptor */
uint64_t hdr_offset; /**< current offset into user buffer */
uint64_t hdr_rdma_offset; /**< current offset into user buffer */
uint32_t hdr_seg_cnt; /**< number of segments for rdma */
mca_bmi_base_segment_t hdr_segs[1]; /**< list of segments for rdma */
};
typedef struct mca_pml_ob1_rdma_hdr_t mca_pml_ob1_rdma_hdr_t;
/**
* Header used to complete an RDMA operation.
*/
struct mca_pml_ob1_fin_hdr_t {
mca_pml_ob1_common_hdr_t hdr_common; /**< common attributes */
ompi_ptr_t hdr_src; /**< source request/descriptor */
ompi_ptr_t hdr_dst; /**< receive request/descriptor */
uint64_t hdr_rdma_offset; /**< data offset */
uint64_t hdr_rdma_length; /**< number of segments for rdma */
};
typedef struct mca_pml_ob1_fin_hdr_t mca_pml_ob1_fin_hdr_t;
/**
* Union of defined hdr types.
*/
@ -232,6 +245,7 @@ union mca_pml_ob1_hdr_t {
mca_pml_ob1_frag_hdr_t hdr_frag;
mca_pml_ob1_ack_hdr_t hdr_ack;
mca_pml_ob1_rdma_hdr_t hdr_rdma;
mca_pml_ob1_fin_hdr_t hdr_fin;
};
typedef union mca_pml_ob1_hdr_t mca_pml_ob1_hdr_t;

9
src/mca/pml/ob1/pml_ob1_rdmafrag.c Обычный файл
Просмотреть файл

@ -0,0 +1,9 @@
#include "pml_ob1.h"
#include "pml_ob1_rdmafrag.h"
OBJ_CLASS_INSTANCE(
mca_pml_ob1_rdma_frag_t,
ompi_list_item_t,
NULL,
NULL);

66
src/mca/pml/ob1/pml_ob1_rdmafrag.h Обычный файл
Просмотреть файл

@ -0,0 +1,66 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_PML_OB1_RDMAFRAG_H
#define MCA_PML_OB1_RDMAFRAG_H
#include "mca/bmi/bmi.h"
#include "pml_ob1_hdr.h"
typedef enum {
MCA_PML_OB1_RMDA_INIT,
MCA_PML_OB1_RDMA_PREPARE,
MCA_PML_OB1_RDMA_PUT,
MCA_PML_OB1_RDMA_FIN
} mca_pml_ob1_rdma_state_t;
struct mca_pml_ob1_rdma_frag_t {
ompi_list_item_t super;
mca_bmi_base_module_t* rdma_bmi;
mca_pml_ob1_hdr_t rdma_hdr;
mca_pml_ob1_rdma_state_t rdma_state;
size_t rdma_length;
mca_bmi_base_segment_t rdma_segs[MCA_BMI_DES_MAX_SEGMENTS];
struct mca_pml_ob1_send_request_t* rdma_req;
};
typedef struct mca_pml_ob1_rdma_frag_t mca_pml_ob1_rdma_frag_t;
OBJ_CLASS_DECLARATION(mca_pml_ob1_rdma_frag_t);
#define MCA_PML_OB1_RDMA_FRAG_ALLOC(frag,rc) \
do { \
ompi_list_item_t* item; \
OMPI_FREE_LIST_WAIT(&mca_pml_ob1.rdma_frags, item, rc); \
frag = (mca_pml_ob1_rdma_frag_t*)item; \
} while(0)
#define MCA_PML_OB1_RDMA_FRAG_RETURN(frag) \
do { \
/* return fragment */ \
OMPI_FREE_LIST_RETURN(&mca_pml_ob1.rdma_frags, \
(ompi_list_item_t*)frag); \
} while(0)
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

Просмотреть файл

@ -42,7 +42,7 @@ OBJ_CLASS_INSTANCE(
);
OBJ_CLASS_INSTANCE(
mca_pml_ob1_fragment_t,
mca_pml_ob1_recv_frag_t,
ompi_list_item_t,
NULL,
NULL
@ -99,6 +99,11 @@ void mca_pml_ob1_recv_frag_callback(
}
case MCA_PML_OB1_HDR_TYPE_FIN:
{
mca_bmi_base_descriptor_t* dst = (mca_bmi_base_descriptor_t*)
hdr->hdr_fin.hdr_dst.pval;
mca_pml_ob1_recv_request_t* recvreq = (mca_pml_ob1_recv_request_t*)dst->des_cbdata;
mca_pml_ob1_recv_request_progress(recvreq,bmi,segments,des->des_src_cnt);
bmi->bmi_free(bmi,dst);
break;
}
default:
@ -478,13 +483,13 @@ int mca_pml_ob1_recv_frag_match(
} else {
/* if no match found, place on unexpected queue */
mca_pml_ob1_fragment_t* frag;
MCA_PML_OB1_FRAG_ALLOC(frag, rc);
mca_pml_ob1_recv_frag_t* frag;
MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc);
if(OMPI_SUCCESS != rc) {
OMPI_THREAD_UNLOCK(&pml_comm->matching_lock);
return rc;
}
MCA_PML_OB1_FRAG_INIT(frag,bmi,hdr,segments,num_segments);
MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segments,num_segments);
ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag );
}
@ -503,13 +508,13 @@ int mca_pml_ob1_recv_frag_match(
* This message comes after the next expected, so it
* is ahead of sequence. Save it for later.
*/
mca_pml_ob1_fragment_t* frag;
MCA_PML_OB1_FRAG_ALLOC(frag, rc);
mca_pml_ob1_recv_frag_t* frag;
MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc);
if(OMPI_SUCCESS != rc) {
OMPI_THREAD_UNLOCK(&pml_comm->matching_lock);
return rc;
}
MCA_PML_OB1_FRAG_INIT(frag,bmi,hdr,segments,num_segments);
MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segments,num_segments);
ompi_list_append(&proc->frags_cant_match, (ompi_list_item_t *)frag);
}
@ -526,10 +531,10 @@ int mca_pml_ob1_recv_frag_match(
if(additional_match) {
ompi_list_item_t* item;
while(NULL != (item = ompi_list_remove_first(&additional_matches))) {
mca_pml_ob1_fragment_t* frag = (mca_pml_ob1_fragment_t*)item;
mca_pml_ob1_recv_frag_t* frag = (mca_pml_ob1_recv_frag_t*)item;
MCA_PML_OB1_RECV_REQUEST_MATCHED(frag->request, hdr);
mca_pml_ob1_recv_request_progress(frag->request,frag->bmi,frag->segments,frag->num_segments);
MCA_PML_OB1_FRAG_RETURN(frag);
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
}
}
return OMPI_SUCCESS;
@ -559,7 +564,7 @@ static bool mca_pml_ob1_check_cantmatch_for_match(
/* local parameters */
int match_found;
uint16_t next_msg_seq_expected, frag_seq;
mca_pml_ob1_fragment_t *frag;
mca_pml_ob1_recv_frag_t *frag;
mca_pml_ob1_recv_request_t *match = NULL;
bool match_made = false;
@ -580,11 +585,11 @@ static bool mca_pml_ob1_check_cantmatch_for_match(
/* search the list for a fragment from the send with sequence
* number next_msg_seq_expected
*/
for(frag = (mca_pml_ob1_fragment_t *)
for(frag = (mca_pml_ob1_recv_frag_t *)
ompi_list_get_first(&proc->frags_cant_match);
frag != (mca_pml_ob1_fragment_t *)
frag != (mca_pml_ob1_recv_frag_t *)
ompi_list_get_end(&proc->frags_cant_match);
frag = (mca_pml_ob1_fragment_t *)
frag = (mca_pml_ob1_recv_frag_t *)
ompi_list_get_next(frag))
{
/*

Просмотреть файл

@ -32,7 +32,7 @@ typedef struct mca_pml_ob1_buffer_t mca_pml_ob1_buffer_t;
OBJ_CLASS_DECLARATION(mca_pml_ob1_buffer_t);
struct mca_pml_ob1_fragment_t {
struct mca_pml_ob1_recv_frag_t {
ompi_list_item_t super;
mca_bmi_base_module_t* bmi;
mca_pml_ob1_hdr_t hdr;
@ -41,26 +41,26 @@ struct mca_pml_ob1_fragment_t {
mca_bmi_base_segment_t segments[MCA_BMI_DES_MAX_SEGMENTS];
mca_pml_ob1_buffer_t* buffers[MCA_BMI_DES_MAX_SEGMENTS];
};
typedef struct mca_pml_ob1_fragment_t mca_pml_ob1_fragment_t;
typedef struct mca_pml_ob1_recv_frag_t mca_pml_ob1_recv_frag_t;
OBJ_CLASS_DECLARATION(mca_pml_ob1_fragment_t);
OBJ_CLASS_DECLARATION(mca_pml_ob1_recv_frag_t);
#define MCA_PML_OB1_FRAG_ALLOC(frag,rc) \
#define MCA_PML_OB1_RECV_FRAG_ALLOC(frag,rc) \
do { \
ompi_list_item_t* item; \
OMPI_FREE_LIST_WAIT(&mca_pml_ob1.fragments, item, rc); \
frag = (mca_pml_ob1_fragment_t*)item; \
OMPI_FREE_LIST_WAIT(&mca_pml_ob1.recv_frags, item, rc); \
frag = (mca_pml_ob1_recv_frag_t*)item; \
} while(0)
#define MCA_PML_OB1_FRAG_INIT(frag,bmi,hdr,segs,cnt) \
#define MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segs,cnt) \
do { \
size_t i; \
mca_bmi_base_segment_t* segments = frag->segments; \
mca_pml_ob1_buffer_t** buffers = frag->buffers; \
\
/* init fragment */ \
/* init recv_frag */ \
frag->bmi = bmi; \
frag->hdr = *(mca_pml_ob1_hdr_t*)hdr; \
frag->num_segments = cnt; \
@ -82,7 +82,7 @@ do { \
} while(0)
#define MCA_PML_OB1_FRAG_RETURN(frag) \
#define MCA_PML_OB1_RECV_FRAG_RETURN(frag) \
do { \
size_t i; \
\
@ -93,14 +93,14 @@ do { \
} \
frag->num_segments = 0; \
\
/* return fragment */ \
OMPI_FREE_LIST_RETURN(&mca_pml_ob1.fragments, \
/* return recv_frag */ \
OMPI_FREE_LIST_RETURN(&mca_pml_ob1.recv_frags, \
(ompi_list_item_t*)frag); \
} while(0)
/**
* Callback from BMI on receipt of a fragment.
* Callback from BMI on receipt of a recv_frag.
*/
OMPI_DECLSPEC void mca_pml_ob1_recv_frag_callback(
@ -111,11 +111,11 @@ OMPI_DECLSPEC void mca_pml_ob1_recv_frag_callback(
);
/**
* Match incoming fragments against posted receives.
* Match incoming recv_frags against posted receives.
* Supports out of order delivery.
*
* @param frag_header (IN) Header of received fragment.
* @param frag_desc (IN) Received fragment descriptor.
* @param frag_header (IN) Header of received recv_frag.
* @param frag_desc (IN) Received recv_frag descriptor.
* @param match_made (OUT) Flag indicating wether a match was made.
* @param additional_matches (OUT) List of additional matches
* @return OMPI_SUCCESS or error status on failure.

Просмотреть файл

@ -24,7 +24,7 @@
#include "pml_ob1_sendreq.h"
static mca_pml_ob1_fragment_t* mca_pml_ob1_recv_request_match_specific_proc(
static mca_pml_ob1_recv_frag_t* mca_pml_ob1_recv_request_match_specific_proc(
mca_pml_ob1_recv_request_t* request, mca_pml_ob1_comm_proc_t* proc);
@ -121,7 +121,7 @@ static void mca_pml_ob1_recv_request_ack(
mca_pml_ob1_proc_t* proc = recvreq->req_proc;
mca_pml_ob1_endpoint_t* ep = mca_pml_ob1_ep_array_get_next(&proc->bmi_eager);
mca_bmi_base_descriptor_t* des;
mca_pml_ob1_fragment_t* frag;
mca_pml_ob1_recv_frag_t* frag;
mca_pml_ob1_ack_hdr_t* ack;
bool schedule;
int rc;
@ -177,7 +177,7 @@ static void mca_pml_ob1_recv_request_ack(
/* queue request to retry later */
retry:
MCA_PML_OB1_FRAG_ALLOC(frag,rc);
MCA_PML_OB1_RECV_FRAG_ALLOC(frag,rc);
frag->bmi = NULL;
frag->hdr.hdr_rndv = *hdr;
frag->num_segments = 0;
@ -201,6 +201,7 @@ void mca_pml_ob1_recv_request_progress(
size_t bytes_delivered = 0;
size_t data_offset = 0;
mca_pml_ob1_hdr_t* hdr = (mca_pml_ob1_hdr_t*)segments->seg_addr.pval;
bool schedule;
switch(hdr->hdr_common.hdr_type) {
case MCA_PML_OB1_HDR_TYPE_MATCH:
@ -234,16 +235,22 @@ void mca_pml_ob1_recv_request_progress(
case MCA_PML_OB1_HDR_TYPE_FRAG:
bytes_received = hdr->hdr_frag.hdr_frag_length;
data_offset = hdr->hdr_frag.hdr_frag_offset;
MCA_PML_OB1_RECV_REQUEST_UNPACK(
recvreq,
segments,
num_segments,
sizeof(mca_pml_ob1_frag_hdr_t),
hdr->hdr_frag.hdr_frag_offset,
data_offset,
bytes_received,
bytes_delivered);
break;
case MCA_PML_OB1_HDR_TYPE_FIN:
bytes_delivered = bytes_received = hdr->hdr_fin.hdr_rdma_length;
break;
default:
break;
}
@ -260,8 +267,16 @@ void mca_pml_ob1_recv_request_progress(
if(ompi_request_waiting) {
ompi_condition_broadcast(&ompi_request_cond);
}
schedule = false;
} else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) {
schedule = true;
}
OMPI_THREAD_UNLOCK(&ompi_request_lock);
/* schedule additional rdma operations */
if(schedule) {
mca_pml_ob1_recv_request_schedule(recvreq);
}
}
@ -344,7 +359,7 @@ void mca_pml_ob1_recv_request_schedule(mca_pml_ob1_recv_request_t* recvreq)
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_src = recvreq->req_send;
hdr->hdr_dst.pval = dst;
hdr->hdr_offset = recvreq->req_rdma_offset;
hdr->hdr_rdma_offset = recvreq->req_rdma_offset;
hdr->hdr_seg_cnt = dst->des_dst_cnt;
memcpy(hdr->hdr_segs, dst->des_dst, dst->des_dst_cnt * sizeof(mca_bmi_base_segment_t));
@ -355,7 +370,7 @@ void mca_pml_ob1_recv_request_schedule(mca_pml_ob1_recv_request_t* recvreq)
/* send rdma request to peer */
rc = ep->bmi_send(ep->bmi, ep->bmi_endpoint, ctl, MCA_BMI_TAG_PML);
if(rc == OMPI_SUCCESS) {
bytes_remaining = recvreq->req_recv.req_bytes_packed - recvreq->req_rdma_offset;
bytes_remaining -= size;
} else {
ep->bmi_free(ep->bmi,ctl);
ep->bmi_free(ep->bmi,dst);
@ -380,7 +395,7 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request
{
mca_pml_ob1_comm_t* comm = request->req_recv.req_base.req_comm->c_pml_comm;
mca_pml_ob1_comm_proc_t* proc = comm->procs + request->req_recv.req_base.req_peer;
mca_pml_ob1_fragment_t* frag;
mca_pml_ob1_recv_frag_t* frag;
/* check for a specific match */
OMPI_THREAD_LOCK(&comm->matching_lock);
@ -395,7 +410,7 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request
mca_pml_ob1_recv_request_progress(request,frag->bmi,frag->segments,frag->num_segments);
if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) ||
(MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) {
MCA_PML_OB1_FRAG_RETURN(frag);
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
}
return; /* match found */
}
@ -434,7 +449,7 @@ void mca_pml_ob1_recv_request_match_wild(mca_pml_ob1_recv_request_t* request)
request->req_recv.req_base.req_sequence = comm->recv_sequence++;
for (i = 0; i < proc_count; i++) {
mca_pml_ob1_fragment_t* frag;
mca_pml_ob1_recv_frag_t* frag;
/* continue if no frags to match */
if (ompi_list_get_size(&proc->unexpected_frags) == 0) {
@ -449,7 +464,7 @@ void mca_pml_ob1_recv_request_match_wild(mca_pml_ob1_recv_request_t* request)
mca_pml_ob1_recv_request_progress(request,frag->bmi,frag->segments,frag->num_segments);
if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) ||
(MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) {
MCA_PML_OB1_FRAG_RETURN(frag);
MCA_PML_OB1_RECV_FRAG_RETURN(frag);
}
return; /* match found */
}
@ -471,19 +486,19 @@ void mca_pml_ob1_recv_request_match_wild(mca_pml_ob1_recv_request_t* request)
* it places the request in the appropriate matched receive list.
*/
static mca_pml_ob1_fragment_t* mca_pml_ob1_recv_request_match_specific_proc(
static mca_pml_ob1_recv_frag_t* mca_pml_ob1_recv_request_match_specific_proc(
mca_pml_ob1_recv_request_t* request,
mca_pml_ob1_comm_proc_t* proc)
{
ompi_list_t* unexpected_frags = &proc->unexpected_frags;
mca_pml_ob1_fragment_t* frag;
mca_pml_ob1_recv_frag_t* frag;
mca_pml_ob1_match_hdr_t* hdr;
int tag = request->req_recv.req_base.req_tag;
if( OMPI_ANY_TAG == tag ) {
for (frag = (mca_pml_ob1_fragment_t*)ompi_list_get_first(unexpected_frags);
frag != (mca_pml_ob1_fragment_t*)ompi_list_get_end(unexpected_frags);
frag = (mca_pml_ob1_fragment_t*)ompi_list_get_next(frag)) {
for (frag = (mca_pml_ob1_recv_frag_t*)ompi_list_get_first(unexpected_frags);
frag != (mca_pml_ob1_recv_frag_t*)ompi_list_get_end(unexpected_frags);
frag = (mca_pml_ob1_recv_frag_t*)ompi_list_get_next(frag)) {
hdr = &(frag->hdr.hdr_match);
/* check first frag - we assume that process matching has been done already */
@ -492,9 +507,9 @@ static mca_pml_ob1_fragment_t* mca_pml_ob1_recv_request_match_specific_proc(
}
}
} else {
for (frag = (mca_pml_ob1_fragment_t*)ompi_list_get_first(unexpected_frags);
frag != (mca_pml_ob1_fragment_t*)ompi_list_get_end(unexpected_frags);
frag = (mca_pml_ob1_fragment_t*)ompi_list_get_next(frag)) {
for (frag = (mca_pml_ob1_recv_frag_t*)ompi_list_get_first(unexpected_frags);
frag != (mca_pml_ob1_recv_frag_t*)ompi_list_get_end(unexpected_frags);
frag = (mca_pml_ob1_recv_frag_t*)ompi_list_get_next(frag)) {
hdr = &(frag->hdr.hdr_match);
/* check first frag - we assume that process matching has been done already */

Просмотреть файл

@ -235,7 +235,6 @@ void mca_pml_ob1_recv_request_progress(
mca_bmi_base_segment_t* segments,
size_t num_segments);
/**
*
*/
@ -243,6 +242,14 @@ void mca_pml_ob1_recv_request_progress(
void mca_pml_ob1_recv_request_schedule(
mca_pml_ob1_recv_request_t* req);
/**
*
*/
void mca_pml_ob1_recv_request_fin(
mca_pml_ob1_recv_request_t* req,
size_t rdma_length);
#if defined(c_plusplus) || defined(__cplusplus)
}

Просмотреть файл

@ -21,10 +21,12 @@
#include "include/constants.h"
#include "mca/pml/pml.h"
#include "mca/bmi/bmi.h"
#include "mca/errmgr/errmgr.h"
#include "pml_ob1.h"
#include "pml_ob1_hdr.h"
#include "pml_ob1_proc.h"
#include "pml_ob1_sendreq.h"
#include "pml_ob1_rdmafrag.h"
#include "pml_ob1_recvreq.h"
#include "pml_ob1_endpoint.h"
@ -80,29 +82,20 @@ static void mca_pml_ob1_short_completion(
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*)descriptor->des_cbdata;
mca_pml_ob1_endpoint_t* bmi_ep = sendreq->req_endpoint;
/* check completion status */
if(OMPI_SUCCESS != status) {
/* TSW - FIX */
ompi_output(0, "%s:%d FATAL", __FILE__, __LINE__);
orte_errmgr.abort();
}
/* attempt to cache the descriptor */
MCA_PML_OB1_ENDPOINT_DES_RETURN(bmi_ep,descriptor);
/* signal request completion */
OMPI_THREAD_LOCK(&ompi_request_lock);
sendreq->req_bytes_delivered = sendreq->req_send.req_bytes_packed;
sendreq->req_send.req_base.req_pml_complete = true;
if (sendreq->req_send.req_base.req_ompi.req_complete == false) {
sendreq->req_send.req_base.req_ompi.req_status.MPI_SOURCE = sendreq->req_send.req_base.req_comm->c_my_rank;
sendreq->req_send.req_base.req_ompi.req_status.MPI_TAG = sendreq->req_send.req_base.req_tag;
sendreq->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
sendreq->req_send.req_base.req_ompi.req_status._count = sendreq->req_send.req_bytes_packed;
sendreq->req_send.req_base.req_ompi.req_complete = true;
sendreq->req_state = MCA_PML_OB1_SR_COMPLETE;
if(ompi_request_waiting) {
ompi_condition_broadcast(&ompi_request_cond);
}
} else if(sendreq->req_send.req_base.req_free_called) {
MCA_PML_OB1_FREE((ompi_request_t**)&sendreq);
} else if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) {
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq);
sendreq->req_state = MCA_PML_OB1_SR_COMPLETE;
}
MCA_PML_OB1_SEND_REQUEST_COMPLETE(sendreq);
OMPI_THREAD_UNLOCK(&ompi_request_lock);
}
@ -122,7 +115,12 @@ static void mca_pml_ob1_send_completion(
mca_bmi_base_segment_t* segments = descriptor->des_src;
size_t i;
OMPI_THREAD_LOCK(&ompi_request_lock);
/* check completion status */
if(OMPI_SUCCESS != status) {
/* TSW - FIX */
ompi_output(0, "%s:%d FATAL", __FILE__, __LINE__);
orte_errmgr.abort();
}
/* count bytes of user data actually delivered */
for(i=0; i<descriptor->des_src_cnt; i++) {
@ -145,32 +143,17 @@ static void mca_pml_ob1_send_completion(
break;
}
/* return the descriptor */
bmi_ep->bmi_free(bmi_ep->bmi, descriptor);
/* check for request completion */
OMPI_THREAD_LOCK(&ompi_request_lock);
if (OMPI_THREAD_ADD32(&sendreq->req_pipeline_depth,-1) == 0 &&
sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) {
sendreq->req_send.req_base.req_pml_complete = true;
if (sendreq->req_send.req_base.req_ompi.req_complete == false) {
sendreq->req_send.req_base.req_ompi.req_status.MPI_SOURCE = sendreq->req_send.req_base.req_comm->c_my_rank;
sendreq->req_send.req_base.req_ompi.req_status.MPI_TAG = sendreq->req_send.req_base.req_tag;
sendreq->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
sendreq->req_send.req_base.req_ompi.req_status._count = sendreq->req_send.req_bytes_packed;
sendreq->req_send.req_base.req_ompi.req_complete = true;
sendreq->req_state = MCA_PML_OB1_SR_COMPLETE;
if(ompi_request_waiting) {
ompi_condition_broadcast(&ompi_request_cond);
}
} else if(sendreq->req_send.req_base.req_free_called) {
MCA_PML_OB1_FREE((ompi_request_t**)&sendreq);
} else if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) {
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq);
sendreq->req_state = MCA_PML_OB1_SR_COMPLETE;
}
MCA_PML_OB1_SEND_REQUEST_COMPLETE(sendreq);
}
OMPI_THREAD_UNLOCK(&ompi_request_lock);
/* return the descriptor */
bmi_ep->bmi_free(bmi_ep->bmi, descriptor);
/* advance pending requests */
while(NULL != sendreq) {
switch(sendreq->req_state) {
@ -435,7 +418,7 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
/* initiate send - note that this may complete before the call returns */
rc = ep->bmi_send(ep->bmi, ep->bmi_endpoint, des, MCA_BMI_TAG_PML);
if(rc == OMPI_SUCCESS) {
bytes_remaining = sendreq->req_rdma_offset - sendreq->req_send_offset;
bytes_remaining -= size;
} else {
sendreq->req_send_offset -= size;
OMPI_THREAD_ADD32(&sendreq->req_pipeline_depth,-1);
@ -453,15 +436,176 @@ int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq)
/**
*
* Return resources used by the RDMA
*/
int mca_pml_ob1_send_request_put(
static void mca_pml_ob1_fin_completion(
mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* ep,
struct mca_bmi_base_descriptor_t* des,
int status)
{
mca_pml_ob1_rdma_frag_t* frag = (mca_pml_ob1_rdma_frag_t*)des->des_cbdata;
MCA_PML_OB1_RDMA_FRAG_RETURN(frag);
bmi->bmi_free(bmi,des);
}
/**
* An RDMA put operation has completed:
* (1) Update request status and if required set completed
* (2) Send FIN control message to the destination
*/
static void mca_pml_ob1_put_completion(
mca_bmi_base_module_t* bmi,
struct mca_bmi_base_endpoint_t* ep,
struct mca_bmi_base_descriptor_t* des,
int status)
{
mca_pml_ob1_rdma_frag_t* frag = (mca_pml_ob1_rdma_frag_t*)des->des_cbdata;
mca_pml_ob1_send_request_t* sendreq = frag->rdma_req;
mca_bmi_base_descriptor_t* fin;
mca_pml_ob1_fin_hdr_t* hdr;
int rc;
/* check completion status */
if(OMPI_SUCCESS != status) {
/* TSW - FIX */
ORTE_ERROR_LOG(status);
orte_errmgr.abort();
}
/* check for request completion */
OMPI_THREAD_LOCK(&ompi_request_lock);
sendreq->req_bytes_delivered += frag->rdma_length;
if(sendreq->req_bytes_delivered >= sendreq->req_send.req_bytes_packed) {
MCA_PML_OB1_SEND_REQUEST_COMPLETE(sendreq);
}
OMPI_THREAD_UNLOCK(&ompi_request_lock);
/* allocate descriptor for fin control message - note that
* the rdma descriptor cannot be reused as it points directly
* at the user buffer
*/
frag->rdma_state = MCA_PML_OB1_RDMA_FIN;
fin = bmi->bmi_alloc(bmi,sizeof(mca_pml_ob1_fin_hdr_t));
if(NULL == fin) {
OMPI_THREAD_LOCK(&mca_pml_ob1.lock);
ompi_list_append(&mca_pml_ob1.rdma_pending, (ompi_list_item_t*)frag);
OMPI_THREAD_LOCK(&mca_pml_ob1.lock);
goto cleanup;
}
fin->des_cbfunc = mca_pml_ob1_fin_completion;
fin->des_cbdata = frag;
/* fill in header */
hdr = (mca_pml_ob1_fin_hdr_t*)fin->des_src->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_FIN;
hdr->hdr_src = frag->rdma_hdr.hdr_rdma.hdr_src;
hdr->hdr_dst = frag->rdma_hdr.hdr_rdma.hdr_dst;
hdr->hdr_rdma_offset = frag->rdma_hdr.hdr_rdma.hdr_rdma_offset;
hdr->hdr_rdma_length = frag->rdma_length;
/* queue request */
rc = bmi->bmi_send(
bmi,
ep,
fin,
MCA_BMI_TAG_PML);
if(OMPI_SUCCESS != rc) {
bmi->bmi_free(bmi, fin);
if(rc == OMPI_ERR_OUT_OF_RESOURCE) {
OMPI_THREAD_LOCK(&mca_pml_ob1.lock);
ompi_list_append(&mca_pml_ob1.rdma_pending, (ompi_list_item_t*)frag);
OMPI_THREAD_LOCK(&mca_pml_ob1.lock);
} else {
/* TSW - FIX */
ORTE_ERROR_LOG(rc);
orte_errmgr.abort();
}
}
cleanup:
/* return rdma descriptor - do this after queuing the fin message - as
* release rdma resources (unpin memory) can take some time.
*/
des->des_dst = NULL;
des->des_src_cnt = 0;
bmi->bmi_free(bmi, des);
}
/**
* Receiver has scheduled an RDMA operation:
* (1) Allocate an RDMA fragment to maintain the state of the operation
* (2) Call BMI prepare_src to pin/prepare source buffers
* (3) Queue the RDMA put
*/
void mca_pml_ob1_send_request_put(
mca_pml_ob1_send_request_t* sendreq,
mca_bmi_base_module_t* bmi,
mca_pml_ob1_rdma_hdr_t* hdr)
{
return OMPI_ERR_NOT_IMPLEMENTED;
mca_pml_ob1_proc_t* proc = sendreq->req_proc;
mca_pml_ob1_endpoint_t* ep = mca_pml_ob1_ep_array_find(&proc->bmi_rdma,bmi);
mca_bmi_base_descriptor_t* des;
mca_pml_ob1_rdma_frag_t* frag;
size_t offset = hdr->hdr_rdma_offset;
size_t i, size = 0;
int rc;
MCA_PML_OB1_RDMA_FRAG_ALLOC(frag, rc);
if(NULL == frag) {
/* TSW - FIX */
ORTE_ERROR_LOG(rc);
orte_errmgr.abort();
}
/* setup fragment */
for(i=0; i<hdr->hdr_seg_cnt; i++) {
size += hdr->hdr_segs[i].seg_len;
frag->rdma_segs[i] = hdr->hdr_segs[i];
}
frag->rdma_hdr.hdr_rdma = *hdr;
frag->rdma_state = MCA_PML_OB1_RDMA_PREPARE;
/* setup descriptor */
ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset);
des = bmi->bmi_prepare_src(
bmi,
ep->bmi_endpoint,
&sendreq->req_send.req_convertor,
0,
&size);
if(NULL == des) {
OMPI_THREAD_LOCK(&mca_pml_ob1.lock);
ompi_list_append(&mca_pml_ob1.rdma_pending, (ompi_list_item_t*)frag);
OMPI_THREAD_UNLOCK(&mca_pml_ob1.lock);
}
frag->rdma_state = MCA_PML_OB1_RDMA_PUT;
frag->rdma_length = size;
des->des_dst = frag->rdma_segs;
des->des_dst_cnt = hdr->hdr_seg_cnt;
des->des_cbfunc = mca_pml_ob1_put_completion;
des->des_cbdata = frag;
/* queue put */
if(OMPI_SUCCESS != (rc = bmi->bmi_put(bmi, ep->bmi_endpoint, des))) {
if(rc == OMPI_ERR_OUT_OF_RESOURCE) {
OMPI_THREAD_LOCK(&mca_pml_ob1.lock);
ompi_list_append(&mca_pml_ob1.rdma_pending, (ompi_list_item_t*)frag);
OMPI_THREAD_UNLOCK(&mca_pml_ob1.lock);
} else {
/* TSW - FIX */
ORTE_ERROR_LOG(rc);
orte_errmgr.abort();
}
}
}

Просмотреть файл

@ -128,6 +128,33 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t);
rc = mca_pml_ob1_send_request_start(sendreq, endpoint); \
}
/*
* Complete a send request
*/
#define MCA_PML_OB1_SEND_REQUEST_COMPLETE(sendreq) \
{ \
(sendreq)->req_send.req_base.req_pml_complete = true; \
if ((sendreq)->req_send.req_base.req_ompi.req_complete == false) { \
(sendreq)->req_send.req_base.req_ompi.req_status.MPI_SOURCE = \
(sendreq)->req_send.req_base.req_comm->c_my_rank; \
(sendreq)->req_send.req_base.req_ompi.req_status.MPI_TAG = \
(sendreq)->req_send.req_base.req_tag; \
(sendreq)->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; \
(sendreq)->req_send.req_base.req_ompi.req_status._count = \
(sendreq)->req_send.req_bytes_packed; \
(sendreq)->req_send.req_base.req_ompi.req_complete = true; \
(sendreq)->req_state = MCA_PML_OB1_SR_COMPLETE; \
if(ompi_request_waiting) { \
ompi_condition_broadcast(&ompi_request_cond); \
} \
} else if(sendreq->req_send.req_base.req_free_called) { \
MCA_PML_OB1_FREE((ompi_request_t**)&sendreq); \
} else if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \
sendreq->req_state = MCA_PML_OB1_SR_COMPLETE; \
} \
}
/*
* Advance a request
*/
@ -159,10 +186,10 @@ int mca_pml_ob1_send_request_schedule(
*
*/
int mca_pml_ob1_send_request_put(
mca_pml_ob1_send_request_t* sendreq,
mca_bmi_base_module_t* bmi,
mca_pml_ob1_rdma_hdr_t* hdr);
void mca_pml_ob1_send_request_put(
mca_pml_ob1_send_request_t* sendreq,
mca_bmi_base_module_t* bmi,
mca_pml_ob1_rdma_hdr_t* hdr);
#if defined(c_plusplus) || defined(__cplusplus)