From e7332d05212df019380e18eab80a027af4e54a96 Mon Sep 17 00:00:00 2001 From: Tim Woodall Date: Wed, 1 Jun 2005 21:09:43 +0000 Subject: [PATCH] cleanup - support for sender side scheduling (non-rdma case) This commit was SVN r5915. --- src/mca/bmi/bmi.h | 2 +- src/mca/bmi/sm/bmi_sm.c | 30 +- src/mca/bmi/sm/bmi_sm.h | 6 +- src/mca/bmi/sm/bmi_sm_component.c | 17 +- src/mca/bmi/sm/bmi_sm_frag.c | 4 +- src/mca/pml/base/pml_base_sendreq.h | 2 +- src/mca/pml/ob1/Makefile.am | 1 - src/mca/pml/ob1/pml_ob1.c | 10 +- src/mca/pml/ob1/pml_ob1.h | 7 - src/mca/pml/ob1/pml_ob1_component.c | 4 + src/mca/pml/ob1/pml_ob1_endpoint.h | 4 +- src/mca/pml/ob1/pml_ob1_match.c | 592 --------------------------- src/mca/pml/ob1/pml_ob1_match.h | 49 --- src/mca/pml/ob1/pml_ob1_recvfrag.c | 604 +++++++++++++++++++++++++++- src/mca/pml/ob1/pml_ob1_recvfrag.h | 41 +- src/mca/pml/ob1/pml_ob1_recvreq.h | 2 + src/mca/pml/ob1/pml_ob1_sendreq.c | 89 ++-- src/mca/pml/ob1/pml_ob1_sendreq.h | 43 +- 18 files changed, 702 insertions(+), 805 deletions(-) delete mode 100644 src/mca/pml/ob1/pml_ob1_match.c delete mode 100644 src/mca/pml/ob1/pml_ob1_match.h diff --git a/src/mca/bmi/bmi.h b/src/mca/bmi/bmi.h index 6c91fcc7b5..c6571193ab 100644 --- a/src/mca/bmi/bmi.h +++ b/src/mca/bmi/bmi.h @@ -456,7 +456,7 @@ struct mca_bmi_base_module_t { /* BMI common attributes */ mca_bmi_base_component_t* bmi_component; /**< pointer back to the BMI component structure */ - size_t bmi_first_frag_size; /**< maximum size of first fragment -- eager send */ + size_t bmi_eager_limit; /**< maximum size of first fragment -- eager send */ size_t bmi_min_frag_size; /**< threshold below which the BMI will not fragment */ size_t bmi_max_frag_size; /**< maximum fragment size supported by the BMI */ uint32_t bmi_exclusivity; /**< indicates this BMI should be used exclusively */ diff --git a/src/mca/bmi/sm/bmi_sm.c b/src/mca/bmi/sm/bmi_sm.c index cbc2c2e408..309256f36f 100644 --- a/src/mca/bmi/sm/bmi_sm.c +++ b/src/mca/bmi/sm/bmi_sm.c @@ -47,7 +47,7 @@ mca_bmi_sm_t mca_bmi_sm[2] = { { { &mca_bmi_sm_component.super, - 0, /* bmi_first_frag_size */ + 0, /* bmi_eager_limit */ 0, /* bmi_min_frag_size */ 0, /* bmi_max_frag_size */ 0, /* bmi_exclusivity */ @@ -70,7 +70,7 @@ mca_bmi_sm_t mca_bmi_sm[2] = { { { &mca_bmi_sm_component.super, - 0, /* bmi_first_frag_size */ + 0, /* bmi_eager_limit */ 0, /* bmi_min_frag_size */ 0, /* bmi_max_frag_size */ 0, /* bmi_exclusivity */ @@ -489,17 +489,10 @@ int mca_bmi_sm_add_procs_same_base_addr( /* some initialization happens only the first time this routine * is called, i.e. when bmi_inited is false */ - /* initialize fragment descriptor free list */ + /* initialize fragment descriptor free lists */ - /* - * first fragment - */ - - /* allocation will be for the fragment descriptor, payload buffer, - * and padding to ensure proper alignment can be acheived */ - length=sizeof(mca_bmi_sm_frag_t)+ - mca_bmi_sm_component.fragment_alignment+ - mca_bmi_sm_component.first_fragment_size; + /* allocation will be for the fragment descriptor and payload buffer */ + length=sizeof(mca_bmi_sm_frag_t) + mca_bmi_sm_component.eager_limit; ompi_free_list_init(&mca_bmi_sm_component.sm_frags1, length, OBJ_CLASS(mca_bmi_sm_frag1_t), mca_bmi_sm_component.sm_free_list_num, @@ -507,9 +500,7 @@ int mca_bmi_sm_add_procs_same_base_addr( mca_bmi_sm_component.sm_free_list_inc, mca_bmi_sm_component.sm_mpool); /* use shared-memory pool */ - length=sizeof(mca_bmi_sm_frag_t)+ - mca_bmi_sm_component.fragment_alignment+ - mca_bmi_sm_component.max_fragment_size; + length=sizeof(mca_bmi_sm_frag_t) + mca_bmi_sm_component.max_frag_size; ompi_free_list_init(&mca_bmi_sm_component.sm_frags2, length, OBJ_CLASS(mca_bmi_sm_frag2_t), mca_bmi_sm_component.sm_free_list_num, @@ -745,7 +736,7 @@ extern mca_bmi_base_descriptor_t* mca_bmi_sm_alloc( { mca_bmi_sm_frag_t* frag; int rc; - if(size <= mca_bmi_sm_component.first_fragment_size) { + if(size <= mca_bmi_sm_component.eager_limit) { MCA_BMI_SM_FRAG_ALLOC1(frag,rc); } else { MCA_BMI_SM_FRAG_ALLOC2(frag,rc); @@ -764,7 +755,7 @@ extern int mca_bmi_sm_free( mca_bmi_base_descriptor_t* des) { mca_bmi_sm_frag_t* frag = (mca_bmi_sm_frag_t*)des; - if(frag->size <= mca_bmi_sm_component.first_fragment_size) { + if(frag->size <= mca_bmi_sm_component.eager_limit) { MCA_BMI_SM_FRAG_RETURN1(frag); } else { MCA_BMI_SM_FRAG_RETURN2(frag); @@ -790,6 +781,7 @@ struct mca_bmi_base_descriptor_t* mca_bmi_sm_prepare_src( struct iovec iov; uint32_t iov_count = 1; uint32_t max_data = *size; + int32_t free_after; int rc; MCA_BMI_SM_FRAG_ALLOC2(frag, rc); @@ -798,12 +790,12 @@ struct mca_bmi_base_descriptor_t* mca_bmi_sm_prepare_src( } if(max_data + reserve > frag->size) { - max_data = *size - reserve; + max_data = frag->size - reserve; } iov.iov_len = max_data; iov.iov_base = (unsigned char*)(frag+1) + reserve; - rc = ompi_convertor_pack(convertor, &iov, &iov_count, &max_data, NULL); + rc = ompi_convertor_pack(convertor, &iov, &iov_count, &max_data, &free_after); if(rc < 0) { MCA_BMI_SM_FRAG_RETURN2(frag); return NULL; diff --git a/src/mca/bmi/sm/bmi_sm.h b/src/mca/bmi/sm/bmi_sm.h index c0284cb6ff..541242d49a 100644 --- a/src/mca/bmi/sm/bmi_sm.h +++ b/src/mca/bmi/sm/bmi_sm.h @@ -80,10 +80,8 @@ struct mca_bmi_sm_component_t { char* sm_mpool_name; /**< name of shared memory pool module */ mca_mpool_base_module_t* sm_mpool; /**< shared memory pool */ void* sm_mpool_base; /**< base address of shared memory pool */ - size_t first_fragment_size; /**< first fragment size */ - size_t max_fragment_size; /**< maximum (second and - beyone) fragment size */ - size_t fragment_alignment; /**< fragment alignment */ + size_t eager_limit; /**< first fragment size */ + size_t max_frag_size; /**< maximum (second and beyone) fragment size */ ompi_mutex_t sm_lock; char* sm_resouce_ctl_file; /**< name of shared memory file used to coordinate resource usage */ diff --git a/src/mca/bmi/sm/bmi_sm_component.c b/src/mca/bmi/sm/bmi_sm_component.c index 48d6c438f4..9ff9a2aadd 100644 --- a/src/mca/bmi/sm/bmi_sm_component.c +++ b/src/mca/bmi/sm/bmi_sm_component.c @@ -123,13 +123,10 @@ int mca_bmi_sm_component_open(void) mca_bmi_sm_param_register_int("sm_extra_procs", -1); mca_bmi_sm_component.sm_mpool_name = mca_bmi_sm_param_register_string("mpool", "sm"); - mca_bmi_sm_component.first_fragment_size = - mca_bmi_sm_param_register_int("first_fragment_size", 1024); - mca_bmi_sm_component.max_fragment_size = - mca_bmi_sm_param_register_int("max_fragment_size", 8*1024); - mca_bmi_sm_component.fragment_alignment = - mca_bmi_sm_param_register_int("fragment_alignment", - CACHE_LINE_SIZE); + mca_bmi_sm_component.eager_limit = + mca_bmi_sm_param_register_int("eager_limit", 1024); + mca_bmi_sm_component.max_frag_size = + mca_bmi_sm_param_register_int("max_frag_size", 8*1024); mca_bmi_sm_component.size_of_cb_queue = mca_bmi_sm_param_register_int("size_of_cb_queue", 128); mca_bmi_sm_component.cb_lazy_free_freq = @@ -271,9 +268,9 @@ mca_bmi_base_module_t** mca_bmi_sm_component_init( /* set scheduling parameters */ for( i=0 ; i < 2 ; i++ ) { - mca_bmi_sm[i].super.bmi_first_frag_size=mca_bmi_sm_component.first_fragment_size; - mca_bmi_sm[i].super.bmi_min_frag_size=mca_bmi_sm_component.max_fragment_size; - mca_bmi_sm[i].super.bmi_max_frag_size=mca_bmi_sm_component.max_fragment_size; + mca_bmi_sm[i].super.bmi_eager_limit=mca_bmi_sm_component.eager_limit; + mca_bmi_sm[i].super.bmi_min_frag_size=mca_bmi_sm_component.max_frag_size; + mca_bmi_sm[i].super.bmi_max_frag_size=mca_bmi_sm_component.max_frag_size; mca_bmi_sm[i].super.bmi_exclusivity=100; /* always use this ptl */ mca_bmi_sm[i].super.bmi_latency=100; /* lowest latency */ mca_bmi_sm[i].super.bmi_bandwidth=900; /* not really used now since exclusivity is set to 100 */ diff --git a/src/mca/bmi/sm/bmi_sm_frag.c b/src/mca/bmi/sm/bmi_sm_frag.c index 133eb420a0..fbfc315a85 100644 --- a/src/mca/bmi/sm/bmi_sm_frag.c +++ b/src/mca/bmi/sm/bmi_sm_frag.c @@ -14,13 +14,13 @@ static inline void mca_bmi_sm_frag_constructor(mca_bmi_sm_frag_t* frag) static void mca_bmi_sm_frag1_constructor(mca_bmi_sm_frag_t* frag) { - frag->size = mca_bmi_sm_component.first_fragment_size; + frag->size = mca_bmi_sm_component.eager_limit; mca_bmi_sm_frag_constructor(frag); } static void mca_bmi_sm_frag2_constructor(mca_bmi_sm_frag_t* frag) { - frag->size = mca_bmi_sm_component.max_fragment_size; + frag->size = mca_bmi_sm_component.max_frag_size; mca_bmi_sm_frag_constructor(frag); } diff --git a/src/mca/pml/base/pml_base_sendreq.h b/src/mca/pml/base/pml_base_sendreq.h index 847e08a100..1fdeb27d85 100644 --- a/src/mca/pml/base/pml_base_sendreq.h +++ b/src/mca/pml/base/pml_base_sendreq.h @@ -108,7 +108,7 @@ typedef struct mca_pml_base_send_request_t mca_pml_base_send_request_t; (request)->req_base.req_datatype, \ (request)->req_base.req_count, \ (request)->req_base.req_addr, \ - -1, NULL ); \ + 0, NULL ); \ ompi_convertor_get_packed_size( &(request)->req_convertor, \ (uint32_t*)&((request)->req_bytes_packed) );\ } else { \ diff --git a/src/mca/pml/ob1/Makefile.am b/src/mca/pml/ob1/Makefile.am index 88185fc6be..e9d10b41b6 100644 --- a/src/mca/pml/ob1/Makefile.am +++ b/src/mca/pml/ob1/Makefile.am @@ -28,7 +28,6 @@ libmca_pml_ob1_la_SOURCES = \ pml_ob1_iprobe.c \ pml_ob1_irecv.c \ pml_ob1_isend.c \ - pml_ob1_match.c \ pml_ob1_proc.c \ pml_ob1_proc.h \ pml_ob1_progress.c \ diff --git a/src/mca/pml/ob1/pml_ob1.c b/src/mca/pml/ob1/pml_ob1.c index 9f9af9248d..527c03dde5 100644 --- a/src/mca/pml/ob1/pml_ob1.c +++ b/src/mca/pml/ob1/pml_ob1.c @@ -28,6 +28,7 @@ #include "pml_ob1_comm.h" #include "pml_ob1_proc.h" #include "pml_ob1_hdr.h" +#include "pml_ob1_recvfrag.h" mca_pml_ob1_t mca_pml_ob1 = { @@ -135,7 +136,7 @@ int mca_pml_ob1_add_bmis() } /* setup callback for receive */ - rc = bmi->bmi_register(bmi, MCA_BMI_TAG_PML, mca_pml_ob1_recv_callback, NULL); + rc = bmi->bmi_register(bmi, MCA_BMI_TAG_PML, mca_pml_ob1_recv_frag_callback, NULL); if(OMPI_SUCCESS != rc) return rc; @@ -250,14 +251,19 @@ int mca_pml_ob1_add_procs(ompi_proc_t** procs, size_t nprocs) } } - /* cache the ob1 on the proc */ + /* cache the endpoint on the proc */ endpoint = mca_pml_ob1_ep_array_insert(&proc_pml->bmi_next); endpoint->bmi = bmi; + endpoint->bmi_eager_limit = bmi->bmi_eager_limit; + endpoint->bmi_min_frag_size = bmi->bmi_min_frag_size; + endpoint->bmi_max_frag_size = bmi->bmi_max_frag_size; endpoint->bmi_cache = NULL; endpoint->bmi_endpoint = bmi_endpoints[p]; endpoint->bmi_weight = 0; endpoint->bmi_alloc = bmi->bmi_alloc; endpoint->bmi_free = bmi->bmi_free; + endpoint->bmi_prepare_src = bmi->bmi_prepare_src; + endpoint->bmi_prepare_dst = bmi->bmi_prepare_dst; endpoint->bmi_send = bmi->bmi_send; endpoint->bmi_put = bmi->bmi_put; endpoint->bmi_get = bmi->bmi_get; diff --git a/src/mca/pml/ob1/pml_ob1.h b/src/mca/pml/ob1/pml_ob1.h index 480409c4d6..5bf907e849 100644 --- a/src/mca/pml/ob1/pml_ob1.h +++ b/src/mca/pml/ob1/pml_ob1.h @@ -199,13 +199,6 @@ extern int mca_pml_ob1_recv( ompi_status_public_t* status ); -extern void mca_pml_ob1_recv_callback( - mca_bmi_base_module_t* bmi, - mca_bmi_base_tag_t tag, - mca_bmi_base_descriptor_t* descriptor, - void* cbdata -); - extern int mca_pml_ob1_progress(void); extern int mca_pml_ob1_start( diff --git a/src/mca/pml/ob1/pml_ob1_component.c b/src/mca/pml/ob1/pml_ob1_component.c index 3e3193ee67..8d4aa7159d 100644 --- a/src/mca/pml/ob1/pml_ob1_component.c +++ b/src/mca/pml/ob1/pml_ob1_component.c @@ -95,6 +95,10 @@ int mca_pml_ob1_component_open(void) mca_pml_ob1_param_register_int("free_list_inc", 256); mca_pml_ob1.priority = mca_pml_ob1_param_register_int("priority", 0); + mca_pml_ob1.send_pipeline_depth = + mca_pml_ob1_param_register_int("send_pipeline_depth", 2); + mca_pml_ob1.recv_pipeline_depth = + mca_pml_ob1_param_register_int("recv_pipeline_depth", 2); return mca_bmi_base_open(); } diff --git a/src/mca/pml/ob1/pml_ob1_endpoint.h b/src/mca/pml/ob1/pml_ob1_endpoint.h index caf12541bf..281241fd7d 100644 --- a/src/mca/pml/ob1/pml_ob1_endpoint.h +++ b/src/mca/pml/ob1/pml_ob1_endpoint.h @@ -35,8 +35,8 @@ struct mca_pml_ob1_endpoint_t { int bmi_weight; /**< BMI weight for scheduling */ int bmi_flags; /**< support for put/get? */ size_t bmi_eager_limit; /**< BMI eager limit */ - size_t bmi_min_seg_size; /**< BMI min segment size */ - size_t bmi_max_seg_size; /**< BMI max segment size */ + size_t bmi_min_frag_size; /**< BMI min fragment size */ + size_t bmi_max_frag_size; /**< BMI max fragment size */ struct mca_bmi_base_module_t *bmi; /**< BMI module */ struct mca_bmi_base_endpoint_t* bmi_endpoint; /**< BMI addressing info */ struct mca_bmi_base_descriptor_t* bmi_cache; diff --git a/src/mca/pml/ob1/pml_ob1_match.c b/src/mca/pml/ob1/pml_ob1_match.c deleted file mode 100644 index 5e2aacb477..0000000000 --- a/src/mca/pml/ob1/pml_ob1_match.c +++ /dev/null @@ -1,592 +0,0 @@ -/** @file */ - -/* - * Copyright (c) 2004-2005 The Trustees of Indiana University. - * All rights reserved. - * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. - * All rights reserved. - * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, - * University of Stuttgart. All rights reserved. - * Copyright (c) 2004-2005 The Regents of the University of California. - * All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include "ompi_config.h" -#include - -#include "class/ompi_list.h" -#include "threads/mutex.h" -#include "include/constants.h" -#include "communicator/communicator.h" -#include "pml_ob1.h" -#include "pml_ob1_comm.h" -#include "pml_ob1_recvfrag.h" -#include "pml_ob1_recvreq.h" -#include "pml_ob1_hdr.h" -#include "pml_ob1_match.h" - - -/** - * Try and match the incoming message fragment to the list of - * "wild" receives - * - * @param hdr Matching data from recived fragment (IN) - * - * @param pml_comm Pointer to the communicator structure used for - * matching purposes. (IN) - * - * @return Matched receive - * - * This routine assumes that the appropriate matching locks are - * set by the upper level routine. - */ - -#define MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr,comm,proc,return_match) \ -do { \ - /* local parameters */ \ - ompi_list_t* wild_receives = &comm->wild_receives; \ - mca_pml_ob1_recv_request_t *wild_recv; \ - int frag_tag,recv_tag; \ - \ - /* initialization */ \ - frag_tag=hdr->hdr_tag; \ - \ - /* \ - * Loop over the wild irecvs - no need to lock, the upper level \ - * locking is protecting from having other threads trying to \ - * change this list. \ - */ \ - for(wild_recv = (mca_pml_ob1_recv_request_t *) \ - ompi_list_get_first(wild_receives); \ - wild_recv != (mca_pml_ob1_recv_request_t *) \ - ompi_list_get_end(wild_receives); \ - wild_recv = (mca_pml_ob1_recv_request_t *) \ - ((ompi_list_item_t *)wild_recv)->ompi_list_next) { \ - \ - recv_tag = wild_recv->req_recv.req_base.req_tag; \ - if ( \ - /* exact tag match */ \ - (frag_tag == recv_tag) || \ - /* wild tag match - negative tags (except for \ - * OMPI_ANY_TAG) are reserved for internal use, and will \ - * not be matched with OMPI_ANY_TAG */ \ - ( (recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) \ - \ - { \ - /* \ - * Mark that this is the matching irecv, and go to process it. \ - */ \ - return_match = wild_recv; \ - \ - /* remove this irecv from the postd wild ireceive list */ \ - ompi_list_remove_item(wild_receives, \ - (ompi_list_item_t *)wild_recv); \ -\ - /* found match - no need to continue */ \ - break; \ - } \ - } \ -} while(0) - - -/** - * Try and match the incoming message fragment to the list of - * "specific" receives - * - * @param hdr Matching data from recived fragment (IN) - * - * @param comm Pointer to the communicator structure used for - * matching purposes. (IN) - * - * @return Matched receive - * - * This routine assumes that the appropriate matching locks are - * set by the upper level routine. - */ -#define MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr,comm,proc,return_match) \ -do { \ - /* local variables */ \ - ompi_list_t* specific_receives = &proc->specific_receives; \ - mca_pml_ob1_recv_request_t *specific_recv; \ - int recv_tag,frag_tag; \ - \ - /* initialization */ \ - frag_tag=hdr->hdr_tag; \ - \ - /* \ - * Loop over the specific irecvs. \ - */ \ - for(specific_recv = (mca_pml_ob1_recv_request_t *) \ - ompi_list_get_first(specific_receives); \ - specific_recv != (mca_pml_ob1_recv_request_t *) \ - ompi_list_get_end(specific_receives); \ - specific_recv = (mca_pml_ob1_recv_request_t *) \ - ((ompi_list_item_t *)specific_recv)->ompi_list_next) { \ - /* \ - * Check for a match \ - */ \ - recv_tag = specific_recv->req_recv.req_base.req_tag; \ - if ( (frag_tag == recv_tag) || \ - ( (recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) { \ - \ - /* \ - * Match made \ - */ \ - return_match = specific_recv; \ - \ - /* remove descriptor from posted specific ireceive list */ \ - ompi_list_remove_item(specific_receives, \ - (ompi_list_item_t *)specific_recv); \ - \ - break; \ - } \ - } \ -} while(0) - -/** - * Try and match the incoming message fragment to the list of - * "wild" receives and "specific" receives. Used when both types - * of receives have been posted, i.e. when we need to coordinate - * between multiple lists to make sure ordered delivery occurs. - * - * @param hdr Matching data from recived fragment (IN) - * - * @param comm Pointer to the communicator structure used for - * matching purposes. (IN) - * - * @return Matched receive - * - * This routine assumes that the appropriate matching locks are - * set by the upper level routine. - */ - -#define MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH( \ - hdr,comm,proc,return_match) \ -do { \ - /* local variables */ \ - mca_pml_ob1_recv_request_t *specific_recv, *wild_recv; \ - mca_ptl_sequence_t wild_recv_seq, specific_recv_seq; \ - int frag_tag, wild_recv_tag, specific_recv_tag; \ - \ - /* initialization */ \ - frag_tag=hdr->hdr_tag; \ - \ - /* \ - * We know that when this is called, both specific and wild irecvs \ - * have been posted. \ - */ \ - specific_recv = (mca_pml_ob1_recv_request_t *) \ - ompi_list_get_first(&(proc)->specific_receives); \ - wild_recv = (mca_pml_ob1_recv_request_t *) \ - ompi_list_get_first(&comm->wild_receives); \ - \ - specific_recv_seq = specific_recv->req_recv.req_base.req_sequence; \ - wild_recv_seq = wild_recv->req_recv.req_base.req_sequence; \ - \ - while (true) { \ - if (wild_recv_seq < specific_recv_seq) { \ - /* \ - * wild recv is earlier than the specific one. \ - */ \ - /* \ - * try and match \ - */ \ - wild_recv_tag = wild_recv->req_recv.req_base.req_tag; \ - if ( (frag_tag == wild_recv_tag) || \ - ( (wild_recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) { \ - /* \ - * Match made \ - */ \ - return_match=wild_recv; \ - \ - /* remove this recv from the wild receive queue */ \ - ompi_list_remove_item(&comm->wild_receives, \ - (ompi_list_item_t *)wild_recv); \ - break; \ - } \ - \ - /* \ - * No match, go to the next. \ - */ \ - wild_recv=(mca_pml_ob1_recv_request_t *) \ - ((ompi_list_item_t *)wild_recv)->ompi_list_next; \ - \ - /* \ - * If that was the last wild one, just look at the \ - * rest of the specific ones. \ - */ \ - if (wild_recv == (mca_pml_ob1_recv_request_t *) \ - ompi_list_get_end(&comm->wild_receives) ) \ - { \ - MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, return_match); \ - break; \ - } \ - \ - /* \ - * Get the sequence number for this recv, and go \ - * back to the top of the loop. \ - */ \ - wild_recv_seq = wild_recv->req_recv.req_base.req_sequence; \ - \ - } else { \ - /* \ - * specific recv is earlier than the wild one. \ - */ \ - specific_recv_tag=specific_recv->req_recv.req_base.req_tag; \ - if ( (frag_tag == specific_recv_tag) || \ - ( (specific_recv_tag == OMPI_ANY_TAG) && (0<=frag_tag)) ) \ - { \ - /* \ - * Match made \ - */ \ - return_match = specific_recv; \ - /* remove descriptor from specific receive list */ \ - ompi_list_remove_item(&(proc)->specific_receives, \ - (ompi_list_item_t *)specific_recv); \ - break; \ - } \ - \ - /* \ - * No match, go on to the next specific irecv. \ - */ \ - specific_recv = (mca_pml_ob1_recv_request_t *) \ - ((ompi_list_item_t *)specific_recv)->ompi_list_next; \ - \ - /* \ - * If that was the last specific irecv, process the \ - * rest of the wild ones. \ - */ \ - if (specific_recv == (mca_pml_ob1_recv_request_t *) \ - ompi_list_get_end(&(proc)->specific_receives)) \ - { \ - MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, return_match); \ - break; \ - } \ - /* \ - * Get the sequence number for this recv, and go \ - * back to the top of the loop. \ - */ \ - specific_recv_seq = specific_recv->req_recv.req_base.req_sequence; \ - } \ - } \ -} while(0) - - -/* - * Specialized matching routines for internal use only. - */ - -static bool mca_pml_ob1_check_cantmatch_for_match( - ompi_list_t *additional_matches, - mca_pml_ob1_comm_t* comm, - mca_pml_ob1_comm_proc_t *proc); - - -/** - * RCS/CTS receive side matching - * - * @param hdr list of parameters needed for matching - * This list is also embeded in frag_desc, - * but this allows to save a memory copy when - * a match is made in this routine. (IN) - * @param frag_desc pointer to receive fragment which we want - * to match (IN/OUT). If a match is not made, - * hdr is copied to frag_desc. - * @param match_made parameter indicating if we matched frag_desc/ - * hdr (OUT) - * @param additional_matches if a match is made with frag_desc, we - * may be able to match fragments that previously - * have arrived out-of-order. If this is the - * case, the associated fragment descriptors are - * put on this list for further processing. (OUT) - * - * @return OMPI error code - * - * This routine is used to try and match a newly arrived message fragment - * to pre-posted receives. The following assumptions are made - * - fragments are received out of order - * - for long messages, e.g. more than one fragment, a RTS/CTS algorithm - * is used. - * - 2nd and greater fragments include a receive descriptor pointer - * - fragments may be dropped - * - fragments may be corrupt - * - this routine may be called simultaneously by more than one thread - */ -int mca_pml_ob1_match( - mca_bmi_base_module_t* bmi, - mca_pml_ob1_match_hdr_t *hdr, - mca_bmi_base_segment_t* segments, - size_t num_segments) -{ - /* local variables */ - uint16_t next_msg_seq_expected, frag_msg_seq; - ompi_communicator_t *comm_ptr; - mca_pml_ob1_recv_request_t *matched_receive = NULL; - mca_pml_ob1_comm_t *comm; - mca_pml_ob1_comm_proc_t *proc; - bool additional_match=false; - ompi_list_t additional_matches; - int rc; - - /* communicator pointer */ - comm_ptr=ompi_comm_lookup(hdr->hdr_contextid); - comm=(mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm; - - /* source sequence number */ - frag_msg_seq = hdr->hdr_msg_seq; - proc = comm->procs + hdr->hdr_src; - - /* get next expected message sequence number - if threaded - * run, lock to make sure that if another thread is processing - * a frag from the same message a match is made only once. - * Also, this prevents other posted receives (for a pair of - * end points) from being processed, and potentially "loosing" - * the fragment. - */ - OMPI_THREAD_LOCK(&comm->matching_lock); - - /* get sequence number of next message that can be processed */ - next_msg_seq_expected = (uint16_t)proc->expected_sequence; - if (frag_msg_seq == next_msg_seq_expected) { - - /* - * This is the sequence number we were expecting, - * so we can try matching it to already posted - * receives. - */ - - /* We're now expecting the next sequence number. */ - (proc->expected_sequence)++; - - /* - * figure out what sort of matching logic to use, if need to - * look only at "specific" receives, or "wild" receives, - * or if we need to traverse both sets at the same time. - */ - if (ompi_list_get_size(&proc->specific_receives) == 0 ){ - /* - * There are only wild irecvs, so specialize the algorithm. - */ - MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, matched_receive); - - } else if (ompi_list_get_size(&comm->wild_receives) == 0 ) { - /* - * There are only specific irecvs, so specialize the algorithm. - */ - MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, matched_receive); - } else { - /* - * There are some of each. - */ - MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, matched_receive); - } - - /* if match found, process data */ - if (matched_receive) { - - /* set length of incoming message */ - matched_receive->req_recv.req_bytes_packed = hdr->hdr_msg_length; - - /* - * update delivered sequence number information, if needed. - */ - if( (matched_receive->req_recv.req_base.req_type == MCA_PML_REQUEST_PROBE) ) { - /* Match a probe, rollback the next expected sequence number */ - (proc->expected_sequence)--; - } - } else { - - /* if no match found, place on unexpected queue */ - mca_pml_ob1_recv_frag_t* frag; - MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc); - if(OMPI_SUCCESS != rc) { - OMPI_THREAD_UNLOCK(&pml_comm->matching_lock); - return rc; - } - MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segments,num_segments); - ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag ); - } - - /* - * Now that new message has arrived, check to see if - * any fragments on the c_c_frags_cant_match list - * may now be used to form new matchs - */ - if (0 < ompi_list_get_size(&proc->frags_cant_match)) { - additional_match = mca_pml_ob1_check_cantmatch_for_match(&additional_matches,comm,proc); - } - - } else { - - /* - * This message comes after the next expected, so it - * is ahead of sequence. Save it for later. - */ - mca_pml_ob1_recv_frag_t* frag; - MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc); - if(OMPI_SUCCESS != rc) { - OMPI_THREAD_UNLOCK(&pml_comm->matching_lock); - return rc; - } - MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segments,num_segments); - ompi_list_append(&proc->frags_cant_match, (ompi_list_item_t *)frag); - - } - OMPI_THREAD_UNLOCK(&pml_comm->matching_lock); - - - /* release matching lock before processing fragment */ - if(matched_receive != NULL) { - mca_pml_ob1_recv_request_progress(matched_receive,bmi,segments,num_segments); - } else { - ompi_output(0, "match not found\n"); - } - if(additional_match) { - ompi_list_item_t* item; - while(NULL != (item = ompi_list_remove_first(&additional_matches))) { -#if 0 - mca_pml_ob1_recv_frag_t* frag = (mca_pml_ob1_recv_frag_t*)item; - mca_pml_ob1_recv_request_progress(frag->request,frag->bmi,frag->segments,frag->num_segments); - MCA_PML_OB1_RECV_FRAG_RETURN(frag); -#endif - } - } - return OMPI_SUCCESS; -} - - -/** - * Scan the list of frags that came in ahead of time to see if any - * can be processed at this time. If they can, try and match the - * frags. - * - * @param additional_matches List to hold new matches with fragments - * from the c_frags_cant_match list. (IN/OUT) - * - * @param pml_comm Pointer to the communicator structure used for - * matching purposes. (IN) - * - * This routine assumes that the appropriate matching locks are - * set by the upper level routine. - */ - -static bool mca_pml_ob1_check_cantmatch_for_match( - ompi_list_t *additional_matches, - mca_pml_ob1_comm_t* comm, - mca_pml_ob1_comm_proc_t *proc) -{ - /* local parameters */ - int match_found; - uint16_t next_msg_seq_expected, frag_seq; - mca_pml_ob1_recv_frag_t *frag_desc; - mca_pml_ob1_recv_request_t *matched_receive = NULL; - bool match_made = false; - - /* - * Loop over all the out of sequence messages. No ordering is assumed - * in the c_frags_cant_match list. - */ - - match_found = 1; - while ((0 < ompi_list_get_size(&proc->frags_cant_match)) && match_found) { - - /* initialize match flag for this search */ - match_found = 0; - - /* get sequence number of next message that can be processed */ - next_msg_seq_expected = proc->expected_sequence; - - /* search the list for a fragment from the send with sequence - * number next_msg_seq_expected - */ - for(frag_desc = (mca_pml_ob1_recv_frag_t *) - ompi_list_get_first(&proc->frags_cant_match); - frag_desc != (mca_pml_ob1_recv_frag_t *) - ompi_list_get_end(&proc->frags_cant_match); - frag_desc = (mca_pml_ob1_recv_frag_t *) - ompi_list_get_next(frag_desc)) - { - /* - * If the message has the next expected seq from that proc... - */ - frag_seq=frag_desc->hdr.hdr_match.hdr_msg_seq; - if (frag_seq == next_msg_seq_expected) { - mca_pml_ob1_match_hdr_t* hdr = &frag_desc->hdr.hdr_match; - - /* We're now expecting the next sequence number. */ - (proc->expected_sequence)++; - - /* signal that match was made */ - match_found = 1; - - /* - * remove frag_desc from list - */ - ompi_list_remove_item(&proc->frags_cant_match, - (ompi_list_item_t *)frag_desc); - - /* - * figure out what sort of matching logic to use, if need to - * look only at "specific" receives, or "wild" receives, - * or if we need to traverse both sets at the same time. - */ - proc = comm->procs + hdr->hdr_src; - if (ompi_list_get_size(&proc->specific_receives) == 0 ) { - /* - * There are only wild irecvs, so specialize the algorithm. - */ - MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, matched_receive); - } else if (ompi_list_get_size(&comm->wild_receives) == 0 ) { - /* - * There are only specific irecvs, so specialize the algorithm. - */ - MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, matched_receive); - } else { - /* - * There are some of each. - */ - MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, matched_receive); - - } - - /* if match found, process data */ - if (matched_receive) { - - /* associate the receive descriptor with the fragment - * descriptor */ - frag_desc->request=matched_receive; - - /* add this fragment descriptor to the list of - * descriptors to be processed later - */ - if(match_made == false) { - match_made = true; - OBJ_CONSTRUCT(additional_matches, ompi_list_t); - } - ompi_list_append(additional_matches, (ompi_list_item_t *)frag_desc); - - } else { - - /* if no match found, place on unexpected queue */ - ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag_desc); - - } - - /* c_frags_cant_match is not an ordered list, so exit loop - * and re-start search for next sequence number */ - break; - - } /* end if (frag_seq == next_msg_seq_expected) */ - - } /* end for (frag_desc) loop */ - - } /* end while loop */ - - return match_made; -} - diff --git a/src/mca/pml/ob1/pml_ob1_match.h b/src/mca/pml/ob1/pml_ob1_match.h deleted file mode 100644 index 834d283afe..0000000000 --- a/src/mca/pml/ob1/pml_ob1_match.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2004-2005 The Trustees of Indiana University. - * All rights reserved. - * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. - * All rights reserved. - * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, - * University of Stuttgart. All rights reserved. - * Copyright (c) 2004-2005 The Regents of the University of California. - * All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ -/** - * @file - */ -#ifndef MCA_PML_OB1_MATCH_H -#define MCA_PML_OB1_MATCH_H -#if defined(c_plusplus) || defined(__cplusplus) -extern "C" { -#endif - -struct mca_pml_ob1_recv_frag_t; - - -/** - * RCS/CTS receive side matching - * Match incoming fragments against posted receives. Out of order - * delivery. - * - * @param frag_header (IN) Header of received fragment. - * @param frag_desc (IN) Received fragment descriptor. - * @param match_made (OUT) Flag indicating wether a match was made. - * @param additional_matches (OUT) List of additional matches - * @return OMPI_SUCCESS or error status on failure. - */ -OMPI_DECLSPEC int mca_pml_ob1_match( - mca_bmi_base_module_t* bmi, - mca_pml_ob1_match_hdr_t *hdr, - mca_bmi_base_segment_t* segments, - size_t num_segments); - -#if defined(c_plusplus) || defined(__cplusplus) -} -#endif -#endif /* MCA_PML_OB1_MATCH_H */ - diff --git a/src/mca/pml/ob1/pml_ob1_recvfrag.c b/src/mca/pml/ob1/pml_ob1_recvfrag.c index bc7b17bada..6217fad380 100644 --- a/src/mca/pml/ob1/pml_ob1_recvfrag.c +++ b/src/mca/pml/ob1/pml_ob1_recvfrag.c @@ -20,12 +20,18 @@ #include "ompi_config.h" +#include "class/ompi_list.h" +#include "threads/mutex.h" +#include "include/constants.h" +#include "communicator/communicator.h" #include "mca/pml/pml.h" +#include "pml_ob1.h" +#include "pml_ob1_comm.h" #include "pml_ob1_recvfrag.h" #include "pml_ob1_recvreq.h" #include "pml_ob1_sendreq.h" -#include "pml_ob1_proc.h" -#include "pml_ob1_match.h" +#include "pml_ob1_hdr.h" + OBJ_CLASS_INSTANCE( @@ -37,7 +43,11 @@ OBJ_CLASS_INSTANCE( -void mca_pml_ob1_recv_callback( +/** + * Callback from BMI on receive. + */ + +void mca_pml_ob1_recv_frag_callback( mca_bmi_base_module_t* bmi, mca_bmi_base_tag_t tag, mca_bmi_base_descriptor_t* des, @@ -48,20 +58,596 @@ void mca_pml_ob1_recv_callback( if(segments->seg_len < sizeof(mca_pml_ob1_common_hdr_t)) { return; } - + switch(hdr->hdr_common.hdr_type) { case MCA_PML_OB1_HDR_TYPE_MATCH: - mca_pml_ob1_match(bmi,&hdr->hdr_match,segments,des->des_src_cnt); - break; case MCA_PML_OB1_HDR_TYPE_RNDV: - mca_pml_ob1_match(bmi,&hdr->hdr_match,segments,des->des_src_cnt); + { + mca_pml_ob1_recv_frag_match(bmi,&hdr->hdr_match,segments,des->des_src_cnt); break; + } case MCA_PML_OB1_HDR_TYPE_ACK: - mca_pml_ob1_send_request_acked(bmi,&hdr->hdr_ack); + { + mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*) + hdr->hdr_ack.hdr_src_req.pval; + sendreq->req_state = MCA_PML_OB1_SR_SEND; + sendreq->req_recv = hdr->hdr_ack.hdr_dst_req; + mca_pml_ob1_send_request_schedule(sendreq); break; + } + case MCA_PML_OB1_HDR_TYPE_FRAG: + { + mca_pml_ob1_recv_request_t* recvreq = (mca_pml_ob1_recv_request_t*) + hdr->hdr_frag.hdr_dst_req.pval; + mca_pml_ob1_recv_request_progress(recvreq,bmi,segments,des->des_src_cnt); + break; + } default: break; } } - + + +/** + * Try and match the incoming message fragment to the list of + * "wild" receives + * + * @param hdr Matching data from recived fragment (IN) + * + * @param pml_comm Pointer to the communicator structure used for + * matching purposes. (IN) + * + * @return Matched receive + * + * This routine assumes that the appropriate matching locks are + * set by the upper level routine. + */ + +#define MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr,comm,proc,return_match) \ +do { \ + /* local parameters */ \ + ompi_list_t* wild_receives = &comm->wild_receives; \ + mca_pml_ob1_recv_request_t *wild_recv; \ + int frag_tag,recv_tag; \ + \ + /* initialization */ \ + frag_tag=hdr->hdr_tag; \ + \ + /* \ + * Loop over the wild irecvs - no need to lock, the upper level \ + * locking is protecting from having other threads trying to \ + * change this list. \ + */ \ + for(wild_recv = (mca_pml_ob1_recv_request_t *) \ + ompi_list_get_first(wild_receives); \ + wild_recv != (mca_pml_ob1_recv_request_t *) \ + ompi_list_get_end(wild_receives); \ + wild_recv = (mca_pml_ob1_recv_request_t *) \ + ((ompi_list_item_t *)wild_recv)->ompi_list_next) { \ + \ + recv_tag = wild_recv->req_recv.req_base.req_tag; \ + if ( \ + /* exact tag match */ \ + (frag_tag == recv_tag) || \ + /* wild tag match - negative tags (except for \ + * OMPI_ANY_TAG) are reserved for internal use, and will \ + * not be matched with OMPI_ANY_TAG */ \ + ( (recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) \ + \ + { \ + /* \ + * Mark that this is the matching irecv, and go to process it. \ + */ \ + return_match = wild_recv; \ + \ + /* remove this irecv from the postd wild ireceive list */ \ + ompi_list_remove_item(wild_receives, \ + (ompi_list_item_t *)wild_recv); \ +\ + /* found match - no need to continue */ \ + break; \ + } \ + } \ +} while(0) + + +/** + * Try and match the incoming message fragment to the list of + * "specific" receives + * + * @param hdr Matching data from recived fragment (IN) + * + * @param comm Pointer to the communicator structure used for + * matching purposes. (IN) + * + * @return Matched receive + * + * This routine assumes that the appropriate matching locks are + * set by the upper level routine. + */ +#define MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr,comm,proc,return_match) \ +do { \ + /* local variables */ \ + ompi_list_t* specific_receives = &proc->specific_receives; \ + mca_pml_ob1_recv_request_t *specific_recv; \ + int recv_tag,frag_tag; \ + \ + /* initialization */ \ + frag_tag=hdr->hdr_tag; \ + \ + /* \ + * Loop over the specific irecvs. \ + */ \ + for(specific_recv = (mca_pml_ob1_recv_request_t *) \ + ompi_list_get_first(specific_receives); \ + specific_recv != (mca_pml_ob1_recv_request_t *) \ + ompi_list_get_end(specific_receives); \ + specific_recv = (mca_pml_ob1_recv_request_t *) \ + ((ompi_list_item_t *)specific_recv)->ompi_list_next) { \ + /* \ + * Check for a match \ + */ \ + recv_tag = specific_recv->req_recv.req_base.req_tag; \ + if ( (frag_tag == recv_tag) || \ + ( (recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) { \ + \ + /* \ + * Match made \ + */ \ + return_match = specific_recv; \ + \ + /* remove descriptor from posted specific ireceive list */ \ + ompi_list_remove_item(specific_receives, \ + (ompi_list_item_t *)specific_recv); \ + \ + break; \ + } \ + } \ +} while(0) + +/** + * Try and match the incoming message fragment to the list of + * "wild" receives and "specific" receives. Used when both types + * of receives have been posted, i.e. when we need to coordinate + * between multiple lists to make sure ordered delivery occurs. + * + * @param hdr Matching data from recived fragment (IN) + * + * @param comm Pointer to the communicator structure used for + * matching purposes. (IN) + * + * @return Matched receive + * + * This routine assumes that the appropriate matching locks are + * set by the upper level routine. + */ + +#define MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH( \ + hdr,comm,proc,return_match) \ +do { \ + /* local variables */ \ + mca_pml_ob1_recv_request_t *specific_recv, *wild_recv; \ + mca_ptl_sequence_t wild_recv_seq, specific_recv_seq; \ + int frag_tag, wild_recv_tag, specific_recv_tag; \ + \ + /* initialization */ \ + frag_tag=hdr->hdr_tag; \ + \ + /* \ + * We know that when this is called, both specific and wild irecvs \ + * have been posted. \ + */ \ + specific_recv = (mca_pml_ob1_recv_request_t *) \ + ompi_list_get_first(&(proc)->specific_receives); \ + wild_recv = (mca_pml_ob1_recv_request_t *) \ + ompi_list_get_first(&comm->wild_receives); \ + \ + specific_recv_seq = specific_recv->req_recv.req_base.req_sequence; \ + wild_recv_seq = wild_recv->req_recv.req_base.req_sequence; \ + \ + while (true) { \ + if (wild_recv_seq < specific_recv_seq) { \ + /* \ + * wild recv is earlier than the specific one. \ + */ \ + /* \ + * try and match \ + */ \ + wild_recv_tag = wild_recv->req_recv.req_base.req_tag; \ + if ( (frag_tag == wild_recv_tag) || \ + ( (wild_recv_tag == OMPI_ANY_TAG) && (0 <= frag_tag) ) ) { \ + /* \ + * Match made \ + */ \ + return_match=wild_recv; \ + \ + /* remove this recv from the wild receive queue */ \ + ompi_list_remove_item(&comm->wild_receives, \ + (ompi_list_item_t *)wild_recv); \ + break; \ + } \ + \ + /* \ + * No match, go to the next. \ + */ \ + wild_recv=(mca_pml_ob1_recv_request_t *) \ + ((ompi_list_item_t *)wild_recv)->ompi_list_next; \ + \ + /* \ + * If that was the last wild one, just look at the \ + * rest of the specific ones. \ + */ \ + if (wild_recv == (mca_pml_ob1_recv_request_t *) \ + ompi_list_get_end(&comm->wild_receives) ) \ + { \ + MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, return_match); \ + break; \ + } \ + \ + /* \ + * Get the sequence number for this recv, and go \ + * back to the top of the loop. \ + */ \ + wild_recv_seq = wild_recv->req_recv.req_base.req_sequence; \ + \ + } else { \ + /* \ + * specific recv is earlier than the wild one. \ + */ \ + specific_recv_tag=specific_recv->req_recv.req_base.req_tag; \ + if ( (frag_tag == specific_recv_tag) || \ + ( (specific_recv_tag == OMPI_ANY_TAG) && (0<=frag_tag)) ) \ + { \ + /* \ + * Match made \ + */ \ + return_match = specific_recv; \ + /* remove descriptor from specific receive list */ \ + ompi_list_remove_item(&(proc)->specific_receives, \ + (ompi_list_item_t *)specific_recv); \ + break; \ + } \ + \ + /* \ + * No match, go on to the next specific irecv. \ + */ \ + specific_recv = (mca_pml_ob1_recv_request_t *) \ + ((ompi_list_item_t *)specific_recv)->ompi_list_next; \ + \ + /* \ + * If that was the last specific irecv, process the \ + * rest of the wild ones. \ + */ \ + if (specific_recv == (mca_pml_ob1_recv_request_t *) \ + ompi_list_get_end(&(proc)->specific_receives)) \ + { \ + MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, return_match); \ + break; \ + } \ + /* \ + * Get the sequence number for this recv, and go \ + * back to the top of the loop. \ + */ \ + specific_recv_seq = specific_recv->req_recv.req_base.req_sequence; \ + } \ + } \ +} while(0) + + +/* + * Specialized matching routines for internal use only. + */ + +static bool mca_pml_ob1_check_cantmatch_for_match( + ompi_list_t *additional_matches, + mca_pml_ob1_comm_t* comm, + mca_pml_ob1_comm_proc_t *proc); + + +/** + * RCS/CTS receive side matching + * + * @param hdr list of parameters needed for matching + * This list is also embeded in frag_desc, + * but this allows to save a memory copy when + * a match is made in this routine. (IN) + * @param frag_desc pointer to receive fragment which we want + * to match (IN/OUT). If a match is not made, + * hdr is copied to frag_desc. + * @param match_made parameter indicating if we matched frag_desc/ + * hdr (OUT) + * @param additional_matches if a match is made with frag_desc, we + * may be able to match fragments that previously + * have arrived out-of-order. If this is the + * case, the associated fragment descriptors are + * put on this list for further processing. (OUT) + * + * @return OMPI error code + * + * This routine is used to try and match a newly arrived message fragment + * to pre-posted receives. The following assumptions are made + * - fragments are received out of order + * - for long messages, e.g. more than one fragment, a RTS/CTS algorithm + * is used. + * - 2nd and greater fragments include a receive descriptor pointer + * - fragments may be dropped + * - fragments may be corrupt + * - this routine may be called simultaneously by more than one thread + */ +int mca_pml_ob1_recv_frag_match( + mca_bmi_base_module_t* bmi, + mca_pml_ob1_match_hdr_t *hdr, + mca_bmi_base_segment_t* segments, + size_t num_segments) +{ + /* local variables */ + uint16_t next_msg_seq_expected, frag_msg_seq; + ompi_communicator_t *comm_ptr; + mca_pml_ob1_recv_request_t *match = NULL; + mca_pml_ob1_comm_t *comm; + mca_pml_ob1_comm_proc_t *proc; + bool additional_match=false; + ompi_list_t additional_matches; + int rc; + + /* communicator pointer */ + comm_ptr=ompi_comm_lookup(hdr->hdr_contextid); + comm=(mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm; + + /* source sequence number */ + frag_msg_seq = hdr->hdr_msg_seq; + proc = comm->procs + hdr->hdr_src; + + /* get next expected message sequence number - if threaded + * run, lock to make sure that if another thread is processing + * a frag from the same message a match is made only once. + * Also, this prevents other posted receives (for a pair of + * end points) from being processed, and potentially "loosing" + * the fragment. + */ + OMPI_THREAD_LOCK(&comm->matching_lock); + + /* get sequence number of next message that can be processed */ + next_msg_seq_expected = (uint16_t)proc->expected_sequence; + if (frag_msg_seq == next_msg_seq_expected) { + + /* + * This is the sequence number we were expecting, + * so we can try matching it to already posted + * receives. + */ + + /* We're now expecting the next sequence number. */ + (proc->expected_sequence)++; + + /* + * figure out what sort of matching logic to use, if need to + * look only at "specific" receives, or "wild" receives, + * or if we need to traverse both sets at the same time. + */ + if (ompi_list_get_size(&proc->specific_receives) == 0 ){ + /* + * There are only wild irecvs, so specialize the algorithm. + */ + MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match); + + } else if (ompi_list_get_size(&comm->wild_receives) == 0 ) { + /* + * There are only specific irecvs, so specialize the algorithm. + */ + MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, match); + } else { + /* + * There are some of each. + */ + MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match); + } + + /* if match found, process data */ + if (match) { + + /* set length of incoming message */ + match->req_recv.req_bytes_packed = hdr->hdr_msg_length; + + /* + * update delivered sequence number information, if needed. + */ + if( (match->req_recv.req_base.req_type == MCA_PML_REQUEST_PROBE) ) { + /* Match a probe, rollback the next expected sequence number */ + (proc->expected_sequence)--; + } + } else { + + /* if no match found, place on unexpected queue */ + mca_pml_ob1_recv_frag_t* frag; + MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc); + if(OMPI_SUCCESS != rc) { + OMPI_THREAD_UNLOCK(&pml_comm->matching_lock); + return rc; + } + MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segments,num_segments); + ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag ); + } + + /* + * Now that new message has arrived, check to see if + * any fragments on the c_c_frags_cant_match list + * may now be used to form new matchs + */ + if (0 < ompi_list_get_size(&proc->frags_cant_match)) { + additional_match = mca_pml_ob1_check_cantmatch_for_match(&additional_matches,comm,proc); + } + + } else { + + /* + * This message comes after the next expected, so it + * is ahead of sequence. Save it for later. + */ + mca_pml_ob1_recv_frag_t* frag; + MCA_PML_OB1_RECV_FRAG_ALLOC(frag, rc); + if(OMPI_SUCCESS != rc) { + OMPI_THREAD_UNLOCK(&pml_comm->matching_lock); + return rc; + } + MCA_PML_OB1_RECV_FRAG_INIT(frag,bmi,hdr,segments,num_segments); + ompi_list_append(&proc->frags_cant_match, (ompi_list_item_t *)frag); + + } + OMPI_THREAD_UNLOCK(&pml_comm->matching_lock); + + + /* release matching lock before processing fragment */ + if(match != NULL) { + match->req_recv.req_bytes_packed = hdr->hdr_msg_length; + match->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_tag; + match->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_src; + mca_pml_ob1_recv_request_progress(match,bmi,segments,num_segments); + } else { + ompi_output(0, "match not found\n"); + } + if(additional_match) { + ompi_list_item_t* item; + while(NULL != (item = ompi_list_remove_first(&additional_matches))) { + mca_pml_ob1_recv_frag_t* frag = (mca_pml_ob1_recv_frag_t*)item; + frag->request->req_recv.req_bytes_packed = hdr->hdr_msg_length; + frag->request->req_recv.req_base.req_ompi.req_status.MPI_TAG = hdr->hdr_tag; + frag->request->req_recv.req_base.req_ompi.req_status.MPI_SOURCE = hdr->hdr_src; + mca_pml_ob1_recv_request_progress(frag->request,frag->bmi,frag->segments,frag->num_segments); + MCA_PML_OB1_RECV_FRAG_RETURN(frag); + } + } + return OMPI_SUCCESS; +} + + +/** + * Scan the list of frags that came in ahead of time to see if any + * can be processed at this time. If they can, try and match the + * frags. + * + * @param additional_matches List to hold new matches with fragments + * from the c_frags_cant_match list. (IN/OUT) + * + * @param pml_comm Pointer to the communicator structure used for + * matching purposes. (IN) + * + * This routine assumes that the appropriate matching locks are + * set by the upper level routine. + */ + +static bool mca_pml_ob1_check_cantmatch_for_match( + ompi_list_t *additional_matches, + mca_pml_ob1_comm_t* comm, + mca_pml_ob1_comm_proc_t *proc) +{ + /* local parameters */ + int match_found; + uint16_t next_msg_seq_expected, frag_seq; + mca_pml_ob1_recv_frag_t *frag_desc; + mca_pml_ob1_recv_request_t *match = NULL; + bool match_made = false; + + /* + * Loop over all the out of sequence messages. No ordering is assumed + * in the c_frags_cant_match list. + */ + + match_found = 1; + while ((0 < ompi_list_get_size(&proc->frags_cant_match)) && match_found) { + + /* initialize match flag for this search */ + match_found = 0; + + /* get sequence number of next message that can be processed */ + next_msg_seq_expected = proc->expected_sequence; + + /* search the list for a fragment from the send with sequence + * number next_msg_seq_expected + */ + for(frag_desc = (mca_pml_ob1_recv_frag_t *) + ompi_list_get_first(&proc->frags_cant_match); + frag_desc != (mca_pml_ob1_recv_frag_t *) + ompi_list_get_end(&proc->frags_cant_match); + frag_desc = (mca_pml_ob1_recv_frag_t *) + ompi_list_get_next(frag_desc)) + { + /* + * If the message has the next expected seq from that proc... + */ + frag_seq=frag_desc->hdr.hdr_match.hdr_msg_seq; + if (frag_seq == next_msg_seq_expected) { + mca_pml_ob1_match_hdr_t* hdr = &frag_desc->hdr.hdr_match; + + /* We're now expecting the next sequence number. */ + (proc->expected_sequence)++; + + /* signal that match was made */ + match_found = 1; + + /* + * remove frag_desc from list + */ + ompi_list_remove_item(&proc->frags_cant_match, + (ompi_list_item_t *)frag_desc); + + /* + * figure out what sort of matching logic to use, if need to + * look only at "specific" receives, or "wild" receives, + * or if we need to traverse both sets at the same time. + */ + proc = comm->procs + hdr->hdr_src; + if (ompi_list_get_size(&proc->specific_receives) == 0 ) { + /* + * There are only wild irecvs, so specialize the algorithm. + */ + MCA_PML_OB1_CHECK_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match); + } else if (ompi_list_get_size(&comm->wild_receives) == 0 ) { + /* + * There are only specific irecvs, so specialize the algorithm. + */ + MCA_PML_OB1_CHECK_SPECIFIC_RECEIVES_FOR_MATCH(hdr, comm, proc, match); + } else { + /* + * There are some of each. + */ + MCA_PML_OB1_CHECK_SPECIFIC_AND_WILD_RECEIVES_FOR_MATCH(hdr, comm, proc, match); + + } + + /* if match found, process data */ + if (match) { + + /* associate the receive descriptor with the fragment + * descriptor */ + frag_desc->request=match; + + /* add this fragment descriptor to the list of + * descriptors to be processed later + */ + if(match_made == false) { + match_made = true; + OBJ_CONSTRUCT(additional_matches, ompi_list_t); + } + ompi_list_append(additional_matches, (ompi_list_item_t *)frag_desc); + + } else { + + /* if no match found, place on unexpected queue */ + ompi_list_append( &proc->unexpected_frags, (ompi_list_item_t *)frag_desc); + + } + + /* c_frags_cant_match is not an ordered list, so exit loop + * and re-start search for next sequence number */ + break; + + } /* end if (frag_seq == next_msg_seq_expected) */ + + } /* end for (frag_desc) loop */ + + } /* end while loop */ + + return match_made; +} diff --git a/src/mca/pml/ob1/pml_ob1_recvfrag.h b/src/mca/pml/ob1/pml_ob1_recvfrag.h index 45bbdea1cc..d8fe225e53 100644 --- a/src/mca/pml/ob1/pml_ob1_recvfrag.h +++ b/src/mca/pml/ob1/pml_ob1_recvfrag.h @@ -51,26 +51,35 @@ typedef struct mca_pml_ob1_recv_frag_t mca_pml_ob1_recv_frag_t; /** - * Called to attempt a match for new fragments. - * - * @param bmi (IN) The PTL pointer - * @param frag (IN) Receive fragment descriptor. - * @param hdr (IN) Header corresponding to the receive fragment. - * @return OMPI_SUCCESS or error status on failure. + * Callback from BMI on receipt of a fragment. */ -bool mca_pml_ob1_recv_frag_match( - struct mca_pml_ob1_recv_frag_t* frag -); -int mca_pml_ob1_recv_frag_matched( - struct mca_pml_ob1_recv_frag_t* frag +OMPI_DECLSPEC void mca_pml_ob1_recv_frag_callback( + mca_bmi_base_module_t* bmi, + mca_bmi_base_tag_t tag, + mca_bmi_base_descriptor_t* descriptor, + void* cbdata ); + +/** + * Match incoming fragments against posted receives. + * Supports out of order delivery. + * + * @param frag_header (IN) Header of received fragment. + * @param frag_desc (IN) Received fragment descriptor. + * @param match_made (OUT) Flag indicating wether a match was made. + * @param additional_matches (OUT) List of additional matches + * @return OMPI_SUCCESS or error status on failure. + */ +OMPI_DECLSPEC int mca_pml_ob1_recv_frag_match( + mca_bmi_base_module_t* bmi, + mca_pml_ob1_match_hdr_t *hdr, + mca_bmi_base_segment_t* segments, + size_t num_segments); -int mca_pml_ob1_recv_frag_complete( - struct mca_bmi_base_module_t* bmi, - struct mca_pml_ob1_recv_request_t* req, - struct mca_pml_ob1_recv_frag_t* frag -); +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif #endif diff --git a/src/mca/pml/ob1/pml_ob1_recvreq.h b/src/mca/pml/ob1/pml_ob1_recvreq.h index f21d3de4e5..0d706ddf91 100644 --- a/src/mca/pml/ob1/pml_ob1_recvreq.h +++ b/src/mca/pml/ob1/pml_ob1_recvreq.h @@ -165,6 +165,8 @@ void mca_pml_ob1_recv_request_match_specific(mca_pml_ob1_recv_request_t* request bytes_received, \ bytes_delivered) \ { \ + /* FIX */ \ + bytes_delivered = bytes_received; \ } diff --git a/src/mca/pml/ob1/pml_ob1_sendreq.c b/src/mca/pml/ob1/pml_ob1_sendreq.c index d13be7818b..a6cc00fca6 100644 --- a/src/mca/pml/ob1/pml_ob1_sendreq.c +++ b/src/mca/pml/ob1/pml_ob1_sendreq.c @@ -81,7 +81,8 @@ static void mca_pml_ob1_send_completion( /* check for request completion */ OMPI_THREAD_LOCK(&ompi_request_lock); - if (sendreq->req_offset == sendreq->req_send.req_bytes_packed) { + if (OMPI_THREAD_ADD32(&sendreq->req_pending,-1) == 0 && + sendreq->req_offset == sendreq->req_send.req_bytes_packed) { sendreq->req_send.req_base.req_pml_complete = true; if (sendreq->req_send.req_base.req_ompi.req_complete == false) { sendreq->req_send.req_base.req_ompi.req_status.MPI_SOURCE = sendreq->req_send.req_base.req_comm->c_my_rank; @@ -89,6 +90,7 @@ static void mca_pml_ob1_send_completion( sendreq->req_send.req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; sendreq->req_send.req_base.req_ompi.req_status._count = sendreq->req_send.req_bytes_packed; sendreq->req_send.req_base.req_ompi.req_complete = true; + sendreq->req_state = MCA_PML_OB1_SR_COMPLETE; if(ompi_request_waiting) { ompi_condition_broadcast(&ompi_request_cond); } @@ -96,16 +98,20 @@ static void mca_pml_ob1_send_completion( MCA_PML_OB1_FREE((ompi_request_t**)&sendreq); } else if (sendreq->req_send.req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); + sendreq->req_state = MCA_PML_OB1_SR_COMPLETE; } - sendreq->req_state = MCA_PML_OB1_SR_COMPLETE; } OMPI_THREAD_UNLOCK(&ompi_request_lock); - /* advance based on request state */ - MCA_PML_OB1_SEND_REQUEST_ADVANCE(sendreq); - - /* check for pending requests that need to be progressed */ - while(ompi_list_get_size(&mca_pml_ob1.send_pending) != 0) { + /* advance pending requests */ + while(NULL != sendreq) { + switch(sendreq->req_state) { + case MCA_PML_OB1_SR_SEND: + mca_pml_ob1_send_request_schedule(sendreq); + break; + default: + break; + } OMPI_THREAD_LOCK(&mca_pml_ob1.ob1_lock); sendreq = (mca_pml_ob1_send_request_t*)ompi_list_remove_first(&mca_pml_ob1.send_pending); OMPI_THREAD_UNLOCK(&mca_pml_ob1.ob1_lock); @@ -214,6 +220,7 @@ int mca_pml_ob1_send_request_start_copy( /* if an acknowledgment is not required - can get by w/ shorter hdr */ if (flags == 0) { + int32_t free_after; hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_MATCH; /* pack the data into the supplied buffer */ @@ -226,13 +233,14 @@ int mca_pml_ob1_send_request_start_copy( &iov, &iov_count, &max_data, - NULL)) < 0) { + &free_after)) < 0) { endpoint->bmi_free(endpoint->bmi, descriptor); return rc; } /* update length w/ number of bytes actually packed */ segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t) + max_data; + sendreq->req_offset = max_data; if(sendreq->req_offset == sendreq->req_send.req_bytes_packed) { ompi_request_complete((ompi_request_t*)sendreq); @@ -240,6 +248,7 @@ int mca_pml_ob1_send_request_start_copy( /* rendezvous header is required */ } else { + int32_t free_after; hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_RNDV; hdr->hdr_rndv.hdr_src_req.lval = 0; /* for VALGRIND/PURIFY - REPLACE WITH MACRO */ hdr->hdr_rndv.hdr_src_req.pval = sendreq; @@ -254,7 +263,7 @@ int mca_pml_ob1_send_request_start_copy( &iov, &iov_count, &max_data, - NULL)) < 0) { + &free_after)) < 0) { endpoint->bmi_free(endpoint->bmi, descriptor); return rc; } @@ -265,6 +274,7 @@ int mca_pml_ob1_send_request_start_copy( } } descriptor->des_cbdata = sendreq; + OMPI_THREAD_ADD32(&sendreq->req_pending,1); /* send */ rc = endpoint->bmi_send( @@ -283,7 +293,7 @@ int mca_pml_ob1_send_request_start_copy( * */ -int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq) +int mca_pml_ob1_send_request_schedule(mca_pml_ob1_send_request_t* sendreq) { /* * Only allow one thread in this routine for a given request. @@ -293,6 +303,7 @@ int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq) */ if(OMPI_THREAD_ADD32(&sendreq->req_lock,1) == 1) { mca_pml_ob1_proc_t* proc = sendreq->req_proc; + size_t num_bmi_avail = mca_pml_ob1_ep_array_get_size(&proc->bmi_next); do { /* allocate remaining bytes to PTLs */ size_t bytes_remaining = sendreq->req_send.req_bytes_packed - sendreq->req_offset; @@ -307,7 +318,7 @@ int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq) * size, then go ahead and give the rest of the message to this PTL. */ size_t size; - if(bytes_remaining < ep->bmi_min_seg_size) { + if(bytes_remaining < ep->bmi_min_frag_size) { size = bytes_remaining; /* otherwise attempt to give the PTL a percentage of the message @@ -315,13 +326,15 @@ int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq) * a percentage of the overall message length (regardless of amount * previously assigned) */ - } else { + } else if (num_bmi_avail > 1) { size = (ep->bmi_weight * bytes_remaining) / 100; + } else { + size = ep->bmi_min_frag_size; } /* makes sure that we don't exceed ptl_max_frag_size */ - if(ep->bmi_max_seg_size != 0 && size > ep->bmi_max_seg_size) - size = ep->bmi_max_seg_size; + if (ep->bmi_max_frag_size != 0 && size > ep->bmi_max_frag_size) + size = ep->bmi_max_frag_size - sizeof(mca_pml_ob1_frag_hdr_t); /* pack into a descriptor */ des = ep->bmi_prepare_src( @@ -331,19 +344,24 @@ int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq) sizeof(mca_pml_ob1_frag_hdr_t), &size); if(des == NULL) { + /* queue for retry? */ break; } + des->des_cbfunc = mca_pml_ob1_send_completion; + des->des_cbdata = sendreq; - /* prepare header */ + /* setup header */ hdr = (mca_pml_ob1_frag_hdr_t*)des->des_src->seg_addr.pval; + hdr->hdr_common.hdr_flags = 0; + hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_FRAG; hdr->hdr_frag_length = size; hdr->hdr_frag_offset = sendreq->req_offset; hdr->hdr_src_req.pval = sendreq; - hdr->hdr_dst_req.pval = sendreq->req_peer.pval; + hdr->hdr_dst_req = sendreq->req_recv; /* update state */ sendreq->req_offset += size; - sendreq->req_pending++; + OMPI_THREAD_ADD32(&sendreq->req_pending,1); /* initiate send - note that this may complete before the call returns */ rc = ep->bmi_send(ep->bmi, ep->bmi_endpoint, des, MCA_BMI_TAG_PML); @@ -352,6 +370,7 @@ int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq) } else { sendreq->req_offset -= size; sendreq->req_pending--; + /* queue for retry? */ break; } } @@ -361,39 +380,3 @@ int mca_pml_ob1_send_request_send(mca_pml_ob1_send_request_t* sendreq) } - -/** - * - */ - -int mca_pml_ob1_send_request_acked( - mca_bmi_base_module_t* bmi, - mca_pml_ob1_ack_hdr_t* hdr) -{ - mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t*) - hdr->hdr_src_req.pval; - if(hdr->hdr_common.hdr_flags & MCA_PML_OB1_HDR_FLAGS_PUT) { - sendreq->req_state = MCA_PML_OB1_SR_PUT; - mca_pml_ob1_send_request_put(sendreq); - } else { - sendreq->req_state = MCA_PML_OB1_SR_SEND; - mca_pml_ob1_send_request_send(sendreq); - } - return OMPI_SUCCESS; -} - - -/* - * - */ - -int mca_pml_ob1_send_request_put( - mca_pml_ob1_send_request_t* sendreq) -{ - return OMPI_SUCCESS; -} - - - - - diff --git a/src/mca/pml/ob1/pml_ob1_sendreq.h b/src/mca/pml/ob1/pml_ob1_sendreq.h index acf01ec60a..c33b9c97f8 100644 --- a/src/mca/pml/ob1/pml_ob1_sendreq.h +++ b/src/mca/pml/ob1/pml_ob1_sendreq.h @@ -31,7 +31,7 @@ extern "C" { typedef enum { MCA_PML_OB1_SR_INIT, - MCA_PML_OB1_SR_WAIT, + MCA_PML_OB1_SR_START, MCA_PML_OB1_SR_SEND, MCA_PML_OB1_SR_PUT, MCA_PML_OB1_SR_COMPLETE @@ -43,7 +43,7 @@ struct mca_pml_ob1_send_request_t { mca_pml_ob1_proc_t* req_proc; mca_pml_ob1_endpoint_t* req_endpoint; mca_pml_ob1_send_request_state_t req_state; - ompi_ptr_t req_peer; + ompi_ptr_t req_recv; int32_t req_lock; size_t req_pending; size_t req_offset; @@ -85,6 +85,7 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t); sendmode, \ persistent) \ { \ + sendreq->req_state = MCA_PML_OB1_SR_INIT; \ MCA_PML_BASE_SEND_REQUEST_INIT(&sendreq->req_send, \ buf, \ count, \ @@ -108,6 +109,8 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t); /* select next endpoint */ \ endpoint = mca_pml_ob1_ep_array_get_next(&proc->bmi_first); \ sendreq->req_offset = 0; \ + sendreq->req_pending = 0; \ + sendreq->req_state = MCA_PML_OB1_SR_START; \ sendreq->req_send.req_base.req_ompi.req_complete = false; \ sendreq->req_send.req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \ sendreq->req_send.req_base.req_sequence = OMPI_THREAD_ADD32(&proc->proc_sequence,1); \ @@ -129,24 +132,6 @@ OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t); * Advance a request */ -#define MCA_PML_OB1_SEND_REQUEST_ADVANCE(sendreq) - -#if 0 - switch(sendreq->req_state) { - case MCA_PML_OB1_SR_WAIT: - mca_pml_ob1_send_request_wait(sendreq); - break; - case MCA_PML_OB1_SR_SEND: - mca_pml_ob1_send_request_send(sendreq); - break; - case MCA_PML_OB1_SR_PUT: - mca_pml_ob1_send_request_put(sendreq); - break; - default: - break; - } -#endif - #define MCA_PML_OB1_SEND_REQUEST_RETURN(sendreq) \ { \ @@ -176,26 +161,10 @@ int mca_pml_ob1_send_request_start_copy( mca_pml_ob1_send_request_t* sendreq, mca_pml_ob1_endpoint_t* endpoint); - -/** - * - */ - -int mca_pml_ob1_send_request_acked( - mca_bmi_base_module_t* bmi, - struct mca_pml_ob1_ack_hdr_t* hdr); - - /** * */ -int mca_pml_ob1_send_request_send( - mca_pml_ob1_send_request_t* sendreq); - -/** - * - */ -int mca_pml_ob1_send_request_put( +int mca_pml_ob1_send_request_schedule( mca_pml_ob1_send_request_t* sendreq); #if defined(c_plusplus) || defined(__cplusplus)