1
1
openmpi/ompi/mca/pml/teg/pml_teg_recvreq.c
George Bosilca d916e0c5b4 The (I hope) final solution for the convertor problem. As all the PML inherit
the base send and receive request from the pml_base, we can solve our problem
if we construct the convertor attached to any request in the pml_base_construct
function. At the end of the life time for each request (here life time is 
related to one utilisation, without taking in account the cache) we release
all information attached to the convertors in the _FINI macro by calling the
ompi_convertor_cleanup.

This commit was SVN r7910.
2005-10-28 03:26:36 +00:00

279 строки
11 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include "mca/pml/pml.h"
#include "mca/ptl/ptl.h"
#include "mca/ptl/base/ptl_base_comm.h"
#include "pml_teg_recvreq.h"
#include "pml_teg_recvfrag.h"
#include "pml_teg_sendreq.h"
static mca_ptl_base_recv_frag_t* mca_pml_teg_recv_request_match_specific_proc(
mca_ptl_base_recv_request_t* request, int proc);
static int mca_pml_teg_recv_request_fini(struct ompi_request_t** request)
{
MCA_PML_TEG_FINI(request);
return OMPI_SUCCESS;
}
static int mca_pml_teg_recv_request_free(struct ompi_request_t** request)
{
MCA_PML_TEG_FREE(request);
return OMPI_SUCCESS;
}
static int mca_pml_teg_recv_request_cancel(struct ompi_request_t* request, int complete)
{
mca_pml_base_request_t* teg_request = (mca_pml_base_request_t*)request;
ompi_communicator_t* ompi_comm = teg_request->req_comm;
mca_pml_ptl_comm_t* pml_comm = (mca_pml_ptl_comm_t*)ompi_comm->c_pml_comm;
if( true == request->req_complete ) { /* way to late to cancel this one */
return OMPI_SUCCESS;
}
/* The rest should be protected behind the match logic lock */
OPAL_THREAD_LOCK(&pml_comm->c_matching_lock);
if( OMPI_ANY_TAG == request->req_status.MPI_TAG ) { /* the match have not been already done */
if( teg_request->req_peer == OMPI_ANY_SOURCE ) {
opal_list_remove_item( &(pml_comm->c_wild_receives),
(opal_list_item_t*)request );
} else {
opal_list_remove_item( pml_comm->c_specific_receives + teg_request->req_peer,
(opal_list_item_t*)request );
}
}
OPAL_THREAD_UNLOCK(&pml_comm->c_matching_lock);
request->req_status._cancelled = true;
request->req_complete = true; /* mark it as completed so all the test/wait functions
* on this particular request will finish */
/* Now we have a problem if we are in a multi-threaded environment. We shou ld
* broadcast the condition on the request in order to allow the other threa ds
* to complete their test/wait functions.
*/
if(ompi_request_waiting) {
opal_condition_broadcast(&ompi_request_cond);
}
return OMPI_SUCCESS;
}
static void mca_pml_teg_recv_request_construct(mca_ptl_base_recv_request_t* request)
{
request->req_recv.req_base.req_type = MCA_PML_REQUEST_RECV;
request->req_recv.req_base.req_ompi.req_fini = mca_pml_teg_recv_request_fini;
request->req_recv.req_base.req_ompi.req_free = mca_pml_teg_recv_request_free;
request->req_recv.req_base.req_ompi.req_cancel = mca_pml_teg_recv_request_cancel;
}
static void mca_pml_teg_recv_request_destruct(mca_ptl_base_recv_request_t* request)
{
}
OBJ_CLASS_INSTANCE(
mca_pml_teg_recv_request_t,
mca_ptl_base_recv_request_t,
mca_pml_teg_recv_request_construct,
mca_pml_teg_recv_request_destruct);
/*
* Update the recv request status to reflect the number of bytes
* received and actually delivered to the application.
*/
void mca_pml_teg_recv_request_progress(
struct mca_ptl_base_module_t* ptl,
mca_ptl_base_recv_request_t* req,
size_t bytes_received,
size_t bytes_delivered)
{
OPAL_THREAD_LOCK(&ompi_request_lock);
req->req_bytes_received += bytes_received;
req->req_bytes_delivered += bytes_delivered;
if (req->req_bytes_received >= req->req_recv.req_bytes_packed) {
/* initialize request status */
req->req_recv.req_base.req_ompi.req_status._count = req->req_bytes_delivered;
req->req_recv.req_base.req_pml_complete = true;
req->req_recv.req_base.req_ompi.req_complete = true;
if(ompi_request_waiting) {
opal_condition_broadcast(&ompi_request_cond);
}
}
OPAL_THREAD_UNLOCK(&ompi_request_lock);
}
/*
* This routine is used to match a posted receive when the source process
* is specified.
*/
void mca_pml_teg_recv_request_match_specific(mca_ptl_base_recv_request_t* request)
{
ompi_communicator_t *comm = request->req_recv.req_base.req_comm;
mca_pml_ptl_comm_t* pml_comm = comm->c_pml_comm;
int req_peer = request->req_recv.req_base.req_peer;
mca_ptl_base_recv_frag_t* frag;
/* check for a specific match */
OPAL_THREAD_LOCK(&pml_comm->c_matching_lock);
/* assign sequence number */
request->req_recv.req_base.req_sequence = pml_comm->c_recv_seq++;
if (opal_list_get_size(&pml_comm->c_unexpected_frags[req_peer]) > 0 &&
(frag = mca_pml_teg_recv_request_match_specific_proc(request, req_peer)) != NULL) {
mca_ptl_base_module_t* ptl = frag->frag_base.frag_owner;
/* setup pointer to ptls peer */
if(NULL == frag->frag_base.frag_peer)
frag->frag_base.frag_peer = mca_pml_teg_proc_lookup_remote_peer(comm,req_peer,ptl);
OPAL_THREAD_UNLOCK(&pml_comm->c_matching_lock);
if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) ||
(MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) {
MCA_PML_TEG_RECV_MATCHED( ptl, frag );
}
return; /* match found */
}
/* We didn't find any matches. Record this irecv so we can match
* it when the message comes in.
*/
if(request->req_recv.req_base.req_type != MCA_PML_REQUEST_IPROBE) {
opal_list_append(pml_comm->c_specific_receives+req_peer, (opal_list_item_t*)request);
}
OPAL_THREAD_UNLOCK(&pml_comm->c_matching_lock);
}
/*
* this routine is used to try and match a wild posted receive - where
* wild is determined by the value assigned to the source process
*/
void mca_pml_teg_recv_request_match_wild(mca_ptl_base_recv_request_t* request)
{
ompi_communicator_t *comm = request->req_recv.req_base.req_comm;
mca_pml_ptl_comm_t* pml_comm = comm->c_pml_comm;
int proc_count = comm->c_remote_group->grp_proc_count;
int proc;
/*
* Loop over all the outstanding messages to find one that matches.
* There is an outer loop over lists of messages from each
* process, then an inner loop over the messages from the
* process.
*/
OPAL_THREAD_LOCK(&pml_comm->c_matching_lock);
/* assign sequence number */
request->req_recv.req_base.req_sequence = pml_comm->c_recv_seq++;
for (proc = 0; proc < proc_count; proc++) {
mca_ptl_base_recv_frag_t* frag;
/* continue if no frags to match */
if (opal_list_get_size(&pml_comm->c_unexpected_frags[proc]) == 0)
continue;
/* loop over messages from the current proc */
if ((frag = mca_pml_teg_recv_request_match_specific_proc(request, proc)) != NULL) {
mca_ptl_base_module_t* ptl = frag->frag_base.frag_owner;
/* if required - setup pointer to ptls peer */
if(NULL == frag->frag_base.frag_peer)
frag->frag_base.frag_peer = mca_pml_teg_proc_lookup_remote_peer(comm,proc,ptl);
OPAL_THREAD_UNLOCK(&pml_comm->c_matching_lock);
if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) ||
(MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) {
MCA_PML_TEG_RECV_MATCHED( ptl, frag );
}
return; /* match found */
}
}
/* We didn't find any matches. Record this irecv so we can match to
* it when the message comes in.
*/
if(request->req_recv.req_base.req_type != MCA_PML_REQUEST_IPROBE)
opal_list_append(&pml_comm->c_wild_receives, (opal_list_item_t*)request);
OPAL_THREAD_UNLOCK(&pml_comm->c_matching_lock);
}
/*
* this routine tries to match a posted receive. If a match is found,
* it places the request in the appropriate matched receive list.
*/
static mca_ptl_base_recv_frag_t* mca_pml_teg_recv_request_match_specific_proc(
mca_ptl_base_recv_request_t* request, int proc)
{
mca_pml_ptl_comm_t *pml_comm = request->req_recv.req_base.req_comm->c_pml_comm;
opal_list_t* unexpected_frags = pml_comm->c_unexpected_frags+proc;
mca_ptl_base_recv_frag_t* frag;
mca_ptl_base_match_header_t* header;
int tag = request->req_recv.req_base.req_tag;
if( OMPI_ANY_TAG == tag ) {
for (frag = (mca_ptl_base_recv_frag_t*)opal_list_get_first(unexpected_frags);
frag != (mca_ptl_base_recv_frag_t*)opal_list_get_end(unexpected_frags);
frag = (mca_ptl_base_recv_frag_t*)opal_list_get_next(frag)) {
header = &(frag->frag_base.frag_header.hdr_match);
/* check first frag - we assume that process matching has been done already */
if( header->hdr_tag >= 0 ) {
goto find_fragment;
}
}
} else {
for (frag = (mca_ptl_base_recv_frag_t*)opal_list_get_first(unexpected_frags);
frag != (mca_ptl_base_recv_frag_t*)opal_list_get_end(unexpected_frags);
frag = (mca_ptl_base_recv_frag_t*)opal_list_get_next(frag)) {
header = &(frag->frag_base.frag_header.hdr_match);
/* check first frag - we assume that process matching has been done already */
if ( tag == header->hdr_tag ) {
/* we assume that the tag is correct from MPI point of view (ie. >= 0 ) */
goto find_fragment;
}
}
}
return NULL;
find_fragment:
request->req_recv.req_bytes_packed = header->hdr_msg_length;
request->req_recv.req_base.req_ompi.req_status.MPI_TAG = header->hdr_tag;
request->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = header->hdr_src;
if( !((MCA_PML_REQUEST_IPROBE == request->req_recv.req_base.req_type) ||
(MCA_PML_REQUEST_PROBE == request->req_recv.req_base.req_type)) ) {
opal_list_remove_item(unexpected_frags, (opal_list_item_t*)frag);
frag->frag_request = request;
} else {
/* it's a probe, therefore report it's completion */
mca_pml_teg_recv_request_progress( NULL, request, header->hdr_msg_length, header->hdr_msg_length );
}
return frag;
}