1
1
openmpi/ompi/mca/pml/csum/pml_csum_recvfrag.c
George Bosilca 0bd2bf9aae The number of segments accepted should be bounded by MCA_BTL_DES_MAX_SEGMENTS
and not by 2.

This commit was SVN r25515.
2011-11-28 17:19:12 +00:00

875 строки
35 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2011 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2007 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2008 UT-Battelle, LLC. All rights reserved.
* Copyright (c) 2006-2008 University of Houston. All rights reserved.
* Copyright (c) 2009 IBM Corporation. All rights reserved.
* Copyright (c) 2009 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2010 Oracle and/or its affiliates. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#include "ompi_config.h"
#include "opal/class/opal_list.h"
#include "opal/util/crc.h"
#include "opal/threads/mutex.h"
#include "opal/prefetch.h"
#include "opal/util/output.h"
#include "orte/util/name_fns.h"
#include "orte/runtime/orte_globals.h"
#include "orte/mca/notifier/notifier.h"
#include "orte/mca/errmgr/errmgr.h"
#include "ompi/constants.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/pml/base/base.h"
#include "ompi/peruse/peruse-internal.h"
#include "ompi/memchecker.h"
#include "pml_csum.h"
#include "pml_csum_comm.h"
#include "pml_csum_recvfrag.h"
#include "pml_csum_recvreq.h"
#include "pml_csum_sendreq.h"
#include "pml_csum_hdr.h"
OBJ_CLASS_INSTANCE( mca_pml_csum_buffer_t,
ompi_free_list_item_t,
NULL,
NULL );
OBJ_CLASS_INSTANCE( mca_pml_csum_recv_frag_t,
opal_list_item_t,
NULL,
NULL );
/**
* Static functions.
*/
/**
* Dump data elements that caused a checksum violation
*/
static void dump_csum_error_data(mca_btl_base_segment_t* segments, size_t num_segments)
{
size_t i, j;
uint8_t *data;
printf("CHECKSUM ERROR DATA\n");
for (i = 0; i < num_segments; ++i) {
printf("Segment %lu", (unsigned long)i);
data = (uint8_t*)segments[i].seg_addr.pval;
for (j=0; j < segments[i].seg_len; j++) {
if (0 == (j % 40)) {
printf("\n");
}
printf("%02x ", data[j]);
};
}
printf("\nEND CHECKSUM ERROR DATA\n\n");
}
/**
* Append a unexpected descriptor to a queue. This function will allocate and
* initialize the fragment (if necessary) and then will add it to the specified
* queue. The allocated fragment is not returned to the caller.
*/
static void
append_frag_to_list(opal_list_t *queue, mca_btl_base_module_t *btl,
mca_pml_csum_match_hdr_t *hdr, mca_btl_base_segment_t* segments,
size_t num_segments, mca_pml_csum_recv_frag_t* frag)
{
int rc;
if(NULL == frag) {
MCA_PML_CSUM_RECV_FRAG_ALLOC(frag, rc);
MCA_PML_CSUM_RECV_FRAG_INIT(frag, hdr, segments, num_segments, btl);
}
opal_list_append(queue, (opal_list_item_t*)frag);
}
/**
* Match incoming recv_frags against posted receives.
* Supports out of order delivery.
*
* @param frag_header (IN) Header of received recv_frag.
* @param frag_desc (IN) Received recv_frag descriptor.
* @param match_made (OUT) Flag indicating wether a match was made.
* @param additional_matches (OUT) List of additional matches
* @return OMPI_SUCCESS or error status on failure.
*/
static int mca_pml_csum_recv_frag_match( mca_btl_base_module_t *btl,
mca_pml_csum_match_hdr_t *hdr,
mca_btl_base_segment_t* segments,
size_t num_segments,
int type);
static mca_pml_csum_recv_request_t*
match_one(mca_btl_base_module_t *btl,
mca_pml_csum_match_hdr_t *hdr, mca_btl_base_segment_t* segments,
size_t num_segments, ompi_communicator_t *comm_ptr,
mca_pml_csum_comm_proc_t *proc,
mca_pml_csum_recv_frag_t* frag);
void mca_pml_csum_recv_frag_callback_match(mca_btl_base_module_t* btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t* des,
void* cbdata )
{
mca_btl_base_segment_t* segments = des->des_dst;
mca_pml_csum_match_hdr_t* hdr = (mca_pml_csum_match_hdr_t*)segments->seg_addr.pval;
ompi_communicator_t *comm_ptr;
mca_pml_csum_recv_request_t *match = NULL;
mca_pml_csum_comm_t *comm;
mca_pml_csum_comm_proc_t *proc;
size_t num_segments = des->des_dst_cnt;
size_t bytes_received = 0;
uint16_t csum_received, csum=0;
uint32_t csum_data;
assert(num_segments <= MCA_BTL_DES_MAX_SEGMENTS);
if( OPAL_UNLIKELY(segments->seg_len < OMPI_PML_CSUM_MATCH_HDR_LEN) ) {
return;
}
csum_hdr_ntoh(((mca_pml_csum_hdr_t*) hdr), MCA_PML_CSUM_HDR_TYPE_MATCH);
csum_received = hdr->hdr_common.hdr_csum;
hdr->hdr_common.hdr_csum = 0;
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
hdr->hdr_common.hdr_flags &= ~MCA_PML_CSUM_HDR_FLAGS_NBO;
#endif
csum = opal_csum16(hdr, OMPI_PML_CSUM_MATCH_HDR_LEN);
hdr->hdr_common.hdr_csum = csum_received;
OPAL_OUTPUT_VERBOSE((5, mca_pml_base_output,
"%s:%s:%d common_hdr: %02x:%02x:%04x match_hdr: %04x:%04x:%08x:%08x:%08x",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), __FILE__, __LINE__,
hdr->hdr_common.hdr_type, hdr->hdr_common.hdr_flags, hdr->hdr_common.hdr_csum,
hdr->hdr_ctx, hdr->hdr_seq, hdr->hdr_src, hdr->hdr_tag, hdr->hdr_csum));
if (csum_received != csum) {
opal_output(0, "%s:%s:%d: Invalid \'match header\' - received csum:0x%04x != computed csum:0x%04x\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), __FILE__, __LINE__, csum_received, csum);
orte_notifier.log(ORTE_NOTIFIER_CRIT, 1,
"Checksum header violation: job %s file %s line %d",
(NULL == orte_job_ident) ? "UNKNOWN" : orte_job_ident,
__FILE__, __LINE__);
dump_csum_error_data(segments, 1);
orte_errmgr.abort(-1,NULL);
}
/* communicator pointer */
comm_ptr = ompi_comm_lookup(hdr->hdr_ctx);
if(OPAL_UNLIKELY(NULL == comm_ptr)) {
/* This is a special case. A message for a not yet existing
* communicator can happens. Instead of doing a matching we
* will temporarily add it the a pending queue in the PML.
* Later on, when the communicator is completely instantiated,
* this pending queue will be searched and all matching fragments
* moved to the right communicator.
*/
append_frag_to_list( &mca_pml_csum.non_existing_communicator_pending,
btl, hdr, segments, num_segments, NULL );
return;
}
comm = (mca_pml_csum_comm_t *)comm_ptr->c_pml_comm;
/* source sequence number */
proc = &comm->procs[hdr->hdr_src];
/* We generate the MSG_ARRIVED event as soon as the PML is aware
* of a matching fragment arrival. Independing if it is received
* on the correct order or not. This will allow the tools to
* figure out if the messages are not received in the correct
* order (if multiple network interfaces).
*/
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_ARRIVED, comm_ptr,
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
/* get next expected message sequence number - if threaded
* run, lock to make sure that if another thread is processing
* a frag from the same message a match is made only once.
* Also, this prevents other posted receives (for a pair of
* end points) from being processed, and potentially "loosing"
* the fragment.
*/
OPAL_THREAD_LOCK(&comm->matching_lock);
/* get sequence number of next message that can be processed */
if(OPAL_UNLIKELY((((uint16_t) hdr->hdr_seq) != ((uint16_t) proc->expected_sequence)) ||
(opal_list_get_size(&proc->frags_cant_match) > 0 ))) {
goto slow_path;
}
/* This is the sequence number we were expecting, so we can try
* matching it to already posted receives.
*/
/* We're now expecting the next sequence number. */
proc->expected_sequence++;
/* We generate the SEARCH_POSTED_QUEUE only when the message is
* received in the correct sequence. Otherwise, we delay the event
* generation until we reach the correct sequence number.
*/
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_BEGIN, comm_ptr,
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
match = match_one(btl, hdr, segments, num_segments, comm_ptr, proc, NULL);
/* The match is over. We generate the SEARCH_POSTED_Q_END here,
* before going into the mca_pml_csum_check_cantmatch_for_match so
* we can make a difference for the searching time for all
* messages.
*/
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_END, comm_ptr,
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
/* release matching lock before processing fragment */
OPAL_THREAD_UNLOCK(&comm->matching_lock);
if(OPAL_LIKELY(match)) {
bytes_received = segments->seg_len - OMPI_PML_CSUM_MATCH_HDR_LEN;
match->req_recv.req_bytes_packed = bytes_received;
MCA_PML_CSUM_RECV_REQUEST_MATCHED(match, hdr);
if(match->req_bytes_expected > 0) {
struct iovec iov[MCA_BTL_DES_MAX_SEGMENTS];
uint32_t iov_count = 1;
/*
* Make user buffer accessable(defined) before unpacking.
*/
MEMCHECKER(
memchecker_call(&opal_memchecker_base_mem_defined,
match->req_recv.req_base.req_addr,
match->req_recv.req_base.req_count,
match->req_recv.req_base.req_datatype);
);
iov[0].iov_len = bytes_received;
iov[0].iov_base = (IOVBASE_TYPE*)((unsigned char*)segments->seg_addr.pval +
OMPI_PML_CSUM_MATCH_HDR_LEN);
while (iov_count < num_segments) {
bytes_received += segments[iov_count].seg_len;
iov[iov_count].iov_len = segments[iov_count].seg_len;
iov[iov_count].iov_base = (IOVBASE_TYPE*)((unsigned char*)segments[iov_count].seg_addr.pval);
iov_count++;
}
opal_convertor_unpack( &match->req_recv.req_base.req_convertor,
iov,
&iov_count,
&bytes_received );
match->req_bytes_received = bytes_received;
/*
* Unpacking finished, make the user buffer unaccessable again.
*/
MEMCHECKER(
memchecker_call(&opal_memchecker_base_mem_noaccess,
match->req_recv.req_base.req_addr,
match->req_recv.req_base.req_count,
match->req_recv.req_base.req_datatype);
);
}
if (bytes_received > 0) {
csum_data = match->req_recv.req_base.req_convertor.checksum;
OPAL_OUTPUT_VERBOSE((1, mca_pml_base_output,
"%s Received \'match\' with data csum:0x%x, header csum:0x%04x, size:%lu\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), hdr->hdr_csum, csum_received, (unsigned long)bytes_received));
if (csum_data != hdr->hdr_csum) {
opal_output(0, "%s:%s:%d: Invalid \'match data\' - received csum:0x%x != computed csum:0x%x\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), __FILE__, __LINE__, hdr->hdr_csum, csum_data);
orte_notifier.log(ORTE_NOTIFIER_CRIT, 1,
"Checksum data violation: job %s file %s line %d",
(NULL == orte_job_ident) ? "UNKNOWN" : orte_job_ident,
__FILE__, __LINE__);
dump_csum_error_data(segments, num_segments);
orte_errmgr.abort(-1,NULL);
}
}
/* no need to check if complete we know we are.. */
/* don't need a rmb as that is for checking */
recv_request_pml_complete(match);
}
return;
slow_path:
OPAL_THREAD_UNLOCK(&comm->matching_lock);
mca_pml_csum_recv_frag_match(btl, hdr, segments,
num_segments, MCA_PML_CSUM_HDR_TYPE_MATCH);
}
void mca_pml_csum_recv_frag_callback_rndv(mca_btl_base_module_t* btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t* des,
void* cbdata )
{
mca_btl_base_segment_t* segments = des->des_dst;
mca_pml_csum_hdr_t* hdr = (mca_pml_csum_hdr_t*)segments->seg_addr.pval;
uint16_t csum_received, csum;
if( OPAL_UNLIKELY(segments->seg_len < sizeof(mca_pml_csum_common_hdr_t)) ) {
return;
}
csum_hdr_ntoh(hdr, MCA_PML_CSUM_HDR_TYPE_RNDV);
csum_received = hdr->hdr_common.hdr_csum;
hdr->hdr_common.hdr_csum = 0;
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
hdr->hdr_common.hdr_flags &= ~MCA_PML_CSUM_HDR_FLAGS_NBO;
#endif
csum = opal_csum16(hdr, sizeof(mca_pml_csum_rendezvous_hdr_t));
hdr->hdr_common.hdr_csum = csum_received;
if (csum_received != csum) {
opal_output(0, "%s:%s:%d: Invalid \'rndv header\' - received csum:0x%04x != computed csum:0x%04x\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), __FILE__, __LINE__, csum_received, csum);
orte_notifier.log(ORTE_NOTIFIER_CRIT, 1,
"Checksum header violation: job %s file %s line %d",
(NULL == orte_job_ident) ? "UNKNOWN" : orte_job_ident,
__FILE__, __LINE__);
dump_csum_error_data(segments, 1);
orte_errmgr.abort(-1,NULL);
}
mca_pml_csum_recv_frag_match(btl, &hdr->hdr_match, segments,
des->des_dst_cnt, MCA_PML_CSUM_HDR_TYPE_RNDV);
return;
}
void mca_pml_csum_recv_frag_callback_rget(mca_btl_base_module_t* btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t* des,
void* cbdata )
{
mca_btl_base_segment_t* segments = des->des_dst;
mca_pml_csum_hdr_t* hdr = (mca_pml_csum_hdr_t*)segments->seg_addr.pval;
if( OPAL_UNLIKELY(segments->seg_len < sizeof(mca_pml_csum_common_hdr_t)) ) {
return;
}
csum_hdr_ntoh(hdr, MCA_PML_CSUM_HDR_TYPE_RGET);
mca_pml_csum_recv_frag_match(btl, &hdr->hdr_match, segments,
des->des_dst_cnt, MCA_PML_CSUM_HDR_TYPE_RGET);
return;
}
void mca_pml_csum_recv_frag_callback_ack(mca_btl_base_module_t* btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t* des,
void* cbdata )
{
mca_btl_base_segment_t* segments = des->des_dst;
mca_pml_csum_hdr_t* hdr = (mca_pml_csum_hdr_t*)segments->seg_addr.pval;
mca_pml_csum_send_request_t* sendreq;
uint16_t csum_received, csum;
if( OPAL_UNLIKELY(segments->seg_len < sizeof(mca_pml_csum_common_hdr_t)) ) {
return;
}
csum_hdr_ntoh(hdr, MCA_PML_CSUM_HDR_TYPE_ACK);
csum_received = hdr->hdr_common.hdr_csum;
hdr->hdr_common.hdr_csum = 0;
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
hdr->hdr_common.hdr_flags &= ~MCA_PML_CSUM_HDR_FLAGS_NBO;
#endif
csum = opal_csum16(hdr, sizeof(mca_pml_csum_ack_hdr_t));
hdr->hdr_common.hdr_csum = csum_received;
OPAL_OUTPUT_VERBOSE((1, mca_pml_base_output,
"%s Received \'ACK\' with header csum:0x%04x\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), csum));
if (csum_received != csum) {
opal_output(0, "%s:%s:%d: Invalid \'ACK header\' - received csum:0x%04x != computed csum:0x%04x\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), __FILE__, __LINE__, csum_received, csum);
orte_notifier.log(ORTE_NOTIFIER_CRIT, 1,
"Checksum header violation: job %s file %s line %d",
(NULL == orte_job_ident) ? "UNKNOWN" : orte_job_ident,
__FILE__, __LINE__);
dump_csum_error_data(segments, 1);
orte_errmgr.abort(-1,NULL);
}
sendreq = (mca_pml_csum_send_request_t*)hdr->hdr_ack.hdr_src_req.pval;
sendreq->req_recv = hdr->hdr_ack.hdr_dst_req;
/* if the request should be delivered entirely by copy in/out
* then throttle sends */
if(hdr->hdr_common.hdr_flags & MCA_PML_CSUM_HDR_FLAGS_NORDMA)
sendreq->req_throttle_sends = true;
mca_pml_csum_send_request_copy_in_out(sendreq,
hdr->hdr_ack.hdr_send_offset,
sendreq->req_send.req_bytes_packed -
hdr->hdr_ack.hdr_send_offset);
if (sendreq->req_state != 0) {
/* Typical receipt of an ACK message causes req_state to be
* decremented. However, a send request that started as an
* RGET request can become a RNDV. For example, when the
* receiver determines that its receive buffer is not
* contiguous and therefore cannot support the RGET
* protocol. A send request that started with the RGET
* protocol has req_state == 0 and as such should not be
* decremented.
*/
OPAL_THREAD_ADD32(&sendreq->req_state, -1);
}
if(send_request_pml_complete_check(sendreq) == false)
mca_pml_csum_send_request_schedule(sendreq);
return;
}
void mca_pml_csum_recv_frag_callback_frag(mca_btl_base_module_t* btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t* des,
void* cbdata ) {
mca_btl_base_segment_t* segments = des->des_dst;
mca_pml_csum_hdr_t* hdr = (mca_pml_csum_hdr_t*)segments->seg_addr.pval;
mca_pml_csum_recv_request_t* recvreq;
uint16_t csum_received, csum;
if( OPAL_UNLIKELY(segments->seg_len < sizeof(mca_pml_csum_common_hdr_t)) ) {
return;
}
csum_hdr_ntoh(hdr, MCA_PML_CSUM_HDR_TYPE_FRAG);
csum_received = hdr->hdr_common.hdr_csum;
hdr->hdr_common.hdr_csum = 0;
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
hdr->hdr_common.hdr_flags &= ~MCA_PML_CSUM_HDR_FLAGS_NBO;
#endif
csum = opal_csum16(hdr, sizeof(mca_pml_csum_frag_hdr_t));
hdr->hdr_common.hdr_csum = csum_received;
if(csum_received != csum) {
opal_output(0, "%s:%s:%d: Invalid \'frag header\' - received csum:0x%04x != computed csum:0x%04x\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), __FILE__, __LINE__, csum_received, csum);
orte_notifier.log(ORTE_NOTIFIER_CRIT, 1,
"Checksum header violation: job %s file %s line %d",
(NULL == orte_job_ident) ? "UNKNOWN" : orte_job_ident,
__FILE__, __LINE__);
dump_csum_error_data(segments, 1);
orte_errmgr.abort(-1,NULL);
}
recvreq = (mca_pml_csum_recv_request_t*)hdr->hdr_frag.hdr_dst_req.pval;
mca_pml_csum_recv_request_progress_frag(recvreq,btl,segments,des->des_dst_cnt);
return;
}
void mca_pml_csum_recv_frag_callback_put(mca_btl_base_module_t* btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t* des,
void* cbdata ) {
mca_btl_base_segment_t* segments = des->des_dst;
mca_pml_csum_hdr_t* hdr = (mca_pml_csum_hdr_t*)segments->seg_addr.pval;
mca_pml_csum_send_request_t* sendreq;
uint16_t csum_received, csum;
if( OPAL_UNLIKELY(segments->seg_len < sizeof(mca_pml_csum_common_hdr_t)) ) {
return;
}
csum_hdr_ntoh(hdr, MCA_PML_CSUM_HDR_TYPE_PUT);
csum_received = hdr->hdr_common.hdr_csum;
hdr->hdr_common.hdr_csum = 0;
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
hdr->hdr_common.hdr_flags &= ~MCA_PML_CSUM_HDR_FLAGS_NBO;
#endif
csum = opal_csum16(hdr, sizeof(mca_pml_csum_rdma_hdr_t));
hdr->hdr_common.hdr_csum = csum_received;
OPAL_OUTPUT_VERBOSE((1, mca_pml_base_output,
"%s Received \'PUT\' with header csum:0x%04x\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), csum));
if(csum_received != csum) {
opal_output(0, "%s:%s:%d: Invalid \'PUT header\' - received csum:0x%04x != computed csum:0x%04x\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), __FILE__, __LINE__, csum_received, csum);
orte_notifier.log(ORTE_NOTIFIER_CRIT, 1,
"Checksum header violation: job %s file %s line %d",
(NULL == orte_job_ident) ? "UNKNOWN" : orte_job_ident,
__FILE__, __LINE__);
dump_csum_error_data(segments, 1);
orte_errmgr.abort(-1,NULL);
}
sendreq = (mca_pml_csum_send_request_t*)hdr->hdr_rdma.hdr_req.pval;
mca_pml_csum_send_request_put(sendreq,btl,&hdr->hdr_rdma);
return;
}
void mca_pml_csum_recv_frag_callback_fin(mca_btl_base_module_t* btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t* des,
void* cbdata ) {
mca_btl_base_segment_t* segments = des->des_dst;
mca_pml_csum_hdr_t* hdr = (mca_pml_csum_hdr_t*)segments->seg_addr.pval;
mca_btl_base_descriptor_t* rdma;
uint16_t csum_received, csum;
if( OPAL_UNLIKELY(segments->seg_len < sizeof(mca_pml_csum_common_hdr_t)) ) {
return;
}
csum_hdr_ntoh(hdr, MCA_PML_CSUM_HDR_TYPE_FIN);
csum_received = hdr->hdr_common.hdr_csum;
hdr->hdr_common.hdr_csum = 0;
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
hdr->hdr_common.hdr_flags &= ~MCA_PML_CSUM_HDR_FLAGS_NBO;
#endif
csum = opal_csum16(hdr, sizeof(mca_pml_csum_fin_hdr_t));
hdr->hdr_common.hdr_csum = csum_received;
OPAL_OUTPUT_VERBOSE((1, mca_pml_base_output,
"%s Received \'FIN\' with header csum:0x%04x\n",ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),csum));
if(csum_received != csum) {
opal_output(0, "%s:%s:%d: Invalid \'FIN header\' - received csum:0x%04x != computed csum:0x%04x\n",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), __FILE__, __LINE__, csum_received, csum);
orte_notifier.log(ORTE_NOTIFIER_CRIT, 1,
"Checksum header violation: job %s file %s line %d",
(NULL == orte_job_ident) ? "UNKNOWN" : orte_job_ident,
__FILE__, __LINE__);
dump_csum_error_data(segments, 1);
orte_errmgr.abort(-1,NULL);
}
rdma = (mca_btl_base_descriptor_t*)hdr->hdr_fin.hdr_des.pval;
rdma->des_cbfunc(btl, NULL, rdma,
hdr->hdr_fin.hdr_fail ? OMPI_ERROR : OMPI_SUCCESS);
return;
}
#define PML_MAX_SEQ ~((mca_pml_sequence_t)0);
static inline mca_pml_csum_recv_request_t* get_posted_recv(opal_list_t *queue)
{
if(opal_list_get_size(queue) == 0)
return NULL;
return (mca_pml_csum_recv_request_t*)opal_list_get_first(queue);
}
static inline mca_pml_csum_recv_request_t* get_next_posted_recv(
opal_list_t *queue,
mca_pml_csum_recv_request_t* req)
{
opal_list_item_t *i = opal_list_get_next((opal_list_item_t*)req);
if(opal_list_get_end(queue) == i)
return NULL;
return (mca_pml_csum_recv_request_t*)i;
}
static mca_pml_csum_recv_request_t *match_incomming(
mca_pml_csum_match_hdr_t *hdr, mca_pml_csum_comm_t *comm,
mca_pml_csum_comm_proc_t *proc)
{
mca_pml_csum_recv_request_t *specific_recv, *wild_recv;
mca_pml_sequence_t wild_recv_seq, specific_recv_seq;
int tag = hdr->hdr_tag;
specific_recv = get_posted_recv(&proc->specific_receives);
wild_recv = get_posted_recv(&comm->wild_receives);
wild_recv_seq = wild_recv ?
wild_recv->req_recv.req_base.req_sequence : PML_MAX_SEQ;
specific_recv_seq = specific_recv ?
specific_recv->req_recv.req_base.req_sequence : PML_MAX_SEQ;
/* they are equal only if both are PML_MAX_SEQ */
while(wild_recv_seq != specific_recv_seq) {
mca_pml_csum_recv_request_t **match;
opal_list_t *queue;
int req_tag;
mca_pml_sequence_t *seq;
if (OPAL_UNLIKELY(wild_recv_seq < specific_recv_seq)) {
match = &wild_recv;
queue = &comm->wild_receives;
seq = &wild_recv_seq;
} else {
match = &specific_recv;
queue = &proc->specific_receives;
seq = &specific_recv_seq;
}
req_tag = (*match)->req_recv.req_base.req_tag;
if(req_tag == tag || (req_tag == OMPI_ANY_TAG && tag >= 0)) {
opal_list_remove_item(queue, (opal_list_item_t*)(*match));
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_REMOVE_FROM_POSTED_Q,
&((*match)->req_recv.req_base), PERUSE_RECV);
return *match;
}
*match = get_next_posted_recv(queue, *match);
*seq = (*match) ? (*match)->req_recv.req_base.req_sequence : PML_MAX_SEQ;
}
return NULL;
}
static mca_pml_csum_recv_request_t*
match_one(mca_btl_base_module_t *btl,
mca_pml_csum_match_hdr_t *hdr, mca_btl_base_segment_t* segments,
size_t num_segments, ompi_communicator_t *comm_ptr,
mca_pml_csum_comm_proc_t *proc,
mca_pml_csum_recv_frag_t* frag)
{
mca_pml_csum_recv_request_t *match;
mca_pml_csum_comm_t *comm = (mca_pml_csum_comm_t *)comm_ptr->c_pml_comm;
do {
match = match_incomming(hdr, comm, proc);
/* if match found, process data */
if(OPAL_LIKELY(NULL != match)) {
match->req_recv.req_base.req_proc = proc->ompi_proc;
if(OPAL_UNLIKELY(MCA_PML_REQUEST_PROBE == match->req_recv.req_base.req_type)) {
/* complete the probe */
mca_pml_csum_recv_request_matched_probe(match, btl, segments,
num_segments);
/* attempt to match actual request */
continue;
}
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_MSG_MATCH_POSTED_REQ,
&(match->req_recv.req_base), PERUSE_RECV);
return match;
}
/* if no match found, place on unexpected queue */
append_frag_to_list(&proc->unexpected_frags, btl, hdr, segments,
num_segments, frag);
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_INSERT_IN_UNEX_Q, comm_ptr,
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
return NULL;
} while(true);
}
static mca_pml_csum_recv_frag_t* check_cantmatch_for_match(mca_pml_csum_comm_proc_t *proc)
{
mca_pml_csum_recv_frag_t *frag;
/* search the list for a fragment from the send with sequence
* number next_msg_seq_expected
*/
for(frag = (mca_pml_csum_recv_frag_t*)opal_list_get_first(&proc->frags_cant_match);
frag != (mca_pml_csum_recv_frag_t*)opal_list_get_end(&proc->frags_cant_match);
frag = (mca_pml_csum_recv_frag_t*)opal_list_get_next(frag))
{
mca_pml_csum_match_hdr_t* hdr = &frag->hdr.hdr_match;
/*
* If the message has the next expected seq from that proc...
*/
if(hdr->hdr_seq != proc->expected_sequence)
continue;
opal_list_remove_item(&proc->frags_cant_match, (opal_list_item_t*)frag);
return frag;
}
return NULL;
}
/**
* RCS/CTS receive side matching
*
* @param hdr list of parameters needed for matching
* This list is also embeded in frag,
* but this allows to save a memory copy when
* a match is made in this routine. (IN)
* @param frag pointer to receive fragment which we want
* to match (IN/OUT). If a match is not made,
* hdr is copied to frag.
* @param match_made parameter indicating if we matched frag/
* hdr (OUT)
* @param additional_matches if a match is made with frag, we
* may be able to match fragments that previously
* have arrived out-of-order. If this is the
* case, the associated fragment descriptors are
* put on this list for further processing. (OUT)
*
* @return OMPI error code
*
* This routine is used to try and match a newly arrived message fragment
* to pre-posted receives. The following assumptions are made
* - fragments are received out of order
* - for long messages, e.g. more than one fragment, a RTS/CTS algorithm
* is used.
* - 2nd and greater fragments include a receive descriptor pointer
* - fragments may be dropped
* - fragments may be corrupt
* - this routine may be called simultaneously by more than one thread
*/
static int mca_pml_csum_recv_frag_match( mca_btl_base_module_t *btl,
mca_pml_csum_match_hdr_t *hdr,
mca_btl_base_segment_t* segments,
size_t num_segments,
int type)
{
/* local variables */
uint16_t next_msg_seq_expected, frag_msg_seq;
ompi_communicator_t *comm_ptr;
mca_pml_csum_recv_request_t *match = NULL;
mca_pml_csum_comm_t *comm;
mca_pml_csum_comm_proc_t *proc;
mca_pml_csum_recv_frag_t* frag = NULL;
/* communicator pointer */
comm_ptr = ompi_comm_lookup(hdr->hdr_ctx);
if(OPAL_UNLIKELY(NULL == comm_ptr)) {
/* This is a special case. A message for a not yet existing
* communicator can happens. Instead of doing a matching we
* will temporarily add it the a pending queue in the PML.
* Later on, when the communicator is completely instantiated,
* this pending queue will be searched and all matching fragments
* moved to the right communicator.
*/
append_frag_to_list( &mca_pml_csum.non_existing_communicator_pending,
btl, hdr, segments, num_segments, NULL );
return OMPI_SUCCESS;
}
comm = (mca_pml_csum_comm_t *)comm_ptr->c_pml_comm;
/* source sequence number */
frag_msg_seq = hdr->hdr_seq;
proc = &comm->procs[hdr->hdr_src];
/**
* We generate the MSG_ARRIVED event as soon as the PML is aware of a matching
* fragment arrival. Independing if it is received on the correct order or not.
* This will allow the tools to figure out if the messages are not received in the
* correct order (if multiple network interfaces).
*/
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_ARRIVED, comm_ptr,
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
/* get next expected message sequence number - if threaded
* run, lock to make sure that if another thread is processing
* a frag from the same message a match is made only once.
* Also, this prevents other posted receives (for a pair of
* end points) from being processed, and potentially "loosing"
* the fragment.
*/
OPAL_THREAD_LOCK(&comm->matching_lock);
/* get sequence number of next message that can be processed */
next_msg_seq_expected = (uint16_t)proc->expected_sequence;
if(OPAL_UNLIKELY(frag_msg_seq != next_msg_seq_expected))
goto wrong_seq;
/*
* This is the sequence number we were expecting,
* so we can try matching it to already posted
* receives.
*/
out_of_order_match:
/* We're now expecting the next sequence number. */
proc->expected_sequence++;
/**
* We generate the SEARCH_POSTED_QUEUE only when the message is received
* in the correct sequence. Otherwise, we delay the event generation until
* we reach the correct sequence number.
*/
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_BEGIN, comm_ptr,
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
match = match_one(btl, hdr, segments, num_segments, comm_ptr, proc, frag);
/**
* The match is over. We generate the SEARCH_POSTED_Q_END here, before going
* into the mca_pml_csum_check_cantmatch_for_match so we can make a difference
* for the searching time for all messages.
*/
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_SEARCH_POSTED_Q_END, comm_ptr,
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
/* release matching lock before processing fragment */
OPAL_THREAD_UNLOCK(&comm->matching_lock);
if(OPAL_LIKELY(match)) {
switch(type) {
case MCA_PML_CSUM_HDR_TYPE_MATCH:
mca_pml_csum_recv_request_progress_match(match, btl, segments, num_segments);
break;
case MCA_PML_CSUM_HDR_TYPE_RNDV:
mca_pml_csum_recv_request_progress_rndv(match, btl, segments, num_segments);
break;
case MCA_PML_CSUM_HDR_TYPE_RGET:
mca_pml_csum_recv_request_progress_rget(match, btl, segments, num_segments);
break;
}
if(OPAL_UNLIKELY(frag))
MCA_PML_CSUM_RECV_FRAG_RETURN(frag);
}
/*
* Now that new message has arrived, check to see if
* any fragments on the c_c_frags_cant_match list
* may now be used to form new matchs
*/
if(OPAL_UNLIKELY(opal_list_get_size(&proc->frags_cant_match) > 0)) {
OPAL_THREAD_LOCK(&comm->matching_lock);
if((frag = check_cantmatch_for_match(proc))) {
hdr = &frag->hdr.hdr_match;
segments = frag->segments;
num_segments = frag->num_segments;
btl = frag->btl;
type = hdr->hdr_common.hdr_type;
goto out_of_order_match;
}
OPAL_THREAD_UNLOCK(&comm->matching_lock);
}
return OMPI_SUCCESS;
wrong_seq:
/*
* This message comes after the next expected, so it
* is ahead of sequence. Save it for later.
*/
append_frag_to_list(&proc->frags_cant_match, btl, hdr, segments,
num_segments, NULL);
OPAL_THREAD_UNLOCK(&comm->matching_lock);
return OMPI_SUCCESS;
}