changes to PTL interface
- moved pml_base_send_request_t allocation to pml - provide capability to cache requests on per ptl basis - and provide hooks for ptl to init/cleanup ptl specific data cached on the request - renamed request/fragment super fields - added ptl_send i/f which is called by pml for the first fragment, the ptl_put i/f is called for subsequent fragments of the same request This commit was SVN r1745.
Этот коммит содержится в:
родитель
419924fc31
Коммит
b577174a4a
@ -48,7 +48,9 @@ int ompi_free_list_init(
|
||||
flist->fl_num_allocated = 0;
|
||||
flist->fl_num_per_alloc = num_elements_per_alloc;
|
||||
flist->fl_mpool = mpool;
|
||||
return ompi_free_list_grow(flist, num_elements_to_alloc);
|
||||
if(num_elements_to_alloc)
|
||||
return ompi_free_list_grow(flist, num_elements_to_alloc);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
@ -61,8 +61,29 @@ int ompi_free_list_grow(ompi_free_list_t* flist, size_t num_elements);
|
||||
}
|
||||
|
||||
|
||||
#define OMPI_FREE_LIST_WAIT(fl, item, rc) \
|
||||
{ \
|
||||
if(ompi_using_threads()) { \
|
||||
ompi_mutex_lock(&((fl)->fl_lock)); \
|
||||
item = ompi_list_remove_first(&((fl)->super)); \
|
||||
if(NULL == item) { \
|
||||
ompi_free_list_grow((fl), (fl)->fl_num_per_alloc); \
|
||||
item = ompi_list_remove_first(&((fl)->super)); \
|
||||
} \
|
||||
ompi_mutex_unlock(&((fl)->fl_lock)); \
|
||||
} else { \
|
||||
item = ompi_list_remove_first(&((fl)->super)); \
|
||||
if(NULL == item) { \
|
||||
ompi_free_list_grow((fl), (fl)->fl_num_per_alloc); \
|
||||
item = ompi_list_remove_first(&((fl)->super)); \
|
||||
} \
|
||||
} \
|
||||
rc = (NULL == item) ? OMPI_ERR_TEMP_OUT_OF_RESOURCE : OMPI_SUCCESS; \
|
||||
}
|
||||
|
||||
|
||||
#define OMPI_FREE_LIST_RETURN(fl, item) \
|
||||
THREAD_SCOPED_LOCK(&((fl)->fl_lock), ompi_list_append(&((fl)->super), (item)));
|
||||
THREAD_SCOPED_LOCK(&((fl)->fl_lock), ompi_list_prepend(&((fl)->super), (item)));
|
||||
|
||||
#endif
|
||||
|
||||
|
@ -109,8 +109,10 @@ static int ompi_process_name_compare(ompi_process_name_t* n1, ompi_process_name_
|
||||
struct mca_oob_1_0_0_t* mca_oob_tcp_init(bool *allow_multi_user_threads,
|
||||
bool *have_hidden_threads)
|
||||
{
|
||||
#if 0
|
||||
/* initialize data structures */
|
||||
ompi_rb_tree_init(&mca_oob_tcp_module.tcp_peer_tree, (ompi_rb_tree_comp_fn_t)ompi_process_name_compare);
|
||||
#endif
|
||||
/* return &mca_oob_tcp; */
|
||||
return NULL;
|
||||
}
|
||||
|
@ -219,8 +219,8 @@ int mca_pml_base_bsend_request_init(ompi_request_t* request, bool persistent)
|
||||
mca_pml_bsend_count++;
|
||||
|
||||
/* set flag indicating mpi layer is done */
|
||||
sendreq->super.req_persistent = persistent;
|
||||
sendreq->super.req_mpi_done = true;
|
||||
sendreq->req_base.req_persistent = persistent;
|
||||
sendreq->req_base.req_mpi_done = true;
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_bsend_mutex);
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ ompi_class_t mca_pml_base_recv_request_t_class = {
|
||||
static void mca_pml_base_recv_request_construct(mca_pml_base_recv_request_t* request)
|
||||
{
|
||||
/* no need to reinit for every recv -- never changes */
|
||||
request->super.req_type = MCA_PML_REQUEST_RECV;
|
||||
request->req_base.req_type = MCA_PML_REQUEST_RECV;
|
||||
}
|
||||
|
||||
|
||||
|
@ -15,10 +15,10 @@ extern ompi_class_t mca_pml_base_recv_request_t_class;
|
||||
* Base type for receive requests.
|
||||
*/
|
||||
struct mca_pml_base_recv_request_t {
|
||||
mca_pml_base_request_t super; /**< base request */
|
||||
size_t req_bytes_packed; /**< size of message being received */
|
||||
size_t req_bytes_received; /**< number of bytes received from network */
|
||||
size_t req_bytes_delivered; /**< number of bytes delivered to user */
|
||||
mca_pml_base_request_t req_base; /**< base request */
|
||||
size_t req_bytes_packed; /**< size of message being received */
|
||||
size_t req_bytes_received; /**< number of bytes received from network */
|
||||
size_t req_bytes_delivered; /**< number of bytes delivered to user */
|
||||
};
|
||||
typedef struct mca_pml_base_recv_request_t mca_pml_base_recv_request_t;
|
||||
|
||||
@ -35,32 +35,32 @@ typedef struct mca_pml_base_recv_request_t mca_pml_base_recv_request_t;
|
||||
* @param comm (IN) Communicator.
|
||||
* @param persistent (IN) Is this a ersistent request.
|
||||
*/
|
||||
#define MCA_PML_BASE_RECV_REQUEST_INIT( \
|
||||
request, \
|
||||
addr, \
|
||||
count, \
|
||||
datatype, \
|
||||
src, \
|
||||
tag, \
|
||||
comm, \
|
||||
persistent) \
|
||||
{ \
|
||||
OMPI_REQUEST_INIT(&(request)->super.super); \
|
||||
(request)->req_bytes_packed = 0; \
|
||||
(request)->req_bytes_received = 0; \
|
||||
(request)->req_bytes_delivered = 0; \
|
||||
(request)->super.req_sequence = 0; \
|
||||
(request)->super.req_addr = addr; \
|
||||
(request)->super.req_count = count; \
|
||||
(request)->super.req_datatype = datatype; \
|
||||
(request)->super.req_peer = src; \
|
||||
(request)->super.req_tag = tag; \
|
||||
(request)->super.req_comm = comm; \
|
||||
(request)->super.req_proc = NULL; \
|
||||
(request)->super.req_persistent = persistent; \
|
||||
(request)->super.req_mpi_done = false; \
|
||||
(request)->super.req_pml_done = false; \
|
||||
(request)->super.req_free_called = false; \
|
||||
#define MCA_PML_BASE_RECV_REQUEST_INIT( \
|
||||
request, \
|
||||
addr, \
|
||||
count, \
|
||||
datatype, \
|
||||
src, \
|
||||
tag, \
|
||||
comm, \
|
||||
persistent) \
|
||||
{ \
|
||||
OMPI_REQUEST_INIT(&(request)->req_base.req_ompi); \
|
||||
(request)->req_bytes_packed = 0; \
|
||||
(request)->req_bytes_received = 0; \
|
||||
(request)->req_bytes_delivered = 0; \
|
||||
(request)->req_base.req_sequence = 0; \
|
||||
(request)->req_base.req_addr = addr; \
|
||||
(request)->req_base.req_count = count; \
|
||||
(request)->req_base.req_datatype = datatype; \
|
||||
(request)->req_base.req_peer = src; \
|
||||
(request)->req_base.req_tag = tag; \
|
||||
(request)->req_base.req_comm = comm; \
|
||||
(request)->req_base.req_proc = NULL; \
|
||||
(request)->req_base.req_persistent = persistent; \
|
||||
(request)->req_base.req_mpi_done = false; \
|
||||
(request)->req_base.req_pml_done = false; \
|
||||
(request)->req_base.req_free_called = false; \
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -7,7 +7,7 @@
|
||||
|
||||
static void mca_pml_base_request_construct(mca_pml_base_request_t* req)
|
||||
{
|
||||
req->super.req_type = OMPI_REQUEST_PML;
|
||||
req->req_ompi.req_type = OMPI_REQUEST_PML;
|
||||
}
|
||||
|
||||
static void mca_pml_base_request_destruct(mca_pml_base_request_t* req)
|
||||
|
@ -31,17 +31,17 @@ typedef enum {
|
||||
* Base type for PML P2P requests
|
||||
*/
|
||||
struct mca_pml_base_request_t {
|
||||
ompi_request_t super; /**< base request */
|
||||
ompi_request_t req_ompi; /**< base request */
|
||||
void *req_addr; /**< pointer to application buffer */
|
||||
size_t req_count; /**< count of user datatype elements */
|
||||
int32_t req_peer; /**< peer process - rank w/in this communicator */
|
||||
int32_t req_tag; /**< user defined tag */
|
||||
ompi_communicator_t *req_comm; /**< communicator pointer */
|
||||
ompi_proc_t* req_proc; /**< peer process */
|
||||
ompi_communicator_t *req_comm; /**< communicator pointer */
|
||||
ompi_proc_t* req_proc; /**< peer process */
|
||||
mca_ptl_sequence_t req_sequence; /**< sequence number for MPI pt-2-pt ordering */
|
||||
ompi_datatype_t *req_datatype; /**< pointer to data type */
|
||||
ompi_datatype_t *req_datatype; /**< pointer to data type */
|
||||
mca_pml_base_request_type_t req_type; /**< MPI request type - used for test */
|
||||
ompi_status_public_t req_status; /**< completion status */
|
||||
ompi_status_public_t req_status; /**< completion status */
|
||||
bool req_persistent; /**< flag indicating if the this is a persistent request */
|
||||
volatile bool req_mpi_done; /**< flag indicating if MPI is done with this request */
|
||||
volatile bool req_pml_done; /**< flag indicating if the pt-2-pt layer is done with this request */
|
||||
|
@ -14,7 +14,6 @@
|
||||
|
||||
typedef struct opened_module_t {
|
||||
ompi_list_item_t super;
|
||||
|
||||
mca_pml_base_module_t *om_module;
|
||||
} opened_module_t;
|
||||
|
||||
|
@ -19,7 +19,7 @@ ompi_class_t mca_pml_base_send_request_t_class = {
|
||||
static void mca_pml_base_send_request_construct(mca_pml_base_send_request_t* request)
|
||||
{
|
||||
/* no need to reinit for every send -- never changes */
|
||||
request->super.req_type = MCA_PML_REQUEST_SEND;
|
||||
request->req_base.req_type = MCA_PML_REQUEST_SEND;
|
||||
OBJ_CONSTRUCT(&request->req_convertor, ompi_convertor_t);
|
||||
}
|
||||
|
||||
|
@ -18,17 +18,17 @@ extern ompi_class_t mca_pml_base_send_request_t_class;
|
||||
* Base type for send requests
|
||||
*/
|
||||
struct mca_pml_base_send_request_t {
|
||||
mca_pml_base_request_t super; /** base request type - common data structure for use by wait/test */
|
||||
size_t req_offset; /**< number of bytes that have already been assigned to a fragment */
|
||||
size_t req_bytes_packed; /**< packed size of a message given the datatype and count */
|
||||
size_t req_bytes_sent; /**< number of bytes that have been sent */
|
||||
mca_pml_base_send_mode_t req_send_mode; /**< type of send */
|
||||
struct mca_ptl_t* req_owner; /**< PTL that allocated this descriptor */
|
||||
struct mca_ptl_base_peer_t* req_peer; /**< PTL peer instance that will be used for first fragment */
|
||||
ompi_ptr_t req_peer_match; /**< matched receive at peer */
|
||||
ompi_ptr_t req_peer_addr;
|
||||
size_t req_peer_size;
|
||||
ompi_convertor_t req_convertor; /**< convertor that describes this datatype */
|
||||
mca_pml_base_request_t req_base; /** base request type - common data structure for use by wait/test */
|
||||
size_t req_offset; /**< number of bytes that have already been assigned to a fragment */
|
||||
size_t req_bytes_packed; /**< packed size of a message given the datatype and count */
|
||||
size_t req_bytes_sent; /**< number of bytes that have been sent */
|
||||
mca_pml_base_send_mode_t req_send_mode; /**< type of send */
|
||||
struct mca_ptl_t* req_ptl; /**< PTL that is selected for first fragment */
|
||||
struct mca_ptl_base_peer_t* req_peer; /**< PTL peer instance that will be used for first fragment */
|
||||
ompi_ptr_t req_peer_match; /**< matched receive at peer */
|
||||
ompi_ptr_t req_peer_addr; /**< peers remote buffer address */
|
||||
size_t req_peer_size; /**< size of peers remote buffer */
|
||||
ompi_convertor_t req_convertor; /**< convertor that describes this datatype */
|
||||
};
|
||||
typedef struct mca_pml_base_send_request_t mca_pml_base_send_request_t;
|
||||
|
||||
@ -47,52 +47,54 @@ typedef struct mca_pml_base_send_request_t mca_pml_base_send_request_t;
|
||||
* @param mode (IN) Send mode (STANDARD,BUFFERED,SYNCHRONOUS,READY)
|
||||
* @param persistent (IN) Is request persistent.
|
||||
*/
|
||||
#define MCA_PML_BASE_SEND_REQUEST_INIT( \
|
||||
request, \
|
||||
addr, \
|
||||
count, \
|
||||
datatype, \
|
||||
peer, \
|
||||
tag, \
|
||||
comm, \
|
||||
mode,\
|
||||
persistent) \
|
||||
{ \
|
||||
OMPI_REQUEST_INIT(&(request)->super.super); \
|
||||
request->req_offset = 0; \
|
||||
request->req_bytes_sent = 0; \
|
||||
request->req_send_mode = mode; \
|
||||
request->req_peer_match.lval = 0; \
|
||||
request->req_peer_addr.lval = 0; \
|
||||
request->req_peer_size = 0; \
|
||||
request->super.req_addr = addr; \
|
||||
request->super.req_count = count; \
|
||||
request->super.req_datatype = datatype; \
|
||||
request->super.req_peer = peer; \
|
||||
request->super.req_tag = tag; \
|
||||
request->super.req_comm = comm; \
|
||||
request->super.req_proc = ompi_comm_peer_lookup(comm,peer); \
|
||||
request->super.req_persistent = persistent; \
|
||||
request->super.req_mpi_done = false; \
|
||||
request->super.req_pml_done = false; \
|
||||
request->super.req_free_called = false; \
|
||||
\
|
||||
/* initialize datatype convertor for this request */ \
|
||||
if(count > 0) { \
|
||||
int packed_size; \
|
||||
ompi_convertor_copy(request->super.req_proc->proc_convertor, &request->req_convertor); \
|
||||
ompi_convertor_init_for_send( \
|
||||
&request->req_convertor, \
|
||||
0, \
|
||||
request->super.req_datatype, \
|
||||
request->super.req_count, \
|
||||
request->super.req_addr, \
|
||||
0); \
|
||||
ompi_convertor_get_packed_size(&request->req_convertor, &packed_size); \
|
||||
request->req_bytes_packed = packed_size; \
|
||||
} else { \
|
||||
request->req_bytes_packed = 0; \
|
||||
} \
|
||||
#define MCA_PML_BASE_SEND_REQUEST_INIT( \
|
||||
request, \
|
||||
addr, \
|
||||
count, \
|
||||
datatype, \
|
||||
peer, \
|
||||
tag, \
|
||||
comm, \
|
||||
mode, \
|
||||
persistent) \
|
||||
{ \
|
||||
OMPI_REQUEST_INIT(&(request)->req_base.req_ompi); \
|
||||
request->req_offset = 0; \
|
||||
request->req_bytes_sent = 0; \
|
||||
request->req_send_mode = mode; \
|
||||
request->req_peer_match.lval = 0; \
|
||||
request->req_peer_addr.lval = 0; \
|
||||
request->req_peer_size = 0; \
|
||||
request->req_base.req_addr = addr; \
|
||||
request->req_base.req_count = count; \
|
||||
request->req_base.req_datatype = datatype; \
|
||||
request->req_base.req_peer = peer; \
|
||||
request->req_base.req_tag = tag; \
|
||||
request->req_base.req_comm = comm; \
|
||||
request->req_base.req_proc = ompi_comm_peer_lookup(comm,peer); \
|
||||
request->req_base.req_persistent = persistent; \
|
||||
request->req_base.req_mpi_done = false; \
|
||||
request->req_base.req_pml_done = false; \
|
||||
request->req_base.req_free_called = false; \
|
||||
\
|
||||
/* initialize datatype convertor for this request */ \
|
||||
if(count > 0) { \
|
||||
int packed_size; \
|
||||
ompi_convertor_copy( \
|
||||
request->req_base.req_proc->proc_convertor, \
|
||||
&request->req_convertor); \
|
||||
ompi_convertor_init_for_send( \
|
||||
&request->req_convertor, \
|
||||
0, \
|
||||
request->req_base.req_datatype, \
|
||||
request->req_base.req_count, \
|
||||
request->req_base.req_addr, \
|
||||
0); \
|
||||
ompi_convertor_get_packed_size(&request->req_convertor, &packed_size); \
|
||||
request->req_bytes_packed = packed_size; \
|
||||
} else { \
|
||||
request->req_bytes_packed = 0; \
|
||||
} \
|
||||
}
|
||||
|
||||
|
||||
|
@ -15,6 +15,8 @@ libmca_pml_teg_la_SOURCES = \
|
||||
pml_teg_irecv.c \
|
||||
pml_teg_isend.c \
|
||||
pml_teg_module.c \
|
||||
pml_teg_ptl.c \
|
||||
pml_teg_ptl.h \
|
||||
pml_teg_proc.c \
|
||||
pml_teg_proc.h \
|
||||
pml_teg_progress.c \
|
||||
|
@ -20,6 +20,7 @@ extern ompi_class_t mca_pml_teg_ptl_array_t_class;
|
||||
struct mca_ptl_proc_t {
|
||||
int ptl_weight; /**< PTL weight for scheduling */
|
||||
struct mca_ptl_base_peer_t* ptl_peer; /**< PTL addressing info */
|
||||
struct mca_pml_base_ptl_t* ptl_base; /**< PML specific PTL info */
|
||||
mca_ptl_t *ptl; /**< PTL implementation */
|
||||
};
|
||||
typedef struct mca_ptl_proc_t mca_ptl_proc_t;
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include "mca/ptl/base/ptl_base_sendfrag.h"
|
||||
#include "pml_teg.h"
|
||||
#include "pml_teg_proc.h"
|
||||
#include "pml_teg_ptl.h"
|
||||
#include "pml_teg_recvreq.h"
|
||||
#include "pml_teg_sendreq.h"
|
||||
|
||||
@ -84,6 +85,7 @@ int mca_pml_teg_add_ptls(ompi_list_t *ptls)
|
||||
/* build an array of ptls and ptl modules */
|
||||
mca_ptl_base_selected_module_t* selected_ptl;
|
||||
size_t num_ptls = ompi_list_get_size(ptls);
|
||||
size_t cache_bytes = 0;
|
||||
mca_pml_teg.teg_num_ptls = 0;
|
||||
mca_pml_teg.teg_num_ptl_modules = 0;
|
||||
mca_pml_teg.teg_ptls = malloc(sizeof(mca_ptl_t*) * num_ptls);
|
||||
@ -108,8 +110,24 @@ int mca_pml_teg_add_ptls(ompi_list_t *ptls)
|
||||
ptl->ptl_match = mca_ptl_base_recv_frag_match;
|
||||
ptl->ptl_send_progress = mca_pml_teg_send_request_progress;
|
||||
ptl->ptl_recv_progress = mca_pml_teg_recv_request_progress;
|
||||
ptl->ptl_stack = ptl;
|
||||
ptl->ptl_base = NULL;
|
||||
|
||||
/* find maximum required size for cache */
|
||||
if(ptl->ptl_cache_bytes > cache_bytes)
|
||||
cache_bytes = ptl->ptl_cache_bytes;
|
||||
}
|
||||
|
||||
/* setup send fragments based on largest required send request */
|
||||
ompi_free_list_init(
|
||||
&mca_pml_teg.teg_send_requests,
|
||||
sizeof(mca_pml_base_send_request_t) + cache_bytes,
|
||||
OBJ_CLASS(mca_pml_base_send_request_t),
|
||||
mca_pml_teg.teg_free_list_num,
|
||||
mca_pml_teg.teg_free_list_max,
|
||||
mca_pml_teg.teg_free_list_inc,
|
||||
NULL);
|
||||
|
||||
/* sort ptl list by exclusivity */
|
||||
qsort(mca_pml_teg.teg_ptls, mca_pml_teg.teg_num_ptls, sizeof(struct mca_ptl_t*), ptl_exclusivity_compare);
|
||||
return OMPI_SUCCESS;
|
||||
@ -199,11 +217,6 @@ int mca_pml_teg_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
proc_ptl->ptl = ptl;
|
||||
proc_ptl->ptl_peer = ptl_peers[p];
|
||||
proc_ptl->ptl_weight = 0;
|
||||
|
||||
/* if this ptl supports exclusive access then don't allow
|
||||
* subsequent ptls to register
|
||||
*/
|
||||
|
||||
proc_pml->proc_ptl_flags |= ptl->ptl_flags;
|
||||
}
|
||||
}
|
||||
@ -245,12 +258,25 @@ int mca_pml_teg_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
struct mca_ptl_proc_t* proc_ptl = mca_ptl_array_get_index(&proc_pml->proc_ptl_next, n_index);
|
||||
struct mca_ptl_t *ptl = proc_ptl->ptl;
|
||||
double weight;
|
||||
|
||||
/* compute weighting factor for this ptl */
|
||||
if(ptl->ptl_bandwidth)
|
||||
weight = proc_ptl->ptl->ptl_bandwidth / total_bandwidth;
|
||||
else
|
||||
weight = 1.0 / n_size;
|
||||
proc_ptl->ptl_weight = (int)(weight * 100);
|
||||
|
||||
/*
|
||||
* save/create ptl extension for use by pml
|
||||
*/
|
||||
proc_ptl->ptl_base = ptl->ptl_base;
|
||||
if(NULL == proc_ptl->ptl_base && ptl->ptl_cache_bytes > 0) {
|
||||
mca_pml_base_ptl_t* ptl_base = OBJ_NEW(mca_pml_base_ptl_t);
|
||||
ptl_base->ptl = ptl;
|
||||
ptl_base->ptl_cache_size = ptl->ptl_cache_size;
|
||||
proc_ptl->ptl_base = ptl->ptl_base = ptl_base;
|
||||
}
|
||||
|
||||
/* check to see if this ptl is already in the array of ptls used for first
|
||||
* fragments - if not add it.
|
||||
*/
|
||||
@ -270,6 +296,7 @@ int mca_pml_teg_add_procs(ompi_proc_t** procs, size_t nprocs)
|
||||
*proc_new = *proc_ptl;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
|
@ -42,7 +42,8 @@ struct mca_pml_teg_t {
|
||||
int teg_free_list_inc; /* number of elements to grow free list */
|
||||
int teg_poll_iterations; /* number of iterations to poll for completion */
|
||||
|
||||
/* free list of recv requests */
|
||||
/* free list of requests */
|
||||
ompi_free_list_t teg_send_requests;
|
||||
ompi_free_list_t teg_recv_requests;
|
||||
|
||||
#if MCA_PML_TEG_STATISTICS
|
||||
@ -56,9 +57,9 @@ struct mca_pml_teg_t {
|
||||
#endif
|
||||
|
||||
/* request completion */
|
||||
ompi_mutex_t teg_request_lock;
|
||||
ompi_condition_t teg_request_cond;
|
||||
volatile int teg_request_waiting;
|
||||
ompi_mutex_t teg_request_lock;
|
||||
ompi_condition_t teg_request_cond;
|
||||
volatile int teg_request_waiting;
|
||||
mca_pml_base_request_t teg_request_null;
|
||||
};
|
||||
typedef struct mca_pml_teg_t mca_pml_teg_t;
|
||||
@ -258,7 +259,7 @@ extern int mca_pml_teg_free(
|
||||
if(pml_request->req_free_called) { \
|
||||
MCA_PML_TEG_FREE(request); \
|
||||
} else { \
|
||||
pml_request->super.req_state = OMPI_REQUEST_INACTIVE; \
|
||||
pml_request->req_ompi.req_state = OMPI_REQUEST_INACTIVE; \
|
||||
} \
|
||||
} else { \
|
||||
MCA_PML_TEG_FREE(request); \
|
||||
@ -277,11 +278,10 @@ extern int mca_pml_teg_free(
|
||||
case MCA_PML_REQUEST_SEND: \
|
||||
{ \
|
||||
mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)pml_request; \
|
||||
mca_ptl_t* ptl = sendreq->req_owner; \
|
||||
if(sendreq->req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \
|
||||
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \
|
||||
} \
|
||||
ptl->ptl_request_return(ptl, sendreq); \
|
||||
MCA_PML_TEG_SEND_REQUEST_RETURN(sendreq); \
|
||||
break; \
|
||||
} \
|
||||
case MCA_PML_REQUEST_RECV: \
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include "pml_teg.h"
|
||||
#include "pml_teg_sendreq.h"
|
||||
#include "mca/pml/base/pml_base_request.h"
|
||||
|
||||
|
||||
|
@ -11,8 +11,8 @@ int mca_pml_teg_iprobe(
|
||||
int rc;
|
||||
|
||||
mca_pml_base_recv_request_t recvreq;
|
||||
recvreq.super.super.req_type = OMPI_REQUEST_PML;
|
||||
recvreq.super.req_type = MCA_PML_REQUEST_IPROBE;
|
||||
recvreq.req_base.req_ompi.req_type = OMPI_REQUEST_PML;
|
||||
recvreq.req_base.req_type = MCA_PML_REQUEST_IPROBE;
|
||||
MCA_PML_BASE_RECV_REQUEST_INIT(
|
||||
&recvreq,
|
||||
NULL,
|
||||
@ -27,8 +27,8 @@ int mca_pml_teg_iprobe(
|
||||
OBJ_DESTRUCT(&recvreq);
|
||||
return rc;
|
||||
}
|
||||
if((*matched = recvreq.super.req_mpi_done) == true && (NULL != status)) {
|
||||
*status = recvreq.super.req_status;
|
||||
if((*matched = recvreq.req_base.req_mpi_done) == true && (NULL != status)) {
|
||||
*status = recvreq.req_base.req_status;
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
@ -42,8 +42,8 @@ int mca_pml_teg_probe(
|
||||
{
|
||||
int rc;
|
||||
mca_pml_base_recv_request_t recvreq;
|
||||
recvreq.super.super.req_type = OMPI_REQUEST_PML;
|
||||
recvreq.super.req_type = MCA_PML_REQUEST_PROBE;
|
||||
recvreq.req_base.req_ompi.req_type = OMPI_REQUEST_PML;
|
||||
recvreq.req_base.req_type = MCA_PML_REQUEST_PROBE;
|
||||
MCA_PML_BASE_RECV_REQUEST_INIT(
|
||||
&recvreq,
|
||||
NULL,
|
||||
@ -59,25 +59,25 @@ int mca_pml_teg_probe(
|
||||
return rc;
|
||||
}
|
||||
|
||||
if(recvreq.super.req_mpi_done == false) {
|
||||
if(recvreq.req_base.req_mpi_done == false) {
|
||||
/* give up and sleep until completion */
|
||||
if(ompi_using_threads()) {
|
||||
ompi_mutex_lock(&mca_pml_teg.teg_request_lock);
|
||||
mca_pml_teg.teg_request_waiting++;
|
||||
while(recvreq.super.req_mpi_done == false)
|
||||
while(recvreq.req_base.req_mpi_done == false)
|
||||
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
|
||||
mca_pml_teg.teg_request_waiting--;
|
||||
ompi_mutex_unlock(&mca_pml_teg.teg_request_lock);
|
||||
} else {
|
||||
mca_pml_teg.teg_request_waiting++;
|
||||
while(recvreq.super.req_mpi_done == false)
|
||||
while(recvreq.req_base.req_mpi_done == false)
|
||||
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
|
||||
mca_pml_teg.teg_request_waiting--;
|
||||
}
|
||||
}
|
||||
|
||||
if(NULL != status) {
|
||||
*status = recvreq.super.req_status;
|
||||
*status = recvreq.req_base.req_status;
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
@ -101,18 +101,18 @@ int mca_pml_teg_recv(
|
||||
return rc;
|
||||
}
|
||||
|
||||
if(recvreq->super.req_mpi_done == false) {
|
||||
if(recvreq->req_base.req_mpi_done == false) {
|
||||
/* give up and sleep until completion */
|
||||
if(ompi_using_threads()) {
|
||||
ompi_mutex_lock(&mca_pml_teg.teg_request_lock);
|
||||
mca_pml_teg.teg_request_waiting++;
|
||||
while(recvreq->super.req_mpi_done == false)
|
||||
while(recvreq->req_base.req_mpi_done == false)
|
||||
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
|
||||
mca_pml_teg.teg_request_waiting--;
|
||||
ompi_mutex_unlock(&mca_pml_teg.teg_request_lock);
|
||||
} else {
|
||||
mca_pml_teg.teg_request_waiting++;
|
||||
while(recvreq->super.req_mpi_done == false)
|
||||
while(recvreq->req_base.req_mpi_done == false)
|
||||
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
|
||||
mca_pml_teg.teg_request_waiting--;
|
||||
}
|
||||
|
@ -113,18 +113,18 @@ int mca_pml_teg_send(
|
||||
return rc;
|
||||
}
|
||||
|
||||
if(sendreq->super.req_mpi_done == false) {
|
||||
if(sendreq->req_base.req_mpi_done == false) {
|
||||
/* give up and sleep until completion */
|
||||
if(ompi_using_threads()) {
|
||||
ompi_mutex_lock(&mca_pml_teg.teg_request_lock);
|
||||
mca_pml_teg.teg_request_waiting++;
|
||||
while(sendreq->super.req_mpi_done == false)
|
||||
while(sendreq->req_base.req_mpi_done == false)
|
||||
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
|
||||
mca_pml_teg.teg_request_waiting--;
|
||||
ompi_mutex_unlock(&mca_pml_teg.teg_request_lock);
|
||||
} else {
|
||||
mca_pml_teg.teg_request_waiting++;
|
||||
while(sendreq->super.req_mpi_done == false)
|
||||
while(sendreq->req_base.req_mpi_done == false)
|
||||
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
|
||||
mca_pml_teg.teg_request_waiting--;
|
||||
}
|
||||
|
@ -62,6 +62,7 @@ int mca_pml_teg_module_open(void)
|
||||
{
|
||||
mca_pml_base_request_t* teg_null = &mca_pml_teg.teg_request_null;
|
||||
OBJ_CONSTRUCT(&mca_pml_teg.teg_lock, ompi_mutex_t);
|
||||
OBJ_CONSTRUCT(&mca_pml_teg.teg_send_requests, ompi_free_list_t);
|
||||
OBJ_CONSTRUCT(&mca_pml_teg.teg_recv_requests, ompi_free_list_t);
|
||||
OBJ_CONSTRUCT(&mca_pml_teg.teg_procs, ompi_list_t);
|
||||
|
||||
@ -124,6 +125,7 @@ int mca_pml_teg_module_close(void)
|
||||
free(mca_pml_teg.teg_ptl_modules);
|
||||
if(NULL != mca_pml_teg.teg_ptls)
|
||||
free(mca_pml_teg.teg_ptls);
|
||||
OBJ_DESTRUCT(&mca_pml_teg.teg_send_requests);
|
||||
OBJ_DESTRUCT(&mca_pml_teg.teg_recv_requests);
|
||||
OBJ_DESTRUCT(&mca_pml_teg.teg_procs);
|
||||
OBJ_DESTRUCT(&mca_pml_teg.teg_lock);
|
||||
|
@ -19,8 +19,8 @@
|
||||
*/
|
||||
struct mca_pml_proc_t {
|
||||
ompi_list_item_t super;
|
||||
ompi_proc_t *proc_ompi; /**< back-pointer to ompi_proc_t */
|
||||
ompi_mutex_t proc_lock; /**< lock to protect against concurrent access */
|
||||
ompi_proc_t *proc_ompi; /**< back-pointer to ompi_proc_t */
|
||||
ompi_mutex_t proc_lock; /**< lock to protect against concurrent access */
|
||||
mca_ptl_array_t proc_ptl_first; /**< array of ptls to use for first fragments */
|
||||
mca_ptl_array_t proc_ptl_next; /**< array of ptls to use for remaining fragments */
|
||||
uint32_t proc_ptl_flags; /**< aggregate ptl flags */
|
||||
|
23
src/mca/pml/teg/src/pml_teg_ptl.c
Обычный файл
23
src/mca/pml/teg/src/pml_teg_ptl.c
Обычный файл
@ -0,0 +1,23 @@
|
||||
#include "pml_teg_ptl.h"
|
||||
|
||||
|
||||
static void mca_pml_base_ptl_construct(mca_pml_base_ptl_t* ptl)
|
||||
{
|
||||
OBJ_CONSTRUCT(&ptl->ptl_cache, ompi_list_t);
|
||||
OBJ_CONSTRUCT(&ptl->ptl_cache_lock, ompi_mutex_t);
|
||||
ptl->ptl = NULL;
|
||||
}
|
||||
|
||||
static void mca_pml_base_ptl_destruct(mca_pml_base_ptl_t* ptl)
|
||||
{
|
||||
OBJ_DESTRUCT(&ptl->ptl_cache);
|
||||
OBJ_DESTRUCT(&ptl->ptl_cache_lock);
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(
|
||||
mca_pml_base_ptl_t,
|
||||
ompi_list_t,
|
||||
mca_pml_base_ptl_construct,
|
||||
mca_pml_base_ptl_destruct
|
||||
);
|
||||
|
18
src/mca/pml/teg/src/pml_teg_ptl.h
Обычный файл
18
src/mca/pml/teg/src/pml_teg_ptl.h
Обычный файл
@ -0,0 +1,18 @@
|
||||
#ifndef _MCA_PML_BASE_PTL_
|
||||
#define _MCA_PML_BASE_PTL_
|
||||
|
||||
#include "mca/ptl/ptl.h"
|
||||
|
||||
struct mca_pml_base_ptl_t {
|
||||
ompi_list_t ptl_cache; /**< cache of send requests */
|
||||
size_t ptl_cache_size; /**< maximum size of cache */
|
||||
ompi_mutex_t ptl_cache_lock; /**< lock for queue access */
|
||||
struct mca_ptl_t* ptl; /**< back pointer to ptl */
|
||||
};
|
||||
typedef struct mca_pml_base_ptl_t mca_pml_base_ptl_t;
|
||||
|
||||
OBJ_CLASS_DECLARATION(mca_pml_base_ptl_t);
|
||||
|
||||
|
||||
#endif
|
||||
|
@ -14,19 +14,20 @@ static mca_ptl_base_recv_frag_t* mca_pml_teg_recv_request_match_specific_proc(
|
||||
void mca_pml_teg_recv_request_progress(
|
||||
struct mca_ptl_t* ptl,
|
||||
mca_pml_base_recv_request_t* req,
|
||||
mca_ptl_base_recv_frag_t* frag)
|
||||
size_t bytes_received,
|
||||
size_t bytes_delivered)
|
||||
{
|
||||
OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock);
|
||||
req->req_bytes_delivered += frag->super.frag_size;
|
||||
req->req_bytes_received += frag->super.frag_header.hdr_frag.hdr_frag_length;
|
||||
req->req_bytes_received += bytes_received;
|
||||
req->req_bytes_delivered += bytes_delivered;
|
||||
if (req->req_bytes_received >= req->req_bytes_packed) {
|
||||
/* initialize request status */
|
||||
req->super.req_status.MPI_SOURCE = req->super.req_peer;
|
||||
req->super.req_status.MPI_TAG = req->super.req_tag;
|
||||
req->super.req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
req->super.req_status._count = req->req_bytes_delivered;
|
||||
req->super.req_pml_done = true;
|
||||
req->super.req_mpi_done = true;
|
||||
req->req_base.req_status.MPI_SOURCE = req->req_base.req_peer;
|
||||
req->req_base.req_status.MPI_TAG = req->req_base.req_tag;
|
||||
req->req_base.req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
req->req_base.req_status._count = req->req_bytes_delivered;
|
||||
req->req_base.req_pml_done = true;
|
||||
req->req_base.req_mpi_done = true;
|
||||
if(mca_pml_teg.teg_request_waiting) {
|
||||
#if MCA_PML_TEG_STATISTICS
|
||||
mca_pml_teg.teg_condition_broadcasts++;
|
||||
@ -46,20 +47,20 @@ void mca_pml_teg_recv_request_progress(
|
||||
|
||||
void mca_pml_teg_recv_request_match_specific(mca_pml_base_recv_request_t* request)
|
||||
{
|
||||
ompi_communicator_t *comm = request->super.req_comm;
|
||||
ompi_communicator_t *comm = request->req_base.req_comm;
|
||||
mca_pml_ptl_comm_t* pml_comm = comm->c_pml_comm;
|
||||
int req_peer = request->super.req_peer;
|
||||
int req_peer = request->req_base.req_peer;
|
||||
mca_ptl_base_recv_frag_t* frag;
|
||||
|
||||
/* check for a specific match */
|
||||
OMPI_THREAD_LOCK(&pml_comm->c_matching_lock);
|
||||
|
||||
/* assign sequence number */
|
||||
request->super.req_sequence = pml_comm->c_recv_seq++;
|
||||
request->req_base.req_sequence = pml_comm->c_recv_seq++;
|
||||
|
||||
if (ompi_list_get_size(&pml_comm->c_unexpected_frags[req_peer]) > 0 &&
|
||||
(frag = mca_pml_teg_recv_request_match_specific_proc(request, req_peer)) != NULL) {
|
||||
mca_ptl_t* ptl = frag->super.frag_owner;
|
||||
mca_ptl_t* ptl = frag->frag_base.frag_owner;
|
||||
OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock);
|
||||
ptl->ptl_matched(ptl, frag);
|
||||
return; /* match found */
|
||||
@ -68,7 +69,7 @@ void mca_pml_teg_recv_request_match_specific(mca_pml_base_recv_request_t* reques
|
||||
/* We didn't find any matches. Record this irecv so we can match
|
||||
* it when the message comes in.
|
||||
*/
|
||||
if(request->super.req_type != MCA_PML_REQUEST_IPROBE)
|
||||
if(request->req_base.req_type != MCA_PML_REQUEST_IPROBE)
|
||||
ompi_list_append(pml_comm->c_specific_receives+req_peer, (ompi_list_item_t*)request);
|
||||
OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock);
|
||||
}
|
||||
@ -81,7 +82,7 @@ void mca_pml_teg_recv_request_match_specific(mca_pml_base_recv_request_t* reques
|
||||
|
||||
void mca_pml_teg_recv_request_match_wild(mca_pml_base_recv_request_t* request)
|
||||
{
|
||||
ompi_communicator_t *comm = request->super.req_comm;
|
||||
ompi_communicator_t *comm = request->req_base.req_comm;
|
||||
mca_pml_ptl_comm_t* pml_comm = comm->c_pml_comm;
|
||||
int proc_count = comm->c_remote_group->grp_proc_count;
|
||||
int proc;
|
||||
@ -95,7 +96,7 @@ void mca_pml_teg_recv_request_match_wild(mca_pml_base_recv_request_t* request)
|
||||
OMPI_THREAD_LOCK(&pml_comm->c_matching_lock);
|
||||
|
||||
/* assign sequence number */
|
||||
request->super.req_sequence = pml_comm->c_recv_seq++;
|
||||
request->req_base.req_sequence = pml_comm->c_recv_seq++;
|
||||
|
||||
for (proc = 0; proc < proc_count; proc++) {
|
||||
mca_ptl_base_recv_frag_t* frag;
|
||||
@ -106,7 +107,7 @@ void mca_pml_teg_recv_request_match_wild(mca_pml_base_recv_request_t* request)
|
||||
|
||||
/* loop over messages from the current proc */
|
||||
if ((frag = mca_pml_teg_recv_request_match_specific_proc(request, proc)) != NULL) {
|
||||
mca_ptl_t* ptl = frag->super.frag_owner;
|
||||
mca_ptl_t* ptl = frag->frag_base.frag_owner;
|
||||
OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock);
|
||||
ptl->ptl_matched(ptl, frag);
|
||||
return; /* match found */
|
||||
@ -117,7 +118,7 @@ void mca_pml_teg_recv_request_match_wild(mca_pml_base_recv_request_t* request)
|
||||
* it when the message comes in.
|
||||
*/
|
||||
|
||||
if(request->super.req_type != MCA_PML_REQUEST_IPROBE)
|
||||
if(request->req_base.req_type != MCA_PML_REQUEST_IPROBE)
|
||||
ompi_list_append(&pml_comm->c_wild_receives, (ompi_list_item_t*)request);
|
||||
OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock);
|
||||
}
|
||||
@ -131,15 +132,15 @@ void mca_pml_teg_recv_request_match_wild(mca_pml_base_recv_request_t* request)
|
||||
static mca_ptl_base_recv_frag_t* mca_pml_teg_recv_request_match_specific_proc(
|
||||
mca_pml_base_recv_request_t* request, int proc)
|
||||
{
|
||||
mca_pml_ptl_comm_t *pml_comm = request->super.req_comm->c_pml_comm;
|
||||
mca_pml_ptl_comm_t *pml_comm = request->req_base.req_comm->c_pml_comm;
|
||||
ompi_list_t* unexpected_frags = pml_comm->c_unexpected_frags+proc;
|
||||
mca_ptl_base_recv_frag_t* frag;
|
||||
int tag = request->super.req_tag;
|
||||
int tag = request->req_base.req_tag;
|
||||
|
||||
for (frag = (mca_ptl_base_recv_frag_t*)ompi_list_get_first(unexpected_frags);
|
||||
frag != (mca_ptl_base_recv_frag_t*)ompi_list_get_end(unexpected_frags);
|
||||
frag = (mca_ptl_base_recv_frag_t*)ompi_list_get_next(frag)) {
|
||||
mca_ptl_base_match_header_t* header = &frag->super.frag_header.hdr_match;
|
||||
mca_ptl_base_match_header_t* header = &frag->frag_base.frag_header.hdr_match;
|
||||
|
||||
/* check first frag - we assume that process matching has been done already */
|
||||
if (((tag == OMPI_ANY_TAG) || (tag == header->hdr_tag))) {
|
||||
@ -149,8 +150,8 @@ static mca_ptl_base_recv_frag_t* mca_pml_teg_recv_request_match_specific_proc(
|
||||
}
|
||||
ompi_list_remove_item(unexpected_frags, (ompi_list_item_t*)frag);
|
||||
request->req_bytes_packed = header->hdr_msg_length;
|
||||
request->super.req_tag = header->hdr_tag;
|
||||
request->super.req_peer = header->hdr_src;
|
||||
request->req_base.req_tag = header->hdr_tag;
|
||||
request->req_base.req_peer = header->hdr_src;
|
||||
frag->frag_request = request;
|
||||
return frag;
|
||||
}
|
||||
|
@ -58,8 +58,8 @@ void mca_pml_teg_recv_request_match_specific(mca_pml_base_recv_request_t* reques
|
||||
*/
|
||||
static inline int mca_pml_teg_recv_request_start(mca_pml_base_recv_request_t* request)
|
||||
{
|
||||
request->super.super.req_state = OMPI_REQUEST_ACTIVE;
|
||||
if(request->super.req_peer == OMPI_ANY_SOURCE) {
|
||||
request->req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE;
|
||||
if(request->req_base.req_peer == OMPI_ANY_SOURCE) {
|
||||
mca_pml_teg_recv_request_match_wild(request);
|
||||
} else {
|
||||
mca_pml_teg_recv_request_match_specific(request);
|
||||
@ -71,14 +71,16 @@ static inline int mca_pml_teg_recv_request_start(mca_pml_base_recv_request_t* re
|
||||
* Update status of a recv request based on the completion status of
|
||||
* the receive fragment.
|
||||
*
|
||||
* @param ptl (IN) The PTL pointer.
|
||||
* @param request (IN) Receive request.
|
||||
* @param frag (IN) Receive fragment.
|
||||
* @param ptl (IN) The PTL pointer.
|
||||
* @param request (IN) Receive request.
|
||||
* @param bytes_received (IN) Bytes received from peer.
|
||||
* @param bytes_delivered (IN) Bytes delivered to application.
|
||||
*/
|
||||
void mca_pml_teg_recv_request_progress(
|
||||
struct mca_ptl_t* ptl,
|
||||
mca_pml_base_recv_request_t* request,
|
||||
mca_ptl_base_recv_frag_t* frag
|
||||
size_t bytes_received,
|
||||
size_t bytes_delivered
|
||||
);
|
||||
|
||||
#endif
|
||||
|
@ -20,7 +20,7 @@
|
||||
|
||||
void mca_pml_teg_send_request_schedule(mca_pml_base_send_request_t* req)
|
||||
{
|
||||
ompi_proc_t *proc = ompi_comm_peer_lookup(req->super.req_comm, req->super.req_peer);
|
||||
ompi_proc_t *proc = ompi_comm_peer_lookup(req->req_base.req_comm, req->req_base.req_peer);
|
||||
mca_pml_proc_t* proc_pml = proc->proc_pml;
|
||||
|
||||
/* allocate remaining bytes to PTLs */
|
||||
@ -60,7 +60,7 @@ void mca_pml_teg_send_request_schedule(mca_pml_base_send_request_t* req)
|
||||
/* unable to complete send - signal request failed */
|
||||
if(bytes_remaining > 0) {
|
||||
OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock);
|
||||
req->super.req_mpi_done = true;
|
||||
req->req_base.req_mpi_done = true;
|
||||
/* FIX - set status correctly */
|
||||
if(mca_pml_teg.teg_request_waiting)
|
||||
ompi_condition_broadcast(&mca_pml_teg.teg_request_cond);
|
||||
@ -82,24 +82,24 @@ void mca_pml_teg_send_request_schedule(mca_pml_base_send_request_t* req)
|
||||
void mca_pml_teg_send_request_progress(
|
||||
struct mca_ptl_t* ptl,
|
||||
mca_pml_base_send_request_t* req,
|
||||
mca_ptl_base_send_frag_t* frag)
|
||||
size_t bytes_sent)
|
||||
{
|
||||
bool first_frag;
|
||||
OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock);
|
||||
first_frag = (req->req_bytes_sent == 0 && req->req_bytes_packed > 0);
|
||||
req->req_bytes_sent += frag->super.frag_size;
|
||||
req->req_bytes_sent += bytes_sent;
|
||||
if (req->req_bytes_sent >= req->req_bytes_packed) {
|
||||
req->super.req_pml_done = true;
|
||||
if (req->super.req_mpi_done == false) {
|
||||
req->super.req_status.MPI_SOURCE = req->super.req_comm->c_my_rank;
|
||||
req->super.req_status.MPI_TAG = req->super.req_tag;
|
||||
req->super.req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
req->super.req_status._count = req->req_bytes_sent;
|
||||
req->super.req_mpi_done = true;
|
||||
req->req_base.req_pml_done = true;
|
||||
if (req->req_base.req_mpi_done == false) {
|
||||
req->req_base.req_status.MPI_SOURCE = req->req_base.req_comm->c_my_rank;
|
||||
req->req_base.req_status.MPI_TAG = req->req_base.req_tag;
|
||||
req->req_base.req_status.MPI_ERROR = OMPI_SUCCESS;
|
||||
req->req_base.req_status._count = req->req_bytes_sent;
|
||||
req->req_base.req_mpi_done = true;
|
||||
if(mca_pml_teg.teg_request_waiting) {
|
||||
ompi_condition_broadcast(&mca_pml_teg.teg_request_cond);
|
||||
}
|
||||
} else if (req->super.req_free_called) {
|
||||
} else if (req->req_base.req_free_called) {
|
||||
MCA_PML_TEG_FREE((ompi_request_t**)&req);
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
|
||||
|
@ -7,48 +7,97 @@
|
||||
#ifndef OMPI_PML_TEG_SEND_REQUEST_H
|
||||
#define OMPI_PML_TEG_SEND_REQUEST_H
|
||||
|
||||
#include "pml_teg_proc.h"
|
||||
#include "mca/ptl/ptl.h"
|
||||
#include "mca/pml/base/pml_base_sendreq.h"
|
||||
#include "mca/ptl/base/ptl_base_sendfrag.h"
|
||||
#include "mca/ptl/base/ptl_base_comm.h"
|
||||
#include "pml_teg_proc.h"
|
||||
#include "pml_teg_ptl.h"
|
||||
|
||||
|
||||
#define MCA_PML_TEG_SEND_REQUEST_ALLOC( \
|
||||
comm, \
|
||||
dst, \
|
||||
sendreq, \
|
||||
rc) \
|
||||
{ \
|
||||
mca_pml_proc_t *proc = mca_pml_teg_proc_lookup_remote(comm,dst); \
|
||||
mca_ptl_proc_t* ptl_proc; \
|
||||
mca_ptl_t* ptl; \
|
||||
\
|
||||
THREAD_SCOPED_LOCK(&proc->proc_lock, \
|
||||
(ptl_proc = mca_ptl_array_get_next(&proc->proc_ptl_first))); \
|
||||
ptl = ptl_proc->ptl; \
|
||||
rc = ptl->ptl_request_alloc(ptl,&sendreq); \
|
||||
if(NULL != sendreq) \
|
||||
sendreq->req_peer = ptl_proc->ptl_peer; \
|
||||
#define MCA_PML_TEG_SEND_REQUEST_ALLOC( \
|
||||
comm, \
|
||||
dst, \
|
||||
sendreq, \
|
||||
rc) \
|
||||
{ \
|
||||
mca_pml_proc_t *proc = mca_pml_teg_proc_lookup_remote(comm,dst); \
|
||||
mca_ptl_proc_t* ptl_proc; \
|
||||
mca_pml_base_ptl_t* ptl_base; \
|
||||
\
|
||||
THREAD_SCOPED_LOCK(&proc->proc_lock, \
|
||||
(ptl_proc = mca_ptl_array_get_next(&proc->proc_ptl_first))); \
|
||||
ptl_base = ptl_proc->ptl_base; \
|
||||
/* \
|
||||
* check to see if there is a cache of send requests associated with \
|
||||
* this ptl - if so try the allocation from there. \
|
||||
*/ \
|
||||
if(NULL != ptl_base) { \
|
||||
OMPI_THREAD_LOCK(&ptl_base->ptl_cache_lock); \
|
||||
sendreq = (mca_pml_base_send_request_t*) \
|
||||
ompi_list_remove_first(&ptl_base->ptl_cache); \
|
||||
OMPI_THREAD_UNLOCK(&ptl_base->ptl_cache_lock); \
|
||||
if(NULL != sendreq) { \
|
||||
rc = OMPI_SUCCESS; \
|
||||
} else { \
|
||||
mca_ptl_t* ptl = ptl_base->ptl; \
|
||||
ompi_list_item_t* item; \
|
||||
OMPI_FREE_LIST_WAIT(&mca_pml_teg.teg_send_requests, item, rc); \
|
||||
sendreq = (mca_pml_base_send_request_t*)item; \
|
||||
sendreq->req_ptl = ptl; \
|
||||
sendreq->req_peer = ptl_proc->ptl_peer; \
|
||||
ptl->ptl_request_init(ptl, sendreq); \
|
||||
} \
|
||||
\
|
||||
/* otherwise - take the allocation from the global list */ \
|
||||
} else { \
|
||||
ompi_list_item_t* item; \
|
||||
OMPI_FREE_LIST_WAIT(&mca_pml_teg.teg_send_requests, item, rc); \
|
||||
sendreq = (mca_pml_base_send_request_t*)item; \
|
||||
sendreq->req_ptl = ptl_proc->ptl; \
|
||||
sendreq->req_peer = ptl_proc->ptl_peer; \
|
||||
} \
|
||||
}
|
||||
|
||||
#define MCA_PML_TEG_SEND_REQUEST_RETURN(request) \
|
||||
request->req_owner->ptl_request_return(request->req_owner, request);
|
||||
|
||||
#define MCA_PML_TEG_SEND_REQUEST_RETURN(request) \
|
||||
{ \
|
||||
mca_ptl_t* ptl = sendreq->req_ptl; \
|
||||
mca_pml_base_ptl_t* ptl_base = ptl->ptl_base; \
|
||||
/* \
|
||||
* If there is a cache associated with the ptl - first attempt \
|
||||
* to return the send descriptor to the cache. \
|
||||
*/ \
|
||||
if(NULL != ptl->ptl_base) { \
|
||||
OMPI_THREAD_LOCK(&ptl_base->ptl_cache_lock); \
|
||||
if(ompi_list_get_size(&ptl_base->ptl_cache) >= ptl_base->ptl_cache_size) { \
|
||||
/* if cache limit is exceeded - return to global pool */ \
|
||||
ptl->ptl_request_fini(ptl, sendreq); \
|
||||
OMPI_FREE_LIST_RETURN( \
|
||||
&mca_pml_teg.teg_send_requests, (ompi_list_item_t*)sendreq); \
|
||||
} else { \
|
||||
ompi_list_prepend(&ptl_base->ptl_cache, (ompi_list_item_t*)sendreq); \
|
||||
} \
|
||||
OMPI_THREAD_UNLOCK(&ptl_base->ptl_cache_lock); \
|
||||
} else { \
|
||||
OMPI_FREE_LIST_RETURN( \
|
||||
&mca_pml_teg.teg_send_requests, (ompi_list_item_t*)request); \
|
||||
} \
|
||||
}
|
||||
|
||||
|
||||
static inline int mca_pml_teg_send_request_start(
|
||||
mca_pml_base_send_request_t* req)
|
||||
{
|
||||
mca_ptl_t* ptl = req->req_owner;
|
||||
mca_ptl_t* ptl = req->req_ptl;
|
||||
size_t first_fragment_size = ptl->ptl_first_frag_size;
|
||||
size_t offset = req->req_offset;
|
||||
int flags, rc;
|
||||
|
||||
/* initialize request state and message sequence number */
|
||||
req->super.super.req_state = OMPI_REQUEST_ACTIVE;
|
||||
req->super.req_sequence = mca_pml_ptl_comm_send_sequence(
|
||||
req->super.req_comm->c_pml_comm,
|
||||
req->super.req_peer);
|
||||
req->req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE;
|
||||
req->req_base.req_sequence = mca_pml_ptl_comm_send_sequence(
|
||||
req->req_base.req_comm->c_pml_comm,
|
||||
req->req_base.req_peer);
|
||||
|
||||
/* start the first fragment */
|
||||
if(first_fragment_size == 0 || req->req_bytes_packed <= first_fragment_size) {
|
||||
@ -59,7 +108,7 @@ static inline int mca_pml_teg_send_request_start(
|
||||
/* require match for first fragment of a multi-fragment */
|
||||
flags = MCA_PTL_FLAGS_ACK_MATCHED;
|
||||
}
|
||||
rc = ptl->ptl_put(ptl, req->req_peer, req, offset, first_fragment_size, flags);
|
||||
rc = ptl->ptl_send(ptl, req->req_peer, req, offset, first_fragment_size, flags);
|
||||
if(rc != OMPI_SUCCESS)
|
||||
return rc;
|
||||
return OMPI_SUCCESS;
|
||||
@ -72,7 +121,7 @@ void mca_pml_teg_send_request_schedule(mca_pml_base_send_request_t* req);
|
||||
void mca_pml_teg_send_request_progress(
|
||||
struct mca_ptl_t* ptl,
|
||||
mca_pml_base_send_request_t* send_request,
|
||||
mca_ptl_base_send_frag_t* send_frag
|
||||
size_t bytes_sent
|
||||
);
|
||||
|
||||
#endif
|
||||
|
@ -18,7 +18,7 @@ int mca_pml_teg_start(size_t count, ompi_request_t** requests)
|
||||
* when the request completes - and create a new request.
|
||||
*/
|
||||
|
||||
switch(pml_request->super.req_state) {
|
||||
switch(pml_request->req_ompi.req_state) {
|
||||
case OMPI_REQUEST_INVALID:
|
||||
return OMPI_ERR_REQUEST;
|
||||
case OMPI_REQUEST_INACTIVE:
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include "pml_teg.h"
|
||||
#include "pml_teg_sendreq.h"
|
||||
|
||||
|
||||
int mca_pml_teg_test(
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include "pml_teg.h"
|
||||
#include "pml_teg_sendreq.h"
|
||||
#include "mca/ptl/base/ptl_base_comm.h"
|
||||
#include "mca/pml/base/pml_base_request.h"
|
||||
|
||||
|
@ -264,7 +264,7 @@ static mca_pml_base_recv_request_t *mca_ptl_base_check_wild_receives_for_match(
|
||||
wild_recv = (mca_pml_base_recv_request_t *)
|
||||
((ompi_list_item_t *)wild_recv)->ompi_list_next) {
|
||||
|
||||
recv_tag = wild_recv->super.req_tag;
|
||||
recv_tag = wild_recv->req_base.req_tag;
|
||||
if (
|
||||
/* exact tag match */
|
||||
(frag_tag == recv_tag) ||
|
||||
@ -332,7 +332,7 @@ static mca_pml_base_recv_request_t *mca_ptl_base_check_specific_receives_for_mat
|
||||
/*
|
||||
* Check for a match
|
||||
*/
|
||||
recv_tag = specific_recv->super.req_tag;
|
||||
recv_tag = specific_recv->req_base.req_tag;
|
||||
if ( (frag_tag == recv_tag) ||
|
||||
( (recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) {
|
||||
|
||||
@ -391,8 +391,8 @@ static mca_pml_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive
|
||||
wild_recv = (mca_pml_base_recv_request_t *)
|
||||
ompi_list_get_first(&(pml_comm->c_wild_receives));
|
||||
|
||||
specific_recv_seq = specific_recv->super.req_sequence;
|
||||
wild_recv_seq = wild_recv->super.req_sequence;
|
||||
specific_recv_seq = specific_recv->req_base.req_sequence;
|
||||
wild_recv_seq = wild_recv->req_base.req_sequence;
|
||||
|
||||
while (true) {
|
||||
if (wild_recv_seq < specific_recv_seq) {
|
||||
@ -402,7 +402,7 @@ static mca_pml_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive
|
||||
/*
|
||||
* try and match
|
||||
*/
|
||||
wild_recv_tag = wild_recv->super.req_tag;
|
||||
wild_recv_tag = wild_recv->req_base.req_tag;
|
||||
if ( (frag_tag == wild_recv_tag) ||
|
||||
( (wild_recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) {
|
||||
|
||||
@ -441,13 +441,13 @@ static mca_pml_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive
|
||||
* Get the sequence number for this recv, and go
|
||||
* back to the top of the loop.
|
||||
*/
|
||||
wild_recv_seq = wild_recv->super.req_sequence;
|
||||
wild_recv_seq = wild_recv->req_base.req_sequence;
|
||||
|
||||
} else {
|
||||
/*
|
||||
* specific recv is earlier than the wild one.
|
||||
*/
|
||||
specific_recv_tag=specific_recv->super.req_tag;
|
||||
specific_recv_tag=specific_recv->req_base.req_tag;
|
||||
if ( (frag_tag == specific_recv_tag) ||
|
||||
( (specific_recv_tag == OMPI_ANY_TAG) && (0<=frag_tag)) )
|
||||
{
|
||||
@ -486,7 +486,7 @@ static mca_pml_base_recv_request_t *mca_ptl_base_check_specific_and_wild_receive
|
||||
* Get the sequence number for this recv, and go
|
||||
* back to the top of the loop.
|
||||
*/
|
||||
specific_recv_seq = specific_recv->super.req_sequence;
|
||||
specific_recv_seq = specific_recv->req_base.req_sequence;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -545,7 +545,7 @@ static void mca_ptl_base_check_cantmatch_for_match(ompi_list_t *additional_match
|
||||
/*
|
||||
* If the message has the next expected seq from that proc...
|
||||
*/
|
||||
frag_seq=frag_desc->super.frag_header.hdr_match.hdr_msg_seq;
|
||||
frag_seq=frag_desc->frag_base.frag_header.hdr_match.hdr_msg_seq;
|
||||
if (frag_seq == next_msg_seq_expected) {
|
||||
|
||||
/* We're now expecting the next sequence number. */
|
||||
@ -564,7 +564,7 @@ static void mca_ptl_base_check_cantmatch_for_match(ompi_list_t *additional_match
|
||||
* check to see if this frag matches a posted message
|
||||
*/
|
||||
matched_receive = mca_ptl_base_check_receives_for_match(
|
||||
&frag_desc->super.frag_header.hdr_match, pml_comm);
|
||||
&frag_desc->frag_base.frag_header.hdr_match, pml_comm);
|
||||
|
||||
/* if match found, process data */
|
||||
if (matched_receive) {
|
||||
|
@ -18,7 +18,7 @@ extern ompi_class_t mca_ptl_base_recv_frag_t_class;
|
||||
* Base type for receive fragment descriptors.
|
||||
*/
|
||||
struct mca_ptl_base_recv_frag_t {
|
||||
mca_ptl_base_frag_t super; /**< base fragment descriptor */
|
||||
mca_ptl_base_frag_t frag_base; /**< base fragment descriptor */
|
||||
mca_pml_base_recv_request_t *frag_request; /**< matched posted receive */
|
||||
bool frag_is_buffered; /**< does fragment need to be unpacked into users buffer */
|
||||
};
|
||||
@ -45,23 +45,27 @@ static inline bool mca_ptl_base_recv_frag_match(
|
||||
frag = (mca_ptl_base_recv_frag_t*)ompi_list_remove_first(&matched_frags);
|
||||
|
||||
while(NULL != frag) {
|
||||
mca_ptl_t* ptl = frag->super.frag_owner;
|
||||
mca_ptl_t* ptl = frag->frag_base.frag_owner;
|
||||
mca_pml_base_recv_request_t *request = frag->frag_request;
|
||||
mca_ptl_base_match_header_t *header = &frag->super.frag_header.hdr_match;
|
||||
mca_ptl_base_match_header_t *header = &frag->frag_base.frag_header.hdr_match;
|
||||
|
||||
/*
|
||||
* Initialize request status.
|
||||
*/
|
||||
request->req_bytes_packed = header->hdr_msg_length;
|
||||
request->super.req_peer = header->hdr_src;
|
||||
request->super.req_tag = header->hdr_tag;
|
||||
request->req_base.req_peer = header->hdr_src;
|
||||
request->req_base.req_tag = header->hdr_tag;
|
||||
|
||||
/*
|
||||
* If probe - signal request is complete - but don't notify PTL
|
||||
*/
|
||||
if(request->super.req_type == MCA_PML_REQUEST_PROBE) {
|
||||
if(request->req_base.req_type == MCA_PML_REQUEST_PROBE) {
|
||||
|
||||
ptl->ptl_recv_progress(ptl, request, frag);
|
||||
ptl->ptl_recv_progress(
|
||||
ptl,
|
||||
request,
|
||||
frag->frag_base.frag_header.hdr_frag.hdr_frag_length,
|
||||
frag->frag_base.frag_size);
|
||||
matched = mca_ptl_base_recv_frag_match( ptl, frag, header );
|
||||
|
||||
} else {
|
||||
|
@ -16,7 +16,7 @@ extern ompi_class_t mca_ptl_base_send_frag_t_class;
|
||||
* Base type for send fragment descriptors
|
||||
*/
|
||||
struct mca_ptl_base_send_frag_t {
|
||||
mca_ptl_base_frag_t super; /**< base fragment descriptor */
|
||||
mca_ptl_base_frag_t frag_base; /**< base fragment descriptor */
|
||||
struct mca_pml_base_send_request_t *frag_request; /**< pointer to send request */
|
||||
};
|
||||
typedef struct mca_ptl_base_send_frag_t mca_ptl_base_send_frag_t;
|
||||
|
@ -299,39 +299,46 @@ typedef int (*mca_ptl_base_del_procs_fn_t)(
|
||||
);
|
||||
|
||||
/**
|
||||
* PML->PTL Allocate a send request from the PTL modules free list.
|
||||
* PML->PTL Initialize a send request for use by the PTL.
|
||||
*
|
||||
* @param ptl (IN) PTL instance
|
||||
* @param request (OUT) Pointer to allocated request.
|
||||
* @return Status indicating if allocation was successful.
|
||||
*
|
||||
* To reduce latency (number of required allocations), a derived
|
||||
* type of mca_pml_base_send_request_t is obtained from the PTL that
|
||||
* is selected to send the first fragment. The derived type should contain
|
||||
* space for the base request structure, the PTL first fragment,
|
||||
* and any other required buffer space.
|
||||
* To reduce latency (number of required allocations), the PML allocates additional
|
||||
* space along w/ each request - that may be used by the PTL for additional control
|
||||
* information (e.g. first fragment descriptor).
|
||||
*
|
||||
* The init function is called the first time the request is ued by the PTL. On
|
||||
* completion of the request - the PML will cache the request for later use by the
|
||||
* same PTL. When the request is re-used from the cache, the init function is NOT
|
||||
* called for subsequent sends.
|
||||
*/
|
||||
typedef int (*mca_ptl_base_request_alloc_fn_t)(
|
||||
typedef void (*mca_ptl_base_request_init_fn_t)(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_pml_base_send_request_t** request
|
||||
struct mca_pml_base_send_request_t* request
|
||||
);
|
||||
|
||||
|
||||
/**
|
||||
* PML->PTL Return a send request to the PTL modules free list.
|
||||
* PML->PTL Cleanup any resources that may have been associated with the
|
||||
* request by the PTL.
|
||||
*
|
||||
* @param ptl (IN) PTL instance
|
||||
* @param request (IN) Pointer to allocated request.
|
||||
* @param request (OUT) Pointer to allocated request.
|
||||
*
|
||||
* Called when the request has been completed at both the MPI
|
||||
* and PML layers.
|
||||
* The fini function is called when the PML removes a request from the PTLs
|
||||
* cache (due to resource constraints) or reaching cache limit, prior to re-using
|
||||
* the request for another PTL. This provides the PTL the chance to cleanup/release
|
||||
* any resources cached on the send descriptor by the PTL.
|
||||
*/
|
||||
typedef void (*mca_ptl_base_request_return_fn_t)(
|
||||
|
||||
typedef void (*mca_ptl_base_request_fini_fn_t)(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_pml_base_send_request_t* request
|
||||
);
|
||||
|
||||
/**
|
||||
* PML->PTL Initiate a send/put to the peer.
|
||||
* PML->PTL Initiate a send to the peer.
|
||||
*
|
||||
* @param ptl (IN) PTL instance
|
||||
* @param ptl_base_peer (IN) PTL peer addressing
|
||||
@ -343,9 +350,37 @@ typedef void (*mca_ptl_base_request_return_fn_t)(
|
||||
* @param request (OUT) OMPI_SUCCESS if the PTL was able to queue one or more fragments
|
||||
*
|
||||
* The PML implements a rendevouz protocol, with up to the PTL defined threshold
|
||||
* bytes of the message sent in eager send mode. On receipt of an acknowledgment
|
||||
* from the peer, the PML will schedule the remaining fragments. If the PTL supports
|
||||
* RDMA functionality, these subsequent transfers may use RDMA put semantics.
|
||||
* bytes of the message sent in eager send mode.
|
||||
*
|
||||
* If the PTL is unable to fragment the requested size, possibly due to resource
|
||||
* constraints or datatype alighnment/offset, it should return the number of bytes
|
||||
* actually fragmented in the size parameter.
|
||||
*/
|
||||
typedef int (*mca_ptl_base_send_fn_t)(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_base_peer_t* ptl_base_peer,
|
||||
struct mca_pml_base_send_request_t* request,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
int flags
|
||||
);
|
||||
|
||||
/**
|
||||
* PML->PTL Initiate a put to the peer.
|
||||
*
|
||||
* @param ptl (IN) PTL instance
|
||||
* @param ptl_base_peer (IN) PTL peer addressing
|
||||
* @param request (IN) Send request
|
||||
* @param offset Current offset into packed/contiguous buffer.
|
||||
* @param size (IN/OUT) Number of bytes PML is requesting PTL to deliver,
|
||||
* PTL returns number of bytes sucessfully fragmented
|
||||
* @param flags (IN) Flags that should be passed to the peer via the message header.
|
||||
* @param request (OUT) OMPI_SUCCESS if the PTL was able to queue one or more fragments
|
||||
*
|
||||
* The PML implements a rendevouz protocol, with up to the PTL defined threshold
|
||||
* bytes of the message sent in eager send mode (via mca_ptl_base_send_fn_t). On receipt
|
||||
* of an acknowledgment from the peer, the PML will schedule the remaining fragments. If
|
||||
* the PTL supports RDMA functionality, these subsequent transfers may use RDMA put semantics.
|
||||
*
|
||||
* If the PTL is unable to fragment the requested size, possibly due to resource
|
||||
* constraints or datatype alighnment/offset, it should return the number of bytes
|
||||
@ -360,6 +395,7 @@ typedef int (*mca_ptl_base_put_fn_t)(
|
||||
int flags
|
||||
);
|
||||
|
||||
|
||||
/**
|
||||
* PML->PTL Initiate a get from a peer.
|
||||
*
|
||||
@ -429,14 +465,16 @@ typedef void (*mca_ptl_base_matched_fn_t)(
|
||||
* PTL->PML Notification from the PTL to the PML that a fragment
|
||||
* has completed (e.g. been successfully delivered into users buffer)
|
||||
*
|
||||
* @param ptr(IN) PTL instance
|
||||
* @param recv_request (IN) Receive Request
|
||||
* @param recv_frag (IN) Receive Fragment
|
||||
* @param ptr(IN) PTL instance
|
||||
* @param recv_request (IN) Receive Request
|
||||
* @param bytes_received (IN) Number of bytes received from peer.
|
||||
* @param bytes_delivered (IN) Number of bytes delivered to application.
|
||||
*/
|
||||
typedef void (*mca_ptl_base_recv_progress_fn_t)(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_pml_base_recv_request_t* recv_request,
|
||||
struct mca_ptl_base_recv_frag_t* recv_frag
|
||||
size_t bytes_received,
|
||||
size_t bytes_delivered
|
||||
);
|
||||
|
||||
/**
|
||||
@ -445,35 +483,14 @@ typedef void (*mca_ptl_base_recv_progress_fn_t)(
|
||||
*
|
||||
* @param ptr(IN) PTL instance
|
||||
* @param send_request (IN) Send Request
|
||||
* @param send_frag (IN) Send Fragment
|
||||
* @param bytes_sent (IN) Number of bytes sent to peer.
|
||||
*/
|
||||
typedef void (*mca_ptl_base_send_progress_fn_t)(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_pml_base_send_request_t* send_request,
|
||||
struct mca_ptl_base_send_frag_t* send_frag
|
||||
size_t bytes_sent
|
||||
);
|
||||
|
||||
/**
|
||||
* PTL function list. They are mainly used for the stack-based approach on the
|
||||
* profilling PTL.
|
||||
*/
|
||||
struct mca_ptl_functions_t {
|
||||
/* PML->PTL function table */
|
||||
mca_ptl_base_add_procs_fn_t ptl_add_procs;
|
||||
mca_ptl_base_del_procs_fn_t ptl_del_procs;
|
||||
mca_ptl_base_finalize_fn_t ptl_finalize;
|
||||
mca_ptl_base_put_fn_t ptl_put;
|
||||
mca_ptl_base_get_fn_t ptl_get;
|
||||
mca_ptl_base_matched_fn_t ptl_matched;
|
||||
mca_ptl_base_request_alloc_fn_t ptl_request_alloc;
|
||||
mca_ptl_base_request_return_fn_t ptl_request_return;
|
||||
/* PTL->PML function table - filled in by PML at init */
|
||||
mca_ptl_base_match_fn_t ptl_match;
|
||||
mca_ptl_base_send_progress_fn_t ptl_send_progress;
|
||||
mca_ptl_base_recv_progress_fn_t ptl_recv_progress;
|
||||
};
|
||||
typedef struct mca_ptl_functions_t mca_ptl_functions_t;
|
||||
|
||||
/**
|
||||
* PTL instance interface functions and attributes.
|
||||
*/
|
||||
@ -481,6 +498,8 @@ struct mca_ptl_t {
|
||||
|
||||
/* PTL common attributes */
|
||||
mca_ptl_base_module_t* ptl_module; /**< pointer back to the PTL module structure */
|
||||
size_t ptl_cache_size; /**< maximum size of request cache for this PTL */
|
||||
size_t ptl_cache_bytes; /**< number of bytes required by PTL for request cache */
|
||||
size_t ptl_first_frag_size; /**< maximum size of first fragment -- eager send */
|
||||
size_t ptl_min_frag_size; /**< threshold below which the PTL will not fragment */
|
||||
size_t ptl_max_frag_size; /**< maximum fragment size supported by the PTL */
|
||||
@ -493,11 +512,12 @@ struct mca_ptl_t {
|
||||
mca_ptl_base_add_procs_fn_t ptl_add_procs;
|
||||
mca_ptl_base_del_procs_fn_t ptl_del_procs;
|
||||
mca_ptl_base_finalize_fn_t ptl_finalize;
|
||||
mca_ptl_base_send_fn_t ptl_send;
|
||||
mca_ptl_base_put_fn_t ptl_put;
|
||||
mca_ptl_base_get_fn_t ptl_get;
|
||||
mca_ptl_base_matched_fn_t ptl_matched;
|
||||
mca_ptl_base_request_alloc_fn_t ptl_request_alloc;
|
||||
mca_ptl_base_request_return_fn_t ptl_request_return;
|
||||
mca_ptl_base_request_init_fn_t ptl_request_init;
|
||||
mca_ptl_base_request_fini_fn_t ptl_request_fini;
|
||||
|
||||
/* PTL->PML function table - filled in by PML at init */
|
||||
mca_ptl_base_match_fn_t ptl_match;
|
||||
@ -506,6 +526,9 @@ struct mca_ptl_t {
|
||||
|
||||
/* Allow the canibalization of the PTL */
|
||||
struct mca_ptl_t* ptl_stack;
|
||||
|
||||
/* for use by PML only */
|
||||
struct mca_pml_base_ptl_t* ptl_base;
|
||||
};
|
||||
typedef struct mca_ptl_t mca_ptl_t;
|
||||
|
||||
|
@ -25,6 +25,8 @@
|
||||
|
||||
mca_ptl_t mca_ptl_self = {
|
||||
&mca_ptl_self_module.super,
|
||||
8, /* ptl_cache_size */
|
||||
sizeof(mca_ptl_base_recv_frag_t), /* ptl_cache_bytes */
|
||||
0, /* ptl_frag_first_size */
|
||||
0, /* ptl_frag_min_size */
|
||||
0, /* ptl_frag_max_size */
|
||||
@ -36,10 +38,11 @@ mca_ptl_t mca_ptl_self = {
|
||||
mca_ptl_self_del_proc,
|
||||
mca_ptl_self_finalize,
|
||||
mca_ptl_self_send, /* put */
|
||||
mca_ptl_self_send, /* put */
|
||||
NULL, /* get */
|
||||
mca_ptl_self_matched, /* matched */
|
||||
mca_ptl_self_request_alloc,
|
||||
mca_ptl_self_request_return,
|
||||
mca_ptl_self_request_init,
|
||||
mca_ptl_self_request_fini,
|
||||
NULL, /* match */
|
||||
NULL,
|
||||
NULL
|
||||
@ -73,23 +76,14 @@ int mca_ptl_self_finalize(struct mca_ptl_t* ptl)
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int mca_ptl_self_request_alloc(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t** request)
|
||||
void mca_ptl_self_request_init(struct mca_ptl_t* ptl, mca_pml_base_send_request_t* request)
|
||||
{
|
||||
int rc;
|
||||
mca_pml_base_send_request_t* sendreq;
|
||||
ompi_list_item_t* send_item;
|
||||
|
||||
OMPI_FREE_LIST_GET( &mca_ptl_self_module.self_send_requests, send_item, rc );
|
||||
|
||||
sendreq = (mca_pml_base_send_request_t*)send_item;
|
||||
sendreq->req_owner = ptl;
|
||||
*request = sendreq;
|
||||
return rc;
|
||||
OBJ_CONSTRUCT(request+1, mca_ptl_base_recv_frag_t);
|
||||
}
|
||||
|
||||
void mca_ptl_self_request_return(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request)
|
||||
void mca_ptl_self_request_fini(struct mca_ptl_t* ptl, mca_pml_base_send_request_t* request)
|
||||
{
|
||||
OMPI_FREE_LIST_RETURN( &mca_ptl_self_module.self_send_requests, (ompi_list_item_t*)request);
|
||||
OBJ_DESTRUCT(request+1);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -108,25 +102,25 @@ int mca_ptl_self_send(
|
||||
int flags )
|
||||
{
|
||||
mca_ptl_self_send_request_t* req = (mca_ptl_self_send_request_t*)request;
|
||||
mca_ptl_base_header_t* hdr = &(req->req_frag.super.frag_header);
|
||||
mca_ptl_base_header_t* hdr = &(req->req_frag.frag_base.frag_header);
|
||||
|
||||
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_MATCH;
|
||||
hdr->hdr_common.hdr_flags = flags;
|
||||
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_match_header_t);
|
||||
hdr->hdr_frag.hdr_frag_offset = offset;
|
||||
hdr->hdr_frag.hdr_frag_seq = 0;
|
||||
hdr->hdr_match.hdr_contextid = request->super.req_comm->c_contextid;
|
||||
hdr->hdr_match.hdr_src = request->super.req_comm->c_my_rank;
|
||||
hdr->hdr_match.hdr_dst = request->super.req_peer;
|
||||
hdr->hdr_match.hdr_tag = request->super.req_tag;
|
||||
hdr->hdr_match.hdr_contextid = request->req_base.req_comm->c_contextid;
|
||||
hdr->hdr_match.hdr_src = request->req_base.req_comm->c_my_rank;
|
||||
hdr->hdr_match.hdr_dst = request->req_base.req_peer;
|
||||
hdr->hdr_match.hdr_tag = request->req_base.req_tag;
|
||||
hdr->hdr_match.hdr_msg_length = request->req_bytes_packed;
|
||||
hdr->hdr_match.hdr_msg_seq = request->super.req_sequence;
|
||||
hdr->hdr_match.hdr_msg_seq = request->req_base.req_sequence;
|
||||
hdr->hdr_frag.hdr_frag_length = request->req_bytes_packed;
|
||||
hdr->hdr_frag.hdr_frag_offset = 0 ;
|
||||
hdr->hdr_frag.hdr_src_ptr.pval = (void*)req;
|
||||
req->req_frag.super.frag_peer = ptl_base_peer;
|
||||
req->req_frag.super.frag_size = request->req_bytes_packed;
|
||||
req->req_frag.super.frag_owner = &mca_ptl_self;
|
||||
req->req_frag.frag_base.frag_peer = ptl_base_peer;
|
||||
req->req_frag.frag_base.frag_size = request->req_bytes_packed;
|
||||
req->req_frag.frag_base.frag_owner = &mca_ptl_self;
|
||||
req->req_frag.frag_request = NULL;
|
||||
req->req_frag.frag_is_buffered = 0;
|
||||
ptl->ptl_match( ptl, &(req->req_frag), &(hdr->hdr_match) );
|
||||
@ -141,9 +135,9 @@ int mca_ptl_self_send(
|
||||
void mca_ptl_self_matched( mca_ptl_t* ptl,
|
||||
mca_ptl_base_recv_frag_t* frag)
|
||||
{
|
||||
mca_ptl_self_send_request_t* sendreq = (mca_ptl_self_send_request_t*)(frag->super.frag_header.hdr_frag.hdr_src_ptr.pval);
|
||||
mca_ptl_self_send_request_t* sendreq = (mca_ptl_self_send_request_t*)
|
||||
frag->frag_base.frag_header.hdr_frag.hdr_src_ptr.pval;
|
||||
mca_pml_base_recv_request_t* recvreq = frag->frag_request;
|
||||
mca_ptl_base_send_frag_t sendfrag;
|
||||
|
||||
/* Did you have the same datatype or not ? If yes we can use an optimized version
|
||||
* for the copy function, if not we have to use a temporary buffer to pack/unpack
|
||||
@ -152,10 +146,13 @@ void mca_ptl_self_matched( mca_ptl_t* ptl,
|
||||
* a contigous buffer and the convertor on the send request initialized to point
|
||||
* into this buffer.
|
||||
*/
|
||||
if( sendreq->super.super.req_datatype == recvreq->super.req_datatype &&
|
||||
sendreq->super.req_send_mode != MCA_PML_BASE_SEND_BUFFERED) {
|
||||
ompi_ddt_copy_content_same_ddt( recvreq->super.req_datatype, recvreq->super.req_count,
|
||||
recvreq->super.req_addr, sendreq->super.super.req_addr );
|
||||
if( sendreq->req_send.req_base.req_datatype == recvreq->req_base.req_datatype &&
|
||||
sendreq->req_send.req_send_mode != MCA_PML_BASE_SEND_BUFFERED) {
|
||||
ompi_ddt_copy_content_same_ddt(
|
||||
recvreq->req_base.req_datatype,
|
||||
recvreq->req_base.req_count,
|
||||
recvreq->req_base.req_addr,
|
||||
sendreq->req_send.req_base.req_addr );
|
||||
} else {
|
||||
ompi_convertor_t *pSendConvertor, *pRecvConvertor;
|
||||
struct iovec iov[1];
|
||||
@ -166,10 +163,14 @@ void mca_ptl_self_matched( mca_ptl_t* ptl,
|
||||
length = 64 * 1024;
|
||||
buf = malloc( length * sizeof(char) );
|
||||
|
||||
ompi_convertor_init_for_recv( &(frag->super.frag_convertor), 0, recvreq->super.req_datatype,
|
||||
recvreq->super.req_count, recvreq->super.req_addr, 0 );
|
||||
pSendConvertor = &(sendreq->super.req_convertor);
|
||||
pRecvConvertor = &(frag->super.frag_convertor);
|
||||
ompi_convertor_init_for_recv(
|
||||
&frag->frag_base.frag_convertor,
|
||||
0,
|
||||
recvreq->req_base.req_datatype,
|
||||
recvreq->req_base.req_count,
|
||||
recvreq->req_base.req_addr, 0 );
|
||||
pSendConvertor = &(sendreq->req_send.req_convertor);
|
||||
pRecvConvertor = &(frag->frag_base.frag_convertor);
|
||||
completed = 0;
|
||||
while( !completed ) {
|
||||
iov[0].iov_base = buf;
|
||||
@ -180,15 +181,12 @@ void mca_ptl_self_matched( mca_ptl_t* ptl,
|
||||
completed |= ompi_convertor_unpack( pRecvConvertor, iov, iov_count );
|
||||
/*assert( freeAfter == 0 );*/
|
||||
}
|
||||
free( buf );
|
||||
free( buf );
|
||||
}
|
||||
/* Now lets progress the request */
|
||||
sendfrag.frag_request = &(sendreq->super);
|
||||
/*sendfrag.super.frag_header = ;*/
|
||||
sendfrag.super.frag_owner = &mca_ptl_self;
|
||||
sendfrag.super.frag_peer = NULL;
|
||||
sendfrag.super.frag_addr = NULL;
|
||||
sendfrag.super.frag_size = sendreq->super.req_bytes_packed;
|
||||
ptl->ptl_send_progress( ptl, &(sendreq->super), &(sendfrag) );
|
||||
ptl->ptl_recv_progress( ptl, recvreq, frag );
|
||||
ptl->ptl_send_progress( ptl, &sendreq->req_send, sendreq->req_send.req_bytes_packed );
|
||||
ptl->ptl_recv_progress( ptl,
|
||||
recvreq,
|
||||
frag->frag_base.frag_header.hdr_frag.hdr_frag_length,
|
||||
frag->frag_base.frag_size );
|
||||
}
|
||||
|
||||
|
@ -40,7 +40,7 @@ typedef struct mca_ptl_self_module_1_0_0_t mca_ptl_self_module_t;
|
||||
* base send request, and the base receive fragment which will be used to do the match.
|
||||
*/
|
||||
struct mca_ptl_self_send_request_t {
|
||||
mca_pml_base_send_request_t super;
|
||||
mca_pml_base_send_request_t req_send;
|
||||
mca_ptl_base_recv_frag_t req_frag; /* first fragment */
|
||||
};
|
||||
typedef struct mca_ptl_self_send_request_t mca_ptl_self_send_request_t;
|
||||
@ -74,12 +74,12 @@ extern mca_ptl_t** mca_ptl_self_module_init(
|
||||
bool *have_hidden_threads
|
||||
);
|
||||
|
||||
int mca_ptl_self_add_proc(struct mca_ptl_t* ptl, size_t nprocs, struct ompi_proc_t **ompi_proc, struct mca_ptl_base_peer_t** peer_ret, ompi_bitmap_t* reachable);
|
||||
int mca_ptl_self_del_proc(struct mca_ptl_t* ptl, size_t nprocs, struct ompi_proc_t **proc, struct mca_ptl_base_peer_t** ptl_peer);
|
||||
int mca_ptl_self_finalize(struct mca_ptl_t* ptl);
|
||||
int mca_ptl_self_request_alloc(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t** request);
|
||||
void mca_ptl_self_request_return(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request);
|
||||
int mca_ptl_self_send( struct mca_ptl_t* ptl, struct mca_ptl_base_peer_t* ptl_base_peer, struct mca_pml_base_send_request_t* request,
|
||||
int mca_ptl_self_add_proc(struct mca_ptl_t* ptl, size_t nprocs, struct ompi_proc_t **ompi_proc, struct mca_ptl_base_peer_t** peer_ret, ompi_bitmap_t* reachable);
|
||||
int mca_ptl_self_del_proc(struct mca_ptl_t* ptl, size_t nprocs, struct ompi_proc_t **proc, struct mca_ptl_base_peer_t** ptl_peer);
|
||||
int mca_ptl_self_finalize(struct mca_ptl_t* ptl);
|
||||
void mca_ptl_self_request_init(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request);
|
||||
void mca_ptl_self_request_fini(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request);
|
||||
int mca_ptl_self_send( struct mca_ptl_t* ptl, struct mca_ptl_base_peer_t* ptl_base_peer, struct mca_pml_base_send_request_t* request,
|
||||
size_t offset, size_t size, int flags );
|
||||
void mca_ptl_self_matched( mca_ptl_t* ptl, mca_ptl_base_recv_frag_t* frag );
|
||||
|
||||
|
@ -24,6 +24,8 @@
|
||||
mca_ptl_tcp_t mca_ptl_tcp = {
|
||||
{
|
||||
&mca_ptl_tcp_module.super,
|
||||
1,
|
||||
sizeof(mca_ptl_tcp_send_frag_t),
|
||||
0, /* ptl_exclusivity */
|
||||
0, /* ptl_latency */
|
||||
0, /* ptl_andwidth */
|
||||
@ -35,10 +37,11 @@ mca_ptl_tcp_t mca_ptl_tcp = {
|
||||
mca_ptl_tcp_del_procs,
|
||||
mca_ptl_tcp_finalize,
|
||||
mca_ptl_tcp_send,
|
||||
mca_ptl_tcp_send,
|
||||
NULL,
|
||||
mca_ptl_tcp_matched,
|
||||
mca_ptl_tcp_request_alloc,
|
||||
mca_ptl_tcp_request_return
|
||||
mca_ptl_tcp_request_init,
|
||||
mca_ptl_tcp_request_fini
|
||||
}
|
||||
};
|
||||
|
||||
@ -108,31 +111,22 @@ int mca_ptl_tcp_finalize(struct mca_ptl_t* ptl)
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int mca_ptl_tcp_request_alloc(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t** request)
|
||||
void mca_ptl_tcp_request_init(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request)
|
||||
{
|
||||
int rc;
|
||||
mca_pml_base_send_request_t* sendreq;
|
||||
ompi_list_item_t* item;
|
||||
OMPI_FREE_LIST_GET(&mca_ptl_tcp_module.tcp_send_requests, item, rc);
|
||||
if(NULL != (sendreq = (mca_pml_base_send_request_t*)item))
|
||||
sendreq->req_owner = ptl;
|
||||
*request = sendreq;
|
||||
return rc;
|
||||
OBJ_CONSTRUCT(request+1, mca_ptl_tcp_send_frag_t);
|
||||
}
|
||||
|
||||
|
||||
void mca_ptl_tcp_request_return(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request)
|
||||
void mca_ptl_tcp_request_fini(struct mca_ptl_t* ptl, struct mca_pml_base_send_request_t* request)
|
||||
{
|
||||
/* OBJ_DESTRUCT(&request->req_convertor); */
|
||||
OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_send_requests, (ompi_list_item_t*)request);
|
||||
OBJ_DESTRUCT(request+1);
|
||||
}
|
||||
|
||||
|
||||
void mca_ptl_tcp_recv_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_tcp_recv_frag_t* frag)
|
||||
{
|
||||
if(frag->super.frag_is_buffered)
|
||||
free(frag->super.super.frag_addr);
|
||||
/* OBJ_DESTRUCT(&frag->super.super.frag_convertor); */
|
||||
if(frag->frag_recv.frag_is_buffered)
|
||||
free(frag->frag_recv.frag_base.frag_addr);
|
||||
OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (ompi_list_item_t*)frag);
|
||||
}
|
||||
|
||||
@ -149,8 +143,8 @@ void mca_ptl_tcp_send_frag_return(struct mca_ptl_t* ptl, struct mca_ptl_tcp_send
|
||||
return;
|
||||
}
|
||||
OMPI_THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock);
|
||||
mca_ptl_tcp_send_frag_init_ack(frag, ptl, pending->super.super.frag_peer, pending);
|
||||
mca_ptl_tcp_peer_send(pending->super.super.frag_peer, frag);
|
||||
mca_ptl_tcp_send_frag_init_ack(frag, ptl, pending->frag_recv.frag_base.frag_peer, pending);
|
||||
mca_ptl_tcp_peer_send(pending->frag_recv.frag_base.frag_peer, frag);
|
||||
mca_ptl_tcp_recv_frag_return(ptl, pending);
|
||||
} else {
|
||||
OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_send_frags, (ompi_list_item_t*)frag);
|
||||
@ -203,7 +197,7 @@ void mca_ptl_tcp_matched(
|
||||
mca_ptl_base_recv_frag_t* frag)
|
||||
{
|
||||
/* send ack back to peer? */
|
||||
mca_ptl_base_header_t* header = &frag->super.frag_header;
|
||||
mca_ptl_base_header_t* header = &frag->frag_base.frag_header;
|
||||
if(header->hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) {
|
||||
int rc;
|
||||
mca_ptl_tcp_send_frag_t* ack;
|
||||
@ -218,8 +212,8 @@ void mca_ptl_tcp_matched(
|
||||
ompi_list_append(&mca_ptl_tcp_module.tcp_pending_acks, (ompi_list_item_t*)frag);
|
||||
OMPI_THREAD_UNLOCK(&mca_ptl_tcp_module.tcp_lock);
|
||||
} else {
|
||||
mca_ptl_tcp_send_frag_init_ack(ack, ptl, recv_frag->super.super.frag_peer, recv_frag);
|
||||
mca_ptl_tcp_peer_send(ack->super.super.frag_peer, ack);
|
||||
mca_ptl_tcp_send_frag_init_ack(ack, ptl, recv_frag->frag_recv.frag_base.frag_peer, recv_frag);
|
||||
mca_ptl_tcp_peer_send(ack->frag_send.frag_base.frag_peer, ack);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -166,26 +166,25 @@ extern int mca_ptl_tcp_del_procs(
|
||||
);
|
||||
|
||||
/**
|
||||
* PML->PTL Allocate a send request from the PTL modules free list.
|
||||
*
|
||||
* @param ptl (IN) PTL instance
|
||||
* @param request (OUT) Pointer to allocated request.
|
||||
* @return Status indicating if allocation was successful.
|
||||
*
|
||||
*/
|
||||
extern int mca_ptl_tcp_request_alloc(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_pml_base_send_request_t**
|
||||
);
|
||||
|
||||
/**
|
||||
* PML->PTL Return a send request to the PTL modules free list.
|
||||
* PML->PTL Initialize a send request for TCP cache.
|
||||
*
|
||||
* @param ptl (IN) PTL instance
|
||||
* @param request (IN) Pointer to allocated request.
|
||||
*
|
||||
*/
|
||||
extern void mca_ptl_tcp_request_return(
|
||||
extern void mca_ptl_tcp_request_init(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_pml_base_send_request_t*
|
||||
);
|
||||
|
||||
/**
|
||||
* PML->PTL Cleanup a send request that is being removed from the cache.
|
||||
*
|
||||
* @param ptl (IN) PTL instance
|
||||
* @param request (IN) Pointer to allocated request.
|
||||
*
|
||||
*/
|
||||
extern void mca_ptl_tcp_request_fini(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_pml_base_send_request_t*
|
||||
);
|
||||
@ -207,7 +206,7 @@ extern void mca_ptl_tcp_matched(
|
||||
*
|
||||
* @param ptl (IN) PTL instance
|
||||
* @param ptl_base_peer (IN) PTL peer addressing
|
||||
* @param send_request (IN/OUT) Send request (allocated by PML via mca_ptl_base_request_alloc_fn_t)
|
||||
* @param send_request (IN/OUT) Send request (initialized by PML via mca_ptl_base_request_init_fn_t)
|
||||
* @param size (IN) Number of bytes PML is requesting PTL to deliver
|
||||
* @param flags (IN) Flags that should be passed to the peer via the message header.
|
||||
* @param request (OUT) OMPI_SUCCESS if the PTL was able to queue one or more fragments
|
||||
|
@ -54,12 +54,12 @@ static void mca_ptl_tcp_recv_frag_destruct(mca_ptl_tcp_recv_frag_t* frag)
|
||||
|
||||
void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_base_peer_t* peer)
|
||||
{
|
||||
frag->super.super.frag_owner = &peer->peer_ptl->super;
|
||||
frag->super.super.frag_addr = NULL;
|
||||
frag->super.super.frag_size = 0;
|
||||
frag->super.super.frag_peer = peer;
|
||||
frag->super.frag_request = 0;
|
||||
frag->super.frag_is_buffered = false;
|
||||
frag->frag_recv.frag_base.frag_owner = &peer->peer_ptl->super;
|
||||
frag->frag_recv.frag_base.frag_addr = NULL;
|
||||
frag->frag_recv.frag_base.frag_size = 0;
|
||||
frag->frag_recv.frag_base.frag_peer = peer;
|
||||
frag->frag_recv.frag_request = 0;
|
||||
frag->frag_recv.frag_is_buffered = false;
|
||||
frag->frag_hdr_cnt = 0;
|
||||
frag->frag_msg_cnt = 0;
|
||||
frag->frag_ack_pending = false;
|
||||
@ -78,7 +78,7 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_header_t)) == false)
|
||||
return false;
|
||||
|
||||
switch(frag->super.super.frag_header.hdr_common.hdr_type) {
|
||||
switch(frag->frag_recv.frag_base.frag_header.hdr_common.hdr_type) {
|
||||
case MCA_PTL_HDR_TYPE_MATCH:
|
||||
return mca_ptl_tcp_recv_frag_match(frag, sd);
|
||||
case MCA_PTL_HDR_TYPE_FRAG:
|
||||
@ -88,7 +88,7 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
return mca_ptl_tcp_recv_frag_ack(frag, sd);
|
||||
default:
|
||||
ompi_output(0, "mca_ptl_tcp_recv_frag_handler: invalid message type: %08X",
|
||||
*(unsigned long*)&frag->super.super.frag_header);
|
||||
*(unsigned long*)&frag->frag_recv.frag_base.frag_header);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -100,11 +100,11 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd, size_t size)
|
||||
{
|
||||
/* non-blocking read - continue if interrupted, otherwise wait until data available */
|
||||
unsigned char* ptr = (unsigned char*)&frag->super.super.frag_header;
|
||||
unsigned char* ptr = (unsigned char*)&frag->frag_recv.frag_base.frag_header;
|
||||
while(frag->frag_hdr_cnt < size) {
|
||||
int cnt = recv(sd, ptr + frag->frag_hdr_cnt, size - frag->frag_hdr_cnt, 0);
|
||||
if(cnt == 0) {
|
||||
mca_ptl_tcp_peer_close(frag->super.super.frag_peer);
|
||||
mca_ptl_tcp_peer_close(frag->frag_recv.frag_base.frag_peer);
|
||||
OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (ompi_list_item_t*)frag);
|
||||
return false;
|
||||
}
|
||||
@ -117,7 +117,7 @@ static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd,
|
||||
return false;
|
||||
default:
|
||||
ompi_output(0, "mca_ptl_tcp_recv_frag_header: recv() failed with errno=%d", errno);
|
||||
mca_ptl_tcp_peer_close(frag->super.super.frag_peer);
|
||||
mca_ptl_tcp_peer_close(frag->frag_recv.frag_base.frag_peer);
|
||||
OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (ompi_list_item_t*)frag);
|
||||
return false;
|
||||
}
|
||||
@ -139,11 +139,11 @@ static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
{
|
||||
mca_ptl_tcp_send_frag_t* sendfrag;
|
||||
mca_pml_base_send_request_t* sendreq;
|
||||
sendfrag = (mca_ptl_tcp_send_frag_t*)frag->super.super.frag_header.hdr_ack.hdr_src_ptr.pval;
|
||||
sendreq = sendfrag->super.frag_request;
|
||||
sendreq->req_peer_match = frag->super.super.frag_header.hdr_ack.hdr_dst_match;
|
||||
sendfrag = (mca_ptl_tcp_send_frag_t*)frag->frag_recv.frag_base.frag_header.hdr_ack.hdr_src_ptr.pval;
|
||||
sendreq = sendfrag->frag_send.frag_request;
|
||||
sendreq->req_peer_match = frag->frag_recv.frag_base.frag_header.hdr_ack.hdr_dst_match;
|
||||
mca_ptl_tcp_send_frag_progress(sendfrag);
|
||||
mca_ptl_tcp_recv_frag_return(frag->super.super.frag_owner, frag);
|
||||
mca_ptl_tcp_recv_frag_return(frag->frag_recv.frag_base.frag_owner, frag);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -155,35 +155,40 @@ static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
{
|
||||
/* first pass through - attempt a match */
|
||||
if(NULL == frag->super.frag_request && 0 == frag->frag_msg_cnt) {
|
||||
if(NULL == frag->frag_recv.frag_request && 0 == frag->frag_msg_cnt) {
|
||||
/* attempt to match a posted recv */
|
||||
if(mca_ptl_base_recv_frag_match( frag->super.super.frag_owner, &frag->super, &frag->super.super.frag_header.hdr_match)) {
|
||||
if (mca_ptl_base_recv_frag_match(
|
||||
frag->frag_recv.frag_base.frag_owner,
|
||||
&frag->frag_recv,
|
||||
&frag->frag_recv.frag_base.frag_header.hdr_match)) {
|
||||
mca_ptl_tcp_recv_frag_matched(frag);
|
||||
} else {
|
||||
/* match was not made - so allocate buffer for eager send */
|
||||
if(frag->super.super.frag_header.hdr_frag.hdr_frag_length > 0) {
|
||||
frag->super.super.frag_addr = malloc(frag->super.super.frag_header.hdr_frag.hdr_frag_length);
|
||||
frag->super.super.frag_size = frag->super.super.frag_header.hdr_frag.hdr_frag_length;
|
||||
frag->super.frag_is_buffered = true;
|
||||
if(frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length > 0) {
|
||||
frag->frag_recv.frag_base.frag_addr =
|
||||
malloc(frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length);
|
||||
frag->frag_recv.frag_base.frag_size =
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length;
|
||||
frag->frag_recv.frag_is_buffered = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* receive fragment data */
|
||||
if(frag->frag_msg_cnt < frag->super.super.frag_size) {
|
||||
if(frag->frag_msg_cnt < frag->frag_recv.frag_base.frag_size) {
|
||||
if(mca_ptl_tcp_recv_frag_data(frag, sd) == false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/* discard any data that exceeds the posted receive */
|
||||
if(frag->frag_msg_cnt < frag->super.super.frag_header.hdr_frag.hdr_frag_length)
|
||||
if(frag->frag_msg_cnt < frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length)
|
||||
if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/* if fragment has already been matched - go ahead and process */
|
||||
if (NULL != frag->super.frag_request)
|
||||
if (NULL != frag->frag_recv.frag_request)
|
||||
mca_ptl_tcp_recv_frag_progress(frag);
|
||||
return true;
|
||||
}
|
||||
@ -197,17 +202,17 @@ static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
{
|
||||
/* get request from header */
|
||||
if(frag->frag_msg_cnt == 0) {
|
||||
frag->super.frag_request = frag->super.super.frag_header.hdr_frag.hdr_dst_ptr.pval;
|
||||
frag->frag_recv.frag_request = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_dst_ptr.pval;
|
||||
mca_ptl_tcp_recv_frag_matched(frag);
|
||||
}
|
||||
|
||||
/* continue to receive user data */
|
||||
if(frag->frag_msg_cnt < frag->super.super.frag_size) {
|
||||
if(frag->frag_msg_cnt < frag->frag_recv.frag_base.frag_size) {
|
||||
if(mca_ptl_tcp_recv_frag_data(frag, sd) == false)
|
||||
return false;
|
||||
}
|
||||
|
||||
if(frag->frag_msg_cnt < frag->super.super.frag_header.hdr_frag.hdr_frag_length)
|
||||
if(frag->frag_msg_cnt < frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length)
|
||||
if(mca_ptl_tcp_recv_frag_discard(frag, sd) == false)
|
||||
return false;
|
||||
|
||||
@ -224,11 +229,11 @@ static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
|
||||
static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
{
|
||||
while(frag->frag_msg_cnt < frag->super.super.frag_size) {
|
||||
int cnt = recv(sd, (unsigned char*)frag->super.super.frag_addr+frag->frag_msg_cnt,
|
||||
frag->super.super.frag_size-frag->frag_msg_cnt, 0);
|
||||
while(frag->frag_msg_cnt < frag->frag_recv.frag_base.frag_size) {
|
||||
int cnt = recv(sd, (unsigned char*)frag->frag_recv.frag_base.frag_addr+frag->frag_msg_cnt,
|
||||
frag->frag_recv.frag_base.frag_size-frag->frag_msg_cnt, 0);
|
||||
if(cnt == 0) {
|
||||
mca_ptl_tcp_peer_close(frag->super.super.frag_peer);
|
||||
mca_ptl_tcp_peer_close(frag->frag_recv.frag_base.frag_peer);
|
||||
OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (ompi_list_item_t*)frag);
|
||||
return false;
|
||||
}
|
||||
@ -240,7 +245,7 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
return false;
|
||||
default:
|
||||
ompi_output(0, "mca_ptl_tcp_recv_frag_data: recv() failed with errno=%d", errno);
|
||||
mca_ptl_tcp_peer_close(frag->super.super.frag_peer);
|
||||
mca_ptl_tcp_peer_close(frag->frag_recv.frag_base.frag_peer);
|
||||
OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (ompi_list_item_t*)frag);
|
||||
return false;
|
||||
}
|
||||
@ -261,13 +266,13 @@ static bool mca_ptl_tcp_recv_frag_data(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
|
||||
static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
{
|
||||
while(frag->frag_msg_cnt < frag->super.super.frag_header.hdr_frag.hdr_frag_length) {
|
||||
size_t count = frag->super.super.frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt;
|
||||
while(frag->frag_msg_cnt < frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length) {
|
||||
size_t count = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length - frag->frag_msg_cnt;
|
||||
void *rbuf = malloc(count);
|
||||
int cnt = recv(sd, rbuf, count, 0);
|
||||
free(rbuf);
|
||||
if(cnt == 0) {
|
||||
mca_ptl_tcp_peer_close(frag->super.super.frag_peer);
|
||||
mca_ptl_tcp_peer_close(frag->frag_recv.frag_base.frag_peer);
|
||||
OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (ompi_list_item_t*)frag);
|
||||
return false;
|
||||
}
|
||||
@ -280,7 +285,7 @@ static bool mca_ptl_tcp_recv_frag_discard(mca_ptl_tcp_recv_frag_t* frag, int sd)
|
||||
return false;
|
||||
default:
|
||||
ompi_output(0, "mca_ptl_tcp_recv_frag_discard: recv() failed with errno=%d", errno);
|
||||
mca_ptl_tcp_peer_close(frag->super.super.frag_peer);
|
||||
mca_ptl_tcp_peer_close(frag->frag_recv.frag_base.frag_peer);
|
||||
OMPI_FREE_LIST_RETURN(&mca_ptl_tcp_module.tcp_recv_frags, (ompi_list_item_t*)frag);
|
||||
return false;
|
||||
}
|
||||
|
@ -25,11 +25,11 @@ extern ompi_class_t mca_ptl_tcp_recv_frag_t_class;
|
||||
* TCP received fragment derived type.
|
||||
*/
|
||||
struct mca_ptl_tcp_recv_frag_t {
|
||||
mca_ptl_base_recv_frag_t super; /**< base receive fragment descriptor */
|
||||
size_t frag_hdr_cnt; /**< number of header bytes received */
|
||||
size_t frag_msg_cnt; /**< number of message bytes received */
|
||||
bool frag_ack_pending; /**< is an ack pending for this fragment */
|
||||
volatile int frag_progressed; /**< flag used to atomically progress fragment */
|
||||
mca_ptl_base_recv_frag_t frag_recv; /**< base receive fragment descriptor */
|
||||
size_t frag_hdr_cnt; /**< number of header bytes received */
|
||||
size_t frag_msg_cnt; /**< number of message bytes received */
|
||||
bool frag_ack_pending; /**< is an ack pending for this fragment */
|
||||
volatile int frag_progressed; /**< flag used to atomically progress fragment */
|
||||
};
|
||||
typedef struct mca_ptl_tcp_recv_frag_t mca_ptl_tcp_recv_frag_t;
|
||||
|
||||
@ -50,8 +50,8 @@ bool mca_ptl_tcp_recv_frag_send_ack(mca_ptl_tcp_recv_frag_t* frag);
|
||||
|
||||
static inline void mca_ptl_tcp_recv_frag_matched(mca_ptl_tcp_recv_frag_t* frag)
|
||||
{
|
||||
mca_pml_base_recv_request_t* request = frag->super.frag_request;
|
||||
mca_ptl_base_frag_header_t* header = &frag->super.super.frag_header.hdr_frag;
|
||||
mca_pml_base_recv_request_t* request = frag->frag_recv.frag_request;
|
||||
mca_ptl_base_frag_header_t* header = &frag->frag_recv.frag_base.frag_header.hdr_frag;
|
||||
|
||||
/* if there is data associated with the fragment -- setup to receive */
|
||||
if(header->hdr_frag_length > 0) {
|
||||
@ -59,14 +59,14 @@ static inline void mca_ptl_tcp_recv_frag_matched(mca_ptl_tcp_recv_frag_t* frag)
|
||||
/* initialize receive convertor */
|
||||
struct iovec iov;
|
||||
ompi_proc_t *proc =
|
||||
ompi_comm_peer_lookup(request->super.req_comm, request->super.req_peer);
|
||||
ompi_convertor_copy(proc->proc_convertor, &frag->super.super.frag_convertor);
|
||||
ompi_comm_peer_lookup(request->req_base.req_comm, request->req_base.req_peer);
|
||||
ompi_convertor_copy(proc->proc_convertor, &frag->frag_recv.frag_base.frag_convertor);
|
||||
ompi_convertor_init_for_recv(
|
||||
&frag->super.super.frag_convertor, /* convertor */
|
||||
&frag->frag_recv.frag_base.frag_convertor, /* convertor */
|
||||
0, /* flags */
|
||||
request->super.req_datatype, /* datatype */
|
||||
request->super.req_count, /* count elements */
|
||||
request->super.req_addr, /* users buffer */
|
||||
request->req_base.req_datatype, /* datatype */
|
||||
request->req_base.req_count, /* count elements */
|
||||
request->req_base.req_addr, /* users buffer */
|
||||
header->hdr_frag_offset); /* offset in bytes into packed buffer */
|
||||
|
||||
/*
|
||||
@ -75,17 +75,17 @@ static inline void mca_ptl_tcp_recv_frag_matched(mca_ptl_tcp_recv_frag_t* frag)
|
||||
*/
|
||||
iov.iov_base = NULL;
|
||||
iov.iov_len = header->hdr_frag_length;
|
||||
ompi_convertor_unpack(&frag->super.super.frag_convertor, &iov, 1);
|
||||
ompi_convertor_unpack(&frag->frag_recv.frag_base.frag_convertor, &iov, 1);
|
||||
|
||||
/* non-contiguous - allocate buffer for receive */
|
||||
if(NULL == iov.iov_base) {
|
||||
frag->super.super.frag_addr = malloc(header->hdr_frag_length);
|
||||
frag->super.super.frag_size = header->hdr_frag_length;
|
||||
frag->super.frag_is_buffered = true;
|
||||
frag->frag_recv.frag_base.frag_addr = malloc(header->hdr_frag_length);
|
||||
frag->frag_recv.frag_base.frag_size = header->hdr_frag_length;
|
||||
frag->frag_recv.frag_is_buffered = true;
|
||||
/* we now have correct offset into users buffer */
|
||||
} else {
|
||||
frag->super.super.frag_addr = iov.iov_base;
|
||||
frag->super.super.frag_size = iov.iov_len;
|
||||
frag->frag_recv.frag_base.frag_addr = iov.iov_base;
|
||||
frag->frag_recv.frag_base.frag_size = iov.iov_len;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -93,37 +93,42 @@ static inline void mca_ptl_tcp_recv_frag_matched(mca_ptl_tcp_recv_frag_t* frag)
|
||||
|
||||
static inline void mca_ptl_tcp_recv_frag_progress(mca_ptl_tcp_recv_frag_t* frag)
|
||||
{
|
||||
if((frag)->frag_msg_cnt >= (frag)->super.super.frag_header.hdr_frag.hdr_frag_length) {
|
||||
if((frag)->frag_msg_cnt >= frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length) {
|
||||
/* make sure this only happens once for threaded case */
|
||||
if(fetchNset(&frag->frag_progressed, 1) == 0) {
|
||||
mca_pml_base_recv_request_t* request = (frag)->super.frag_request;
|
||||
if((frag)->super.frag_is_buffered) {
|
||||
mca_ptl_base_match_header_t* header = &(frag)->super.super.frag_header.hdr_match;
|
||||
mca_pml_base_recv_request_t* request = frag->frag_recv.frag_request;
|
||||
if(frag->frag_recv.frag_is_buffered) {
|
||||
mca_ptl_base_match_header_t* header = &(frag)->frag_recv.frag_base.frag_header.hdr_match;
|
||||
|
||||
/*
|
||||
* Initialize convertor and use it to unpack data
|
||||
*/
|
||||
struct iovec iov;
|
||||
ompi_proc_t *proc =
|
||||
ompi_comm_peer_lookup(request->super.req_comm, request->super.req_peer);
|
||||
ompi_convertor_copy(proc->proc_convertor, &(frag)->super.super.frag_convertor);
|
||||
ompi_comm_peer_lookup(request->req_base.req_comm, request->req_base.req_peer);
|
||||
ompi_convertor_copy(proc->proc_convertor, &frag->frag_recv.frag_base.frag_convertor);
|
||||
ompi_convertor_init_for_recv(
|
||||
&(frag)->super.super.frag_convertor, /* convertor */
|
||||
&frag->frag_recv.frag_base.frag_convertor, /* convertor */
|
||||
0, /* flags */
|
||||
request->super.req_datatype, /* datatype */
|
||||
request->super.req_count, /* count elements */
|
||||
request->super.req_addr, /* users buffer */
|
||||
request->req_base.req_datatype, /* datatype */
|
||||
request->req_base.req_count, /* count elements */
|
||||
request->req_base.req_addr, /* users buffer */
|
||||
header->hdr_frag.hdr_frag_offset); /* offset in bytes into packed buffer */
|
||||
|
||||
iov.iov_base = (frag)->super.super.frag_addr;
|
||||
iov.iov_len = (frag)->super.super.frag_size;
|
||||
ompi_convertor_unpack(&(frag)->super.super.frag_convertor, &iov, 1);
|
||||
iov.iov_base = frag->frag_recv.frag_base.frag_addr;
|
||||
iov.iov_len = frag->frag_recv.frag_base.frag_size;
|
||||
ompi_convertor_unpack(&frag->frag_recv.frag_base.frag_convertor, &iov, 1);
|
||||
}
|
||||
|
||||
/* progress the request */
|
||||
(frag)->super.super.frag_owner->ptl_recv_progress((frag)->super.super.frag_owner, request, &(frag)->super);
|
||||
frag->frag_recv.frag_base.frag_owner->ptl_recv_progress(
|
||||
frag->frag_recv.frag_base.frag_owner,
|
||||
request,
|
||||
frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length,
|
||||
frag->frag_recv.frag_base.frag_size);
|
||||
|
||||
if((frag)->frag_ack_pending == false) {
|
||||
mca_ptl_tcp_recv_frag_return((frag)->super.super.frag_owner, (frag));
|
||||
mca_ptl_tcp_recv_frag_return(frag->frag_recv.frag_base.frag_owner, (frag));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -12,10 +12,10 @@
|
||||
#include "ptl_tcp_proc.h"
|
||||
#include "ptl_tcp_sendfrag.h"
|
||||
|
||||
#define frag_header super.super.frag_header
|
||||
#define frag_owner super.super.frag_owner
|
||||
#define frag_peer super.super.frag_peer
|
||||
#define frag_convertor super.super.frag_convertor
|
||||
#define frag_header frag_send.frag_base.frag_header
|
||||
#define frag_owner frag_send.frag_base.frag_owner
|
||||
#define frag_peer frag_send.frag_base.frag_peer
|
||||
#define frag_convertor frag_send.frag_base.frag_convertor
|
||||
|
||||
|
||||
static void mca_ptl_tcp_send_frag_construct(mca_ptl_tcp_send_frag_t* frag);
|
||||
@ -70,12 +70,12 @@ int mca_ptl_tcp_send_frag_init(
|
||||
hdr->hdr_frag.hdr_src_ptr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_frag.hdr_src_ptr.pval = sendfrag;
|
||||
hdr->hdr_frag.hdr_dst_ptr.lval = 0;
|
||||
hdr->hdr_match.hdr_contextid = sendreq->super.req_comm->c_contextid;
|
||||
hdr->hdr_match.hdr_src = sendreq->super.req_comm->c_my_rank;
|
||||
hdr->hdr_match.hdr_dst = sendreq->super.req_peer;
|
||||
hdr->hdr_match.hdr_tag = sendreq->super.req_tag;
|
||||
hdr->hdr_match.hdr_contextid = sendreq->req_base.req_comm->c_contextid;
|
||||
hdr->hdr_match.hdr_src = sendreq->req_base.req_comm->c_my_rank;
|
||||
hdr->hdr_match.hdr_dst = sendreq->req_base.req_peer;
|
||||
hdr->hdr_match.hdr_tag = sendreq->req_base.req_tag;
|
||||
hdr->hdr_match.hdr_msg_length = sendreq->req_bytes_packed;
|
||||
hdr->hdr_match.hdr_msg_seq = sendreq->super.req_sequence;
|
||||
hdr->hdr_match.hdr_msg_seq = sendreq->req_base.req_sequence;
|
||||
} else {
|
||||
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_FRAG;
|
||||
hdr->hdr_common.hdr_flags = flags;
|
||||
@ -105,9 +105,9 @@ int mca_ptl_tcp_send_frag_init(
|
||||
ompi_convertor_init_for_send(
|
||||
convertor,
|
||||
0,
|
||||
sendreq->super.req_datatype,
|
||||
sendreq->super.req_count,
|
||||
sendreq->super.req_addr,
|
||||
sendreq->req_base.req_datatype,
|
||||
sendreq->req_base.req_count,
|
||||
sendreq->req_base.req_addr,
|
||||
offset);
|
||||
}
|
||||
|
||||
@ -129,9 +129,9 @@ int mca_ptl_tcp_send_frag_init(
|
||||
|
||||
/* fragment state */
|
||||
sendfrag->frag_owner = &ptl_peer->peer_ptl->super;
|
||||
sendfrag->super.frag_request = sendreq;
|
||||
sendfrag->super.super.frag_addr = sendfrag->frag_vec[1].iov_base;
|
||||
sendfrag->super.super.frag_size = size_out;
|
||||
sendfrag->frag_send.frag_request = sendreq;
|
||||
sendfrag->frag_send.frag_base.frag_addr = sendfrag->frag_vec[1].iov_base;
|
||||
sendfrag->frag_send.frag_base.frag_size = size_out;
|
||||
|
||||
sendfrag->frag_peer = ptl_peer;
|
||||
sendfrag->frag_vec_ptr = sendfrag->frag_vec;
|
||||
|
@ -25,11 +25,11 @@ struct mca_ptl_base_peer_t;
|
||||
* TCP send fragment derived type.
|
||||
*/
|
||||
struct mca_ptl_tcp_send_frag_t {
|
||||
mca_ptl_base_send_frag_t super; /**< base send fragment descriptor */
|
||||
struct iovec *frag_vec_ptr; /**< pointer into iovec array */
|
||||
size_t frag_vec_cnt; /**< number of iovec structs left to process */
|
||||
struct iovec frag_vec[2]; /**< array of iovecs for send */
|
||||
volatile int frag_progressed; /**< for threaded case - has request status been updated */
|
||||
mca_ptl_base_send_frag_t frag_send; /**< base send fragment descriptor */
|
||||
struct iovec *frag_vec_ptr; /**< pointer into iovec array */
|
||||
size_t frag_vec_cnt; /**< number of iovec structs left to process */
|
||||
struct iovec frag_vec[2]; /**< array of iovecs for send */
|
||||
volatile int frag_progressed; /**< for threaded case - has request status been updated */
|
||||
};
|
||||
typedef struct mca_ptl_tcp_send_frag_t mca_ptl_tcp_send_frag_t;
|
||||
|
||||
@ -70,30 +70,33 @@ int mca_ptl_tcp_send_frag_init(
|
||||
|
||||
static inline void mca_ptl_tcp_send_frag_progress(mca_ptl_tcp_send_frag_t* frag)
|
||||
{
|
||||
mca_pml_base_send_request_t* request = frag->super.frag_request;
|
||||
mca_pml_base_send_request_t* request = frag->frag_send.frag_request;
|
||||
|
||||
/* if this is an ack - simply return to pool */
|
||||
if(request == NULL) {
|
||||
mca_ptl_tcp_send_frag_return(frag->super.super.frag_owner, frag);
|
||||
mca_ptl_tcp_send_frag_return(frag->frag_send.frag_base.frag_owner, frag);
|
||||
|
||||
/* otherwise, if the message has been sent, and an ack has already been
|
||||
* received, go ahead and update the request status
|
||||
*/
|
||||
} else if (frag->frag_vec_cnt == 0 &&
|
||||
((frag->super.super.frag_header.hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) == 0 ||
|
||||
((frag->frag_send.frag_base.frag_header.hdr_common.hdr_flags & MCA_PTL_FLAGS_ACK_MATCHED) == 0 ||
|
||||
mca_pml_base_send_request_matched(request))) {
|
||||
|
||||
/* make sure this only happens once in threaded case */
|
||||
if(fetchNset(&frag->frag_progressed,1) == 0) {
|
||||
|
||||
/* update request status */
|
||||
frag->super.super.frag_owner->ptl_send_progress(frag->super.super.frag_owner, request, &frag->super);
|
||||
frag->frag_send.frag_base.frag_owner->ptl_send_progress(
|
||||
frag->frag_send.frag_base.frag_owner,
|
||||
request,
|
||||
frag->frag_send.frag_base.frag_size);
|
||||
|
||||
/* the first fragment is allocated with the request,
|
||||
* all others need to be returned to free list
|
||||
*/
|
||||
if(frag->super.super.frag_header.hdr_frag.hdr_frag_offset != 0)
|
||||
mca_ptl_tcp_send_frag_return(frag->super.super.frag_owner, frag);
|
||||
if(frag->frag_send.frag_base.frag_header.hdr_frag.hdr_frag_offset != 0)
|
||||
mca_ptl_tcp_send_frag_return(frag->frag_send.frag_base.frag_owner, frag);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -105,22 +108,22 @@ static inline void mca_ptl_tcp_send_frag_init_ack(
|
||||
struct mca_ptl_base_peer_t* ptl_peer,
|
||||
mca_ptl_tcp_recv_frag_t* frag)
|
||||
{
|
||||
mca_ptl_base_header_t* hdr = &ack->super.super.frag_header;
|
||||
mca_pml_base_recv_request_t* request = frag->super.frag_request;
|
||||
mca_ptl_base_header_t* hdr = &ack->frag_send.frag_base.frag_header;
|
||||
mca_pml_base_recv_request_t* request = frag->frag_recv.frag_request;
|
||||
hdr->hdr_common.hdr_type = MCA_PTL_HDR_TYPE_ACK;
|
||||
hdr->hdr_common.hdr_flags = 0;
|
||||
hdr->hdr_common.hdr_size = sizeof(mca_ptl_base_ack_header_t);
|
||||
hdr->hdr_ack.hdr_src_ptr = frag->super.super.frag_header.hdr_frag.hdr_src_ptr;
|
||||
hdr->hdr_ack.hdr_src_ptr = frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_src_ptr;
|
||||
hdr->hdr_ack.hdr_dst_match.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_ack.hdr_dst_match.pval = request;
|
||||
hdr->hdr_ack.hdr_dst_addr.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */
|
||||
hdr->hdr_ack.hdr_dst_addr.pval = request->super.req_addr;
|
||||
hdr->hdr_ack.hdr_dst_addr.pval = request->req_base.req_addr;
|
||||
hdr->hdr_ack.hdr_dst_size = request->req_bytes_packed;
|
||||
ack->super.frag_request = 0;
|
||||
ack->super.super.frag_peer = ptl_peer;
|
||||
ack->super.super.frag_owner = ptl;
|
||||
ack->super.super.frag_addr = NULL;
|
||||
ack->super.super.frag_size = 0;
|
||||
ack->frag_send.frag_request = 0;
|
||||
ack->frag_send.frag_base.frag_peer = ptl_peer;
|
||||
ack->frag_send.frag_base.frag_owner = ptl;
|
||||
ack->frag_send.frag_base.frag_addr = NULL;
|
||||
ack->frag_send.frag_base.frag_size = 0;
|
||||
ack->frag_vec_ptr = ack->frag_vec;
|
||||
ack->frag_vec[0].iov_base = (ompi_iov_base_ptr_t)hdr;
|
||||
ack->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t);
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user