1
1

Rework the data reliability PML, still needs quite a bit of work,

working on creating a uniform retransmission mechanism otherwise each type of
send ends up needing a special case for retransmission. 
Removed NACK for individual transmissions, we just aggregate these and send
them at the end of a vfrag 

This commit was SVN r9141.
Этот коммит содержится в:
Galen Shipman 2006-02-24 17:08:14 +00:00
родитель 6e57e4c370
Коммит 05140c5f8f
10 изменённых файлов: 262 добавлений и 150 удалений

Просмотреть файл

@ -69,6 +69,12 @@ struct mca_pml_dr_t {
ompi_free_list_t recv_frags;
ompi_free_list_t vfrags;
ompi_free_list_t buffers;
int timer_wdog_sec;
int timer_wdog_usec;
int timer_ack_sec;
int timer_ack_usec;
};
typedef struct mca_pml_dr_t mca_pml_dr_t;

Просмотреть файл

@ -91,6 +91,15 @@ int mca_pml_dr_component_open(void)
mca_pml_dr_param_register_int("eager_limit", 128 * 1024);
mca_pml_dr.send_pipeline_depth =
mca_pml_dr_param_register_int("send_pipeline_depth", 3);
mca_pml_dr.timer_wdog_sec =
mca_pml_dr_param_register_int("timer_wdog_sec", 1);
mca_pml_dr.timer_wdog_usec =
mca_pml_dr_param_register_int("timer_wdog_usec", 0);
mca_pml_dr.timer_ack_sec =
mca_pml_dr_param_register_int("timer_ack_sec", 1);
mca_pml_dr.timer_ack_usec =
mca_pml_dr_param_register_int("timer_ack_usec", 0);
OBJ_CONSTRUCT(&mca_pml_dr.lock, opal_mutex_t);
/* requests */

Просмотреть файл

@ -32,8 +32,7 @@
#define MCA_PML_DR_HDR_TYPE_MATCH 1
#define MCA_PML_DR_HDR_TYPE_RNDV 2
#define MCA_PML_DR_HDR_TYPE_ACK 3
#define MCA_PML_DR_HDR_TYPE_NACK 4
#define MCA_PML_DR_HDR_TYPE_FRAG 5
#define MCA_PML_DR_HDR_TYPE_FRAG 4
#define MCA_PML_DR_HDR_FLAGS_ACK 1 /* is an ack required */
#define MCA_PML_DR_HDR_FLAGS_NBO 2 /* is the hdr in network byte order */

Просмотреть файл

@ -24,6 +24,7 @@
#include "opal/class/opal_list.h"
#include "opal/threads/mutex.h"
#include "opal/util/crc.h"
#include "ompi/constants.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/pml/pml.h"
@ -77,39 +78,58 @@ void mca_pml_dr_recv_frag_callback(
{
mca_btl_base_segment_t* segments = des->des_dst;
mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval;
if(segments->seg_len < sizeof(mca_pml_dr_common_hdr_t)) {
return;
}
switch(hdr->hdr_common.hdr_type) {
case MCA_PML_DR_HDR_TYPE_MATCH:
{
if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t))) {
assert(0);
return;
}
mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
break;
}
case MCA_PML_DR_HDR_TYPE_RNDV:
{
if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t))) {
assert(0);
return;
}
mca_pml_dr_recv_frag_match(btl, &hdr->hdr_match, segments,des->des_dst_cnt);
break;
}
case MCA_PML_DR_HDR_TYPE_ACK:
{
mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*)
hdr->hdr_ack.hdr_src_req.pval;
mca_pml_dr_send_request_acked(sendreq, &hdr->hdr_ack);
break;
mca_pml_dr_send_request_t* sendreq;
if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_ack_hdr_t))) {
assert(0);
return;
}
case MCA_PML_DR_HDR_TYPE_NACK:
{
mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*)
sendreq = (mca_pml_dr_send_request_t*)
hdr->hdr_ack.hdr_src_req.pval;
mca_pml_dr_send_request_nacked(sendreq, &hdr->hdr_ack);
mca_pml_dr_send_request_acked(sendreq, &hdr->hdr_ack);
break;
}
case MCA_PML_DR_HDR_TYPE_FRAG:
{
mca_pml_dr_recv_request_t* recvreq = (mca_pml_dr_recv_request_t*)
mca_pml_dr_recv_request_t* recvreq;
if(hdr->hdr_common.hdr_csum != (uint16_t) opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t))) {
assert(0);
return;
}
recvreq = (mca_pml_dr_recv_request_t*)
hdr->hdr_frag.hdr_dst_req.pval;
mca_pml_dr_recv_request_progress(recvreq,btl,segments,des->des_dst_cnt);
break;
}
default:
return; /* drop it on the floor.. */
break;
}
}
@ -697,6 +717,7 @@ rematch:
void mca_pml_dr_recv_frag_ack(mca_pml_dr_recv_frag_t* frag)
{
mca_pml_dr_ack_hdr_t* ack;
mca_btl_base_descriptor_t* des;
mca_bml_base_endpoint_t* bml_endpoint;
@ -722,6 +743,7 @@ void mca_pml_dr_recv_frag_ack(mca_pml_dr_recv_frag_t* frag)
ack->hdr_src_req = frag->hdr.hdr_match.hdr_src_req;
assert(ack->hdr_src_req.pval);
ack->hdr_dst_req.pval = NULL;
ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t));
/* initialize descriptor */
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;

Просмотреть файл

@ -17,6 +17,7 @@
*/
#include "ompi_config.h"
#include "opal/util/crc.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/bml/bml.h"
@ -139,7 +140,7 @@ static void mca_pml_dr_ctl_completion(
static void mca_pml_dr_recv_request_matched(
mca_pml_dr_recv_request_t* recvreq,
mca_pml_dr_rendezvous_hdr_t* hdr,
uint8_t type)
uint8_t mask)
{
ompi_proc_t* proc = recvreq->req_proc;
mca_bml_base_endpoint_t* bml_endpoint = NULL;
@ -169,10 +170,11 @@ static void mca_pml_dr_recv_request_matched(
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK;
ack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_MATCH;
ack->hdr_vid = hdr->hdr_match.hdr_vid;
ack->hdr_vmask = 0x1;
ack->hdr_vmask = mask;
ack->hdr_src_req = hdr->hdr_match.hdr_src_req;
assert(ack->hdr_src_req.pval);
ack->hdr_dst_req.pval = recvreq;
ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t));
/* initialize descriptor */
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
@ -194,50 +196,6 @@ retry:
opal_list_append(&mca_pml_dr.acks_pending, (opal_list_item_t*)frag);
}
/*
* Generate an ack to the peer after first fragment is matched.
*/
static void mca_pml_dr_recv_request_nack(
mca_pml_dr_recv_request_t* recvreq,
mca_pml_dr_frag_hdr_t* hdr)
{
ompi_proc_t* proc = recvreq->req_proc;
mca_bml_base_endpoint_t* bml_endpoint = NULL;
mca_btl_base_descriptor_t* des;
mca_bml_base_btl_t* bml_btl;
mca_pml_dr_ack_hdr_t* nack;
int rc;
bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_pml;
bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_eager);
/* allocate descriptor */
MCA_PML_DR_DES_ALLOC(bml_btl, des, sizeof(mca_pml_dr_ack_hdr_t));
if(NULL == des) {
return;
}
/* fill out header */
nack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
nack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_NACK;
nack->hdr_common.hdr_flags = MCA_PML_DR_HDR_FLAGS_MATCH;
nack->hdr_vid = hdr->hdr_vid;
nack->hdr_vmask = 1 << hdr->hdr_frag_idx;
nack->hdr_src_req = hdr->hdr_src_req;
assert(nack->hdr_src_req.pval);
nack->hdr_dst_req.pval = recvreq;
/* initialize descriptor */
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
des->des_cbfunc = mca_pml_dr_ctl_completion;
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
if(rc != OMPI_SUCCESS) {
mca_bml_base_free(bml_btl, des);
}
}
/*
* Generate an ack w/ the current vfrag status.
@ -272,6 +230,7 @@ static void mca_pml_dr_recv_request_vfrag_ack(
ack->hdr_vmask = vfrag->vf_ack;
ack->hdr_src_req = recvreq->req_vfrag0.vf_send;
ack->hdr_dst_req.pval = recvreq;
ack->hdr_common.hdr_csum = opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t));
/* initialize descriptor */
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
@ -302,6 +261,8 @@ void mca_pml_dr_recv_request_progress(
mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval;
size_t i;
uint32_t csum = 0;
uint64_t bit;
mca_pml_dr_vfrag_t* vfrag;
for(i=0; i<num_segments; i++)
bytes_received += segments[i].seg_len;
@ -338,14 +299,17 @@ void mca_pml_dr_recv_request_progress(
bytes_received,
bytes_delivered,
csum);
mca_pml_dr_recv_request_matched(recvreq, &hdr->hdr_rndv,
MCA_PML_DR_HDR_TYPE_ACK);
/* mca_pml_dr_recv_request_matched(recvreq, &hdr->hdr_rndv, */
/* (csum == hdr->hdr_match.hdr_csum) ? MCA_PML_DR_HDR_TYPE_ACK : MCA_PML_DR_HDR_TYPE_NACK); */
/* MCA_PML_DR_HDR_TYPE_ACK); */
if(csum != hdr->hdr_match.hdr_csum) {
assert(0);
}
mca_pml_dr_recv_request_matched(recvreq, &hdr->hdr_rndv,
(csum == hdr->hdr_match.hdr_csum));
break;
case MCA_PML_DR_HDR_TYPE_FRAG:
bytes_received -= sizeof(mca_pml_dr_frag_hdr_t);
data_offset = hdr->hdr_frag.hdr_frag_offset;
MCA_PML_DR_RECV_REQUEST_UNPACK(
@ -358,27 +322,25 @@ void mca_pml_dr_recv_request_progress(
bytes_delivered,
csum);
/* if checksum fails - immediately nack this fragment */
/* if(csum != hdr->hdr_frag.hdr_frag_csum) { */
if(0) {
bytes_received = bytes_delivered = 0;
mca_pml_dr_recv_request_nack(recvreq, &hdr->hdr_frag);
} else {
mca_pml_dr_vfrag_t* vfrag;
uint64_t bit = ((uint64_t)1 << hdr->hdr_frag.hdr_frag_idx);
bit = ((uint64_t)1 << hdr->hdr_frag.hdr_frag_idx);
/* update vfrag status */
MCA_PML_DR_RECV_REQUEST_VFRAG_LOOKUP(recvreq, &hdr->hdr_frag, vfrag);
if((vfrag->vf_ack & bit) == 0) {
/* update the mask to show that this vfrag was received,
note that it might still fail the checksum though */
vfrag->vf_mask_processed |= bit;
if(csum == hdr->hdr_frag.hdr_frag_csum) {
/* this part of the vfrag passed the checksum,
mark it so that we ack it after receiving the
entire vfrag */
vfrag->vf_ack |= bit;
if((vfrag->vf_ack & vfrag->vf_mask) == vfrag->vf_mask) {
/* done w/ this vfrag - ack it */
if((vfrag->vf_mask_processed & vfrag->vf_mask) == vfrag->vf_mask) {
/* we have received all the pieces of the vfrag, ack
everything that passed the checksum */
mca_pml_dr_recv_request_vfrag_ack(recvreq, vfrag);
}
} else {
/* duplicate fragment - send an ack w/ the frags completed */
mca_pml_dr_recv_request_vfrag_ack(recvreq, vfrag);
}
bytes_received = bytes_delivered = 0;
}
break;

Просмотреть файл

@ -245,10 +245,11 @@ do {
&max_data, \
&free_after); \
bytes_delivered = max_data; \
csum = request->req_recv.req_convertor.checksum; \
} else { \
bytes_delivered = 0; \
csum = 0; \
} \
csum = request->req_recv.req_convertor.checksum; \
} while (0)
@ -310,6 +311,7 @@ do { \
(vfrag)->vf_id = (hdr)->hdr_vid; \
(vfrag)->vf_len = (hdr)->hdr_vlen; \
(vfrag)->vf_ack = 0; \
(vfrag)->vf_mask_processed = 0; \
if((hdr)->hdr_vlen == 64) { \
(vfrag)->vf_mask = ~(uint64_t)0; \
} else { \

Просмотреть файл

@ -82,6 +82,8 @@ static void mca_pml_dr_send_request_construct(mca_pml_dr_send_request_t* req)
req->req_vfrag0.vf_len = 1;
req->req_vfrag0.vf_idx = 1;
req->req_vfrag0.vf_mask = 1;
req->req_vfrag0.vf_mask_processed = 0;
req->req_vfrag0.sendreq = req;
req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND;
req->req_send.req_base.req_ompi.req_fini = mca_pml_dr_send_request_fini;
req->req_send.req_base.req_ompi.req_free = mca_pml_dr_send_request_free;
@ -212,10 +214,25 @@ static void mca_pml_dr_frag_completion(
struct mca_btl_base_descriptor_t* descriptor,
int status)
{
mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*)descriptor->des_cbdata;
mca_bml_base_btl_t* bml_btl = (mca_bml_base_btl_t*) descriptor->des_context;
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) descriptor->des_cbdata;
mca_pml_dr_send_request_t* sendreq = (mca_pml_dr_send_request_t*) vfrag->sendreq;
mca_bml_base_btl_t* bml_btl = vfrag->bml_btl;
mca_pml_dr_frag_hdr_t* hdr = (mca_pml_dr_frag_hdr_t*)descriptor->des_src->seg_addr.pval;
bool schedule;
uint64_t bit = ((uint64_t)1 << hdr->hdr_frag_idx);
vfrag->vf_mask_processed |= bit;
/* when we have local completion of the entire vfrag
we stop the local wdog timers and set our ack timer
as the peer should be sending us an ack for the vfrag
*/
if(vfrag->vf_mask_processed == vfrag->vf_mask) {
MCA_PML_DR_VFRAG_WDOG_STOP(vfrag);
/* MCA_PML_DR_VFRAG_ACK_START(vfrag); */
} else {
MCA_PML_DR_VFRAG_WDOG_RESET(vfrag);
}
/* check completion status */
if(OMPI_SUCCESS != status) {
/* TSW - FIX */
@ -267,6 +284,7 @@ int mca_pml_dr_send_request_start_buffered(
size_t max_data;
int32_t free_after;
int rc;
uint32_t csum;
/* allocate descriptor */
mca_bml_base_alloc(bml_btl, &descriptor, sizeof(mca_pml_dr_rendezvous_hdr_t) + size);
@ -291,10 +309,12 @@ int mca_pml_dr_send_request_start_buffered(
return rc;
}
csum = size > 0 ? sendreq->req_send.req_convertor.checksum : 0;
/* update lengths */
segment->seg_len = sizeof(mca_pml_dr_rendezvous_hdr_t) + max_data;
sendreq->req_send_offset = max_data;
sendreq->req_vfrag0.vf_size = max_data;
sendreq->req_vfrag0.bml_btl = bml_btl;
descriptor->des_cbfunc = mca_pml_dr_rndv_completion;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
@ -330,7 +350,7 @@ int mca_pml_dr_send_request_start_buffered(
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_match.hdr_csum = csum;
hdr->hdr_match.hdr_src_req.pval = sendreq;
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t));
@ -408,15 +428,16 @@ int mca_pml_dr_send_request_start_copy(
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : 0;
hdr->hdr_match.hdr_src_req.pval = sendreq;
hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t));
/* update lengths */
segment->seg_len = sizeof(mca_pml_dr_match_hdr_t) + max_data;
sendreq->req_send_offset = max_data;
sendreq->req_vfrag0.vf_size = max_data;
sendreq->req_vfrag0.bml_btl = bml_btl;
/* short message */
descriptor->des_cbfunc = mca_pml_dr_match_completion_free;
descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
@ -472,8 +493,9 @@ int mca_pml_dr_send_request_start_prepare(
hdr->hdr_match.hdr_src = sendreq->req_send.req_base.req_comm->c_my_rank;
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : 0; /* nope */
hdr->hdr_match.hdr_src_req.pval = sendreq;
hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t));
/* short message */
@ -484,7 +506,7 @@ int mca_pml_dr_send_request_start_prepare(
/* update lengths */
sendreq->req_send_offset = size;
sendreq->req_vfrag0.vf_size = size;
sendreq->req_vfrag0.bml_btl = bml_btl;
/* send */
rc = mca_bml_base_send(bml_btl, descriptor, MCA_BTL_TAG_PML);
if(OMPI_SUCCESS != rc) {
@ -541,7 +563,10 @@ int mca_pml_dr_send_request_start_rndv(
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag;
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence;
hdr->hdr_match.hdr_src_req.pval = sendreq;
hdr->hdr_match.hdr_csum = size > 0 ? sendreq->req_send.req_convertor.checksum : 0;
hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id;
hdr->hdr_rndv.hdr_msg_length = sendreq->req_send.req_bytes_packed;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t));
/* first fragment of a long message */
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
@ -549,6 +574,7 @@ int mca_pml_dr_send_request_start_rndv(
des->des_cbfunc = mca_pml_dr_rndv_completion;
sendreq->req_send_offset = size;
sendreq->req_vfrag0.vf_size = size;
sendreq->req_vfrag0.bml_btl = bml_btl;
/* send */
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
@ -559,6 +585,7 @@ int mca_pml_dr_send_request_start_rndv(
}
/**
* Schedule pipeline of send descriptors for the given request,
* using send protocol.
@ -583,7 +610,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
mca_pml_dr_frag_hdr_t* hdr;
mca_btl_base_descriptor_t* des;
mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send);
mca_bml_base_btl_t* bml_btl = NULL;
mca_pml_dr_vfrag_t* vfrag = sendreq->req_vfrag;
size_t size = bytes_remaining;
/* offset tells us how much of the vfrag has been scheduled */
@ -593,6 +620,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
/* do we need to allocate a new vfrag
(we scheduled all the vfrag already) */
if(vfrag->vf_size == offset) {
bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send);
MCA_PML_DR_VFRAG_ALLOC(vfrag,rc);
if(NULL == vfrag) {
OPAL_THREAD_LOCK(&mca_pml_dr.lock);
@ -601,8 +629,12 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
break;
}
MCA_PML_DR_SEND_REQUEST_VFRAG_INIT(sendreq,bml_endpoint,bytes_remaining,vfrag);
vfrag->bml_btl = bml_btl;
vfrag->sendreq = sendreq;
offset = 0;
sendreq->req_num_vfrags++;
} else { /* always schedule the vfrag accross the same btl */
bml_btl = vfrag->bml_btl;
}
/* makes sure that we don't exceed vfrag size */
@ -630,8 +662,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
break;
}
des->des_cbfunc = mca_pml_dr_frag_completion;
des->des_cbdata = sendreq;
des->des_cbdata = vfrag;
/* setup header */
hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval;
hdr->hdr_common.hdr_flags = 0;
@ -643,6 +674,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
hdr->hdr_frag_csum = sendreq->req_send.req_convertor.checksum;
hdr->hdr_frag_offset = sendreq->req_send_offset;
hdr->hdr_src_req.pval = sendreq;
hdr->hdr_dst_req = sendreq->req_vfrag0.vf_recv;
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t));
@ -651,9 +683,10 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
sendreq->req_send_offset += size;
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
/* start vfrag watchdog timer */
/* start vfrag watchdog timer if this is the first part of the vfrag*/
if(vfrag->vf_idx == 0) {
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
}
/* initiate send - note that this may complete before the call returns */
rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML);
@ -683,14 +716,16 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
*/
while(vfrag->vf_idx < vfrag->vf_len &&
sendreq->req_pipeline_depth < mca_pml_dr.send_pipeline_depth) {
if(((1 << vfrag->vf_idx) & vfrag->vf_ack) == 0) {
mca_bml_base_btl_t* bml_btl = mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send);
if(((1 << vfrag->vf_idx) & vfrag->vf_mask_processed) == 0) {
mca_bml_base_btl_t* bml_btl = vfrag->bml_btl;
mca_pml_dr_frag_hdr_t* hdr;
mca_btl_base_descriptor_t* des;
size_t offset = vfrag->vf_offset + (vfrag->vf_max_send_size * vfrag->vf_idx);
size_t size;
int rc;
if(vfrag->vf_idx == vfrag->vf_len - 1) {
size = vfrag->vf_size - offset;
} else {
@ -714,7 +749,7 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
break;
}
des->des_cbfunc = mca_pml_dr_frag_completion;
des->des_cbdata = sendreq;
des->des_cbdata = vfrag;
/* setup header */
hdr = (mca_pml_dr_frag_hdr_t*)des->des_src->seg_addr.pval;
@ -735,8 +770,8 @@ int mca_pml_dr_send_request_schedule(mca_pml_dr_send_request_t* sendreq)
sendreq->req_send_offset += size;
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth,1);
/* start vfrag watchdog timer */
MCA_PML_DR_VFRAG_WDOG_START(vfrag);
/* reset the vfrag watchdog timer due to retransmission */
MCA_PML_DR_VFRAG_WDOG_RESET(vfrag);
/* initiate send - note that this may complete before the call returns */
rc = mca_bml_base_send( bml_btl, des, MCA_BTL_TAG_PML);
@ -777,17 +812,25 @@ void mca_pml_dr_send_request_acked(
mca_pml_dr_send_request_t* sendreq,
mca_pml_dr_ack_hdr_t* ack)
{
mca_pml_dr_vfrag_t* vfrag;
assert(sendreq);
MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq, ack, vfrag);
/* MCA_PML_DR_VFRAG_ACK_STOP(vfrag); */
if(ack->hdr_common.hdr_flags & MCA_PML_DR_HDR_FLAGS_MATCH) {
if(ack->hdr_vmask) {
sendreq->req_vfrag0.vf_recv = ack->hdr_dst_req;
MCA_PML_DR_SEND_REQUEST_ADVANCE(sendreq);
} else {
mca_pml_dr_vfrag_t* vfrag;
MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq, ack, vfrag);
if(NULL == vfrag) {
return;
}
else {
vfrag->vf_idx = 0;
vfrag->vf_ack = 0;
/* retransmit missing fragments */
/* OPAL_THREAD_LOCK(&sendreq->req_mutex); */
/* opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag); */
/* OPAL_THREAD_UNLOCK(&sendreq->req_mutex); */
/* mca_pml_dr_send_request_schedule(sendreq); */
}
} else {
/* add in acknowledged fragments */
vfrag->vf_ack |= ack->hdr_vmask;
@ -810,6 +853,9 @@ void mca_pml_dr_send_request_acked(
} else {
/* retransmit missing fragments */
/* reset local completion flags to only those that have been
successfully acked */
vfrag->vf_mask_processed = vfrag->vf_ack;
OPAL_THREAD_LOCK(&sendreq->req_mutex);
vfrag->vf_idx = 0;
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
@ -818,26 +864,3 @@ void mca_pml_dr_send_request_acked(
}
}
}
void mca_pml_dr_send_request_nacked(
mca_pml_dr_send_request_t* sendreq,
mca_pml_dr_ack_hdr_t* ack)
{
mca_pml_dr_vfrag_t* vfrag;
MCA_PML_DR_SEND_REQUEST_VFRAG_PENDING(sendreq, ack, vfrag);
if(NULL == vfrag) {
return;
}
/* removed nacked bits from acknowledged fragments */
vfrag->vf_idx = 0;
vfrag->vf_ack &= ~ack->hdr_vmask;
/* retransmit missing fragments */
OPAL_THREAD_LOCK(&sendreq->req_mutex);
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&sendreq->req_mutex);
mca_pml_dr_send_request_schedule(sendreq);
}

Просмотреть файл

@ -21,6 +21,7 @@
#ifndef OMPI_PML_DR_SEND_REQUEST_H
#define OMPI_PML_DR_SEND_REQUEST_H
#include "opal/util/crc.h"
#include "ompi_config.h"
#include "ompi/datatype/convertor.h"
#include "ompi/mca/btl/btl.h"
@ -32,6 +33,7 @@
#include "pml_dr_comm.h"
#include "pml_dr_hdr.h"
#include "pml_dr_vfrag.h"
#include "opal/event/event.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
@ -60,6 +62,8 @@ struct mca_pml_dr_send_request_t {
opal_mutex_t req_mutex;
size_t req_num_acks;
size_t req_num_vfrags;
};
typedef struct mca_pml_dr_send_request_t mca_pml_dr_send_request_t;
@ -167,6 +171,8 @@ do {
return OMPI_ERR_OUT_OF_RESOURCE; \
} \
segment = descriptor->des_src; \
/* setup vfrag */ \
sendreq->req_vfrag0.vf_size = 0; \
\
/* build hdr */ \
hdr = (mca_pml_dr_hdr_t*)segment->seg_addr.pval; \
@ -177,6 +183,8 @@ do {
hdr->hdr_match.hdr_tag = sendreq->req_send.req_base.req_tag; \
hdr->hdr_match.hdr_seq = sendreq->req_send.req_base.req_sequence; \
hdr->hdr_match.hdr_src_req.pval = sendreq; \
hdr->hdr_match.hdr_vid = sendreq->req_vfrag0.vf_id; \
hdr->hdr_common.hdr_csum = opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t)); \
\
/* short message */ \
descriptor->des_cbfunc = mca_pml_dr_match_completion_cache; \
@ -331,6 +339,7 @@ do {
vfrag->vf_mask = (((uint64_t)1 << vfrag->vf_len) - (uint64_t)1); \
} \
\
vfrag->vf_mask_processed = 0; \
vfrag->vf_id = OPAL_THREAD_ADD32(&proc->vfrag_id,1); \
vfrag->vf_ack = 0; \
vfrag->vf_offset = sendreq->req_send_offset; \

Просмотреть файл

@ -18,7 +18,9 @@
#include "ompi_config.h"
#include "pml_dr_vfrag.h"
#include "pml_dr_sendreq.h"
void mca_pml_dr_send_request_wdog_timeout(int fd, short event, void* vfrag);
void mca_pml_dr_send_request_ack_timeout(int fd, short event, void* vfrag);
static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag)
{
@ -32,7 +34,12 @@ static void mca_pml_dr_vfrag_construct(mca_pml_dr_vfrag_t* vfrag)
vfrag->vf_max_send_size = 0;
vfrag->vf_ack = 0;
vfrag->vf_mask = 0;
memset(&vfrag->vf_event, 0, sizeof(vfrag->vf_event));
vfrag->tv_wdog.tv_sec = mca_pml_dr.timer_wdog_sec;
vfrag->tv_wdog.tv_usec = mca_pml_dr.timer_wdog_usec;
vfrag->tv_ack.tv_sec = mca_pml_dr.timer_ack_usec;
vfrag->tv_ack.tv_usec = mca_pml_dr.timer_ack_usec;
opal_evtimer_set(&vfrag->ev_wdog, mca_pml_dr_send_request_wdog_timeout, (void*) vfrag);
opal_evtimer_set(&vfrag->ev_ack, mca_pml_dr_send_request_ack_timeout, (void*) vfrag);
}
@ -50,4 +57,34 @@ OBJ_CLASS_INSTANCE(
);
/**
* The wdog timer expired, better do something about it, like resend the current part of the vfrag
*/
void mca_pml_dr_send_request_wdog_timeout(int fd, short event, void* data) {
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data;
mca_pml_dr_send_request_t* sendreq = vfrag->sendreq;
OPAL_THREAD_LOCK(&sendreq->req_mutex);
vfrag->vf_idx = 0;
opal_list_remove_item(&sendreq->req_pending, (opal_list_item_t*)vfrag);
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&sendreq->req_mutex);
mca_pml_dr_send_request_schedule(sendreq);
}
/**
* The ack timer expired, better do something about it, like resend the entire vfrag?
*/
void mca_pml_dr_send_request_ack_timeout(int fd, short event, void* data) {
mca_pml_dr_vfrag_t* vfrag = (mca_pml_dr_vfrag_t*) data;
mca_pml_dr_send_request_t* sendreq = vfrag->sendreq;
/* reset it all, so it will all retransmit */
vfrag->vf_ack = vfrag->vf_mask_processed = 0;
OPAL_THREAD_LOCK(&sendreq->req_mutex);
vfrag->vf_idx = 0;
opal_list_remove_item(&sendreq->req_pending, (opal_list_item_t*)vfrag);
opal_list_append(&sendreq->req_retrans, (opal_list_item_t*)vfrag);
OPAL_THREAD_UNLOCK(&sendreq->req_mutex);
mca_pml_dr_send_request_schedule(sendreq);
}

Просмотреть файл

@ -42,7 +42,21 @@ struct mca_pml_dr_vfrag_t {
size_t vf_max_send_size;
uint64_t vf_ack;
uint64_t vf_mask;
opal_event_t vf_event;
uint64_t vf_mask_processed;
struct mca_pml_dr_send_request_t* sendreq;
struct mca_bml_base_btl_t* bml_btl;
/* we need a timer for the vfrag for:
1) a watchdog timer for local completion of the current
operation
2) a timeout for ACK of the VRAG
*/
struct timeval tv_wdog;
struct timeval tv_ack;
opal_event_t ev_ack;
opal_event_t ev_wdog;
uint8_t cnt_wdog;
uint8_t cnt_ack;
uint8_t cnt_nack;
};
typedef struct mca_pml_dr_vfrag_t mca_pml_dr_vfrag_t;
@ -61,17 +75,46 @@ do { \
OMPI_FREE_LIST_RETURN(&mca_pml_dr.vfrags, (opal_list_item_t*)vfrag); \
} while(0)
#if 0
#define MCA_PML_DR_VFRAG_WDOG_START(vfrag) \
do { \
\
opal_event_add(&vfrag->ev_wdog, &vfrag->tv_wdog); \
} while(0)
#define MCA_PML_DR_VFRAG_WDOG_RESET(vfrag) \
do { \
opal_event_del(&vfrag->ev_wdog); \
opal_event_add(&vfrag->ev_wdog, &vfrag->tv_wdog); \
} while(0)
#define MCA_PML_DR_VFRAG_WDOG_STOP(vfrag) \
do { \
opal_event_del(&vfrag->ev_wdog); \
\
} while(0)
#define MCA_PML_DR_VFRAG_ACK_START(vfrag) \
do { \
opal_event_add(&vfrag->ev_ack, &vfrag->tv_ack); \
} while(0)
#define MCA_PML_DR_VFRAG_ACK_STOP(vfrag) \
do { \
opal_event_del(&vfrag->ev_ack); \
\
} while(0)
#endif
#if 1
#define MCA_PML_DR_VFRAG_WDOG_START(vfrag)
#define MCA_PML_DR_VFRAG_WDOG_RESET(vfrag)
#define MCA_PML_DR_VFRAG_WDOG_STOP(vfrag)
#define MCA_PML_DR_VFRAG_ACK_START(vfrag)
#define MCA_PML_DR_VFRAG_ACK_STOP(vfrag)
#endif
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif