1
1
openmpi/ompi/mca/pml/dr/pml_dr_recvfrag.c
Jeff Squyres e7ecd56bd2 This commit represents a bunch of work on a Mercurial side branch. As
such, the commit message back to the master SVN repository is fairly
long.

= ORTE Job-Level Output Messages =

Add two new interfaces that should be used for all new code throughout
the ORTE and OMPI layers (we already make the search-and-replace on
the existing ORTE / OMPI layers):

 * orte_output(): (and corresponding friends ORTE_OUTPUT,
   orte_output_verbose, etc.)  This function sends the output directly
   to the HNP for processing as part of a job-specific output
   channel.  It supports all the same outputs as opal_output()
   (syslog, file, stdout, stderr), but for stdout/stderr, the output
   is sent to the HNP for processing and output.  More on this below.
 * orte_show_help(): This function is a drop-in-replacement for
   opal_show_help(), with two differences in functionality:
   1. the rendered text help message output is sent to the HNP for
      display (rather than outputting directly into the process' stderr
      stream)
   1. the HNP detects duplicate help messages and does not display them
      (so that you don't see the same error message N times, once from
      each of your N MPI processes); instead, it counts "new" instances
      of the help message and displays a message every ~5 seconds when
      there are new ones ("I got X new copies of the help message...")

opal_show_help and opal_output still exist, but they only output in
the current process.  The intent for the new orte_* functions is that
they can apply job-level intelligence to the output.  As such, we
recommend that all new ORTE and OMPI code use the new orte_*
functions, not thei opal_* functions.

=== New code ===

For ORTE and OMPI programmers, here's what you need to do differently
in new code:

 * Do not include opal/util/show_help.h or opal/util/output.h.
   Instead, include orte/util/output.h (this one header file has
   declarations for both the orte_output() series of functions and
   orte_show_help()).
 * Effectively s/opal_output/orte_output/gi throughout your code.
   Note that orte_output_open() takes a slightly different argument
   list (as a way to pass data to the filtering stream -- see below),
   so you if explicitly call opal_output_open(), you'll need to
   slightly adapt to the new signature of orte_output_open().
 * Literally s/opal_show_help/orte_show_help/.  The function signature
   is identical.

=== Notes ===

 * orte_output'ing to stream 0 will do similar to what
   opal_output'ing did, so leaving a hard-coded "0" as the first
   argument is safe.
 * For systems that do not use ORTE's RML or the HNP, the effect of
   orte_output_* and orte_show_help will be identical to their opal
   counterparts (the additional information passed to
   orte_output_open() will be lost!).  Indeed, the orte_* functions
   simply become trivial wrappers to their opal_* counterparts.  Note
   that we have not tested this; the code is simple but it is quite
   possible that we mucked something up.

= Filter Framework =

Messages sent view the new orte_* functions described above and
messages output via the IOF on the HNP will now optionally be passed
through a new "filter" framework before being output to
stdout/stderr.  The "filter" OPAL MCA framework is intended to allow
preprocessing to messages before they are sent to their final
destinations.  The first component that was written in the filter
framework was to create an XML stream, segregating all the messages
into different XML tags, etc.  This will allow 3rd party tools to read
the stdout/stderr from the HNP and be able to know exactly what each
text message is (e.g., a help message, another OMPI infrastructure
message, stdout from the user process, stderr from the user process,
etc.).

Filtering is not active by default.  Filter components must be
specifically requested, such as:

{{{
$ mpirun --mca filter xml ...
}}}

There can only be one filter component active.

= New MCA Parameters =

The new functionality described above introduces two new MCA
parameters:

 * '''orte_base_help_aggregate''': Defaults to 1 (true), meaning that
   help messages will be aggregated, as described above.  If set to 0,
   all help messages will be displayed, even if they are duplicates
   (i.e., the original behavior).
 * '''orte_base_show_output_recursions''': An MCA parameter to help
   debug one of the known issues, described below.  It is likely that
   this MCA parameter will disappear before v1.3 final.

= Known Issues =

 * The XML filter component is not complete.  The current output from
   this component is preliminary and not real XML.  A bit more work
   needs to be done to configure.m4 search for an appropriate XML
   library/link it in/use it at run time.
 * There are possible recursion loops in the orte_output() and
   orte_show_help() functions -- e.g., if RML send calls orte_output()
   or orte_show_help().  We have some ideas how to fix these, but
   figured that it was ok to commit before feature freeze with known
   issues.  The code currently contains sub-optimal workarounds so
   that this will not be a problem, but it would be good to actually
   solve the problem rather than have hackish workarounds before v1.3 final.

This commit was SVN r18434.
2008-05-13 20:00:55 +00:00

1009 строки
41 KiB
C

/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2007 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2006 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/**
* @file
*/
#include "ompi_config.h"
#include "opal/class/opal_list.h"
#include "opal/threads/mutex.h"
#include "opal/util/crc.h"
#include "ompi/constants.h"
#include "ompi/communicator/communicator.h"
#include "ompi/datatype/datatype.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/class/ompi_seq_tracker.h"
#include "pml_dr.h"
#include "pml_dr_comm.h"
#include "pml_dr_recvfrag.h"
#include "pml_dr_recvreq.h"
#include "pml_dr_sendreq.h"
#include "pml_dr_hdr.h"
#include "orte/mca/errmgr/errmgr.h"
#define MCA_PML_DR_HDR_VALIDATE_ACK(do_csum, hdr, type) \
do { \
mca_pml_dr_endpoint_t* ep; \
if(do_csum) { \
uint16_t csum = (uint16_t)opal_csum(hdr, sizeof(type)); \
if(hdr->hdr_common.hdr_csum != csum) { \
MCA_PML_DR_DEBUG(0, (0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n", \
__FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum)); \
return; \
} \
} \
ep = (mca_pml_dr_endpoint_t*)opal_pointer_array_get_item(&mca_pml_dr.endpoints, hdr->hdr_common.hdr_src); \
assert(ep != NULL); \
if(ompi_seq_tracker_check_duplicate(&ep->seq_sends, hdr->hdr_common.hdr_vid)) { \
MCA_PML_DR_DEBUG(0, (0, "%s:%d: dropping duplicate ack, vfrag ID %d", \
__FILE__, __LINE__, hdr->hdr_common.hdr_vid)); \
return; \
} \
MCA_PML_DR_DEBUG(1, (0, "%s:%d: couldn't find vfrag ID %d \n", \
__FILE__, __LINE__, hdr->hdr_common.hdr_vid)); \
} while (0)
OBJ_CLASS_INSTANCE(
mca_pml_dr_buffer_t,
ompi_free_list_item_t,
NULL,
NULL
);
OBJ_CLASS_INSTANCE(
mca_pml_dr_recv_frag_t,
ompi_free_list_item_t,
NULL,
NULL
);
/*
* Release resources.
*/
static void mca_pml_dr_ctl_completion(
mca_btl_base_module_t* btl,
struct mca_btl_base_endpoint_t* ep,
struct mca_btl_base_descriptor_t* des,
int status)
{
mca_bml_base_btl_t* bml_btl = (mca_bml_base_btl_t*)des->des_context;
mca_bml_base_free(bml_btl, des);
}
/**
* Callback from BTL on receive.
*/
void mca_pml_dr_recv_frag_callback(
mca_btl_base_module_t* btl,
mca_btl_base_tag_t tag,
mca_btl_base_descriptor_t* des,
void* cbdata)
{
mca_btl_base_segment_t* segments = des->des_dst;
mca_pml_dr_hdr_t* hdr = (mca_pml_dr_hdr_t*)segments->seg_addr.pval;
mca_pml_dr_comm_t *comm;
mca_pml_dr_comm_proc_t *proc;
mca_pml_dr_endpoint_t *ep;
ompi_communicator_t* ompi_comm;
uint16_t csum;
bool do_csum = mca_pml_dr.enable_csum &&
(btl->btl_flags & MCA_BTL_FLAGS_NEED_CSUM);
if(segments->seg_len < sizeof(mca_pml_dr_common_hdr_t)) {
MCA_PML_DR_DEBUG(0,(0, "%s:%d: wtf? segments->seg_len:%d < sizeof(mca_pml_dr_common_hdr_t):%lu\n",
__FILE__, __LINE__, segments->seg_len, (unsigned long)sizeof(mca_pml_dr_common_hdr_t)));
return;
}
MCA_PML_DR_DEBUG(0,(0, "%s:%d: got a hdr of type %d\n",
__FILE__, __LINE__, hdr->hdr_common.hdr_type));
switch(hdr->hdr_common.hdr_type) {
case MCA_PML_DR_HDR_TYPE_MATCH:
{
if(do_csum) {
csum = (uint16_t)opal_csum(hdr, sizeof(mca_pml_dr_match_hdr_t));
if(hdr->hdr_common.hdr_csum != csum) {
MCA_PML_DR_DEBUG(0,(0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n",
__FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum));
return;
}
}
if(hdr->hdr_common.hdr_dst != mca_pml_dr.my_rank ) {
MCA_PML_DR_DEBUG(0, (0, "%s:%d: misdelivered packet [rank %d -> rank %d]\n",
__FILE__, __LINE__, hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst));
return;
}
ep = (mca_pml_dr_endpoint_t*)opal_pointer_array_get_item(&mca_pml_dr.endpoints, hdr->hdr_common.hdr_src);
assert(ep != NULL);
if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs, hdr->hdr_common.hdr_vid)) {
MCA_PML_DR_DEBUG(0,(0, "%s:%d: got a duplicate vfrag vfrag id %d\n",
__FILE__, __LINE__, hdr->hdr_common.hdr_vid));
mca_pml_dr_recv_frag_ack(btl,
ep->bml_endpoint,
&hdr->hdr_common,
hdr->hdr_match.hdr_src_ptr.pval,
1, 0);
return;
}
ompi_comm = ompi_comm_lookup(hdr->hdr_common.hdr_ctx);
if(NULL == ompi_comm ) {
ORTE_OUTPUT((0, "%s:%d: invalid communicator %d\n",
__FILE__, __LINE__, hdr->hdr_common.hdr_ctx));
return;
}
comm = (mca_pml_dr_comm_t*)ompi_comm->c_pml_comm;
assert(hdr->hdr_common.hdr_src < opal_pointer_array_get_size(&comm->sparse_procs));
proc = (mca_pml_dr_comm_proc_t*)opal_pointer_array_get_item(&comm->sparse_procs, hdr->hdr_common.hdr_src);
assert(proc != NULL);
assert(ep == proc->pml_endpoint);
mca_pml_dr_recv_frag_match(comm,proc,btl,&hdr->hdr_match,segments,des->des_dst_cnt);
break;
}
case MCA_PML_DR_HDR_TYPE_MATCH_ACK:
{
MCA_PML_DR_HDR_VALIDATE_ACK(do_csum, hdr, mca_pml_dr_ack_hdr_t);
mca_pml_dr_send_request_match_ack(btl, &hdr->hdr_ack);
break;
}
case MCA_PML_DR_HDR_TYPE_RNDV:
{
if(do_csum) {
csum = (uint16_t)opal_csum(hdr, sizeof(mca_pml_dr_rendezvous_hdr_t));
if(hdr->hdr_common.hdr_csum != csum) {
MCA_PML_DR_DEBUG(0, (0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n",
__FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum));
return;
}
}
if(hdr->hdr_common.hdr_dst != mca_pml_dr.my_rank ) {
MCA_PML_DR_DEBUG(0, (0, "%s:%d: misdelivered packet [rank %d -> rank %d]\n",
__FILE__, __LINE__, hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst));
return;
}
ep = (mca_pml_dr_endpoint_t*)opal_pointer_array_get_item(&mca_pml_dr.endpoints, hdr->hdr_common.hdr_src);
assert(ep != NULL);
/* seq_recvs protected by matching lock */
if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs, hdr->hdr_common.hdr_vid)){
mca_pml_dr_recv_request_t* recvreq;
ompi_comm = ompi_comm_lookup(hdr->hdr_common.hdr_ctx);
if(NULL == ompi_comm) {
if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs_matched, hdr->hdr_common.hdr_vid)) {
MCA_PML_DR_DEBUG(0, (0, "%s:%d: acking duplicate matched rendezvous from sequence tracker\n",
__FILE__, __LINE__));
mca_pml_dr_recv_frag_ack(btl,
ep->bml_endpoint,
&hdr->hdr_common,
hdr->hdr_match.hdr_src_ptr.pval,
~(uint64_t) 0, hdr->hdr_rndv.hdr_msg_length);
return;
} else {
ORTE_OUTPUT((0, "%s:%d: the world as we know it is bad\n", __FILE__, __LINE__));
orte_errmgr.abort(-1, NULL);
}
}
comm = (mca_pml_dr_comm_t*)ompi_comm->c_pml_comm;
assert(hdr->hdr_common.hdr_src < opal_pointer_array_get_size(&comm->sparse_procs));
proc = (mca_pml_dr_comm_proc_t*)opal_pointer_array_get_item(&comm->sparse_procs, hdr->hdr_common.hdr_src);
assert(proc != NULL);
assert(ep == proc->pml_endpoint);
/* ack only if the vfrag has been matched */
recvreq =
mca_pml_dr_comm_proc_check_matched(proc, hdr->hdr_common.hdr_vid);
if(NULL != recvreq) {
MCA_PML_DR_DEBUG(0,(0, "%s:%d: acking duplicate matched rendezvous from pending matched vfrag id %d\n",
__FILE__, __LINE__, hdr->hdr_common.hdr_vid));
mca_pml_dr_recv_request_ack(btl, recvreq, &hdr->hdr_common,
hdr->hdr_match.hdr_src_ptr, recvreq->req_bytes_received, 1);
} else {
if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs_matched, hdr->hdr_common.hdr_vid)) {
MCA_PML_DR_DEBUG(0,(0, "%s:%d: acking duplicate matched rendezvous from sequence tracker\n", __FILE__, __LINE__));
mca_pml_dr_recv_frag_ack(btl,
ep->bml_endpoint,
&hdr->hdr_common,
hdr->hdr_match.hdr_src_ptr.pval,
~(uint64_t) 0, hdr->hdr_rndv.hdr_msg_length);
} else {
MCA_PML_DR_DEBUG(0,(0, "%s:%d: droping duplicate unmatched rendezvous\n", __FILE__, __LINE__));
}
}
} else {
ompi_comm = ompi_comm_lookup(hdr->hdr_common.hdr_ctx);
if(NULL == ompi_comm) {
ORTE_OUTPUT((0, "%s:%d: the world as we know it is bad\n", __FILE__, __LINE__));
orte_errmgr.abort(-1, NULL);
}
comm = (mca_pml_dr_comm_t*)ompi_comm->c_pml_comm;
assert(hdr->hdr_common.hdr_src < opal_pointer_array_get_size(&comm->sparse_procs));
proc = (mca_pml_dr_comm_proc_t*)opal_pointer_array_get_item(&comm->sparse_procs, hdr->hdr_common.hdr_src);
assert(proc != NULL);
assert(ep == proc->pml_endpoint);
mca_pml_dr_recv_frag_match(comm,proc,btl,&hdr->hdr_match,segments,des->des_dst_cnt);
}
break;
}
case MCA_PML_DR_HDR_TYPE_RNDV_ACK:
{
MCA_PML_DR_HDR_VALIDATE_ACK(do_csum, hdr, mca_pml_dr_ack_hdr_t);
mca_pml_dr_send_request_rndv_ack(btl, &hdr->hdr_ack);
break;
}
case MCA_PML_DR_HDR_TYPE_FRAG:
{
mca_pml_dr_recv_request_t* recvreq;
if(do_csum) {
csum = (uint16_t)opal_csum(hdr, sizeof(mca_pml_dr_frag_hdr_t));
if(hdr->hdr_common.hdr_csum != csum) {
MCA_PML_DR_DEBUG(0,(0, "%s:%d: invalid header checksum: 0x%04x != 0x%04x\n",
__FILE__, __LINE__, hdr->hdr_common.hdr_csum, csum));
return;
}
}
if(hdr->hdr_common.hdr_dst != mca_pml_dr.my_rank ) {
MCA_PML_DR_DEBUG(0,(0, "%s:%d: misdelivered packet [rank %d -> rank %d]\n",
__FILE__, __LINE__, hdr->hdr_common.hdr_src, hdr->hdr_common.hdr_dst));
return;
}
ep = (mca_pml_dr_endpoint_t*)opal_pointer_array_get_item(&mca_pml_dr.endpoints, hdr->hdr_common.hdr_src);
assert(ep != NULL);
/* seq_recvs protected by matching lock */
if(ompi_seq_tracker_check_duplicate(&ep->seq_recvs, hdr->hdr_common.hdr_vid)) {
MCA_PML_DR_DEBUG(0,(0, "%s:%d: acking duplicate fragment\n", __FILE__, __LINE__));
mca_pml_dr_recv_frag_ack(btl,
ep->bml_endpoint,
&hdr->hdr_common,
hdr->hdr_frag.hdr_src_ptr.pval,
~(uint64_t) 0, 0);
} else {
ompi_comm = ompi_comm_lookup(hdr->hdr_common.hdr_ctx);
if(NULL == ompi_comm) {
MCA_PML_DR_DEBUG(0,(0, "%s:%d: the world as we know it is bad\n", __FILE__, __LINE__));
orte_errmgr.abort(-1, NULL);
}
comm = (mca_pml_dr_comm_t*)ompi_comm->c_pml_comm;
assert(hdr->hdr_common.hdr_src < opal_pointer_array_get_size(&comm->sparse_procs));
proc = (mca_pml_dr_comm_proc_t*)opal_pointer_array_get_item(&comm->sparse_procs, hdr->hdr_common.hdr_src);
assert(proc != NULL);
assert(ep == proc->pml_endpoint);
recvreq = (mca_pml_dr_recv_request_t*)hdr->hdr_frag.hdr_dst_ptr.pval;
mca_pml_dr_recv_request_progress(recvreq,btl,segments,des->des_dst_cnt);
}
break;
}
case MCA_PML_DR_HDR_TYPE_FRAG_ACK:
{
MCA_PML_DR_HDR_VALIDATE_ACK(do_csum, hdr, mca_pml_dr_ack_hdr_t);
mca_pml_dr_send_request_frag_ack(btl, &hdr->hdr_ack);
break;
}
default:
MCA_PML_DR_DEBUG(0,(0, "%s:%d: dropping unknown header type\n", __FILE__,__LINE__));
break;
}
}
/**
* Try and match the incoming message fragment to the list of
* "wild" receives
*
* @param hdr Matching data from recived fragment (IN)
*
* @param pml_comm Pointer to the communicator structure used for
* matching purposes. (IN)
*
* @return Matched receive
*
* This routine assumes that the appropriate matching locks are
* set by the upper level routine.
*/
#define MCA_PML_DR_CHECK_WILD_RECEIVES_FOR_MATCH(hdr,comm,proc,return_match) \
do { \
/* local parameters */ \
opal_list_t* wild_receives = &comm->wild_receives; \
mca_pml_dr_recv_request_t *wild_recv; \
int frag_tag,recv_tag; \
\
/* initialization */ \
frag_tag=hdr->hdr_tag; \
\
/* \
* Loop over the wild irecvs - no need to lock, the upper level \
* locking is protecting from having other threads trying to \
* change this list. \
*/ \
for(wild_recv = (mca_pml_dr_recv_request_t *) \
opal_list_get_first(wild_receives); \
wild_recv != (mca_pml_dr_recv_request_t *) \
opal_list_get_end(wild_receives); \
wild_recv = (mca_pml_dr_recv_request_t *) \
((opal_list_item_t *)wild_recv)->opal_list_next) { \
\
recv_tag = wild_recv->req_recv.req_base.req_tag; \
if ( \
/* exact tag match */ \
(frag_tag == recv_tag) || \
/* wild tag match - negative tags (except for \
* OMPI_ANY_TAG) are reserved for internal use, and will \
* not be matched with OMPI_ANY_TAG */ \
( (recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) \
\
{ \
/* \
* Mark that this is the matching irecv, and go to process it. \
*/ \
return_match = wild_recv; \
\
/* remove this irecv from the postd wild ireceive list */ \
opal_list_remove_item(wild_receives, \
(opal_list_item_t *)wild_recv); \
\
/* found match - no need to continue */ \
break; \
} \
} \
} while(0)
/**
* Try and match the incoming message fragment to the list of
* "specific" receives
*
* @param hdr Matching data from recived fragment (IN)
*
* @param comm Pointer to the communicator structure used for
* matching purposes. (IN)
*
* @return Matched receive
*
* This routine assumes that the appropriate matching locks are
* set by the upper level routine.
*/
#define MCA_PML_DR_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr,comm,proc,return_match) \
do { \
/* local variables */ \
opal_list_t* specific_receives = &proc->specific_receives; \
mca_pml_dr_recv_request_t *specific_recv; \
int recv_tag,frag_tag; \
\
/* initialization */ \
frag_tag=hdr->hdr_tag; \
\
/* \
* Loop over the specific irecvs. \
*/ \
for(specific_recv = (mca_pml_dr_recv_request_t *) \
opal_list_get_first(specific_receives); \
specific_recv != (mca_pml_dr_recv_request_t *) \
opal_list_get_end(specific_receives); \
specific_recv = (mca_pml_dr_recv_request_t *) \
((opal_list_item_t *)specific_recv)->opal_list_next) { \
/* \
* Check for a match \
*/ \
recv_tag = specific_recv->req_recv.req_base.req_tag; \
if ( (frag_tag == recv_tag) || \
( (recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) { \
\
/* \
* Match made \
*/ \
return_match = specific_recv; \
\
/* remove descriptor from posted specific ireceive list */ \
opal_list_remove_item(specific_receives, \
(opal_list_item_t *)specific_recv); \
\
break; \
} \
} \
} while(0)
/**
* Try and match the incoming message fragment to the list of
* "wild" receives and "specific" receives. Used when both types
* of receives have been posted, i.e. when we need to coordinate
* between multiple lists to make sure ordered delivery occurs.
*
* @param hdr Matching data from recived fragment (IN)
*
* @param comm Pointer to the communicator structure used for
* matching purposes. (IN)
*
* @return Matched receive
*
* This routine assumes that the appropriate matching locks are
* set by the upper level routine.
*/
#define MCA_PML_DR_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH( \
hdr,comm,proc,return_match) \
do { \
/* local variables */ \
mca_pml_dr_recv_request_t *specific_recv, *wild_recv; \
mca_pml_sequence_t wild_recv_seq, specific_recv_seq; \
int frag_tag, wild_recv_tag, specific_recv_tag; \
\
/* initialization */ \
frag_tag=hdr->hdr_tag; \
\
/* \
* We know that when this is called, both specific and wild irecvs \
* have been posted. \
*/ \
specific_recv = (mca_pml_dr_recv_request_t *) \
opal_list_get_first(&(proc)->specific_receives); \
wild_recv = (mca_pml_dr_recv_request_t *) \
opal_list_get_first(&comm->wild_receives); \
\
specific_recv_seq = specific_recv->req_recv.req_base.req_sequence; \
wild_recv_seq = wild_recv->req_recv.req_base.req_sequence; \
\
while (true) { \
if (wild_recv_seq < specific_recv_seq) { \
/* \
* wild recv is earlier than the specific one. \
*/ \
/* \
* try and match \
*/ \
wild_recv_tag = wild_recv->req_recv.req_base.req_tag; \
if ( (frag_tag == wild_recv_tag) || \
( (wild_recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) { \
/* \
* Match made \
*/ \
return_match=wild_recv; \
\
/* remove this recv from the wild receive queue */ \
opal_list_remove_item(&comm->wild_receives, \
(opal_list_item_t *)wild_recv); \
break; \
} \
\
/* \
* No match, go to the next. \
*/ \
wild_recv=(mca_pml_dr_recv_request_t *) \
((opal_list_item_t *)wild_recv)->opal_list_next; \
\
/* \
* If that was the last wild one, just look at the \
* rest of the specific ones. \
*/ \
if (wild_recv == (mca_pml_dr_recv_request_t *) \
opal_list_get_end(&comm->wild_receives) ) \
{ \
MCA_PML_DR_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, return_match); \
break; \
} \
\
/* \
* Get the sequence number for this recv, and go \
* back to the top of the loop. \
*/ \
wild_recv_seq = wild_recv->req_recv.req_base.req_sequence; \
\
} else { \
/* \
* specific recv is earlier than the wild one. \
*/ \
specific_recv_tag=specific_recv->req_recv.req_base.req_tag; \
if ( (frag_tag == specific_recv_tag) || \
( (specific_recv_tag == OMPI_ANY_TAG) && (0<=frag_tag)) ) \
{ \
/* \
* Match made \
*/ \
return_match = specific_recv; \
/* remove descriptor from specific receive list */ \
opal_list_remove_item(&(proc)->specific_receives, \
(opal_list_item_t *)specific_recv); \
break; \
} \
\
/* \
* No match, go on to the next specific irecv. \
*/ \
specific_recv = (mca_pml_dr_recv_request_t *) \
((opal_list_item_t *)specific_recv)->opal_list_next; \
\
/* \
* If that was the last specific irecv, process the \
* rest of the wild ones. \
*/ \
if (specific_recv == (mca_pml_dr_recv_request_t *) \
opal_list_get_end(&(proc)->specific_receives)) \
{ \
MCA_PML_DR_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, return_match); \
break; \
} \
/* \
* Get the sequence number for this recv, and go \
* back to the top of the loop. \
*/ \
specific_recv_seq = specific_recv->req_recv.req_base.req_sequence; \
} \
} \
} while(0)
/*
* Specialized matching routines for internal use only.
*/
static bool mca_pml_dr_check_cantmatch_for_match(
opal_list_t *additional_matches,
mca_pml_dr_comm_t* comm,
mca_pml_dr_comm_proc_t *proc);
/**
* RCS/CTS receive side matching
*
* @param hdr list of parameters needed for matching
* This list is also embeded in frag,
* but this allows to save a memory copy when
* a match is made in this routine. (IN)
* @param frag pointer to receive fragment which we want
* to match (IN/OUT). If a match is not made,
* hdr is copied to frag.
* @param match_made parameter indicating if we matched frag/
* hdr (OUT)
* @param additional_matches if a match is made with frag, we
* may be able to match fragments that previously
* have arrived out-of-order. If this is the
* case, the associated fragment descriptors are
* put on this list for further processing. (OUT)
*
* @return OMPI error code
*
* This routine is used to try and match a newly arrived message fragment
* to pre-posted receives. The following assumptions are made
* - fragments are received out of order
* - for long messages, e.g. more than one fragment, a RTS/CTS algorithm
* is used.
* - 2nd and greater fragments include a receive descriptor pointer
* - fragments may be dropped
* - fragments may be corrupt
* - this routine may be called simultaneously by more than one thread
*/
bool mca_pml_dr_recv_frag_match(
mca_pml_dr_comm_t* comm,
mca_pml_dr_comm_proc_t *proc,
mca_btl_base_module_t *btl,
mca_pml_dr_match_hdr_t *hdr,
mca_btl_base_segment_t* segments,
size_t num_segments)
{
/* local variables */
uint16_t next_msg_seq_expected, frag_msg_seq;
mca_pml_dr_recv_request_t *match = NULL;
bool additional_match=false;
opal_list_t additional_matches;
ompi_proc_t* ompi_proc = proc->ompi_proc;
int rc;
uint32_t csum;
mca_pml_dr_endpoint_t* ep = (mca_pml_dr_endpoint_t*) proc->pml_endpoint;
bool do_csum = mca_pml_dr.enable_csum &&
(btl->btl_flags & MCA_BTL_FLAGS_NEED_CSUM);
/* source sequence number */
frag_msg_seq = hdr->hdr_seq;
/* get next expected message sequence number - if threaded
* run, lock to make sure that if another thread is processing
* a frag from the same message a match is made only once.
* Also, this prevents other posted receives (for a pair of
* end points) from being processed, and potentially "loosing"
* the fragment.
*/
OPAL_THREAD_LOCK(&comm->matching_lock);
/* get sequence number of next message that can be processed */
next_msg_seq_expected = (uint16_t)proc->expected_sequence;
if (frag_msg_seq == next_msg_seq_expected) {
/*
* This is the sequence number we were expecting,
* so we can try matching it to already posted
* receives.
*/
/* We're now expecting the next sequence number. */
(proc->expected_sequence)++;
rematch:
/*
* figure out what sort of matching logic to use, if need to
* look only at "specific" receives, or "wild" receives,
* or if we need to traverse both sets at the same time.
*/
if (opal_list_get_size(&proc->specific_receives) == 0 ){
/*
* There are only wild irecvs, so specialize the algorithm.
*/
MCA_PML_DR_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
} else if (opal_list_get_size(&comm->wild_receives) == 0 ) {
/*
* There are only specific irecvs, so specialize the algorithm.
*/
MCA_PML_DR_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
} else {
/*
* There are some of each.
*/
MCA_PML_DR_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
}
/* if match found, process data */
if (match) {
/*
* update delivered sequence number information, if needed.
*/
MCA_PML_DR_RECV_REQUEST_MATCHED(match,comm,proc,hdr);
if( (match->req_recv.req_base.req_type == MCA_PML_REQUEST_PROBE) ) {
/* complete the probe */
mca_pml_dr_recv_request_matched_probe(match,btl,segments,num_segments);
/* retry the matchh */
match = NULL;
goto rematch;
}
} else {
/* if no match found, verify csum, if pass place on unexpected queue */
mca_pml_dr_recv_frag_t* frag;
MCA_PML_DR_RECV_FRAG_ALLOC(frag, rc);
if(OMPI_SUCCESS != rc) {
OPAL_THREAD_UNLOCK(&comm->matching_lock);
return false;
}
MCA_PML_DR_RECV_FRAG_INIT(frag,ompi_proc,hdr,segments,num_segments,btl,csum);
if(do_csum && csum != hdr->hdr_csum) {
mca_pml_dr_recv_frag_ack(btl,
(mca_bml_base_endpoint_t*)ompi_proc->proc_bml,
&hdr->hdr_common, hdr->hdr_src_ptr.pval, 0, 0);
MCA_PML_DR_DEBUG(0,(0, "%s:%d: received corrupted data 0x%08x != 0x%08x (segments %lu length %lu)\n",
__FILE__, __LINE__, csum, hdr->hdr_csum, (unsigned long)num_segments,
(unsigned long)(segments[0].seg_len - mca_pml_dr_hdr_size(hdr->hdr_common.hdr_type))));
MCA_PML_DR_RECV_FRAG_RETURN(frag);
OPAL_THREAD_UNLOCK(&comm->matching_lock);
return false;
}
opal_list_append( &proc->unexpected_frags, (opal_list_item_t *)frag );
}
/*
* Now that new message has arrived, check to see if
* any fragments on the c_c_frags_cant_match list
* may now be used to form new matchs
*/
if (0 < opal_list_get_size(&proc->frags_cant_match)) {
additional_match = mca_pml_dr_check_cantmatch_for_match(&additional_matches,comm,proc);
}
} else {
/*
* This message comes after the next expected, so it
* is ahead of sequence. If passes csum save it for later.
*/
mca_pml_dr_recv_frag_t* frag;
MCA_PML_DR_RECV_FRAG_ALLOC(frag, rc);
if(OMPI_SUCCESS != rc) {
OPAL_THREAD_UNLOCK(&comm->matching_lock);
return false;
}
MCA_PML_DR_RECV_FRAG_INIT(frag,ompi_proc,hdr,segments,num_segments,btl,csum);
if(do_csum && csum != hdr->hdr_csum) {
mca_pml_dr_recv_frag_ack(btl,
(mca_bml_base_endpoint_t*)ompi_proc->proc_bml,
&hdr->hdr_common, hdr->hdr_src_ptr.pval, 0, 0);
MCA_PML_DR_DEBUG(0,(0, "%s:%d: received corrupted data 0x%08x != 0x%08x\n",
__FILE__, __LINE__, csum, hdr->hdr_csum));
MCA_PML_DR_RECV_FRAG_RETURN(frag);
OPAL_THREAD_UNLOCK(&comm->matching_lock);
return false;
}
opal_list_append(&proc->frags_cant_match, (opal_list_item_t *)frag);
}
ompi_seq_tracker_insert(&ep->seq_recvs, hdr->hdr_common.hdr_vid);
OPAL_THREAD_UNLOCK(&comm->matching_lock);
/* release matching lock before processing fragment */
if(match != NULL) {
mca_pml_dr_recv_request_progress(match,btl,segments,num_segments);
}
/* if buffered a short message - go ahead and ack */
else if (hdr->hdr_common.hdr_type == MCA_PML_DR_HDR_TYPE_MATCH) {
MCA_PML_DR_DEBUG(1,(0, "%s:%d: received short message, acking now vfrag id: %d\n",
__FILE__, __LINE__, hdr->hdr_common.hdr_vid));
mca_pml_dr_recv_frag_ack(btl,
(mca_bml_base_endpoint_t*)ompi_proc->proc_bml,
&hdr->hdr_common, hdr->hdr_src_ptr.pval, 1, 0);
}
if(additional_match) {
opal_list_item_t* item;
while(NULL != (item = opal_list_remove_first(&additional_matches))) {
mca_pml_dr_recv_frag_t* frag = (mca_pml_dr_recv_frag_t*)item;
mca_pml_dr_recv_request_progress(frag->request,frag->btl,frag->segments,frag->num_segments);
MCA_PML_DR_RECV_FRAG_RETURN(frag);
}
}
return (match != NULL);
}
void mca_pml_dr_recv_frag_ack(
mca_btl_base_module_t* btl,
mca_bml_base_endpoint_t* endpoint,
mca_pml_dr_common_hdr_t* hdr,
void *src_ptr,
uint64_t mask,
uint16_t len)
{
mca_btl_base_descriptor_t* des;
mca_bml_base_btl_t* bml_btl;
mca_pml_dr_recv_frag_t* frag;
mca_pml_dr_ack_hdr_t* ack;
int rc;
bool do_csum;
/* use the same BTL for ACK's makes failover SANE */
bml_btl = mca_bml_base_btl_array_find(&endpoint->btl_eager,
btl);
do_csum = mca_pml_dr.enable_csum &&
(bml_btl->btl_flags & MCA_BTL_FLAGS_NEED_CSUM);
/* allocate descriptor */
mca_bml_base_alloc(bml_btl, &des, MCA_BTL_NO_ORDER,
sizeof(mca_pml_dr_ack_hdr_t), MCA_BTL_DES_FLAGS_PRIORITY);
if(NULL == des) {
goto retry;
}
/* fill out header */
ack = (mca_pml_dr_ack_hdr_t*)des->des_src->seg_addr.pval;
ack->hdr_common.hdr_type = MCA_PML_DR_HDR_TYPE_ACK | hdr->hdr_type;
ack->hdr_common.hdr_flags = 0;
ack->hdr_common.hdr_src = hdr->hdr_dst;
ack->hdr_common.hdr_dst = hdr->hdr_src;
ack->hdr_common.hdr_vid = hdr->hdr_vid;
ack->hdr_common.hdr_ctx = hdr->hdr_ctx;
ack->hdr_vlen = len;
ack->hdr_vmask = mask;
ack->hdr_src_ptr.pval = src_ptr;
assert(ack->hdr_src_ptr.pval);
ack->hdr_dst_ptr.pval = NULL;
ack->hdr_common.hdr_csum = (uint16_t)(do_csum ?
opal_csum(ack, sizeof(mca_pml_dr_ack_hdr_t)) :
OPAL_CSUM_ZERO);
/* initialize descriptor */
des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY;
des->des_cbfunc = mca_pml_dr_ctl_completion;
MCA_PML_DR_DEBUG(1,(0, "%s:%d: sending ack, vfrag ID %d",
__FILE__, __LINE__, hdr->hdr_vid));
rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML);
if(rc != OMPI_SUCCESS) {
mca_bml_base_free(bml_btl, des);
goto retry;
}
return;
/* queue request to retry later */
retry:
MCA_PML_DR_RECV_FRAG_ALLOC(frag,rc);
/* frag->hdr.hdr_match = *hdr; */
frag->num_segments = 0;
opal_list_append(&mca_pml_dr.acks_pending, (opal_list_item_t*)frag);
}
/**
* Scan the list of frags that came in ahead of time to see if any
* can be processed at this time. If they can, try and match the
* frags.
*
* @param additional_matches List to hold new matches with fragments
* from the c_frags_cant_match list. (IN/OUT)
*
* @param pml_comm Pointer to the communicator structure used for
* matching purposes. (IN)
*
* This routine assumes that the appropriate matching locks are
* set by the upper level routine.
*/
static bool mca_pml_dr_check_cantmatch_for_match(
opal_list_t *additional_matches,
mca_pml_dr_comm_t* comm,
mca_pml_dr_comm_proc_t *proc)
{
/* local parameters */
int match_found;
uint16_t next_msg_seq_expected, frag_seq;
mca_pml_dr_recv_frag_t *frag;
bool match_made = false;
/*
* Loop over all the out of sequence messages. No ordering is assumed
* in the c_frags_cant_match list.
*/
match_found = 1;
while ((0 < opal_list_get_size(&proc->frags_cant_match)) && match_found) {
/* initialize match flag for this search */
match_found = 0;
/* get sequence number of next message that can be processed */
next_msg_seq_expected = proc->expected_sequence;
/* search the list for a fragment from the send with sequence
* number next_msg_seq_expected
*/
for(frag = (mca_pml_dr_recv_frag_t *)
opal_list_get_first(&proc->frags_cant_match);
frag != (mca_pml_dr_recv_frag_t *)
opal_list_get_end(&proc->frags_cant_match);
frag = (mca_pml_dr_recv_frag_t *)
opal_list_get_next(frag))
{
/*
* If the message has the next expected seq from that proc...
*/
frag_seq=frag->hdr.hdr_match.hdr_seq;
if (frag_seq == next_msg_seq_expected) {
mca_pml_dr_recv_request_t *match = NULL;
mca_pml_dr_match_hdr_t* hdr = &frag->hdr.hdr_match;
/* We're now expecting the next sequence number. */
(proc->expected_sequence)++;
/* signal that match was made */
match_found = 1;
/*
* remove frag from list
*/
opal_list_remove_item(&proc->frags_cant_match,
(opal_list_item_t *)frag);
rematch:
/*
* figure out what sort of matching logic to use, if need to
* look only at "specific" receives, or "wild" receives,
* or if we need to traverse both sets at the same time.
*/
proc = (mca_pml_dr_comm_proc_t*)opal_pointer_array_get_item(&comm->sparse_procs,
hdr->hdr_common.hdr_src);
if (opal_list_get_size(&proc->specific_receives) == 0 ) {
/*
* There are only wild irecvs, so specialize the algorithm.
*/
MCA_PML_DR_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
} else if (opal_list_get_size(&comm->wild_receives) == 0 ) {
/*
* There are only specific irecvs, so specialize the algorithm.
*/
MCA_PML_DR_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
} else {
/*
* There are some of each.
*/
MCA_PML_DR_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match);
}
/* if match found, process data */
if (match) {
/*
* If this was a probe need to queue fragment on unexpected list
*/
if( (match->req_recv.req_base.req_type == MCA_PML_REQUEST_PROBE) ) {
/* complete the probe */
mca_pml_dr_recv_request_matched_probe(match,frag->btl,frag->segments,frag->num_segments);
/* retry the match */
match = NULL;
goto rematch;
} else {
/* associate the receive descriptor with the fragment
* descriptor */
frag->request=match;
match->req_proc = proc;
match->req_endpoint = (mca_pml_dr_endpoint_t*)proc->ompi_proc->proc_pml;
MCA_PML_DR_DEBUG(10, (0, "%s:%d: adding endpoint %p match %p\n",
__FILE__, __LINE__, (void*)proc->ompi_proc->proc_pml, (void*)match->req_endpoint));
/* add this fragment descriptor to the list of
* descriptors to be processed later
*/
if(match_made == false) {
match_made = true;
OBJ_CONSTRUCT(additional_matches, opal_list_t);
}
MCA_PML_DR_RECV_REQUEST_MATCHED(match,comm,proc,&frag->hdr.hdr_match);
opal_list_append(additional_matches, (opal_list_item_t *)frag);
}
} else {
/* if no match found, place on unexpected queue */
opal_list_append( &proc->unexpected_frags, (opal_list_item_t *)frag);
}
/* c_frags_cant_match is not an ordered list, so exit loop
* and re-start search for next sequence number */
break;
} /* end if (frag_seq == next_msg_seq_expected) */
} /* end for (frag) loop */
} /* end while loop */
return match_made;
}