1
1

first cut at ack/retrans protocol

This commit was SVN r8570.
Этот коммит содержится в:
Tim Woodall 2005-12-20 21:42:58 +00:00
родитель b06d79d4fe
Коммит 8c1027d974
17 изменённых файлов: 923 добавлений и 311 удалений

Просмотреть файл

@ -23,8 +23,6 @@ ob1_sources = \
pml_dr_comm.h \ pml_dr_comm.h \
pml_dr_component.c \ pml_dr_component.c \
pml_dr_component.h \ pml_dr_component.h \
pml_dr_endpoint.c \
pml_dr_endpoint.h \
pml_dr_hdr.h \ pml_dr_hdr.h \
pml_dr_iprobe.c \ pml_dr_iprobe.c \
pml_dr_irecv.c \ pml_dr_irecv.c \
@ -38,7 +36,8 @@ ob1_sources = \
pml_dr_recvreq.h \ pml_dr_recvreq.h \
pml_dr_sendreq.c \ pml_dr_sendreq.c \
pml_dr_sendreq.h \ pml_dr_sendreq.h \
pml_dr_start.c pml_dr_start.c \
pml_dr_vfrag.c
if OMPI_BUILD_pml_dr_DSO if OMPI_BUILD_pml_dr_DSO
component_noinst = component_noinst =

Просмотреть файл

@ -74,7 +74,7 @@ int mca_pml_dr_add_comm(ompi_communicator_t* comm)
if (NULL == pml_comm) { if (NULL == pml_comm) {
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }
mca_pml_dr_comm_init_size(pml_comm, comm->c_remote_group->grp_proc_count); mca_pml_dr_comm_init(pml_comm, comm);
comm->c_pml_comm = pml_comm; comm->c_pml_comm = pml_comm;
comm->c_pml_procs = (mca_pml_proc_t**)malloc( comm->c_pml_procs = (mca_pml_proc_t**)malloc(
comm->c_remote_group->grp_proc_count * sizeof(mca_pml_proc_t)); comm->c_remote_group->grp_proc_count * sizeof(mca_pml_proc_t));

Просмотреть файл

@ -53,6 +53,9 @@ struct mca_pml_dr_t {
size_t send_pipeline_depth; size_t send_pipeline_depth;
bool enabled; bool enabled;
time_t tout_ack;
time_t tout_watch_dog;
/* lock queue access */ /* lock queue access */
opal_mutex_t lock; opal_mutex_t lock;
@ -64,6 +67,7 @@ struct mca_pml_dr_t {
ompi_free_list_t send_requests; ompi_free_list_t send_requests;
ompi_free_list_t recv_requests; ompi_free_list_t recv_requests;
ompi_free_list_t recv_frags; ompi_free_list_t recv_frags;
ompi_free_list_t vfrags;
ompi_free_list_t buffers; ompi_free_list_t buffers;
}; };
typedef struct mca_pml_dr_t mca_pml_dr_t; typedef struct mca_pml_dr_t mca_pml_dr_t;

Просмотреть файл

@ -27,6 +27,7 @@
static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc) static void mca_pml_dr_comm_proc_construct(mca_pml_dr_comm_proc_t* proc)
{ {
proc->expected_sequence = 1; proc->expected_sequence = 1;
proc->vfrag_id = 1;
proc->send_sequence = 0; proc->send_sequence = 0;
OBJ_CONSTRUCT(&proc->frags_cant_match, opal_list_t); OBJ_CONSTRUCT(&proc->frags_cant_match, opal_list_t);
OBJ_CONSTRUCT(&proc->specific_receives, opal_list_t); OBJ_CONSTRUCT(&proc->specific_receives, opal_list_t);
@ -78,19 +79,21 @@ OBJ_CLASS_INSTANCE(
mca_pml_dr_comm_destruct); mca_pml_dr_comm_destruct);
int mca_pml_dr_comm_init_size(mca_pml_dr_comm_t* comm, size_t size) int mca_pml_dr_comm_init(mca_pml_dr_comm_t* dr_comm, ompi_communicator_t* ompi_comm)
{ {
size_t i; size_t i;
size_t size = ompi_comm->c_remote_group->grp_proc_count;
/* send message sequence-number support - sender side */ /* send message sequence-number support - sender side */
comm->procs = malloc(sizeof(mca_pml_dr_comm_proc_t)*size); dr_comm->procs = malloc(sizeof(mca_pml_dr_comm_proc_t)*size);
if(NULL == comm->procs) { if(NULL == dr_comm->procs) {
return OMPI_ERR_OUT_OF_RESOURCE; return OMPI_ERR_OUT_OF_RESOURCE;
} }
for(i=0; i<size; i++) { for(i=0; i<size; i++) {
OBJ_CONSTRUCT(comm->procs+i, mca_pml_dr_comm_proc_t); OBJ_CONSTRUCT(dr_comm->procs+i, mca_pml_dr_comm_proc_t);
dr_comm->procs[i].ompi_proc = ompi_comm->c_remote_group->grp_proc_pointers[i];
} }
comm->num_procs = size; dr_comm->num_procs = size;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -25,6 +25,8 @@
#include "opal/threads/condition.h" #include "opal/threads/condition.h"
#include "mca/ptl/ptl.h" #include "mca/ptl/ptl.h"
#include "opal/class/opal_list.h" #include "opal/class/opal_list.h"
#include "ompi/communicator/communicator.h"
#include "ompi/proc/proc.h"
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
extern "C" { extern "C" {
#endif #endif
@ -33,6 +35,7 @@ extern "C" {
struct mca_pml_dr_comm_proc_t { struct mca_pml_dr_comm_proc_t {
opal_object_t super; opal_object_t super;
uint16_t expected_sequence; /**< send message sequence number - receiver side */ uint16_t expected_sequence; /**< send message sequence number - receiver side */
uint32_t vfrag_id; /**< virtual fragment identifier */
#if OMPI_HAVE_THREAD_SUPPORT #if OMPI_HAVE_THREAD_SUPPORT
volatile int32_t send_sequence; /**< send side sequence number */ volatile int32_t send_sequence; /**< send side sequence number */
#else #else
@ -41,6 +44,7 @@ struct mca_pml_dr_comm_proc_t {
opal_list_t frags_cant_match; /**< out-of-order fragment queues */ opal_list_t frags_cant_match; /**< out-of-order fragment queues */
opal_list_t specific_receives; /**< queues of unmatched specific receives */ opal_list_t specific_receives; /**< queues of unmatched specific receives */
opal_list_t unexpected_frags; /**< unexpected fragment queues */ opal_list_t unexpected_frags; /**< unexpected fragment queues */
ompi_proc_t* ompi_proc; /**< back pointer to ompi_proc_t */
}; };
typedef struct mca_pml_dr_comm_proc_t mca_pml_dr_comm_proc_t; typedef struct mca_pml_dr_comm_proc_t mca_pml_dr_comm_proc_t;
@ -69,12 +73,12 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_pml_dr_comm_t);
/** /**
* Initialize an instance of mca_pml_dr_comm_t based on the communicator size. * Initialize an instance of mca_pml_dr_comm_t based on the communicator size.
* *
* @param comm Instance of mca_pml_dr_comm_t * @param dr_comm Instance of mca_pml_dr_comm_t
* @param size Size of communicator * @param pml_comm Communicator
* @return OMPI_SUCCESS or error status on failure. * @return OMPI_SUCCESS or error status on failure.
*/ */
OMPI_DECLSPEC extern int mca_pml_dr_comm_init_size(mca_pml_dr_comm_t* comm, size_t size); OMPI_DECLSPEC extern int mca_pml_dr_comm_init(mca_pml_dr_comm_t* dr_comm, ompi_communicator_t* ompi_comm);
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
} }

Просмотреть файл

@ -125,6 +125,16 @@ int mca_pml_dr_component_open(void)
mca_pml_dr.free_list_inc, mca_pml_dr.free_list_inc,
NULL); NULL);
OBJ_CONSTRUCT(&mca_pml_dr.vfrags, ompi_free_list_t);
ompi_free_list_init(
&mca_pml_dr.vfrags,
sizeof(mca_pml_dr_vfrag_t),
OBJ_CLASS(mca_pml_dr_vfrag_t),
mca_pml_dr.free_list_num,
mca_pml_dr.free_list_max,
mca_pml_dr.free_list_inc,
NULL);
OBJ_CONSTRUCT(&mca_pml_dr.send_pending, opal_list_t); OBJ_CONSTRUCT(&mca_pml_dr.send_pending, opal_list_t);
OBJ_CONSTRUCT(&mca_pml_dr.acks_pending, opal_list_t); OBJ_CONSTRUCT(&mca_pml_dr.acks_pending, opal_list_t);
OBJ_CONSTRUCT(&mca_pml_dr.buffers, ompi_free_list_t); OBJ_CONSTRUCT(&mca_pml_dr.buffers, ompi_free_list_t);

Просмотреть файл

@ -1,35 +0,0 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_PML_DR_ENDPOINT_H
#define MCA_PML_DR_ENDPOINT_H
#include "opal/util/output.h"
#include "mca/btl/btl.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif

Просмотреть файл

@ -31,15 +31,13 @@
#define MCA_PML_DR_HDR_TYPE_MATCH 1 #define MCA_PML_DR_HDR_TYPE_MATCH 1
#define MCA_PML_DR_HDR_TYPE_RNDV 2 #define MCA_PML_DR_HDR_TYPE_RNDV 2
#define MCA_PML_DR_HDR_TYPE_ACK 4 #define MCA_PML_DR_HDR_TYPE_ACK 3
#define MCA_PML_DR_HDR_TYPE_NACK 5 #define MCA_PML_DR_HDR_TYPE_NACK 4
#define MCA_PML_DR_HDR_TYPE_FRAG 6 #define MCA_PML_DR_HDR_TYPE_FRAG 5
#define MCA_PML_DR_HDR_TYPE_MAX 10
#define MCA_PML_DR_HDR_FLAGS_ACK 1 /* is an ack required */ #define MCA_PML_DR_HDR_FLAGS_ACK 1 /* is an ack required */
#define MCA_PML_DR_HDR_FLAGS_NBO 2 /* is the hdr in network byte order */ #define MCA_PML_DR_HDR_FLAGS_NBO 2 /* is the hdr in network byte order */
#define MCA_PML_DR_HDR_FLAGS_PIN 4 /* is user buffer pinned */ #define MCA_PML_DR_HDR_FLAGS_MATCH 4 /* is the ack in response to a match */
#define MCA_PML_DR_HDR_FLAGS_CONTIG 8 /* is user buffer contiguous */
/* /*
@ -88,6 +86,7 @@ static inline uint64_t ntoh64(uint64_t val)
struct mca_pml_dr_common_hdr_t { struct mca_pml_dr_common_hdr_t {
uint8_t hdr_type; /**< type of envelope */ uint8_t hdr_type; /**< type of envelope */
uint8_t hdr_flags; /**< flags indicating how fragment should be processed */ uint8_t hdr_flags; /**< flags indicating how fragment should be processed */
uint16_t hdr_csum; /**< checksum over header */
}; };
typedef struct mca_pml_dr_common_hdr_t mca_pml_dr_common_hdr_t; typedef struct mca_pml_dr_common_hdr_t mca_pml_dr_common_hdr_t;
@ -99,11 +98,14 @@ typedef struct mca_pml_dr_common_hdr_t mca_pml_dr_common_hdr_t;
* attributes required to match the corresponding posted receive. * attributes required to match the corresponding posted receive.
*/ */
struct mca_pml_dr_match_hdr_t { struct mca_pml_dr_match_hdr_t {
mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */ mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */
uint32_t hdr_vid; /**< vfrag id */
uint16_t hdr_ctx; /**< communicator index */ uint16_t hdr_ctx; /**< communicator index */
uint16_t hdr_seq; /**< message sequence number */
int32_t hdr_src; /**< source rank */ int32_t hdr_src; /**< source rank */
int32_t hdr_tag; /**< user tag */ int32_t hdr_tag; /**< user tag */
uint16_t hdr_seq; /**< message sequence number */ uint32_t hdr_csum; /**< checksum over data */
ompi_ptr_t hdr_src_req; /**< pointer to source request - returned in ack */
}; };
typedef struct mca_pml_dr_match_hdr_t mca_pml_dr_match_hdr_t; typedef struct mca_pml_dr_match_hdr_t mca_pml_dr_match_hdr_t;
@ -133,7 +135,6 @@ typedef struct mca_pml_dr_match_hdr_t mca_pml_dr_match_hdr_t;
struct mca_pml_dr_rendezvous_hdr_t { struct mca_pml_dr_rendezvous_hdr_t {
mca_pml_dr_match_hdr_t hdr_match; mca_pml_dr_match_hdr_t hdr_match;
uint64_t hdr_msg_length; /**< message length */ uint64_t hdr_msg_length; /**< message length */
ompi_ptr_t hdr_src_req; /**< pointer to source request - returned in ack */
}; };
typedef struct mca_pml_dr_rendezvous_hdr_t mca_pml_dr_rendezvous_hdr_t; typedef struct mca_pml_dr_rendezvous_hdr_t mca_pml_dr_rendezvous_hdr_t;
@ -153,10 +154,14 @@ typedef struct mca_pml_dr_rendezvous_hdr_t mca_pml_dr_rendezvous_hdr_t;
* Header for subsequent fragments. * Header for subsequent fragments.
*/ */
struct mca_pml_dr_frag_hdr_t { struct mca_pml_dr_frag_hdr_t {
mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */ mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */
uint64_t hdr_frag_offset; /**< offset into message */ uint32_t hdr_vid; /**< virtual frag id */
ompi_ptr_t hdr_src_req; /**< pointer to source request */ uint16_t hdr_vlen; /**< length of entire vfrag */
ompi_ptr_t hdr_dst_req; /**< pointer to matched receive */ uint16_t hdr_frag_idx; /**< bit index of this frag w/in vfrag */
uint32_t hdr_frag_csum; /**< checksum over data */
uint64_t hdr_frag_offset; /**< absolute offset of this fragment */
ompi_ptr_t hdr_src_req; /**< pointer to source req */
ompi_ptr_t hdr_dst_req; /**< pointer to receive req */
}; };
typedef struct mca_pml_dr_frag_hdr_t mca_pml_dr_frag_hdr_t; typedef struct mca_pml_dr_frag_hdr_t mca_pml_dr_frag_hdr_t;
@ -179,6 +184,8 @@ typedef struct mca_pml_dr_frag_hdr_t mca_pml_dr_frag_hdr_t;
struct mca_pml_dr_ack_hdr_t { struct mca_pml_dr_ack_hdr_t {
mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */ mca_pml_dr_common_hdr_t hdr_common; /**< common attributes */
uint32_t hdr_vid; /**< virtual fragment id */
uint64_t hdr_vmask; /**< acknowledged frags */
ompi_ptr_t hdr_src_req; /**< source request */ ompi_ptr_t hdr_src_req; /**< source request */
ompi_ptr_t hdr_dst_req; /**< matched receive request */ ompi_ptr_t hdr_dst_req; /**< matched receive request */
}; };

Просмотреть файл

@ -25,7 +25,6 @@
#include "communicator/communicator.h" #include "communicator/communicator.h"
#include "group/group.h" #include "group/group.h"
#include "proc/proc.h" #include "proc/proc.h"
#include "pml_dr_endpoint.h"
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
extern "C" { extern "C" {
@ -40,57 +39,6 @@ struct mca_pml_dr_proc_t {
typedef struct mca_pml_dr_proc_t mca_pml_dr_proc_t; typedef struct mca_pml_dr_proc_t mca_pml_dr_proc_t;
OMPI_COMP_EXPORT extern opal_class_t mca_pml_dr_proc_t_class; OMPI_COMP_EXPORT extern opal_class_t mca_pml_dr_proc_t_class;
/**
* Return the mca_pml_proc_t instance cached in the communicators local group.
*
* @param comm Communicator
* @param rank Peer rank
* @return mca_pml_proc_t instance
*/
/* static inline mca_pml_dr_proc_t* mca_pml_dr_proc_lookup_local(ompi_communicator_t* comm, int rank) */
/* { */
/* return (mca_pml_dr_proc_t*) comm->c_local_group->grp_proc_pointers[rank]->proc_pml; */
/* } */
/* /\** */
/* * Return the mca_pml_proc_t instance cached on the communicators remote group. */
/* * */
/* * @param comm Communicator */
/* * @param rank Peer rank */
/* * @return mca_pml_proc_t instance */
/* *\/ */
/* static inline mca_pml_dr_proc_t* mca_pml_dr_proc_lookup_remote(ompi_communicator_t* comm, int rank) */
/* { */
/* return (mca_pml_dr_proc_t*) comm->c_pml_procs[rank]; */
/* } */
/* /\** */
/* * Return the mca_btl_peer_t instance corresponding to the process/btl combination. */
/* * */
/* * @param comm Communicator */
/* * @param rank Peer rank */
/* * @return mca_pml_proc_t instance */
/* *\/ */
/* static inline struct mca_btl_base_endpoint_t* mca_pml_dr_proc_lookup_remote_endpoint( */
/* ompi_communicator_t* comm, */
/* int rank, */
/* struct mca_btl_base_module_t* btl) */
/* { */
/* mca_pml_dr_proc_t* proc = (mca_pml_dr_proc_t*) comm->c_pml_procs[rank]; */
/* size_t i, size = mca_pml_dr_ep_array_get_size(&proc->btl_eager); */
/* mca_pml_dr_endpoint_t* endpoint = proc->btl_eager.arr_endpoints; */
/* for(i = 0; i < size; i++) { */
/* if(endpoint->btl == btl) { */
/* return endpoint->btl_endpoint; */
/* } */
/* endpoint++; */
/* } */
/* return NULL; */
/* } */
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
} }

Просмотреть файл

@ -50,6 +50,19 @@ OBJ_CLASS_INSTANCE(
NULL NULL
); );
/*
* Release resources.
*/
static void mca_pml_dr_ctl_completion(
mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_descriptor_t* des,
int status)
{
mca_bml_base_btl_t* bml_btl = (mca_bml_base_btl_t*)des->des_context;
MCA_BML_BASE_BTL_DES_RETURN(bml_btl, des);
}
/** /**
@ -79,8 +92,14 @@ void mca_pml_dr_recv_frag_callback(
{ {
mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*) mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*)
hdr->hdr_ack.hdr_src_req.pval; hdr->hdr_ack.hdr_src_req.pval;
sendreq->req_recv = hdr->hdr_ack.hdr_dst_req; mca_pml_dr_send_request_acked(sendreq, &hdr->hdr_ack);
MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq); break;
}
case MCA_PML_DR_HDR_TYPE_NACK:
{
mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*)
hdr->hdr_ack.hdr_src_req.pval;
mca_pml_dr_send_request_nacked(sendreq, &hdr->hdr_ack);
break; break;
} }
case MCA_PML_DR_HDR_TYPE_FRAG: case MCA_PML_DR_HDR_TYPE_FRAG:
@ -382,7 +401,7 @@ static bool mca_pml_dr_check_cantmatch_for_match(
* - fragments may be corrupt * - fragments may be corrupt
* - this routine may be called simultaneously by more than one thread * - this routine may be called simultaneously by more than one thread
*/ */
int mca_pml_dr_recv_frag_match( bool mca_pml_dr_recv_frag_match(
mca_btl_base_module_t *btl, mca_btl_base_module_t *btl,
mca_pml_dr_match_hdr_t *hdr, mca_pml_dr_match_hdr_t *hdr,
mca_btl_base_segment_t* segments, mca_btl_base_segment_t* segments,
@ -396,6 +415,7 @@ int mca_pml_dr_recv_frag_match(
mca_pml_dr_comm_proc_t *proc; mca_pml_dr_comm_proc_t *proc;
bool additional_match=false; bool additional_match=false;
opal_list_t additional_matches; opal_list_t additional_matches;
ompi_proc_t* ompi_proc;
int rc; int rc;
/* communicator pointer */ /* communicator pointer */
@ -405,6 +425,7 @@ int mca_pml_dr_recv_frag_match(
/* source sequence number */ /* source sequence number */
frag_msg_seq = hdr->hdr_seq; frag_msg_seq = hdr->hdr_seq;
proc = comm->procs + hdr->hdr_src; proc = comm->procs + hdr->hdr_src;
ompi_proc = proc->ompi_proc;
/* get next expected message sequence number - if threaded /* get next expected message sequence number - if threaded
* run, lock to make sure that if another thread is processing * run, lock to make sure that if another thread is processing
@ -427,6 +448,7 @@ int mca_pml_dr_recv_frag_match(
/* We're now expecting the next sequence number. */ /* We're now expecting the next sequence number. */
(proc->expected_sequence)++; (proc->expected_sequence)++;
rematch:
/* /*
* figure out what sort of matching logic to use, if need to * figure out what sort of matching logic to use, if need to
@ -459,15 +481,12 @@ int mca_pml_dr_recv_frag_match(
*/ */
if( (match->req_recv.req_base.req_type == MCA_PML_REQUEST_PROBE) ) { if( (match->req_recv.req_base.req_type == MCA_PML_REQUEST_PROBE) ) {
/* Match a probe, rollback the next expected sequence number */
(proc->expected_sequence)--;
OPAL_THREAD_UNLOCK(&comm->matching_lock);
/* complete the probe */ /* complete the probe */
mca_pml_dr_recv_request_matched_probe(match,btl,segments,num_segments); mca_pml_dr_recv_request_matched_probe(match,btl,segments,num_segments);
/* attempt to match actual request */ /* retry the matchh */
return mca_pml_dr_recv_frag_match(btl,hdr,segments,num_segments); match = NULL;
goto rematch;
} }
} else { } else {
@ -478,7 +497,7 @@ int mca_pml_dr_recv_frag_match(
OPAL_THREAD_UNLOCK(&comm->matching_lock); OPAL_THREAD_UNLOCK(&comm->matching_lock);
return rc; return rc;
} }
MCA_PML_DR_RECV_FRAG_INIT(frag,hdr,segments,num_segments,btl); MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl);
opal_list_append( &proc->unexpected_frags, (opal_list_item_t *)frag ); opal_list_append( &proc->unexpected_frags, (opal_list_item_t *)frag );
} }
@ -503,9 +522,8 @@ int mca_pml_dr_recv_frag_match(
OPAL_THREAD_UNLOCK(&comm->matching_lock); OPAL_THREAD_UNLOCK(&comm->matching_lock);
return rc; return rc;
} }
MCA_PML_DR_RECV_FRAG_INIT(frag,hdr,segments,num_segments,btl); MCA_PML_DR_RECV_FRAG_INIT(frag,proc->ompi_proc,hdr,segments,num_segments,btl);
opal_list_append(&proc->frags_cant_match, (opal_list_item_t *)frag); opal_list_append(&proc->frags_cant_match, (opal_list_item_t *)frag);
} }
OPAL_THREAD_UNLOCK(&comm->matching_lock); OPAL_THREAD_UNLOCK(&comm->matching_lock);
@ -522,7 +540,7 @@ int mca_pml_dr_recv_frag_match(
MCA_PML_DR_RECV_FRAG_RETURN(frag); MCA_PML_DR_RECV_FRAG_RETURN(frag);
} }
} }
return OMPI_SUCCESS; return (match != NULL);
} }
@ -550,7 +568,6 @@ static bool mca_pml_dr_check_cantmatch_for_match(
int match_found; int match_found;
uint16_t next_msg_seq_expected, frag_seq; uint16_t next_msg_seq_expected, frag_seq;
mca_pml_dr_recv_frag_t *frag; mca_pml_dr_recv_frag_t *frag;
mca_pml_dr_recv_request_t *match = NULL;
bool match_made = false; bool match_made = false;
/* /*
@ -582,6 +599,7 @@ static bool mca_pml_dr_check_cantmatch_for_match(
*/ */
frag_seq=frag->hdr.hdr_match.hdr_seq; frag_seq=frag->hdr.hdr_match.hdr_seq;
if (frag_seq == next_msg_seq_expected) { if (frag_seq == next_msg_seq_expected) {
mca_pml_dr_recv_request_t *match = NULL;
mca_pml_dr_match_hdr_t* hdr = &frag->hdr.hdr_match; mca_pml_dr_match_hdr_t* hdr = &frag->hdr.hdr_match;
/* We're now expecting the next sequence number. */ /* We're now expecting the next sequence number. */
@ -596,6 +614,7 @@ static bool mca_pml_dr_check_cantmatch_for_match(
opal_list_remove_item(&proc->frags_cant_match, opal_list_remove_item(&proc->frags_cant_match,
(opal_list_item_t *)frag); (opal_list_item_t *)frag);
rematch:
/* /*
* figure out what sort of matching logic to use, if need to * figure out what sort of matching logic to use, if need to
* look only at "specific" receives, or "wild" receives, * look only at "specific" receives, or "wild" receives,
@ -623,18 +642,33 @@ static bool mca_pml_dr_check_cantmatch_for_match(
/* if match found, process data */ /* if match found, process data */
if (match) { if (match) {
/* associate the receive descriptor with the fragment /*
* descriptor */ * If this was a probe need to queue fragment on unexpected list
frag->request=match;
/* add this fragment descriptor to the list of
* descriptors to be processed later
*/ */
if(match_made == false) { if( (match->req_recv.req_base.req_type == MCA_PML_REQUEST_PROBE) ) {
match_made = true;
OBJ_CONSTRUCT(additional_matches, opal_list_t); /* complete the probe */
mca_pml_dr_recv_request_matched_probe(match,frag->btl,frag->segments,frag->num_segments);
/* retry the match */
match = NULL;
goto rematch;
} else {
/* associate the receive descriptor with the fragment
* descriptor */
frag->request=match;
/* add this fragment descriptor to the list of
* descriptors to be processed later
*/
if(match_made == false) {
match_made = true;
OBJ_CONSTRUCT(additional_matches, opal_list_t);
}
opal_list_append(additional_matches, (opal_list_item_t *)frag);
} }
opal_list_append(additional_matches, (opal_list_item_t *)frag);
} else { } else {
@ -656,3 +690,45 @@ static bool mca_pml_dr_check_cantmatch_for_match(
return match_made; return match_made;
} }
/**
* Generate an acknowledgement to the request
*/
void mca_pml_dr_recv_frag_ack(mca_pml_dr_recv_frag_t* frag)
{
mca_pml_dr_ack_hdr_t* ack;
mca_btl_base_descriptor_t* des;
mca_bml_base_endpoint_t* bml_endpoint;
mca_bml_base_btl_t* bml_btl;
int rc;
/* lookup btl */
bml_endpoint = (mca_bml_base_endpoint_t*)frag->proc->proc_pml;
bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager);
/* allocate descriptor */
MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t));
if(NULL == des) {
return;
}
/* fill out header */
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK;
ack->hdr_common.hdr_flags = 0;
ack->hdr_vmask = 1;
ack->hdr_vid = frag->hdr.hdr_match.hdr_vid;
ack->hdr_src_req = frag->hdr.hdr_match.hdr_src_req;
ack->hdr_dst_req.pval = NULL;
/* initialize descriptor */
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
des->des_cbfunc = mca_pml_dr_ctl_completion;
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
if(rc != OMPI_SUCCESS) {
mca_bml_base_free(bml_btl, des);
}
}

Просмотреть файл

@ -41,7 +41,9 @@ struct mca_pml_dr_recv_frag_t {
mca_pml_dr_hdr_t hdr; mca_pml_dr_hdr_t hdr;
struct mca_pml_dr_recv_request_t* request; struct mca_pml_dr_recv_request_t* request;
size_t num_segments; size_t num_segments;
uint32_t csum;
mca_btl_base_module_t* btl; mca_btl_base_module_t* btl;
ompi_proc_t* proc;
mca_btl_base_segment_t segments[MCA_BTL_DES_MAX_SEGMENTS]; mca_btl_base_segment_t segments[MCA_BTL_DES_MAX_SEGMENTS];
mca_pml_dr_buffer_t* buffers[MCA_BTL_DES_MAX_SEGMENTS]; mca_pml_dr_buffer_t* buffers[MCA_BTL_DES_MAX_SEGMENTS];
}; };
@ -50,30 +52,33 @@ typedef struct mca_pml_dr_recv_frag_t mca_pml_dr_recv_frag_t;
OBJ_CLASS_DECLARATION(mca_pml_dr_recv_frag_t); OBJ_CLASS_DECLARATION(mca_pml_dr_recv_frag_t);
#define MCA_PML_DR_RECV_FRAG_ALLOC(frag,rc) \ #define MCA_PML_DR_RECV_FRAG_ALLOC(frag,rc) \
do { \ do { \
opal_list_item_t* item; \ opal_list_item_t* item; \
OMPI_FREE_LIST_WAIT(&mca_pml_dr.recv_frags, item, rc); \ OMPI_FREE_LIST_WAIT(&mca_pml_dr.recv_frags, item, rc); \
frag = (mca_pml_dr_recv_frag_t*)item; \ frag = (mca_pml_dr_recv_frag_t*)item; \
} while(0) } while(0)
#define MCA_PML_DR_RECV_FRAG_INIT(frag, hdr,segs,cnt,btl) \ #define MCA_PML_DR_RECV_FRAG_INIT(frag,oproc,hdr,segs,cnt,btl) \
do { \ do { \
size_t i; \ size_t i; \
mca_btl_base_segment_t* macro_segments = frag->segments; \ mca_btl_base_segment_t* macro_segments = frag->segments; \
mca_pml_dr_buffer_t** buffers = frag->buffers; \ mca_pml_dr_buffer_t** buffers = frag->buffers; \
\ \
/* init recv_frag */ \ /* init recv_frag */ \
frag->btl = btl; \ frag->btl = btl; \
frag->hdr = *(mca_pml_dr_hdr_t*)hdr; \ frag->hdr = *(mca_pml_dr_hdr_t*)hdr; \
frag->num_segments = cnt; \ frag->num_segments = cnt; \
frag->csum = 0; \
frag->proc = oproc; \
\
/* copy over data */ \ /* copy over data */ \
for(i=0; i<cnt; i++) { \ for(i=0; i<cnt; i++) { \
opal_list_item_t* item; \ opal_list_item_t* item; \
mca_pml_dr_buffer_t* buff; \ mca_pml_dr_buffer_t* buff; \
OMPI_FREE_LIST_WAIT(&mca_pml_dr.buffers, item, rc); \ OMPI_FREE_LIST_WAIT(&mca_pml_dr.buffers, item, rc); \
buff = (mca_pml_dr_buffer_t*)item; \ buff = (mca_pml_dr_buffer_t*)item; \
buffers[i] = buff; \ buffers[i] = buff; \
macro_segments[i].seg_addr.pval = buff->addr; \ macro_segments[i].seg_addr.pval = buff->addr; \
macro_segments[i].seg_len = segs[i].seg_len; \ macro_segments[i].seg_len = segs[i].seg_len; \
@ -81,23 +86,24 @@ do { \
segs[i].seg_addr.pval, \ segs[i].seg_addr.pval, \
segs[i].seg_len); \ segs[i].seg_len); \
} \ } \
mca_pml_dr_recv_frag_ack(frag); \
\ \
} while(0) } while(0)
#define MCA_PML_DR_RECV_FRAG_RETURN(frag) \ #define MCA_PML_DR_RECV_FRAG_RETURN(frag) \
do { \ do { \
size_t i; \ size_t i; \
\ \
/* return buffers */ \ /* return buffers */ \
for(i=0; i<frag->num_segments; i++) { \ for(i=0; i<frag->num_segments; i++) { \
OMPI_FREE_LIST_RETURN(&mca_pml_dr.buffers, \ OMPI_FREE_LIST_RETURN(&mca_pml_dr.buffers, \
(opal_list_item_t*)frag->buffers[i]); \ (opal_list_item_t*)frag->buffers[i]); \
} \ } \
frag->num_segments = 0; \ frag->num_segments = 0; \
\ \
/* return recv_frag */ \ /* return recv_frag */ \
OMPI_FREE_LIST_RETURN(&mca_pml_dr.recv_frags, \ OMPI_FREE_LIST_RETURN(&mca_pml_dr.recv_frags, \
(opal_list_item_t*)frag); \ (opal_list_item_t*)frag); \
} while(0) } while(0)
@ -107,11 +113,10 @@ do { \
*/ */
OMPI_DECLSPEC void mca_pml_dr_recv_frag_callback( OMPI_DECLSPEC void mca_pml_dr_recv_frag_callback(
mca_btl_base_module_t *btl, mca_btl_base_module_t *btl,
mca_btl_base_tag_t tag, mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t* descriptor, mca_btl_base_descriptor_t* descriptor,
void* cbdata void* cbdata);
);
/** /**
* Match incoming recv_frags against posted receives. * Match incoming recv_frags against posted receives.
@ -123,11 +128,18 @@ OMPI_DECLSPEC void mca_pml_dr_recv_frag_callback(
* @param additional_matches (OUT) List of additional matches * @param additional_matches (OUT) List of additional matches
* @return OMPI_SUCCESS or error status on failure. * @return OMPI_SUCCESS or error status on failure.
*/ */
OMPI_DECLSPEC int mca_pml_dr_recv_frag_match( OMPI_DECLSPEC bool mca_pml_dr_recv_frag_match(
mca_btl_base_module_t* btl, mca_btl_base_module_t* btl,
mca_pml_dr_match_hdr_t *hdr, mca_pml_dr_match_hdr_t *hdr,
mca_btl_base_segment_t* segments, mca_btl_base_segment_t* segments,
size_t num_segments); size_t num_segments);
/**
* Generate an acknowledgment for an unexpected/out-of-order fragment
*/
OMPI_DECLSPEC void mca_pml_dr_recv_frag_ack(
mca_pml_dr_recv_frag_t* frag);
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)

Просмотреть файл

@ -79,8 +79,8 @@ static int mca_pml_dr_recv_request_cancel(struct ompi_request_t* ompi_request, i
ompi_request->req_status._cancelled = true; ompi_request->req_status._cancelled = true;
ompi_request->req_complete = true; /* mark it as completed so all the test/wait functions ompi_request->req_complete = true; /* mark it as completed so all the test/wait functions
* on this particular request will finish */ * on this particular request will finish */
/* Now we have a problem if we are in a multi-threaded environment. We shou ld /* Now we have a problem if we are in a multi-threaded environment. We should
* broadcast the condition on the request in order to allow the other threa ds * broadcast the condition on the request in order to allow the other threads
* to complete their test/wait functions. * to complete their test/wait functions.
*/ */
ompi_request_completed++; ompi_request_completed++;
@ -97,10 +97,16 @@ static void mca_pml_dr_recv_request_construct(mca_pml_dr_recv_request_t* request
request->req_recv.req_base.req_ompi.req_fini = mca_pml_dr_recv_request_fini; request->req_recv.req_base.req_ompi.req_fini = mca_pml_dr_recv_request_fini;
request->req_recv.req_base.req_ompi.req_free = mca_pml_dr_recv_request_free; request->req_recv.req_base.req_ompi.req_free = mca_pml_dr_recv_request_free;
request->req_recv.req_base.req_ompi.req_cancel = mca_pml_dr_recv_request_cancel; request->req_recv.req_base.req_ompi.req_cancel = mca_pml_dr_recv_request_cancel;
OBJ_CONSTRUCT(&request->req_vfrag0, mca_pml_dr_vfrag_t);
OBJ_CONSTRUCT(&request->req_vfrags, opal_list_t);
OBJ_CONSTRUCT(&request->req_mutex, opal_mutex_t);
} }
static void mca_pml_dr_recv_request_destruct(mca_pml_dr_recv_request_t* request) static void mca_pml_dr_recv_request_destruct(mca_pml_dr_recv_request_t* request)
{ {
OBJ_DESTRUCT(&request->req_vfrag0);
OBJ_DESTRUCT(&request->req_vfrags);
OBJ_DESTRUCT(&request->req_mutex);
} }
@ -127,15 +133,15 @@ static void mca_pml_dr_ctl_completion(
/* /*
* * Generate an ack to the peer after first fragment is matched.
*/ */
static void mca_pml_dr_recv_request_ack( static void mca_pml_dr_recv_request_matched(
mca_pml_dr_recv_request_t* recvreq, mca_pml_dr_recv_request_t* recvreq,
mca_pml_dr_rendezvous_hdr_t* hdr, mca_pml_dr_rendezvous_hdr_t* hdr,
size_t bytes_received) uint8_t type)
{ {
ompi_proc_t* proc = (ompi_proc_t*) recvreq->req_proc; ompi_proc_t* proc = recvreq->req_proc;
mca_bml_base_endpoint_t* bml_endpoint = NULL; mca_bml_base_endpoint_t* bml_endpoint = NULL;
mca_btl_base_descriptor_t* des; mca_btl_base_descriptor_t* des;
mca_bml_base_btl_t* bml_btl; mca_bml_base_btl_t* bml_btl;
@ -160,9 +166,11 @@ static void mca_pml_dr_recv_request_ack(
/* fill out header */ /* fill out header */
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval; ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK; ack->hdr_common.hdr_type = type;
ack->hdr_common.hdr_flags = 0; ack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_MATCH;
ack->hdr_src_req = hdr->hdr_src_req; ack->hdr_vid = hdr->hdr_match.hdr_vid;
ack->hdr_vmask = 0x1;
ack->hdr_src_req = hdr->hdr_match.hdr_src_req;
ack->hdr_dst_req.pval = recvreq; ack->hdr_dst_req.pval = recvreq;
/* initialize descriptor */ /* initialize descriptor */
@ -185,6 +193,94 @@ retry:
opal_list_append(&mca_pml_dr.acks_pending, (opal_list_item_t*)frag); opal_list_append(&mca_pml_dr.acks_pending, (opal_list_item_t*)frag);
} }
/*
* Generate an ack to the peer after first fragment is matched.
*/
static void mca_pml_dr_recv_request_nack(
mca_pml_dr_recv_request_t* recvreq,
mca_pml_dr_frag_hdr_t* hdr)
{
ompi_proc_t* proc = recvreq->req_proc;
mca_bml_base_endpoint_t* bml_endpoint = NULL;
mca_btl_base_descriptor_t* des;
mca_bml_base_btl_t* bml_btl;
mca_pml_dr_ack_hdr_t* nack;
int rc;
bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_pml;
bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager);
/* allocate descriptor */
MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t));
if(NULL == des) {
return;
}
/* fill out header */
nack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
nack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_NACK;
nack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_MATCH;
nack->hdr_vid = hdr->hdr_vid;
nack->hdr_vmask = 1 << hdr->hdr_frag_idx;
nack->hdr_src_req = hdr->hdr_src_req;
nack->hdr_dst_req.pval = recvreq;
/* initialize descriptor */
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
des->des_cbfunc = mca_pml_dr_ctl_completion;
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
if(rc != OMPI_SUCCESS) {
mca_bml_base_free(bml_btl, des);
}
}
/*
* Generate an ack w/ the current vfrag status.
*/
static void mca_pml_dr_recv_request_vfrag_ack(
mca_pml_dr_recv_request_t* recvreq,
mca_pml_dr_vfrag_t* vfrag)
{
ompi_proc_t* proc = recvreq->req_proc;
mca_bml_base_endpoint_t* bml_endpoint = NULL;
mca_btl_base_descriptor_t* des;
mca_bml_base_btl_t* bml_btl;
mca_pml_dr_ack_hdr_t* ack;
int rc;
bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_pml;
bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager);
/* allocate descriptor */
MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t));
if(NULL == des) {
return;
}
/* fill out header */
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK;
ack->hdr_common.hdr_flags = 0;
ack->hdr_vid = vfrag->vf_id;
ack->hdr_vmask = vfrag->vf_ack;
ack->hdr_src_req = recvreq->req_vfrag0.vf_send;
ack->hdr_dst_req.pval = recvreq;
/* initialize descriptor */
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
des->des_cbfunc = mca_pml_dr_ctl_completion;
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
if(rc != OMPI_SUCCESS) {
mca_bml_base_free(bml_btl, des);
}
}
/* /*
* Update the recv request status to reflect the number of bytes * Update the recv request status to reflect the number of bytes
@ -202,6 +298,7 @@ void mca_pml_dr_recv_request_progress(
size_t data_offset = 0; size_t data_offset = 0;
mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval; mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval;
size_t i; size_t i;
uint32_t csum = 0;
for(i=0; i<num_segments; i++) for(i=0; i<num_segments; i++)
bytes_received += segments[i].seg_len; bytes_received += segments[i].seg_len;
@ -219,16 +316,16 @@ void mca_pml_dr_recv_request_progress(
sizeof(mca_pml_dr_match_hdr_t), sizeof(mca_pml_dr_match_hdr_t),
data_offset, data_offset,
bytes_received, bytes_received,
bytes_delivered); bytes_delivered,
csum);
break; break;
case MCA_PML_DR_HDR_TYPE_RNDV: case MCA_PML_DR_HDR_TYPE_RNDV:
bytes_received -= sizeof(mca_pml_dr_rendezvous_hdr_t); bytes_received -= sizeof(mca_pml_dr_rendezvous_hdr_t);
recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length; recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length;
recvreq->req_send = hdr->hdr_rndv.hdr_src_req; recvreq->req_vfrag0.vf_send = hdr->hdr_match.hdr_src_req;
MCA_PML_DR_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match); MCA_PML_DR_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match);
mca_pml_dr_recv_request_ack(recvreq, &hdr->hdr_rndv, bytes_received);
MCA_PML_DR_RECV_REQUEST_UNPACK( MCA_PML_DR_RECV_REQUEST_UNPACK(
recvreq, recvreq,
segments, segments,
@ -236,7 +333,10 @@ void mca_pml_dr_recv_request_progress(
sizeof(mca_pml_dr_rendezvous_hdr_t), sizeof(mca_pml_dr_rendezvous_hdr_t),
data_offset, data_offset,
bytes_received, bytes_received,
bytes_delivered); bytes_delivered,
csum);
mca_pml_dr_recv_request_matched(recvreq, &hdr->hdr_rndv,
(csum == hdr->hdr_match.hdr_csum) ? MCA_PML_DR_HDR_TYPE_ACK : MCA_PML_DR_HDR_TYPE_NACK);
break; break;
case MCA_PML_DR_HDR_TYPE_FRAG: case MCA_PML_DR_HDR_TYPE_FRAG:
@ -250,7 +350,30 @@ void mca_pml_dr_recv_request_progress(
sizeof(mca_pml_dr_frag_hdr_t), sizeof(mca_pml_dr_frag_hdr_t),
data_offset, data_offset,
bytes_received, bytes_received,
bytes_delivered); bytes_delivered,
csum);
/* if checksum fails - immediately nack this fragment */
if(csum != hdr->hdr_frag.hdr_frag_csum) {
bytes_received = bytes_delivered = 0;
mca_pml_dr_recv_request_nack(recvreq, &hdr->hdr_frag);
} else {
mca_pml_dr_vfrag_t* vfrag;
uint64_t bit = ((uint64_t)1 << hdr->hdr_frag.hdr_frag_idx);
/* update vfrag status */
MCA_PML_DR_RECV_REQUEST_VFRAG_LOOKUP(recvreq, &hdr->hdr_frag, vfrag);
if((vfrag->vf_ack & bit) == 0) {
vfrag->vf_ack |= bit;
if((vfrag->vf_ack & vfrag->vf_mask) == vfrag->vf_mask) {
/* done w/ this vfrag - ack it */
mca_pml_dr_recv_request_vfrag_ack(recvreq, vfrag);
}
} else {
/* duplicate fragment - send an ack w/ the frags completed */
mca_pml_dr_recv_request_vfrag_ack(recvreq, vfrag);
}
}
break; break;
default: default:
@ -468,3 +591,4 @@ static mca_pml_dr_recv_frag_t* mca_pml_dr_recv_request_match_specific_proc(
return frag; return frag;
} }

Просмотреть файл

@ -21,11 +21,14 @@
#ifndef OMPI_PML_DR_RECV_REQUEST_H #ifndef OMPI_PML_DR_RECV_REQUEST_H
#define OMPI_PML_DR_RECV_REQUEST_H #define OMPI_PML_DR_RECV_REQUEST_H
#include "pml_dr.h" #include "ompi_config.h"
#include "pml_dr_proc.h"
#include "mca/mpool/base/base.h" #include "mca/mpool/base/base.h"
#include "mca/pml/base/pml_base_recvreq.h" #include "mca/pml/base/pml_base_recvreq.h"
#include "pml_dr.h"
#include "pml_dr_proc.h"
#include "pml_dr_vfrag.h"
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
extern "C" { extern "C" {
#endif #endif
@ -34,15 +37,19 @@ extern "C" {
struct mca_pml_dr_recv_request_t { struct mca_pml_dr_recv_request_t {
mca_pml_base_recv_request_t req_recv; mca_pml_base_recv_request_t req_recv;
struct ompi_proc_t *req_proc; struct ompi_proc_t *req_proc;
ompi_ptr_t req_send;
#if OMPI_HAVE_THREAD_SUPPORT #if OMPI_HAVE_THREAD_SUPPORT
volatile int32_t req_lock; volatile int32_t req_lock;
#else #else
int32_t req_lock; int32_t req_lock;
#endif #endif
size_t req_pipeline_depth; size_t req_pipeline_depth;
size_t req_bytes_received; size_t req_bytes_received;
size_t req_bytes_delivered; size_t req_bytes_delivered;
mca_pml_dr_vfrag_t *req_vfrag;
mca_pml_dr_vfrag_t req_vfrag0;
opal_list_t req_vfrags;
opal_mutex_t req_mutex;
}; };
typedef struct mca_pml_dr_recv_request_t mca_pml_dr_recv_request_t; typedef struct mca_pml_dr_recv_request_t mca_pml_dr_recv_request_t;
@ -56,12 +63,12 @@ OBJ_CLASS_DECLARATION(mca_pml_dr_recv_request_t);
* @param rc (OUT) OMPI_SUCCESS or error status on failure. * @param rc (OUT) OMPI_SUCCESS or error status on failure.
* @return Receive request. * @return Receive request.
*/ */
#define MCA_PML_DR_RECV_REQUEST_ALLOC(recvreq, rc) \ #define MCA_PML_DR_RECV_REQUEST_ALLOC(recvreq, rc) \
do { \ do { \
opal_list_item_t* item; \ opal_list_item_t* item; \
rc = OMPI_SUCCESS; \ rc = OMPI_SUCCESS; \
OMPI_FREE_LIST_GET(&mca_pml_dr.recv_requests, item, rc); \ OMPI_FREE_LIST_GET(&mca_pml_dr.recv_requests, item, rc); \
recvreq = (mca_pml_dr_recv_request_t*)item; \ recvreq = (mca_pml_dr_recv_request_t*)item; \
} while(0) } while(0)
@ -77,7 +84,7 @@ do { \
* @param comm (IN) Communicator. * @param comm (IN) Communicator.
* @param persistent (IN) Is this a ersistent request. * @param persistent (IN) Is this a ersistent request.
*/ */
#define MCA_PML_DR_RECV_REQUEST_INIT( \ #define MCA_PML_DR_RECV_REQUEST_INIT( \
request, \ request, \
addr, \ addr, \
count, \ count, \
@ -105,6 +112,16 @@ do { \
*/ */
#define MCA_PML_DR_RECV_REQUEST_RETURN(recvreq) \ #define MCA_PML_DR_RECV_REQUEST_RETURN(recvreq) \
do { \ do { \
opal_list_item_t* item; \
\
/* return vfrags */ \
OPAL_THREAD_LOCK(&(recvreq)->req_mutex); \
while(NULL != (item = opal_list_remove_first(&(recvreq)->req_vfrags))) { \
OMPI_FREE_LIST_RETURN(&mca_pml_dr.vfrags, item); \
} \
OPAL_THREAD_UNLOCK(&(recvreq)->req_mutex); \
\
/* decrement reference counts */ \
MCA_PML_BASE_RECV_REQUEST_FINI(&(recvreq)->req_recv); \ MCA_PML_BASE_RECV_REQUEST_FINI(&(recvreq)->req_recv); \
OMPI_FREE_LIST_RETURN(&mca_pml_dr.recv_requests, (opal_list_item_t*)(recvreq)); \ OMPI_FREE_LIST_RETURN(&mca_pml_dr.recv_requests, (opal_list_item_t*)(recvreq)); \
} while(0) } while(0)
@ -131,7 +148,7 @@ void mca_pml_dr_recv_request_match_specific(mca_pml_dr_recv_request_t* request);
* @param request Receive request. * @param request Receive request.
* @return OMPI_SUCESS or error status on failure. * @return OMPI_SUCESS or error status on failure.
*/ */
#define MCA_PML_DR_RECV_REQUEST_START(request) \ #define MCA_PML_DR_RECV_REQUEST_START(request) \
do { \ do { \
/* init/re-init the request */ \ /* init/re-init the request */ \
(request)->req_bytes_received = 0; \ (request)->req_bytes_received = 0; \
@ -141,6 +158,7 @@ do {
(request)->req_recv.req_base.req_pml_complete = false; \ (request)->req_recv.req_base.req_pml_complete = false; \
(request)->req_recv.req_base.req_ompi.req_complete = false; \ (request)->req_recv.req_base.req_ompi.req_complete = false; \
(request)->req_recv.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \ (request)->req_recv.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
(request)->req_vfrag = &(request)->req_vfrag0; \
\ \
/* always set the req_status.MPI_TAG to ANY_TAG before starting the \ /* always set the req_status.MPI_TAG to ANY_TAG before starting the \
* request. This field is used if cancelled to find out if the request \ * request. This field is used if cancelled to find out if the request \
@ -152,9 +170,9 @@ do {
\ \
/* attempt to match posted recv */ \ /* attempt to match posted recv */ \
if((request)->req_recv.req_base.req_peer == OMPI_ANY_SOURCE) { \ if((request)->req_recv.req_base.req_peer == OMPI_ANY_SOURCE) { \
mca_pml_dr_recv_request_match_wild(request); \ mca_pml_dr_recv_request_match_wild(request); \
} else { \ } else { \
mca_pml_dr_recv_request_match_specific(request); \ mca_pml_dr_recv_request_match_specific(request); \
} \ } \
} while (0) } while (0)
@ -163,7 +181,7 @@ do {
* *
*/ */
#define MCA_PML_DR_RECV_REQUEST_MATCHED( \ #define MCA_PML_DR_RECV_REQUEST_MATCHED( \
request, \ request, \
hdr) \ hdr) \
do { \ do { \
@ -190,14 +208,15 @@ do {
* *
*/ */
#define MCA_PML_DR_RECV_REQUEST_UNPACK( \ #define MCA_PML_DR_RECV_REQUEST_UNPACK( \
request, \ request, \
segments, \ segments, \
num_segments, \ num_segments, \
seg_offset, \ seg_offset, \
data_offset, \ data_offset, \
bytes_received, \ bytes_received, \
bytes_delivered) \ bytes_delivered, \
csum) \
do { \ do { \
if(request->req_recv.req_bytes_packed > 0) { \ if(request->req_recv.req_bytes_packed > 0) { \
struct iovec iov[MCA_BTL_DES_MAX_SEGMENTS]; \ struct iovec iov[MCA_BTL_DES_MAX_SEGMENTS]; \
@ -259,6 +278,49 @@ void mca_pml_dr_recv_request_matched_probe(
void mca_pml_dr_recv_request_schedule( void mca_pml_dr_recv_request_schedule(
mca_pml_dr_recv_request_t* req); mca_pml_dr_recv_request_t* req);
/*
*
*/
#define MCA_PML_DR_RECV_REQUEST_VFRAG_LOOKUP(recvreq,hdr,vfrag) \
do { \
if((recvreq)->req_vfrag->vf_id == (hdr)->hdr_vid) { \
vfrag = (recvreq)->req_vfrag; \
} else if ((hdr)->hdr_frag_offset == 0) { \
vfrag = &(recvreq)->req_vfrag0; \
} else { \
opal_list_item_t* item; \
int rc; \
\
vfrag = NULL; \
OPAL_THREAD_LOCK(&(recvreq)->req_mutex); \
for(item = opal_list_get_first(&(recvreq)->req_vfrags); \
item != opal_list_get_end(&(recvreq)->req_vfrags); \
item = opal_list_get_next(item)) { \
mca_pml_dr_vfrag_t* vf = (mca_pml_dr_vfrag_t*)item; \
if(vf->vf_id == (hdr)->hdr_vid) { \
vfrag = vf; \
break; \
} \
} \
if(NULL == vfrag) { \
MCA_PML_DR_VFRAG_ALLOC(vfrag,rc); \
if(NULL != vfrag) { \
(vfrag)->vf_id = (hdr)->hdr_vid; \
(vfrag)->vf_len = (hdr)->hdr_vlen; \
(vfrag)->vf_ack = 0; \
if((hdr)->hdr_vlen == 64) { \
(vfrag)->vf_mask = ~(uint64_t)0; \
} else { \
(vfrag)->vf_mask = (((uint64_t)1 << (hdr)->hdr_vlen)-1); \
} \
opal_list_append(&(recvreq)->req_vfrags, (opal_list_item_t*)vfrag); \
(recvreq)->req_vfrag = vfrag; \
} \
} \
OPAL_THREAD_UNLOCK(&(recvreq)->req_mutex); \
} \
} while(0)
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
} }

Просмотреть файл

@ -20,6 +20,7 @@
/*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/ /*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/
#include "ompi_config.h" #include "ompi_config.h"
#include "opal/util/crc.h"
#include "ompi/include/constants.h" #include "ompi/include/constants.h"
#include "mca/pml/pml.h" #include "mca/pml/pml.h"
#include "mca/btl/btl.h" #include "mca/btl/btl.h"
@ -30,10 +31,9 @@
#include "pml_dr_proc.h" #include "pml_dr_proc.h"
#include "pml_dr_sendreq.h" #include "pml_dr_sendreq.h"
#include "pml_dr_recvreq.h" #include "pml_dr_recvreq.h"
#include "pml_dr_endpoint.h"
#include "mca/bml/base/base.h" #include "mca/bml/base/base.h"
static int mca_pml_dr_send_request_fini(struct ompi_request_t** request) static int mca_pml_dr_send_request_fini(struct ompi_request_t** request)
{ {
mca_pml_dr_send_request_t* sendreq = *(mca_pml_dr_send_request_t**)(request); mca_pml_dr_send_request_t* sendreq = *(mca_pml_dr_send_request_t**)(request);
@ -73,14 +73,26 @@ static int mca_pml_dr_send_request_cancel(struct ompi_request_t* request, int co
static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req) static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req)
{ {
req->req_vfrag0.vf_len = 1;
req->req_vfrag0.vf_idx = 1;
req->req_vfrag0.vf_mask = 1;
req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND; req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND;
req->req_send.req_base.req_ompi.req_fini = mca_pml_dr_send_request_fini; req->req_send.req_base.req_ompi.req_fini = mca_pml_dr_send_request_fini;
req->req_send.req_base.req_ompi.req_free = mca_pml_dr_send_request_free; req->req_send.req_base.req_ompi.req_free = mca_pml_dr_send_request_free;
req->req_send.req_base.req_ompi.req_cancel = mca_pml_dr_send_request_cancel; req->req_send.req_base.req_ompi.req_cancel = mca_pml_dr_send_request_cancel;
OBJ_CONSTRUCT(&req->req_vfrag0, mca_pml_dr_vfrag_t);
OBJ_CONSTRUCT(&req->req_pending, opal_list_t);
OBJ_CONSTRUCT(&req->req_retrans, opal_list_t);
OBJ_CONSTRUCT(&req->req_mutex, opal_mutex_t);
} }
static void mca_pml_dr_send_request_destruct(mca_pml_dr_send_request_t* req) static void mca_pml_dr_send_request_destruct(mca_pml_dr_send_request_t* req)
{ {
OBJ_DESTRUCT(&req->req_vfrag0);
OBJ_DESTRUCT(&req->req_pending);
OBJ_DESTRUCT(&req->req_retrans);
OBJ_DESTRUCT(&req->req_mutex);
} }
@ -115,7 +127,9 @@ void mca_pml_dr_match_completion_cache(
/* signal request completion */ /* signal request completion */
OPAL_THREAD_LOCK(&ompi_request_lock); OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); if(sendreq->req_num_acks == sendreq->req_num_vfrags) {
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock); OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
@ -144,7 +158,9 @@ void mca_pml_dr_match_completion_free(
/* signal request completion */ /* signal request completion */
OPAL_THREAD_LOCK(&ompi_request_lock); OPAL_THREAD_LOCK(&ompi_request_lock);
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); if(sendreq->req_num_acks == sendreq->req_num_vfrags) {
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock); OPAL_THREAD_UNLOCK(&ompi_request_lock);
} }
@ -211,7 +227,8 @@ static void mca_pml_dr_frag_completion(
/* count bytes of user data actually delivered */ /* count bytes of user data actually delivered */
MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq,descriptor,sizeof(mca_pml_dr_frag_hdr_t)); MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq,descriptor,sizeof(mca_pml_dr_frag_hdr_t));
if (OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1) == 0 && if (OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1) == 0 &&
sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed &&
sendreq->req_num_acks == sendreq->req_num_vfrags) {
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
schedule = false; schedule = false;
} else { } else {
@ -272,20 +289,10 @@ int mca_pml_dr_send_request_start_buffered(
return rc; return rc;
} }
/* build rendezvous header */
hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed;
hdr->hdr_rndv.hdr_src_req.pval = sendreq;
/* update lengths */ /* update lengths */
segment->seg_len = sizeof(mca_pml_dr_rendezvous_hdr_t) + max_data; segment->seg_len = sizeof(mca_pml_dr_rendezvous_hdr_t) + max_data;
sendreq->req_send_offset = max_data; sendreq->req_send_offset = max_data;
sendreq->req_vfrag0.vf_size = max_data;
descriptor->des_cbfunc = mca_pml_dr_rndv_completion; descriptor->des_cbfunc = mca_pml_dr_rndv_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
@ -311,6 +318,21 @@ int mca_pml_dr_send_request_start_buffered(
return rc; return rc;
} }
/* build rendezvous header */
hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_RNDV;
hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_match.hdr_src_req.pval = sendreq;
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t));
/* re-init convertor for packed data */ /* re-init convertor for packed data */
ompi_convertor_prepare_for_send( ompi_convertor_prepare_for_send(
&sendreq->req_send.req_convertor, &sendreq->req_send.req_convertor,
@ -376,15 +398,19 @@ int mca_pml_dr_send_request_start_copy(
/* build match header */ /* build match header */
hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t));
/* update lengths */ /* update lengths */
segment->seg_len = sizeof(mca_pml_dr_match_hdr_t) + max_data; segment->seg_len = sizeof(mca_pml_dr_match_hdr_t) + max_data;
sendreq->req_send_offset = max_data; sendreq->req_send_offset = max_data;
sendreq->req_vfrag0.vf_size = max_data;
/* short message */ /* short message */
descriptor->des_cbfunc = mca_pml_dr_match_completion_free; descriptor->des_cbfunc = mca_pml_dr_match_completion_free;
@ -435,11 +461,14 @@ int mca_pml_dr_send_request_start_prepare(
/* build match header */ /* build match header */
hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH;
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid;
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t));
/* short message */ /* short message */
descriptor->des_cbfunc = mca_pml_dr_match_completion_free; descriptor->des_cbfunc = mca_pml_dr_match_completion_free;
@ -448,6 +477,7 @@ int mca_pml_dr_send_request_start_prepare(
/* update lengths */ /* update lengths */
sendreq->req_send_offset = size; sendreq->req_send_offset = size;
sendreq->req_vfrag0.vf_size = size;
/* send */ /* send */
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML); rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
@ -504,14 +534,15 @@ int mca_pml_dr_send_request_start_rndv(
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_src_req.pval = sendreq;
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed; hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed;
hdr->hdr_rndv.hdr_src_req.pval = sendreq;
/* first fragment of a long message */ /* first fragment of a long message */
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
des->des_cbdata = sendreq; des->des_cbdata = sendreq;
des->des_cbfunc = mca_pml_dr_rndv_completion; des->des_cbfunc = mca_pml_dr_rndv_completion;
sendreq->req_send_offset = size; sendreq->req_send_offset = size;
sendreq->req_vfrag0.vf_size = size;
/* send */ /* send */
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
@ -537,7 +568,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
*/ */
mca_bml_base_endpoint_t* bml_endpoint = sendreq->req_endpoint; mca_bml_base_endpoint_t* bml_endpoint = sendreq->req_endpoint;
if(OPAL_THREAD_ADD32(&sendreq->req_lock,1) == 1) { if(OPAL_THREAD_ADD32(&sendreq->req_lock,1) == 1) {
do { do {
/* allocate remaining bytes to BTLs */ /* allocate remaining bytes to BTLs */
@ -547,58 +577,36 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
mca_pml_dr_frag_hdr_t* hdr; mca_pml_dr_frag_hdr_t* hdr;
mca_btl_base_descriptor_t* des; mca_btl_base_descriptor_t* des;
int rc;
size_t size;
mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send); mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send);
size_t num_btl_avail = bml_endpoint->btl_send.arr_size; mca_pml_dr_vfrag_t* vfrag = sendreq->req_vfrag;
size_t size = bytes_remaining;
if(num_btl_avail == 1 || bytes_remaining < bml_btl->btl_min_send_size) { size_t offset = sendreq->req_send_offset - vfrag->vf_offset;
size = bytes_remaining; int rc;
/* otherwise attempt to give the BTL a percentage of the message /* do we need to allocate a new vfrag */
* based on a weighting factor. for simplicity calculate this as if(vfrag->vf_size == offset) {
* a percentage of the overall message length (regardless of amount MCA_PML_DR_VFRAG_ALLOC(vfrag,rc);
* previously assigned) if(NULL == vfrag) {
*/ OPAL_THREAD_LOCK(&mca_pml_dr.lock);
} else { opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
size = (size_t)(bml_btl->btl_weight * bytes_remaining); OPAL_THREAD_UNLOCK(&mca_pml_dr.lock);
} break;
/* makes sure that we don't exceed BTL max send size */
if (bml_btl->btl_max_send_size != 0 &&
size > bml_btl->btl_max_send_size - sizeof(mca_pml_dr_frag_hdr_t)) {
size = bml_btl->btl_max_send_size - sizeof(mca_pml_dr_frag_hdr_t);
/* very expensive - however for buffered sends we need to send on a
* boundary that the receiver will be able to unpack completely
* using the native datatype
*/
if(sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) {
ompi_convertor_t convertor;
size_t position = sendreq->req_send_offset + size;
/*
* We need this local convertor in order to correctly compute
* the correct position. Therefore we have to correctly construct and
* destruct it.
*/
OBJ_CONSTRUCT( &convertor, ompi_convertor_t );
ompi_convertor_copy_and_prepare_for_send(
&sendreq->req_send.req_convertor,
sendreq->req_send.req_base.req_datatype,
sendreq->req_send.req_base.req_count,
sendreq->req_send.req_base.req_addr,
&convertor);
ompi_convertor_set_position(&convertor, &position);
OBJ_DESTRUCT( &convertor );
size = position - sendreq->req_send_offset;
} }
MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,bml_endpoint,bytes_remaining,vfrag);
offset = 0;
sendreq->req_num_vfrags++;
} }
/* makes sure that we don't exceed vfrag size */
if (size > vfrag->vf_max_send_size) {
size = vfrag->vf_max_send_size;
}
if (size > vfrag->vf_size - offset) {
size = vfrag->vf_size - offset;
}
/* pack into a descriptor */ /* pack into a descriptor */
ompi_convertor_set_position(&sendreq->req_send.req_convertor, ompi_convertor_set_position(&sendreq->req_send.req_convertor, &sendreq->req_send_offset);
&sendreq->req_send_offset);
mca_bml_base_prepare_src( mca_bml_base_prepare_src(
bml_btl, bml_btl,
NULL, NULL,
@ -607,7 +615,6 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
&size, &size,
&des &des
); );
if(des == NULL) { if(des == NULL) {
OPAL_THREAD_LOCK(&mca_pml_dr.lock); OPAL_THREAD_LOCK(&mca_pml_dr.lock);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq); opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
@ -620,15 +627,25 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
/* setup header */ /* setup header */
hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval; hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG; hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG;
hdr->hdr_vid = vfrag->vf_id;
hdr->hdr_vlen = vfrag->vf_len;
hdr->hdr_frag_idx = vfrag->vf_idx;
hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_frag_offset = sendreq->req_send_offset; hdr->hdr_frag_offset = sendreq->req_send_offset;
hdr->hdr_src_req.pval = sendreq; hdr->hdr_src_req.pval = sendreq;
hdr->hdr_dst_req = sendreq->req_recv; hdr->hdr_dst_req = sendreq->req_vfrag0.vf_recv;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t));
/* update state */ /* update state */
vfrag->vf_idx++;
sendreq->req_send_offset += size; sendreq->req_send_offset += size;
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1); OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
/* start vfrag watchdog timer */
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
/* initiate send - note that this may complete before the call returns */ /* initiate send - note that this may complete before the call returns */
rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML); rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML);
@ -645,8 +662,173 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
} }
mca_pml_dr_progress(); mca_pml_dr_progress();
} }
/*
* VFrags w/ nacks or that timed out
*/
while(opal_list_get_size(&sendreq->req_retrans) &&
sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) {
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*)opal_list_get_first(&sendreq->req_retrans);
/*
* Retransmit fragments that have not been acked.
*/
while(vfrag->vf_idx < vfrag->vf_len &&
sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) {
if(((1 << vfrag->vf_idx) & vfrag->vf_ack) == 0) {
mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send);
mca_pml_dr_frag_hdr_t* hdr;
mca_btl_base_descriptor_t* des;
size_t offset = vfrag->vf_offset + (vfrag->vf_max_send_size * vfrag->vf_idx);
size_t size;
int rc;
if(vfrag->vf_idx == vfrag->vf_len - 1) {
size = vfrag->vf_size - offset;
} else {
size = vfrag->vf_max_send_size;
}
/* pack into a descriptor */
ompi_convertor_set_position(&sendreq->req_send.req_convertor, &offset);
mca_bml_base_prepare_src(
bml_btl,
NULL,
&sendreq->req_send.req_convertor,
sizeof(mca_pml_dr_frag_hdr_t),
&size,
&des
);
if(des == NULL) {
OPAL_THREAD_LOCK(&mca_pml_dr.lock);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
OPAL_THREAD_UNLOCK(&mca_pml_dr.lock);
break;
}
des->des_cbfunc = mca_pml_dr_frag_completion;
des->des_cbdata = sendreq;
/* setup header */
hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0;
hdr->hdr_common.hdr_csum = 0;
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_FRAG;
hdr->hdr_vid = vfrag->vf_id;
hdr->hdr_vlen = vfrag->vf_len;
hdr->hdr_frag_idx = vfrag->vf_idx;
hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_frag_offset = sendreq->req_send_offset;
hdr->hdr_src_req.pval = sendreq;
hdr->hdr_dst_req = sendreq->req_vfrag0.vf_recv;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t));
/* update state */
vfrag->vf_idx++;
sendreq->req_send_offset += size;
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
/* start vfrag watchdog timer */
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
/* initiate send - note that this may complete before the call returns */
rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML);
if(rc == OMPI_SUCCESS) {
bytes_remaining -= size;
} else {
vfrag->vf_idx--;
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,-1);
mca_bml_base_free(bml_btl,des);
OPAL_THREAD_LOCK(&mca_pml_dr.lock);
opal_list_append(&mca_pml_dr.send_pending, (opal_list_item_t*)sendreq);
OPAL_THREAD_UNLOCK(&mca_pml_dr.lock);
break;
}
}
vfrag->vf_idx++;
}
/* move from retrans to pending list */
if(vfrag->vf_idx == vfrag->vf_len) {
OPAL_THREAD_LOCK(&senddreq->req_mutex);
opal_list_remove_item(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
opal_list_append(&sendreq->req_pending, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&sendreq->req_mutex);
}
}
} while (OPAL_THREAD_ADD32(&sendreq->req_lock,-1) > 0); } while (OPAL_THREAD_ADD32(&sendreq->req_lock,-1) > 0);
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
/**
* Acknowledgment of vfrag
*/
void mca_pml_dr_send_request_acked(
mca_pml_dr_send_request_t* sendreq,
mca_pml_dr_ack_hdr_t* ack)
{
if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCH) {
sendreq->req_vfrag0.vf_recv = ack->hdr_dst_req;
MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq);
} else {
mca_pml_dr_vfrag_t* vfrag;
MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq, ack, vfrag);
if(NULL == vfrag) {
return;
}
/* add in acknowledged fragments */
vfrag->vf_ack |= ack->hdr_vmask;
/* have all fragments w/in this vfrag been acked? */
if((vfrag->vf_ack & vfrag->vf_mask) == vfrag->vf_mask) {
/* return vfrag */
if (vfrag != &sendreq->req_vfrag0) {
MCA_PML_DR_VFRAG_RETURN(vfrag);
}
/* are we done with this request */
OPAL_THREAD_LOCK(&ompi_request_lock);
sendreq->req_num_acks++;
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed &&
sendreq->req_num_acks == sendreq->req_num_vfrags) {
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq);
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
} else {
/* retransmit missing fragments */
OPAL_THREAD_LOCK(&sendreq->req_mutex);
vfrag->vf_idx = 0;
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&sendreq->req_mutex);
mca_pml_dr_send_request_schedule(sendreq);
}
}
}
void mca_pml_dr_send_request_nacked(
mca_pml_dr_send_request_t* sendreq,
mca_pml_dr_ack_hdr_t* ack)
{
mca_pml_dr_vfrag_t* vfrag;
MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq, ack, vfrag);
if(NULL == vfrag) {
return;
}
/* removed nacked bits from acknowledged fragments */
vfrag->vf_idx = 0;
vfrag->vf_ack &= ~ack->hdr_vmask;
/* retransmit missing fragments */
OPAL_THREAD_LOCK(&sendreq->req_mutex);
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&sendreq->req_mutex);
mca_pml_dr_send_request_schedule(sendreq);
}

Просмотреть файл

@ -21,14 +21,17 @@
#ifndef OMPI_PML_DR_SEND_REQUEST_H #ifndef OMPI_PML_DR_SEND_REQUEST_H
#define OMPI_PML_DR_SEND_REQUEST_H #define OMPI_PML_DR_SEND_REQUEST_H
#include "mca/btl/btl.h" #include "ompi_config.h"
#include "mca/pml/base/pml_base_sendreq.h" #include "ompi/datatype/convertor.h"
#include "mca/mpool/base/base.h" #include "ompi/mca/btl/btl.h"
#include "ompi/mca/pml/base/pml_base_sendreq.h"
#include "ompi/mca/mpool/base/base.h"
#include "ompi/mca/bml/bml.h"
#include "pml_dr_proc.h" #include "pml_dr_proc.h"
#include "pml_dr_comm.h" #include "pml_dr_comm.h"
#include "pml_dr_hdr.h" #include "pml_dr_hdr.h"
#include "datatype/convertor.h" #include "pml_dr_vfrag.h"
#include "mca/bml/bml.h"
#if defined(c_plusplus) || defined(__cplusplus) #if defined(c_plusplus) || defined(__cplusplus)
extern "C" { extern "C" {
@ -39,17 +42,24 @@ struct mca_pml_dr_send_request_t {
mca_pml_base_send_request_t req_send; mca_pml_base_send_request_t req_send;
ompi_proc_t* req_proc; ompi_proc_t* req_proc;
mca_bml_base_endpoint_t* req_endpoint; mca_bml_base_endpoint_t* req_endpoint;
ompi_ptr_t req_recv;
#if OMPI_HAVE_THREAD_SUPPORT #if OMPI_HAVE_THREAD_SUPPORT
volatile int32_t req_state; volatile int32_t req_state;
volatile int32_t req_lock; volatile int32_t req_lock;
#else #else
volatile int32_t req_state; int32_t req_state;
volatile int32_t req_lock; int32_t req_lock;
#endif #endif
size_t req_pipeline_depth; size_t req_pipeline_depth;
size_t req_bytes_delivered; size_t req_bytes_delivered;
size_t req_send_offset; size_t req_send_offset;
mca_pml_dr_vfrag_t* req_vfrag;
mca_pml_dr_vfrag_t req_vfrag0;
opal_list_t req_pending;
opal_list_t req_retrans;
opal_mutex_t req_mutex;
size_t req_num_acks;
size_t req_num_vfrags;
}; };
typedef struct mca_pml_dr_send_request_t mca_pml_dr_send_request_t; typedef struct mca_pml_dr_send_request_t mca_pml_dr_send_request_t;
@ -57,7 +67,7 @@ typedef struct mca_pml_dr_send_request_t mca_pml_dr_send_request_t;
OBJ_CLASS_DECLARATION(mca_pml_dr_send_request_t); OBJ_CLASS_DECLARATION(mca_pml_dr_send_request_t);
#define MCA_PML_DR_SEND_REQUEST_ALLOC( \ #define MCA_PML_DR_SEND_REQUEST_ALLOC( \
comm, \ comm, \
dst, \ dst, \
sendreq, \ sendreq, \
@ -71,14 +81,14 @@ OBJ_CLASS_DECLARATION(mca_pml_dr_send_request_t);
rc = OMPI_ERR_OUT_OF_RESOURCE; \ rc = OMPI_ERR_OUT_OF_RESOURCE; \
} else { \ } else { \
rc = OMPI_SUCCESS; \ rc = OMPI_SUCCESS; \
OMPI_FREE_LIST_WAIT(&mca_pml_dr.send_requests, item, rc); \ OMPI_FREE_LIST_WAIT(&mca_pml_dr.send_requests, item, rc); \
sendreq = (mca_pml_dr_send_request_t*)item; \ sendreq = (mca_pml_dr_send_request_t*)item; \
sendreq->req_proc = proc; \ sendreq->req_proc = proc; \
} \ } \
} }
#define MCA_PML_DR_SEND_REQUEST_INIT( \ #define MCA_PML_DR_SEND_REQUEST_INIT( \
sendreq, \ sendreq, \
buf, \ buf, \
count, \ count, \
@ -108,6 +118,7 @@ OBJ_CLASS_DECLARATION(mca_pml_dr_send_request_t);
#define MCA_PML_DR_SEND_REQUEST_START(sendreq, rc) \ #define MCA_PML_DR_SEND_REQUEST_START(sendreq, rc) \
do { \ do { \
mca_pml_dr_comm_t* comm = sendreq->req_send.req_base.req_comm->c_pml_comm; \ mca_pml_dr_comm_t* comm = sendreq->req_send.req_base.req_comm->c_pml_comm; \
mca_pml_dr_comm_proc_t* proc = comm->procs + sendreq->req_send.req_base.req_peer; \
mca_bml_base_endpoint_t* endpoint = (mca_bml_base_endpoint_t*)sendreq->req_proc->proc_pml; \ mca_bml_base_endpoint_t* endpoint = (mca_bml_base_endpoint_t*)sendreq->req_proc->proc_pml; \
mca_bml_base_btl_t* bml_btl; \ mca_bml_base_btl_t* bml_btl; \
size_t size = sendreq->req_send.req_bytes_packed; \ size_t size = sendreq->req_send.req_bytes_packed; \
@ -126,9 +137,12 @@ do {
sendreq->req_send.req_base.req_ompi.req_complete = false; \ sendreq->req_send.req_base.req_ompi.req_complete = false; \
sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \ sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \
sendreq->req_send.req_base.req_ompi.req_status._cancelled = 0; \ sendreq->req_send.req_base.req_ompi.req_status._cancelled = 0; \
sendreq->req_send.req_base.req_sequence = OPAL_THREAD_ADD32( \ sendreq->req_send.req_base.req_sequence = OPAL_THREAD_ADD32(&proc->send_sequence,1); \
&comm->procs[sendreq->req_send.req_base.req_peer].send_sequence,1); \
sendreq->req_endpoint = endpoint; \ sendreq->req_endpoint = endpoint; \
sendreq->req_vfrag0.vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \
sendreq->req_vfrag = &sendreq->req_vfrag0; \
sendreq->req_num_acks = 0; \
sendreq->req_num_vfrags = 0; \
\ \
/* select a btl */ \ /* select a btl */ \
bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \ bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); \
@ -137,32 +151,32 @@ do {
if(size == 0 && sendreq->req_send.req_send_mode != MCA_PML_BASE_SEND_SYNCHRONOUS) { \ if(size == 0 && sendreq->req_send.req_send_mode != MCA_PML_BASE_SEND_SYNCHRONOUS) { \
mca_btl_base_descriptor_t* descriptor; \ mca_btl_base_descriptor_t* descriptor; \
mca_btl_base_segment_t* segment; \ mca_btl_base_segment_t* segment; \
mca_pml_dr_hdr_t* hdr; \ mca_pml_dr_hdr_t* hdr; \
\ \
/* allocate a descriptor */ \ /* allocate a descriptor */ \
MCA_PML_DR_DES_ALLOC(bml_btl, descriptor, sizeof(mca_pml_dr_match_hdr_t)); \ MCA_PML_DR_DES_ALLOC(bml_btl, descriptor, sizeof(mca_pml_dr_match_hdr_t)); \
if(NULL == descriptor) { \ if(NULL == descriptor) { \
return OMPI_ERR_OUT_OF_RESOURCE; \ return OMPI_ERR_OUT_OF_RESOURCE; \
} \ } \
segment = descriptor->des_src; \ segment = descriptor->des_src; \
\ \
/* build hdr */ \ /* build hdr */ \
hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; \ hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; \
hdr->hdr_common.hdr_flags = 0; \ hdr->hdr_common.hdr_flags = 0; \
hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH; \ hdr->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_MATCH; \
hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; \ hdr->hdr_match.hdr_ctx = sendreq->req_send.req_base.req_comm->c_contextid; \
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; \ hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank; \
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; \ hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; \
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; \ hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; \
\ \
/* short message */ \ /* short message */ \
descriptor->des_cbfunc = mca_pml_dr_match_completion_cache; \ descriptor->des_cbfunc = mca_pml_dr_match_completion_cache; \
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; \ descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; \
descriptor->des_cbdata = sendreq; \ descriptor->des_cbdata = sendreq; \
\ \
/* request is complete at mpi level */ \ /* request is complete at mpi level */ \
OPAL_THREAD_LOCK(&ompi_request_lock); \ OPAL_THREAD_LOCK(&ompi_request_lock); \
MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq); \ MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq); \
OPAL_THREAD_UNLOCK(&ompi_request_lock); \ OPAL_THREAD_UNLOCK(&ompi_request_lock); \
\ \
/* send */ \ /* send */ \
@ -205,7 +219,7 @@ do {
* Mark a send request as completed at the MPI level. * Mark a send request as completed at the MPI level.
*/ */
#define MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq) \ #define MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq) \
do { \ do { \
(sendreq)->req_send.req_base.req_ompi.req_status.MPI_SOURCE = \ (sendreq)->req_send.req_base.req_ompi.req_status.MPI_SOURCE = \
(sendreq)->req_send.req_base.req_comm->c_my_rank; \ (sendreq)->req_send.req_base.req_comm->c_my_rank; \
@ -226,7 +240,7 @@ do {
* may have been orphaned by the user or have already completed * may have been orphaned by the user or have already completed
* at the MPI level. * at the MPI level.
*/ */
#define MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq) \ #define MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq) \
do { \ do { \
/* request completed at pml level */ \ /* request completed at pml level */ \
(sendreq)->req_send.req_base.req_pml_complete = true; \ (sendreq)->req_send.req_base.req_pml_complete = true; \
@ -238,10 +252,10 @@ do {
(sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \ (sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \
mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \ mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \
} \ } \
MCA_PML_DR_SEND_REQUEST_RETURN(sendreq); \ MCA_PML_DR_SEND_REQUEST_RETURN(sendreq); \
/* is request complete at mpi level */ \ /* is request complete at mpi level */ \
} else if ((sendreq)->req_send.req_base.req_ompi.req_complete == false) { \ } else if ((sendreq)->req_send.req_base.req_ompi.req_complete == false) { \
MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq); \ MCA_PML_DR_SEND_REQUEST_MPI_COMPLETE(sendreq); \
/* buffered send - release any resources */ \ /* buffered send - release any resources */ \
} else if ((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \ } else if ((sendreq)->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED && \
(sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \ (sendreq)->req_send.req_addr != (sendreq)->req_send.req_base.req_addr) { \
@ -256,7 +270,7 @@ do {
* However, these events may occur in either order. * However, these events may occur in either order.
*/ */
#define MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq) \ #define MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq) \
do { \ do { \
bool schedule = false; \ bool schedule = false; \
\ \
@ -264,7 +278,7 @@ do {
if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2) { \ if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2) { \
OPAL_THREAD_LOCK(&ompi_request_lock); \ OPAL_THREAD_LOCK(&ompi_request_lock); \
if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { \ if(sendreq->req_bytes_delivered == sendreq->req_send.req_bytes_packed) { \
MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); \ MCA_PML_DR_SEND_REQUEST_PML_COMPLETE(sendreq); \
} else { \ } else { \
schedule = true; \ schedule = true; \
} \ } \
@ -273,7 +287,7 @@ do {
\ \
/* additional data to schedule */ \ /* additional data to schedule */ \
if(schedule == true) { \ if(schedule == true) { \
mca_pml_dr_send_request_schedule(sendreq); \ mca_pml_dr_send_request_schedule(sendreq); \
} \ } \
} while (0) } while (0)
@ -281,20 +295,111 @@ do {
* Release resources associated with a request * Release resources associated with a request
*/ */
#define MCA_PML_DR_SEND_REQUEST_RETURN(sendreq) \ #define MCA_PML_DR_SEND_REQUEST_RETURN(sendreq) \
{ \ do { \
/* Let the base handle the reference counts */ \ /* Let the base handle the reference counts */ \
MCA_PML_BASE_SEND_REQUEST_FINI((&(sendreq)->req_send)); \ MCA_PML_BASE_SEND_REQUEST_FINI((&(sendreq)->req_send)); \
OMPI_FREE_LIST_RETURN( \ OMPI_FREE_LIST_RETURN( \
&mca_pml_dr.send_requests, (opal_list_item_t*)sendreq); \ &mca_pml_dr.send_requests, (opal_list_item_t*)sendreq); \
} } while(0)
/*
* Lookup/allocate a vfrag for the pending send
*/
#define MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq, endpoint, size, vfrag) \
do { \
mca_pml_dr_comm_t* comm = sendreq->req_send.req_base.req_comm->c_pml_comm; \
mca_pml_dr_comm_proc_t* proc = comm->procs + sendreq->req_send.req_base.req_peer; \
size_t max_send_size = endpoint->btl_max_send_size - sizeof(mca_pml_dr_frag_hdr_t); \
size_t div = size / max_send_size; \
\
if(div == 0) { \
vfrag->vf_len = 1; \
vfrag->vf_size = size; \
vfrag->vf_mask = 1; \
} else if(div > 64) { \
vfrag->vf_len = 64; \
vfrag->vf_size = (max_send_size << 6); /* size * 64 */ \
vfrag->vf_mask = ~(uint64_t)0; \
} else if (div == 64) { \
size_t mod = size % max_send_size; \
vfrag->vf_len = 64; \
vfrag->vf_size = (mod ? (size - mod) : size); \
vfrag->vf_mask = ~(uint64_t)0; \
} else { \
size_t mod = size % max_send_size; \
vfrag->vf_len = div + (mod ? 1 : 0); \
vfrag->vf_size = size; \
vfrag->vf_mask = (((uint64_t)1 << vfrag->vf_len) - (uint64_t)1); \
} \
\
vfrag->vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \
vfrag->vf_ack = 0; \
vfrag->vf_offset = sendreq->req_send_offset; \
vfrag->vf_idx = 0; \
vfrag->vf_max_send_size = max_send_size; \
opal_list_append(&sendreq->req_pending, (opal_list_item_t*)vfrag); \
sendreq->req_vfrag = vfrag; \
} while(0)
/*
*
*/
#define MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq,hdr,vfrag) \
do { \
if ((hdr)->hdr_vid == (sendreq)->req_vfrag0.vf_id) { \
vfrag = &(sendreq)->req_vfrag0; \
} else if((sendreq)->req_vfrag->vf_id == (hdr)->hdr_vid) { \
vfrag = (sendreq)->req_vfrag; \
opal_list_remove_item(&(sendreq)->req_pending,(opal_list_item_t*)vfrag); \
} else { \
opal_list_item_t* item; \
vfrag = NULL; \
OPAL_THREAD_LOCK(&(sendreq)->req_mutex); \
for(item = opal_list_get_first(&(sendreq)->req_pending); \
item != opal_list_get_end(&(sendreq)->req_pending); \
item = opal_list_get_next(item)) { \
mca_pml_dr_vfrag_t* vf = (mca_pml_dr_vfrag_t*)item; \
if(vf->vf_id == (hdr)->hdr_vid) { \
opal_list_remove_item(&(sendreq)->req_pending,item); \
vfrag = vf; \
break; \
} \
} \
OPAL_THREAD_UNLOCK(&(sendreq)->req_mutex); \
} \
} while(0)
/*
*
*/
#define MCA_PML_DR_SEND_REQUEST_VFRAG_RETRANS(sendreq,hdr,vfrag) \
do { \
opal_list_item_t* item; \
vfrag = NULL; \
OPAL_THREAD_LOCK(&(sendreq)->req_mutex); \
for(item = opal_list_get_first(&(sendreq)->req_retrans); \
item != opal_list_get_end(&(sendreq)->req_retrans); \
item = opal_list_get_next(item)) { \
mca_pml_dr_vfrag_t* vf = (mca_pml_dr_vfrag_t*)item; \
if(vf->vf_id == (hdr)->hdr_vid) { \
vfrag = vf; \
break; \
} \
} \
OPAL_THREAD_UNLOCK(&(sendreq)->req_mutex); \
} while(0)
/* /*
* Update bytes delivered on request based on supplied descriptor * Update bytes delivered on request based on supplied descriptor
*/ */
#define MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, descriptor, hdrlen) \ #define MCA_PML_DR_SEND_REQUEST_SET_BYTES_DELIVERED(sendreq, descriptor, hdrlen) \
do { \ do { \
size_t i; \ size_t i; \
mca_btl_base_segment_t* segments = descriptor->des_src; \ mca_btl_base_segment_t* segments = descriptor->des_src; \
@ -310,18 +415,18 @@ do {
* Attempt to process any pending requests * Attempt to process any pending requests
*/ */
#define MCA_PML_DR_SEND_REQUEST_PROCESS_PENDING() \ #define MCA_PML_DR_SEND_REQUEST_PROCESS_PENDING() \
do { \ do { \
/* advance pending requests */ \ /* advance pending requests */ \
while(opal_list_get_size(&mca_pml_dr.send_pending)) { \ while(opal_list_get_size(&mca_pml_dr.send_pending)) { \
mca_pml_dr_send_request_t* sendreq; \ mca_pml_dr_send_request_t* sendreq; \
OPAL_THREAD_LOCK(&mca_pml_dr.lock); \ OPAL_THREAD_LOCK(&mca_pml_dr.lock); \
sendreq = (mca_pml_dr_send_request_t*) \ sendreq = (mca_pml_dr_send_request_t*) \
opal_list_remove_first(&mca_pml_dr.send_pending); \ opal_list_remove_first(&mca_pml_dr.send_pending); \
OPAL_THREAD_UNLOCK(&mca_pml_dr.lock); \ OPAL_THREAD_UNLOCK(&mca_pml_dr.lock); \
if(NULL == sendreq) \ if(NULL == sendreq) \
break; \ break; \
mca_pml_dr_send_request_schedule(sendreq); \ mca_pml_dr_send_request_schedule(sendreq); \
} \ } \
} while (0) } while (0)
@ -357,6 +462,21 @@ int mca_pml_dr_send_request_start_rndv(
int mca_pml_dr_send_request_schedule( int mca_pml_dr_send_request_schedule(
mca_pml_dr_send_request_t* sendreq); mca_pml_dr_send_request_t* sendreq);
int mca_pml_dr_send_request_reschedule(
mca_pml_dr_send_request_t* sendreq,
mca_pml_dr_vfrag_t* vfrag);
/**
* Acknowledgment of vfrag
*/
void mca_pml_dr_send_request_acked(
mca_pml_dr_send_request_t* sendreq,
mca_pml_dr_ack_hdr_t*);
void mca_pml_dr_send_request_nacked(
mca_pml_dr_send_request_t* sendreq,
mca_pml_dr_ack_hdr_t*);
/** /**
* Completion callback on match header * Completion callback on match header
* Cache descriptor. * Cache descriptor.

Просмотреть файл

@ -5,22 +5,39 @@
* Copyright (c) 2004-2005 The University of Tennessee and The University * Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights * of Tennessee Research Foundation. All rights
* reserved. * reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved. * University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California. * Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved. * All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
* *
* $HEADER$ * $HEADER$
*/ */
#include "ompi_config.h" #include "ompi_config.h"
#include "pml_dr_vfrag.h"
#include <string.h>
#include "mca/pml/pml.h" static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag)
#include "pml_dr_endpoint.h" {
memset(&vfrag->vf_event, 0, sizeof(vfrag->vf_event));
}
static void mca_pml_dr_vfrag_destruct(mca_pml_dr_vfrag_t* vfrag)
{
}
OBJ_CLASS_INSTANCE(
mca_pml_dr_vfrag_t,
opal_list_item_t,
mca_pml_dr_vfrag_construct,
mca_pml_dr_vfrag_destruct
);

79
ompi/mca/pml/dr/pml_dr_vfrag.h Обычный файл
Просмотреть файл

@ -0,0 +1,79 @@
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#ifndef OMPI_PML_DR_VFRAG_H_
#define OMPI_PML_DR_VFRAG_H_
#include "ompi_config.h"
#include "opal/event/event.h"
#include "pml_dr.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
struct mca_pml_dr_vfrag_t {
opal_list_item_t super;
ompi_ptr_t vf_send;
ompi_ptr_t vf_recv;
uint32_t vf_id;
uint16_t vf_idx;
uint16_t vf_len;
size_t vf_offset;
size_t vf_size;
size_t vf_max_send_size;
uint64_t vf_ack;
uint64_t vf_mask;
opal_event_t vf_event;
};
typedef struct mca_pml_dr_vfrag_t mca_pml_dr_vfrag_t;
OBJ_CLASS_DECLARATION(mca_pml_dr_vfrag_t);
#define MCA_PML_DR_VFRAG_ALLOC(vfrag,rc) \
do { \
opal_list_item_t* item; \
OMPI_FREE_LIST_WAIT(&mca_pml_dr.vfrags, item, rc); \
vfrag = (mca_pml_dr_vfrag_t*)item; \
} while(0)
#define MCA_PML_DR_VFRAG_RETURN(vfrag) \
do { \
OMPI_FREE_LIST_RETURN(&mca_pml_dr.vfrags, (opal_list_item_t*)vfrag); \
} while(0)
#define MCA_PML_DR_VFRAG_WDOG_START(vfrag) \
do { \
\
} while(0)
#define MCA_PML_DR_VFRAG_WDOG_STOP(vfrag) \
do { \
\
} while(0)
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif
#endif