diff --git a/src/mca/pml/uniq/.ompi_ignore b/src/mca/pml/uniq/.ompi_ignore new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/mca/pml/uniq/Makefile.am b/src/mca/pml/uniq/Makefile.am new file mode 100644 index 0000000000..6cca71d5f9 --- /dev/null +++ b/src/mca/pml/uniq/Makefile.am @@ -0,0 +1,49 @@ +# +# Copyright (c) 2004-2005 The Trustees of Indiana University. +# All rights reserved. +# Copyright (c) 2004-2005 The Trustees of the University of Tennessee. +# All rights reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# Use the top-level Makefile.options + +include $(top_ompi_srcdir)/config/Makefile.options + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if OMPI_BUILD_pml_uniq_DSO +component_noinst = +component_install = mca_pml_uniq.la +else +component_noinst = libmca_pml_uniq.la +component_install = +endif + +mcacomponentdir = $(libdir)/openmpi +mcacomponent_LTLIBRARIES = $(component_install) +mca_pml_uniq_la_SOURCES = $(pml_uniq_la_sources) +mca_pml_uniq_la_LIBADD = +mca_pml_uniq_la_LDFLAGS = -module -avoid-version + +noinst_LTLIBRARIES = $(component_noinst) +libmca_pml_uniq_la_SOURCES = $(pml_uniq_la_sources) +libmca_pml_uniq_la_LIBADD = +libmca_pml_uniq_la_LDFLAGS = -module -avoid-version + +pml_uniq_la_sources = pml_uniq.c pml_uniq.h pml_uniq_cancel.c pml_uniq_component.c \ + pml_uniq_component.h pml_uniq_iprobe.c pml_uniq_irecv.c pml_uniq_isend.c \ + pml_uniq_ptl.c pml_uniq_ptl.h pml_uniq_proc.c pml_uniq_proc.h pml_uniq_progress.c \ + pml_uniq_recvfrag.c pml_uniq_recvfrag.h pml_uniq_recvreq.c pml_uniq_recvreq.h \ + pml_uniq_sendreq.c pml_uniq_sendreq.h pml_uniq_start.c pml_ptl_array.c pml_ptl_array.h + diff --git a/src/mca/pml/uniq/configure.params b/src/mca/pml/uniq/configure.params new file mode 100644 index 0000000000..0ab31ac070 --- /dev/null +++ b/src/mca/pml/uniq/configure.params @@ -0,0 +1,22 @@ +# -*- shell-script -*- +# +# Copyright (c) 2004-2005 The Trustees of Indiana University. +# All rights reserved. +# Copyright (c) 2004-2005 The Trustees of the University of Tennessee. +# All rights reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# Specific to this module + +PARAM_INIT_FILE=pml_uniq.c +PARAM_CONFIG_HEADER_FILE="uniq_config.h" +PARAM_CONFIG_FILES="Makefile" diff --git a/src/mca/pml/uniq/pml_ptl_array.c b/src/mca/pml/uniq/pml_ptl_array.c new file mode 100644 index 0000000000..94752393b7 --- /dev/null +++ b/src/mca/pml/uniq/pml_ptl_array.c @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include + +#include "mca/pml/pml.h" +#include "pml_ptl_array.h" + + +static void mca_ptl_array_construct(mca_ptl_array_t* array) +{ + array->ptl_procs = 0; + array->ptl_size = 0; + array->ptl_index = 0; + array->ptl_reserve = 0; +} + + +static void mca_ptl_array_destruct(mca_ptl_array_t* array) +{ + if(array->ptl_procs != 0) + free(array->ptl_procs); +} + +OBJ_CLASS_INSTANCE( + mca_pml_uniq_ptl_array_t, + ompi_object_t, + mca_ptl_array_construct, + mca_ptl_array_destruct +); + +int mca_ptl_array_reserve(mca_ptl_array_t* array, size_t size) +{ + mca_ptl_proc_t *procs; + if(array->ptl_reserve >= size) + return OMPI_SUCCESS; + + procs = (mca_ptl_proc_t *)realloc(array->ptl_procs, sizeof(mca_ptl_proc_t)*size); + if(NULL == procs) + return OMPI_ERR_OUT_OF_RESOURCE; + array->ptl_procs = procs; + array->ptl_reserve = size; + memset(array->ptl_procs+array->ptl_size, 0, (size-array->ptl_size)*sizeof(mca_ptl_proc_t)); + return OMPI_SUCCESS; +} + diff --git a/src/mca/pml/uniq/pml_ptl_array.h b/src/mca/pml/uniq/pml_ptl_array.h new file mode 100644 index 0000000000..9b3e99c5f0 --- /dev/null +++ b/src/mca/pml/uniq/pml_ptl_array.h @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ +#ifndef OMPI_PTL_ARRAY_H +#define OMPI_PTL_ARRAY_H + +#include "util/output.h" +#include "mca/ptl/ptl.h" +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif + +extern ompi_class_t mca_pml_uniq_ptl_array_t_class; + +/** + * A data structure associated with a ompi_proc_t that caches + * addressing/scheduling attributes for a specific PTL instance + * that can be used to reach the process. + */ +struct mca_ptl_proc_t { + int ptl_weight; /**< PTL weight for scheduling */ + struct mca_ptl_base_peer_t* ptl_peer; /**< PTL addressing info */ + struct mca_pml_base_ptl_t* ptl_base; /**< PML specific PTL info */ + mca_ptl_base_module_t *ptl; /**< PTL module */ +}; +typedef struct mca_ptl_proc_t mca_ptl_proc_t; + +/** + * A dynamically growable array of mca_ptl_proc_t instances. + * Maintains an index into the array that is used for round-robin + * scheduling across contents. + */ +struct mca_ptl_array_t { + ompi_object_t super; + mca_ptl_proc_t* ptl_procs; /**< array of ptl procs */ + size_t ptl_size; /**< number available */ + size_t ptl_reserve; /**< size of allocated ptl_proc array */ + size_t ptl_index; /**< last used index*/ +}; +typedef struct mca_ptl_array_t mca_ptl_array_t; +typedef struct mca_ptl_array_t mca_pml_uniq_ptl_array_t; + + +/** + * If required, reallocate (grow) the array to the indicate size. + * + * @param array (IN) + * @param size (IN) + */ +int mca_ptl_array_reserve(mca_ptl_array_t*, size_t); + +static inline size_t mca_ptl_array_get_size(mca_ptl_array_t* array) +{ + return array->ptl_size; +} + +/** + * Grow the array if required, and set the size. + * + * @param array (IN) + * @param size (IN) + */ +static inline void mca_ptl_array_set_size(mca_ptl_array_t* array, size_t size) +{ + if(array->ptl_size > array->ptl_reserve) + mca_ptl_array_reserve(array, size); + array->ptl_size = size; +} + +/** + * Grow the array size by one and return the item at that index. + * + * @param array (IN) + */ +static inline mca_ptl_proc_t* mca_ptl_array_insert(mca_ptl_array_t* array) +{ +#if OMPI_ENABLE_DEBUG + if(array->ptl_size >= array->ptl_reserve) { + ompi_output(0, "mca_ptl_array_insert: invalid array index %d >= %d", + array->ptl_size, array->ptl_reserve); + return 0; + } +#endif + return &array->ptl_procs[array->ptl_size++]; +} + +/** + * Return an array item at the specified index. + * + * @param array (IN) + * @param index (IN) + */ +static inline mca_ptl_proc_t* mca_ptl_array_get_index(mca_ptl_array_t* array, size_t index) +{ +#if OMPI_ENABLE_DEBUG + if(index >= array->ptl_size) { + ompi_output(0, "mca_ptl_array_get_index: invalid array index %d >= %d", + index, array->ptl_size); + return 0; + } +#endif + return &array->ptl_procs[index]; +} + +/** + * Return the next LRU index in the array. + * + * @param array (IN) + * @param index (IN) + */ +static inline mca_ptl_proc_t* mca_ptl_array_get_next(mca_ptl_array_t* array) +{ + mca_ptl_proc_t* ptl_proc; +#if OMPI_ENABLE_DEBUG + if(array->ptl_size == 0) { + ompi_output(0, "mca_ptl_array_get_next: invalid array size"); + return 0; + } +#endif + ptl_proc = &array->ptl_procs[array->ptl_index++]; + if(array->ptl_index == array->ptl_size) + array->ptl_index = 0; + return ptl_proc; +} + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif + +#endif + diff --git a/src/mca/pml/uniq/pml_uniq.c b/src/mca/pml/uniq/pml_uniq.c new file mode 100644 index 0000000000..39fc2332bf --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq.c @@ -0,0 +1,431 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include +#include + +#include "class/ompi_bitmap.h" +#include "mca/pml/pml.h" +#include "mca/ptl/ptl.h" +#include "mca/ptl/base/base.h" +#include "mca/ptl/base/ptl_base_comm.h" +#include "mca/ptl/base/ptl_base_header.h" +#include "mca/ptl/base/ptl_base_recvfrag.h" +#include "mca/ptl/base/ptl_base_sendfrag.h" +#include "pml_uniq.h" +#include "pml_uniq_component.h" +#include "pml_uniq_proc.h" +#include "pml_uniq_ptl.h" +#include "pml_uniq_recvreq.h" +#include "pml_uniq_sendreq.h" +#include "pml_uniq_recvfrag.h" + + +mca_pml_uniq_t mca_pml_uniq = { + { + mca_pml_uniq_add_procs, + mca_pml_uniq_del_procs, + mca_pml_uniq_add_ptls, + mca_pml_uniq_control, + mca_pml_uniq_progress, + mca_pml_uniq_add_comm, + mca_pml_uniq_del_comm, + mca_pml_uniq_irecv_init, + mca_pml_uniq_irecv, + mca_pml_uniq_recv, + mca_pml_uniq_isend_init, + mca_pml_uniq_isend, + mca_pml_uniq_send, + mca_pml_uniq_iprobe, + mca_pml_uniq_probe, + mca_pml_uniq_start + } +}; + + +int mca_pml_uniq_add_comm(ompi_communicator_t* comm) +{ + /* allocate pml specific comm data */ + mca_pml_ptl_comm_t* pml_comm = OBJ_NEW(mca_pml_ptl_comm_t); + if (NULL == pml_comm) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + mca_pml_ptl_comm_init_size(pml_comm, comm->c_remote_group->grp_proc_count); + comm->c_pml_comm = pml_comm; + return OMPI_SUCCESS; +} + +int mca_pml_uniq_del_comm(ompi_communicator_t* comm) +{ + OBJ_RELEASE(comm->c_pml_comm); + comm->c_pml_comm = 0; + return OMPI_SUCCESS; +} + +static int ptl_exclusivity_compare(const void* arg1, const void* arg2) +{ + mca_ptl_base_module_t* ptl1 = *(struct mca_ptl_base_module_t**)arg1; + mca_ptl_base_module_t* ptl2 = *(struct mca_ptl_base_module_t**)arg2; + if( ptl1->ptl_exclusivity > ptl2->ptl_exclusivity ) { + return -1; + } else if (ptl1->ptl_exclusivity == ptl2->ptl_exclusivity ) { + return 0; + } else { + return 1; + } +} + + +int mca_pml_uniq_add_ptls(ompi_list_t *ptls) +{ + /* build an array of ptls and ptl modules */ + mca_ptl_base_selected_module_t* selected_ptl; + size_t num_ptls = ompi_list_get_size(ptls); + size_t cache_bytes = 0; + mca_pml_uniq.uniq_num_ptl_modules = 0; + mca_pml_uniq.uniq_num_ptl_progress = 0; + mca_pml_uniq.uniq_num_ptl_components = 0; + mca_pml_uniq.uniq_ptl_modules = (mca_ptl_base_module_t **)malloc(sizeof(mca_ptl_base_module_t*) * num_ptls); + mca_pml_uniq.uniq_ptl_progress = (mca_ptl_base_component_progress_fn_t*)malloc(sizeof(mca_ptl_base_component_progress_fn_t) * num_ptls); + mca_pml_uniq.uniq_ptl_components = (mca_ptl_base_component_t **)malloc(sizeof(mca_ptl_base_component_t*) * num_ptls); + if (NULL == mca_pml_uniq.uniq_ptl_modules || + NULL == mca_pml_uniq.uniq_ptl_progress || + NULL == mca_pml_uniq.uniq_ptl_components) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + for(selected_ptl = (mca_ptl_base_selected_module_t*)ompi_list_get_first(ptls); + selected_ptl != (mca_ptl_base_selected_module_t*)ompi_list_get_end(ptls); + selected_ptl = (mca_ptl_base_selected_module_t*)ompi_list_get_next(selected_ptl)) { + mca_ptl_base_module_t *ptl = selected_ptl->pbsm_module; + size_t i; + + mca_pml_uniq.uniq_ptl_modules[mca_pml_uniq.uniq_num_ptl_modules++] = ptl; + for(i=0; iptl_component) { + break; + } + } + if(i == mca_pml_uniq.uniq_num_ptl_components) { + mca_pml_uniq.uniq_ptl_components[mca_pml_uniq.uniq_num_ptl_components++] = ptl->ptl_component; + } + + /* + *setup ptl + */ + + /* set pointer to fragment matching logic routine, if this + * not already set by the ptl */ + if( NULL == ptl->ptl_match) + ptl->ptl_match = mca_pml_uniq_recv_frag_match; + ptl->ptl_send_progress = mca_pml_uniq_send_request_progress; + ptl->ptl_recv_progress = mca_pml_uniq_recv_request_progress; + ptl->ptl_stack = ptl; + ptl->ptl_base = NULL; + + /* find maximum required size for cache */ + if(ptl->ptl_cache_bytes > cache_bytes) { + cache_bytes = ptl->ptl_cache_bytes; + } + } + + /* setup send fragments based on largest required send request */ + ompi_free_list_init( + &mca_pml_uniq.uniq_send_requests, + sizeof(mca_pml_uniq_send_request_t) + cache_bytes, + OBJ_CLASS(mca_pml_uniq_send_request_t), + mca_pml_uniq.uniq_free_list_num, + mca_pml_uniq.uniq_free_list_max, + mca_pml_uniq.uniq_free_list_inc, + NULL); + + /* sort ptl list by exclusivity */ + qsort(mca_pml_uniq.uniq_ptl_modules, mca_pml_uniq.uniq_num_ptl_modules, sizeof(struct mca_ptl_t*), ptl_exclusivity_compare); + return OMPI_SUCCESS; +} + +/* + * Pass control information through to all PTL modules. + */ + +int mca_pml_uniq_control(int param, void* value, size_t size) +{ + size_t i; + for( i = 0; i < mca_pml_uniq.uniq_num_ptl_components; i++ ) { + if(NULL != mca_pml_uniq.uniq_ptl_components[i]->ptlm_control) { + int rc = mca_pml_uniq.uniq_ptl_components[i]->ptlm_control(param,value,size); + if(rc != OMPI_SUCCESS) + return rc; + } + } + return OMPI_SUCCESS; +} + +/* + * For each proc setup a datastructure that indicates the PTLs + * that can be used to reach the destination. + * + */ + +int mca_pml_uniq_add_procs(ompi_proc_t** procs, size_t nprocs) +{ + size_t p; + ompi_bitmap_t reachable; + struct mca_ptl_base_peer_t** ptl_peers = NULL; + int rc; + size_t p_index; + + if( nprocs == 0 ) + return OMPI_SUCCESS; + + OBJ_CONSTRUCT( &reachable, ompi_bitmap_t ); + rc = ompi_bitmap_init( &reachable, nprocs ); + if( OMPI_SUCCESS != rc ) + return rc; + + /* iterate through each of the procs and set the peers architecture */ + for( p = 0; p < nprocs; p++ ) { + uint32_t* proc_arch; + size_t size = sizeof(uint32_t); + rc = mca_base_modex_recv(&mca_pml_uniq_component.pmlm_version, procs[p], + (void**)&proc_arch, &size); + if(rc != OMPI_SUCCESS) + return rc; + if(size != sizeof(uint32_t)) + return OMPI_ERROR; + procs[p]->proc_arch = ntohl(*proc_arch); + free(proc_arch); + } + + /* attempt to add all procs to each ptl */ + ptl_peers = (struct mca_ptl_base_peer_t **)malloc(nprocs * sizeof(struct mca_ptl_base_peer_t*)); + for( p_index = 0; p_index < mca_pml_uniq.uniq_num_ptl_modules; p_index++ ) { + mca_ptl_base_module_t* ptl = mca_pml_uniq.uniq_ptl_modules[p_index]; + int ptl_inuse = 0; + + /* if the ptl can reach the destination proc it sets the + * corresponding bit (proc index) in the reachable bitmap + * and can return addressing information for each proc + * that is passed back to the ptl on data transfer calls + */ + ompi_bitmap_clear_all_bits(&reachable); + memset(ptl_peers, 0, nprocs * sizeof(struct mca_ptl_base_peer_t*)); + rc = ptl->ptl_add_procs(ptl, nprocs, procs, ptl_peers, &reachable); + if(OMPI_SUCCESS != rc) { + free(ptl_peers); + return rc; + } + + /* for each proc that is reachable - add the ptl to the procs array(s) */ + for( p = 0; p < nprocs; p++) { + ompi_proc_t *proc; + mca_pml_proc_t* proc_pml; + mca_ptl_proc_t* proc_ptl; + size_t size; + + if( !ompi_bitmap_is_set_bit(&reachable, p) ) continue; + + proc = procs[p]; + proc_pml = proc->proc_pml; + + /* this ptl can be used */ + ptl_inuse++; + + /* initialize each proc */ + if(NULL == proc_pml) { + + /* allocate pml specific proc data */ + proc_pml = OBJ_NEW(mca_pml_uniq_proc_t); + if (NULL == proc_pml) { + ompi_output(0, "mca_pml_uniq_add_procs: unable to allocate resources"); + free(ptl_peers); + return OMPI_ERR_OUT_OF_RESOURCE; + } +#ifdef TOTO + /* preallocate space in array for max number of ptls */ + mca_ptl_array_reserve(&proc_pml->proc_ptl_first, mca_pml_uniq.uniq_num_ptl_modules); + mca_ptl_array_reserve(&proc_pml->proc_ptl_next, mca_pml_uniq.uniq_num_ptl_modules); +#endif /* TOTO */ + proc_pml->proc_ompi = proc; + proc->proc_pml = proc_pml; + /* it's the first PTL so add it to both first and next */ + proc_pml->proc_ptl_flags |= ptl->ptl_flags; + proc_pml->proc_ptl_first = proc_ptl; + proc_pml->proc_ptl_next = proc_ptl; + } else { + /* choose the best for first and next. For the first look at the latency when + * for the next at the maximum bandwidth. + */ + } + /* dont allow an additional PTL with a lower exclusivity ranking */ + size = mca_ptl_array_get_size(&proc_pml->proc_ptl_next); + if( size > 0 ) { + proc_ptl = mca_ptl_array_get_index(&proc_pml->proc_ptl_next, size-1); + /* skip this ptl if the exclusivity is less than the previous */ + if(proc_ptl->ptl->ptl_exclusivity > ptl->ptl_exclusivity) { + if(ptl_peers[p] != NULL) { + ptl->ptl_del_procs(ptl, 1, &proc, &ptl_peers[p]); + } + continue; + } + } + + /* cache the ptl on the proc */ + proc_ptl = mca_ptl_array_insert(&proc_pml->proc_ptl_next); + proc_ptl->ptl = ptl; + proc_ptl->ptl_peer = ptl_peers[p]; + proc_ptl->ptl_weight = 0; + proc_pml->proc_ptl_flags |= ptl->ptl_flags; + } + + if(ptl_inuse > 0 && NULL != ptl->ptl_component->ptlm_progress) { + size_t p; + bool found = false; + for( p = 0; p < mca_pml_uniq.uniq_num_ptl_progress; p++ ) { + if(mca_pml_uniq.uniq_ptl_progress[p] == ptl->ptl_component->ptlm_progress) { + found = true; + break; + } + } + if(found == false) { + mca_pml_uniq.uniq_ptl_progress[mca_pml_uniq.uniq_num_ptl_progress] = + ptl->ptl_component->ptlm_progress; + mca_pml_uniq.uniq_num_ptl_progress++; + } + } + } + free(ptl_peers); + + /* iterate back through procs and compute metrics for registered ptls */ + for( p = 0; p < nprocs; p++ ) { + ompi_proc_t *proc = procs[p]; + mca_pml_proc_t* proc_pml = proc->proc_pml; + double total_bandwidth = 0; + uint32_t latency = 0; + size_t n_index; + size_t n_size; + + /* skip over procs w/ no ptls registered */ + if(NULL == proc_pml) + continue; + + /* (1) determine the total bandwidth available across all ptls + * note that we need to do this here, as we may already have ptls configured + * (2) determine the highest priority ranking for latency + */ + n_size = mca_ptl_array_get_size(&proc_pml->proc_ptl_next); + for(n_index = 0; n_index < n_size; n_index++) { + struct mca_ptl_proc_t* proc_ptl = mca_ptl_array_get_index(&proc_pml->proc_ptl_next, n_index); + struct mca_ptl_base_module_t* ptl = proc_ptl->ptl; + total_bandwidth += proc_ptl->ptl->ptl_bandwidth; + if(ptl->ptl_latency > latency) + latency = ptl->ptl_latency; + } + + /* (1) set the weight of each ptl as a percentage of overall bandwidth + * (2) copy all ptl instances at the highest priority ranking into the + * list of ptls used for first fragments + */ + + for(n_index = 0; n_index < n_size; n_index++) { + struct mca_ptl_proc_t* proc_ptl = mca_ptl_array_get_index(&proc_pml->proc_ptl_next, n_index); + struct mca_ptl_base_module_t *ptl = proc_ptl->ptl; + double weight; + + /* compute weighting factor for this ptl */ + if(ptl->ptl_bandwidth) + weight = proc_ptl->ptl->ptl_bandwidth / total_bandwidth; + else + weight = 1.0 / n_size; + proc_ptl->ptl_weight = (int)(weight * 100); + + /* + * save/create ptl extension for use by pml + */ + proc_ptl->ptl_base = ptl->ptl_base; + if (NULL == proc_ptl->ptl_base && + ptl->ptl_cache_bytes > 0 && + NULL != ptl->ptl_request_init && + NULL != ptl->ptl_request_fini) { + + mca_pml_base_ptl_t* ptl_base = OBJ_NEW(mca_pml_base_ptl_t); + ptl_base->ptl = ptl; + ptl_base->ptl_cache_size = ptl->ptl_cache_size; + proc_ptl->ptl_base = ptl->ptl_base = ptl_base; + } + + /* check to see if this ptl is already in the array of ptls used for first + * fragments - if not add it. + */ + if(ptl->ptl_latency == latency) { + struct mca_ptl_proc_t* proc_new = mca_ptl_array_insert(&proc_pml->proc_ptl_first); + *proc_new = *proc_ptl; + } + + } + } + return OMPI_SUCCESS; +} + +/* + * iterate through each proc and notify any PTLs associated + * with the proc that it is/has gone away + */ + +int mca_pml_uniq_del_procs(ompi_proc_t** procs, size_t nprocs) +{ + size_t p; + int rc; + for(p = 0; p < nprocs; p++) { + ompi_proc_t *proc = procs[p]; + mca_pml_proc_t* proc_pml = proc->proc_pml; + mca_ptl_proc_t* ptl_proc; + mca_ptl_base_module_t* ptl; + + /* If the PTL used for the first fragment and the one use for the others is not + * the same then we have to remove the processor from both of them. + */ + + ptl_proc = proc_pml->proc_ptl_first; + ptl = ptl_proc->ptl; + rc = ptl->ptl_del_procs( ptl, 1, &proc, &ptl_proc->ptl_peer ); + if( OMPI_SUCCESS != rc ) { + return rc; + } + if( proc_pml->proc_ptl_first != proc_pml->proc_ptl_next ) { + ptl_proc = proc_pml->proc_ptl_next; + ptl = ptl_proc->ptl; + rc = ptl->ptl_del_procs( ptl, 1, &proc, &ptl_proc->ptl_peer ); + if( OMPI_SUCCESS != rc ) { + return rc; + } + } + + /* do any required cleanup */ + OBJ_RELEASE(proc_pml); + proc->proc_pml = NULL; + } + return OMPI_SUCCESS; +} + +int mca_pml_uniq_component_fini(void) +{ + /* FIX */ + return OMPI_SUCCESS; +} + diff --git a/src/mca/pml/uniq/pml_uniq.h b/src/mca/pml/uniq/pml_uniq.h new file mode 100644 index 0000000000..0e2bfea627 --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq.h @@ -0,0 +1,262 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ + +#ifndef MCA_PML_TEG_H +#define MCA_PML_TEG_H + +#include "threads/thread.h" +#include "threads/condition.h" +#include "class/ompi_free_list.h" +#include "util/cmd_line.h" +#include "request/request.h" +#include "mca/pml/pml.h" +#include "mca/pml/base/pml_base_request.h" +#include "mca/pml/base/pml_base_bsend.h" +#include "mca/pml/base/pml_base_sendreq.h" +#include "mca/ptl/ptl.h" + + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif +/** + * TEG PML module + */ + +struct mca_pml_uniq_t { + mca_pml_base_module_t super; + + mca_ptl_base_component_t **uniq_ptl_components; + size_t uniq_num_ptl_components; + + mca_ptl_base_module_t** uniq_ptl_modules; + size_t uniq_num_ptl_modules; + + mca_ptl_base_component_progress_fn_t* uniq_ptl_progress; + size_t uniq_num_ptl_progress; + + ompi_list_t uniq_procs; + ompi_mutex_t uniq_lock; + + int uniq_free_list_num; /* initial size of free list */ + int uniq_free_list_max; /* maximum size of free list */ + int uniq_free_list_inc; /* number of elements to grow free list */ + int uniq_poll_iterations; /* number of iterations to poll for completion */ + + /* free list of requests */ + ompi_free_list_t uniq_send_requests; + ompi_free_list_t uniq_recv_requests; + + /* list of pending send requests */ + ompi_list_t uniq_send_pending; +}; +typedef struct mca_pml_uniq_t mca_pml_uniq_t; + +extern mca_pml_uniq_t mca_pml_uniq; + + +/* + * PML module functions. + */ + + +extern int mca_pml_uniq_component_open(void); +extern int mca_pml_uniq_component_close(void); + +extern mca_pml_base_module_t* mca_pml_uniq_component_init( + int *priority, + bool enable_progress_threads, + bool enable_mpi_threads +); + +extern int mca_pml_uniq_component_fini(void); + + + +/* + * PML interface functions. + */ + +extern int mca_pml_uniq_add_comm( + struct ompi_communicator_t* comm +); + +extern int mca_pml_uniq_del_comm( + struct ompi_communicator_t* comm +); + +extern int mca_pml_uniq_add_procs( + struct ompi_proc_t **procs, + size_t nprocs +); + +extern int mca_pml_uniq_del_procs( + struct ompi_proc_t **procs, + size_t nprocs +); + +extern int mca_pml_uniq_add_ptls( + ompi_list_t *ptls +); + +extern int mca_pml_uniq_control( + int param, + void *size, + size_t value +); + +extern int mca_pml_uniq_progress(void); + +extern int mca_pml_uniq_iprobe( + int dst, + int tag, + struct ompi_communicator_t* comm, + int *matched, + ompi_status_public_t* status +); + +extern int mca_pml_uniq_probe( + int dst, + int tag, + struct ompi_communicator_t* comm, + ompi_status_public_t* status +); + +extern int mca_pml_uniq_cancelled( + ompi_request_t* request, + int *flag +); + + +extern int mca_pml_uniq_isend_init( + void *buf, + size_t count, + ompi_datatype_t *datatype, + int dst, + int tag, + mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm, + struct ompi_request_t **request +); + +extern int mca_pml_uniq_isend( + void *buf, + size_t count, + ompi_datatype_t *datatype, + int dst, + int tag, + mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm, + struct ompi_request_t **request +); + +extern int mca_pml_uniq_send( + void *buf, + size_t count, + ompi_datatype_t *datatype, + int dst, + int tag, + mca_pml_base_send_mode_t mode, + struct ompi_communicator_t* comm +); + +extern int mca_pml_uniq_irecv_init( + void *buf, + size_t count, + ompi_datatype_t *datatype, + int src, + int tag, + struct ompi_communicator_t* comm, + struct ompi_request_t **request +); + +extern int mca_pml_uniq_irecv( + void *buf, + size_t count, + ompi_datatype_t *datatype, + int src, + int tag, + struct ompi_communicator_t* comm, + struct ompi_request_t **request +); + +extern int mca_pml_uniq_recv( + void *buf, + size_t count, + ompi_datatype_t *datatype, + int src, + int tag, + struct ompi_communicator_t* comm, + ompi_status_public_t* status +); + +extern int mca_pml_uniq_progress(void); + +extern int mca_pml_uniq_start( + size_t count, + ompi_request_t** requests +); + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif + +#define MCA_PML_TEG_FREE(request) \ +{ \ + mca_pml_base_request_t* pml_request = *(mca_pml_base_request_t**)(request); \ + pml_request->req_free_called = true; \ + if( pml_request->req_pml_complete == true) \ + { \ + OMPI_REQUEST_FINI(*(request)); \ + switch(pml_request->req_type) { \ + case MCA_PML_REQUEST_SEND: \ + { \ + mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)pml_request; \ + while(sendreq->req_lock > 0); \ + if(sendreq->req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \ + mca_pml_base_bsend_request_fini((ompi_request_t*)sendreq); \ + } \ + MCA_PML_TEG_SEND_REQUEST_RETURN(sendreq); \ + break; \ + } \ + case MCA_PML_REQUEST_RECV: \ + { \ + mca_pml_base_recv_request_t* recvreq = (mca_pml_base_recv_request_t*)pml_request; \ + MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq); \ + break; \ + } \ + default: \ + break; \ + } \ + } \ + *(request) = MPI_REQUEST_NULL; \ +} + +#define MCA_PML_TEG_FINI(request) \ +{ \ + mca_pml_base_request_t* pml_request = *(mca_pml_base_request_t**)(request); \ + if( (pml_request->req_persistent) && !(pml_request->req_free_called) ) { \ + pml_request->req_ompi.req_state = OMPI_REQUEST_INACTIVE; \ + } else { \ + MCA_PML_TEG_FREE(request); \ + } \ +} + +#endif + diff --git a/src/mca/pml/uniq/pml_uniq_cancel.c b/src/mca/pml/uniq/pml_uniq_cancel.c new file mode 100644 index 0000000000..7677f3ddd8 --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_cancel.c @@ -0,0 +1,28 @@ +/* -*- Mode: C; c-basic-offset:4 ; -*- */ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "pml_uniq.h" + +int mca_pml_uniq_cancelled(ompi_request_t* request, int* flag) +{ + if(NULL != flag) + *flag = (true == request->req_status._cancelled ? 1 : 0); + return OMPI_SUCCESS; +} + diff --git a/src/mca/pml/uniq/pml_uniq_component.c b/src/mca/pml/uniq/pml_uniq_component.c new file mode 100644 index 0000000000..537eec67c8 --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_component.c @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "event/event.h" +#include "mpi.h" +#include "mca/pml/pml.h" +#include "mca/ptl/ptl.h" +#include "mca/base/mca_base_param.h" +#include "mca/pml/base/pml_base_bsend.h" +#include "pml_uniq.h" +#include "pml_uniq_proc.h" +#include "pml_uniq_sendreq.h" +#include "pml_uniq_recvreq.h" + + +mca_pml_base_component_1_0_0_t mca_pml_uniq_component = { + + /* First, the mca_base_component_t struct containing meta + information about the component itself */ + + { + /* Indicate that we are a pml v1.0.0 component (which also implies + a specific MCA version) */ + + MCA_PML_BASE_VERSION_1_0_0, + + "uniq", /* MCA component name */ + 1, /* MCA component major version */ + 0, /* MCA component minor version */ + 0, /* MCA component release version */ + mca_pml_uniq_component_open, /* component open */ + mca_pml_uniq_component_close /* component close */ + }, + + /* Next the MCA v1.0.0 component meta data */ + + { + /* Whether the component is checkpointable or not */ + false + }, + + mca_pml_uniq_component_init, /* component init */ + mca_pml_uniq_component_fini /* component finalize */ +}; + + + +static inline int mca_pml_uniq_param_register_int( + const char* param_name, + int default_value) +{ + int id = mca_base_param_register_int("pml","uniq",param_name,NULL,default_value); + int param_value = default_value; + mca_base_param_lookup_int(id,¶m_value); + return param_value; +} + + +int mca_pml_uniq_component_open(void) +{ +#ifdef WIN32 + WSADATA win_sock_data; + if (WSAStartup(MAKEWORD(2,2), &win_sock_data) != 0) { + ompi_output (0, "mca_oob_tcp_component_init: failed to initialise windows sockets: %d\n", WSAGetLastError()); + return OMPI_ERROR; + } +#endif + OBJ_CONSTRUCT(&mca_pml_uniq.uniq_lock, ompi_mutex_t); + OBJ_CONSTRUCT(&mca_pml_uniq.uniq_send_requests, ompi_free_list_t); + OBJ_CONSTRUCT(&mca_pml_uniq.uniq_recv_requests, ompi_free_list_t); + OBJ_CONSTRUCT(&mca_pml_uniq.uniq_procs, ompi_list_t); + OBJ_CONSTRUCT(&mca_pml_uniq.uniq_send_pending, ompi_list_t); + + mca_pml_uniq.uniq_ptl_components = NULL; + mca_pml_uniq.uniq_num_ptl_components = 0; + mca_pml_uniq.uniq_ptl_modules = NULL; + mca_pml_uniq.uniq_num_ptl_modules = 0; + mca_pml_uniq.uniq_ptl_progress = NULL; + mca_pml_uniq.uniq_num_ptl_progress = 0; + + mca_pml_uniq.uniq_free_list_num = + mca_pml_uniq_param_register_int("free_list_num", 256); + mca_pml_uniq.uniq_free_list_max = + mca_pml_uniq_param_register_int("free_list_max", -1); + mca_pml_uniq.uniq_free_list_inc = + mca_pml_uniq_param_register_int("free_list_inc", 256); + mca_pml_uniq.uniq_poll_iterations = + mca_pml_uniq_param_register_int("poll_iterations", 100000); + + return OMPI_SUCCESS; +} + + +int mca_pml_uniq_component_close(void) +{ +#ifdef WIN32 + WSACleanup(); +#endif + +#if OMPI_ENABLE_DEBUG + if (mca_pml_uniq.uniq_recv_requests.fl_num_allocated != + mca_pml_uniq.uniq_recv_requests.super.ompi_list_length) { + ompi_output(0, "uniq recv requests: %d allocated %d returned\n", + mca_pml_uniq.uniq_recv_requests.fl_num_allocated, + mca_pml_uniq.uniq_recv_requests.super.ompi_list_length); + } +#endif + + if(NULL != mca_pml_uniq.uniq_ptl_components) { + free(mca_pml_uniq.uniq_ptl_components); + } + if(NULL != mca_pml_uniq.uniq_ptl_modules) { + free(mca_pml_uniq.uniq_ptl_modules); + } + if(NULL != mca_pml_uniq.uniq_ptl_progress) { + free(mca_pml_uniq.uniq_ptl_progress); + } + OBJ_DESTRUCT(&mca_pml_uniq.uniq_send_pending); + OBJ_DESTRUCT(&mca_pml_uniq.uniq_send_requests); + OBJ_DESTRUCT(&mca_pml_uniq.uniq_recv_requests); + OBJ_DESTRUCT(&mca_pml_uniq.uniq_procs); + OBJ_DESTRUCT(&mca_pml_uniq.uniq_lock); + return OMPI_SUCCESS; +} + + +mca_pml_base_module_t* mca_pml_uniq_component_init(int* priority, + bool enable_progress_threads, + bool enable_mpi_threads) +{ + uint32_t proc_arch; + int rc; + *priority = 0; + + /* recv requests */ + ompi_free_list_init( + &mca_pml_uniq.uniq_recv_requests, + sizeof(mca_pml_uniq_recv_request_t), + OBJ_CLASS(mca_pml_uniq_recv_request_t), + mca_pml_uniq.uniq_free_list_num, + mca_pml_uniq.uniq_free_list_max, + mca_pml_uniq.uniq_free_list_inc, + NULL); + + /* buffered send */ + if(OMPI_SUCCESS != mca_pml_base_bsend_init(enable_mpi_threads)) { + ompi_output(0, "mca_pml_uniq_component_init: mca_pml_bsend_init failed\n"); + return NULL; + } + + /* post this processes datatype */ + proc_arch = ompi_proc_local()->proc_arch; + proc_arch = htonl(proc_arch); + rc = mca_base_modex_send(&mca_pml_uniq_component.pmlm_version, &proc_arch, sizeof(proc_arch)); + if(rc != OMPI_SUCCESS) + return NULL; + + return &mca_pml_uniq.super; +} + diff --git a/src/mca/pml/uniq/pml_uniq_component.h b/src/mca/pml/uniq/pml_uniq_component.h new file mode 100644 index 0000000000..5635c1aef1 --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_component.h @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ + +#ifndef MCA_PML_TEG_COMPONENT_H +#define MCA_PML_TEG_COMPONENT_H + +/* + * PML module functions. + */ + +OMPI_COMP_EXPORT extern mca_pml_base_component_1_0_0_t mca_pml_uniq_component; + +#endif diff --git a/src/mca/pml/uniq/pml_uniq_iprobe.c b/src/mca/pml/uniq/pml_uniq_iprobe.c new file mode 100644 index 0000000000..f10e8a593c --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_iprobe.c @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "request/request.h" +#include "pml_uniq_recvreq.h" + + +int mca_pml_uniq_iprobe(int src, + int tag, + struct ompi_communicator_t *comm, + int *matched, ompi_status_public_t * status) +{ + int rc; + mca_pml_base_recv_request_t recvreq; + + OBJ_CONSTRUCT( &(recvreq), mca_pml_base_recv_request_t ); + recvreq.req_base.req_ompi.req_type = OMPI_REQUEST_PML; + recvreq.req_base.req_type = MCA_PML_REQUEST_IPROBE; + MCA_PML_BASE_RECV_REQUEST_INIT(&recvreq, NULL, 0, &ompi_mpi_char, src, tag, comm, true); + + *matched = 0; + if ((rc = mca_pml_uniq_recv_request_start(&recvreq)) == OMPI_SUCCESS) { + if( recvreq.req_base.req_ompi.req_complete == true ) { + if( NULL != status ) { + *status = recvreq.req_base.req_ompi.req_status; + } + *matched = 1; + } else { + /* we are supposed to progress ... */ + ompi_progress(); + } + } + MCA_PML_BASE_RECV_REQUEST_RETURN( &recvreq ); + return rc; +} + + +int mca_pml_uniq_probe(int src, + int tag, + struct ompi_communicator_t *comm, + ompi_status_public_t * status) +{ + int rc; + mca_pml_base_recv_request_t recvreq; + + OBJ_CONSTRUCT( &(recvreq), mca_pml_base_recv_request_t ); + recvreq.req_base.req_ompi.req_type = OMPI_REQUEST_PML; + recvreq.req_base.req_type = MCA_PML_REQUEST_PROBE; + MCA_PML_BASE_RECV_REQUEST_INIT(&recvreq, NULL, 0, &ompi_mpi_char, src, tag, comm, true); + + if ((rc = mca_pml_uniq_recv_request_start(&recvreq)) != OMPI_SUCCESS) { + MCA_PML_BASE_RECV_REQUEST_RETURN( &recvreq ); + return rc; + } + + if (recvreq.req_base.req_ompi.req_complete == false) { + /* give up and sleep until completion */ + if (ompi_using_threads()) { + ompi_mutex_lock(&ompi_request_lock); + ompi_request_waiting++; + while (recvreq.req_base.req_ompi.req_complete == false) + ompi_condition_wait(&ompi_request_cond, &ompi_request_lock); + ompi_request_waiting--; + ompi_mutex_unlock(&ompi_request_lock); + } else { + ompi_request_waiting++; + while (recvreq.req_base.req_ompi.req_complete == false) + ompi_condition_wait(&ompi_request_cond, &ompi_request_lock); + ompi_request_waiting--; + } + } + + if (NULL != status) { + *status = recvreq.req_base.req_ompi.req_status; + } + MCA_PML_BASE_RECV_REQUEST_RETURN( &recvreq ); + return OMPI_SUCCESS; +} + diff --git a/src/mca/pml/uniq/pml_uniq_irecv.c b/src/mca/pml/uniq/pml_uniq_irecv.c new file mode 100644 index 0000000000..df674edc1b --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_irecv.c @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include "request/request.h" +#include "pml_uniq_recvreq.h" + + +int mca_pml_uniq_irecv_init(void *addr, + size_t count, + ompi_datatype_t * datatype, + int src, + int tag, + struct ompi_communicator_t *comm, + struct ompi_request_t **request) +{ + int rc; + mca_pml_base_recv_request_t *recvreq; + MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc); + if (NULL == recvreq) + return rc; + + MCA_PML_BASE_RECV_REQUEST_INIT(recvreq, + addr, + count, datatype, src, tag, comm, true); + + *request = (ompi_request_t *) recvreq; + return OMPI_SUCCESS; +} + +int mca_pml_uniq_irecv(void *addr, + size_t count, + ompi_datatype_t * datatype, + int src, + int tag, + struct ompi_communicator_t *comm, + struct ompi_request_t **request) +{ + int rc; + + mca_pml_base_recv_request_t *recvreq; + MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc); + if (NULL == recvreq) + return rc; + + MCA_PML_BASE_RECV_REQUEST_INIT(recvreq, + addr, + count, datatype, src, tag, comm, false); + + if ((rc = mca_pml_uniq_recv_request_start(recvreq)) != OMPI_SUCCESS) { + MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq); + return rc; + } + *request = (ompi_request_t *) recvreq; + return OMPI_SUCCESS; +} + + +int mca_pml_uniq_recv(void *addr, + size_t count, + ompi_datatype_t * datatype, + int src, + int tag, + struct ompi_communicator_t *comm, + ompi_status_public_t * status) +{ + int rc; + mca_pml_base_recv_request_t *recvreq; + MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc); + if (NULL == recvreq) + return rc; + + MCA_PML_BASE_RECV_REQUEST_INIT(recvreq, + addr, + count, datatype, src, tag, comm, false); + + if ((rc = mca_pml_uniq_recv_request_start(recvreq)) != OMPI_SUCCESS) { + goto recv_finish; + } + + if (recvreq->req_base.req_ompi.req_complete == false) { + /* give up and sleep until completion */ + if (ompi_using_threads()) { + ompi_mutex_lock(&ompi_request_lock); + ompi_request_waiting++; + while (recvreq->req_base.req_ompi.req_complete == false) + ompi_condition_wait(&ompi_request_cond, &ompi_request_lock); + ompi_request_waiting--; + ompi_mutex_unlock(&ompi_request_lock); + } else { + ompi_request_waiting++; + while (recvreq->req_base.req_ompi.req_complete == false) + ompi_condition_wait(&ompi_request_cond, &ompi_request_lock); + ompi_request_waiting--; + } + } + recv_finish: + if (NULL != status) { /* return status */ + *status = recvreq->req_base.req_ompi.req_status; + } + + MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq); + return recvreq->req_base.req_ompi.req_status.MPI_ERROR; +} diff --git a/src/mca/pml/uniq/pml_uniq_isend.c b/src/mca/pml/uniq/pml_uniq_isend.c new file mode 100644 index 0000000000..1ce8d79d34 --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_isend.c @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "pml_uniq.h" +#include "pml_uniq_proc.h" +#include "pml_uniq_sendreq.h" +#include "pml_uniq_recvreq.h" + + +int mca_pml_uniq_isend_init(void *buf, + size_t count, + ompi_datatype_t * datatype, + int dst, + int tag, + mca_pml_base_send_mode_t sendmode, + ompi_communicator_t * comm, + ompi_request_t ** request) +{ + int rc; + + mca_pml_base_send_request_t *sendreq; + MCA_PML_TEG_SEND_REQUEST_ALLOC(comm, dst, sendreq, rc); + if (rc != OMPI_SUCCESS) + return rc; + + MCA_PML_BASE_SEND_REQUEST_INIT(sendreq, + buf, + count, + datatype, + dst, tag, + comm, sendmode, true); + + *request = (ompi_request_t *) sendreq; + return OMPI_SUCCESS; +} + + +int mca_pml_uniq_isend(void *buf, + size_t count, + ompi_datatype_t * datatype, + int dst, + int tag, + mca_pml_base_send_mode_t sendmode, + ompi_communicator_t * comm, + ompi_request_t ** request) +{ + int rc; + mca_pml_base_send_request_t *sendreq; + MCA_PML_TEG_SEND_REQUEST_ALLOC(comm, dst, sendreq, rc); + if (rc != OMPI_SUCCESS) + return rc; + MCA_PML_BASE_SEND_REQUEST_INIT(sendreq, + buf, + count, + datatype, + dst, tag, + comm, sendmode, false); + + MCA_PML_TEG_SEND_REQUEST_START(sendreq, rc); + *request = (ompi_request_t *) sendreq; + return rc; +} + + +int mca_pml_uniq_send(void *buf, + size_t count, + ompi_datatype_t * datatype, + int dst, + int tag, + mca_pml_base_send_mode_t sendmode, + ompi_communicator_t * comm) +{ + int rc; + mca_pml_base_send_request_t *sendreq; + MCA_PML_TEG_SEND_REQUEST_ALLOC(comm, dst, sendreq, rc); + if (rc != OMPI_SUCCESS) + return rc; + + MCA_PML_BASE_SEND_REQUEST_INIT(sendreq, + buf, + count, + datatype, + dst, tag, + comm, sendmode, false); + + MCA_PML_TEG_SEND_REQUEST_START(sendreq, rc); + if (rc != OMPI_SUCCESS) { + MCA_PML_TEG_FREE((ompi_request_t **) & sendreq); + return rc; + } + + if (sendreq->req_base.req_ompi.req_complete == false) { + /* give up and sleep until completion */ + if (ompi_using_threads()) { + ompi_mutex_lock(&ompi_request_lock); + ompi_request_waiting++; + while (sendreq->req_base.req_ompi.req_complete == false) + ompi_condition_wait(&ompi_request_cond, &ompi_request_lock); + ompi_request_waiting--; + ompi_mutex_unlock(&ompi_request_lock); + } else { + ompi_request_waiting++; + while (sendreq->req_base.req_ompi.req_complete == false) + ompi_condition_wait(&ompi_request_cond, &ompi_request_lock); + ompi_request_waiting--; + } + } + + /* return request to pool */ + MCA_PML_TEG_FREE((ompi_request_t **) & sendreq); + return OMPI_SUCCESS; +} + diff --git a/src/mca/pml/uniq/pml_uniq_proc.c b/src/mca/pml/uniq/pml_uniq_proc.c new file mode 100644 index 0000000000..8d1a6c8a84 --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_proc.c @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "include/sys/atomic.h" +#include "pml_uniq.h" +#include "pml_uniq_proc.h" +#include "pml_ptl_array.h" + + +static void mca_pml_uniq_proc_construct(mca_pml_proc_t* proc) +{ + proc->proc_ompi = NULL; + proc->proc_ptl_flags = 0; + OBJ_CONSTRUCT(&proc->proc_lock, ompi_mutex_t); +#ifdef TOTO + OBJ_CONSTRUCT(&proc->proc_ptl_first, mca_pml_uniq_ptl_array_t); + OBJ_CONSTRUCT(&proc->proc_ptl_next, mca_pml_uniq_ptl_array_t); +#endif /* TOTO */ + proc->proc_ptl_first = NULL; + proc->proc_ptl_next = NULL; + + OMPI_THREAD_LOCK(&mca_pml_uniq.uniq_lock); + ompi_list_append(&mca_pml_uniq.uniq_procs, (ompi_list_item_t*)proc); + OMPI_THREAD_UNLOCK(&mca_pml_uniq.uniq_lock); +} + + +static void mca_pml_uniq_proc_destruct(mca_pml_proc_t* proc) +{ + OMPI_THREAD_LOCK(&mca_pml_uniq.uniq_lock); + ompi_list_remove_item(&mca_pml_uniq.uniq_procs, (ompi_list_item_t*)proc); + OMPI_THREAD_UNLOCK(&mca_pml_uniq.uniq_lock); + + OBJ_DESTRUCT(&proc->proc_lock); +#ifdef TOTO + OBJ_DESTRUCT(&proc->proc_ptl_first); + OBJ_DESTRUCT(&proc->proc_ptl_next); +#endif /* TOTO */ +} + +OBJ_CLASS_INSTANCE( + mca_pml_uniq_proc_t, + ompi_list_item_t, + mca_pml_uniq_proc_construct, + mca_pml_uniq_proc_destruct +); + diff --git a/src/mca/pml/uniq/pml_uniq_proc.h b/src/mca/pml/uniq/pml_uniq_proc.h new file mode 100644 index 0000000000..e391285329 --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_proc.h @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ +#ifndef MCA_PML_PROC_H +#define MCA_PML_PROC_H + +#include "threads/mutex.h" +#include "communicator/communicator.h" +#include "group/group.h" +#include "proc/proc.h" +#include "pml_ptl_array.h" + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif +/** + * Structure associated w/ ompi_proc_t that contains data specific + * to the PML. Note that this name is not PML specific. + */ +struct mca_pml_proc_t { + ompi_list_item_t super; + ompi_proc_t *proc_ompi; /**< back-pointer to ompi_proc_t */ + ompi_mutex_t proc_lock; /**< lock to protect against concurrent access */ + mca_ptl_proc_t* proc_ptl_first; /**< ptl for the first fragment */ + mca_ptl_proc_t* proc_ptl_next; /**< ptl for the remaining fragments */ +#ifdef TOTO + mca_ptl_array_t proc_ptl_first; /**< array of ptls to use for first fragments */ + mca_ptl_array_t proc_ptl_next; /**< array of ptls to use for remaining fragments */ +#endif /* TOTO */ + uint32_t proc_ptl_flags; /**< aggregate ptl flags */ +}; +typedef struct mca_pml_proc_t mca_pml_proc_t; + + +OMPI_COMP_EXPORT extern ompi_class_t mca_pml_uniq_proc_t_class; +typedef struct mca_pml_proc_t mca_pml_uniq_proc_t; + +/** + * Return the mca_pml_proc_t instance cached in the communicators local group. + * + * @param comm Communicator + * @param rank Peer rank + * @return mca_pml_proc_t instance + */ + +static inline mca_pml_proc_t* mca_pml_uniq_proc_lookup_local(ompi_communicator_t* comm, int rank) +{ + ompi_proc_t* proc = comm->c_local_group->grp_proc_pointers[rank]; + return proc->proc_pml; +} + +/** + * Return the mca_pml_proc_t instance cached on the communicators remote group. + * + * @param comm Communicator + * @param rank Peer rank + * @return mca_pml_proc_t instance + */ + +static inline mca_pml_proc_t* mca_pml_uniq_proc_lookup_remote(ompi_communicator_t* comm, int rank) +{ + ompi_proc_t* proc = comm->c_remote_group->grp_proc_pointers[rank]; + return proc->proc_pml; +} + +/** + * Return the mca_ptl_peer_t instance corresponding to the process/ptl combination. + * + * @param comm Communicator + * @param rank Peer rank + * @return mca_pml_proc_t instance + */ + +static inline struct mca_ptl_base_peer_t* mca_pml_uniq_proc_lookup_remote_peer( + ompi_communicator_t* comm, + int rank, + struct mca_ptl_base_module_t* ptl) +{ + ompi_proc_t* proc = comm->c_remote_group->grp_proc_pointers[rank]; + mca_pml_proc_t* proc_pml = proc->proc_pml; + if( proc_pml->proc_ptl_first->ptl == ptl ) + return proc_pml->proc_ptl_first->ptl_peer; + if( proc_pml->proc_ptl_next->ptl == ptl ) + return proc_pml->proc_ptl_next->ptl_peer; +#ifdef TOTO + size_t i, size = mca_ptl_array_get_size(&proc_pml->proc_ptl_first); + mca_ptl_proc_t* proc_ptl = proc_pml->proc_ptl_first.ptl_procs; + for(i = 0; i < size; i++) { + if(proc_ptl->ptl == ptl) { + return proc_ptl->ptl_peer; + } + proc_ptl++; + } +#endif /* TOTO */ + return NULL; +} + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif +#endif + diff --git a/src/mca/pml/uniq/pml_uniq_progress.c b/src/mca/pml/uniq/pml_uniq_progress.c new file mode 100644 index 0000000000..2cde3bafe8 --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_progress.c @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "pml_uniq.h" +#include "pml_uniq_sendreq.h" + + +int mca_pml_uniq_progress(void) +{ + mca_ptl_tstamp_t tstamp = 0; + size_t i; + int count = 0; + + /* + * Progress each of the PTL modules + */ + for(i=0; i 0) { + count += rc; + } + } + return count; +} + diff --git a/src/mca/pml/uniq/pml_uniq_ptl.c b/src/mca/pml/uniq/pml_uniq_ptl.c new file mode 100644 index 0000000000..141c07a893 --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_ptl.c @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "pml_uniq_ptl.h" + + +static void mca_pml_base_ptl_construct(mca_pml_base_ptl_t* ptl) +{ + OBJ_CONSTRUCT(&ptl->ptl_cache, ompi_list_t); + OBJ_CONSTRUCT(&ptl->ptl_cache_lock, ompi_mutex_t); + ptl->ptl = NULL; + ptl->ptl_cache_size = 0; + ptl->ptl_cache_alloc = 0; +} + +static void mca_pml_base_ptl_destruct(mca_pml_base_ptl_t* ptl) +{ + OBJ_DESTRUCT(&ptl->ptl_cache); + OBJ_DESTRUCT(&ptl->ptl_cache_lock); +} + +OBJ_CLASS_INSTANCE( + mca_pml_base_ptl_t, + ompi_list_t, + mca_pml_base_ptl_construct, + mca_pml_base_ptl_destruct +); + diff --git a/src/mca/pml/uniq/pml_uniq_ptl.h b/src/mca/pml/uniq/pml_uniq_ptl.h new file mode 100644 index 0000000000..400da7c4a9 --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_ptl.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef _MCA_PML_BASE_PTL_ +#define _MCA_PML_BASE_PTL_ + +#include "mca/pml/pml.h" +#include "mca/ptl/ptl.h" +#include "threads/condition.h" +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif + + +struct mca_pml_base_ptl_t { + ompi_list_t ptl_cache; /**< cache of send requests */ + size_t ptl_cache_size; /**< maximum size of cache */ + size_t ptl_cache_alloc; /**< current number of allocated items */ + ompi_mutex_t ptl_cache_lock; /**< lock for queue access */ + struct mca_ptl_base_module_t* ptl; /**< back pointer to ptl */ +}; +typedef struct mca_pml_base_ptl_t mca_pml_base_ptl_t; + +OBJ_CLASS_DECLARATION(mca_pml_base_ptl_t); + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif + +#endif + diff --git a/src/mca/pml/uniq/pml_uniq_recvfrag.c b/src/mca/pml/uniq/pml_uniq_recvfrag.c new file mode 100644 index 0000000000..b2f25a75de --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_recvfrag.c @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +/** + * @file + */ + +#include "ompi_config.h" + +#include "mca/pml/pml.h" +#include "pml_uniq_recvfrag.h" +#include "pml_uniq_proc.h" + + +OMPI_DECLSPEC extern ompi_class_t mca_ptl_base_recv_frag_t_class; + + +/** + * Called by the PTL to match attempt a match for new fragments. + * + * @param ptl (IN) The PTL pointer + * @param frag (IN) Receive fragment descriptor. + * @param header (IN) Header corresponding to the receive fragment. + * @return OMPI_SUCCESS or error status on failure. + */ +bool mca_pml_uniq_recv_frag_match( + mca_ptl_base_module_t* ptl, + mca_ptl_base_recv_frag_t* frag, + mca_ptl_base_match_header_t* header) +{ + bool matched; + bool matches = false; + ompi_list_t matched_frags; + if((matched = mca_ptl_base_match(header, frag, &matched_frags, &matches)) == false) { + frag = (matches ? (mca_ptl_base_recv_frag_t*)ompi_list_remove_first(&matched_frags) : NULL); + } + + while(NULL != frag) { + mca_ptl_base_module_t* ptl = frag->frag_base.frag_owner; + mca_pml_base_recv_request_t *request = frag->frag_request; + mca_ptl_base_match_header_t *header = &frag->frag_base.frag_header.hdr_match; + + /* + * Initialize request status. + */ + request->req_bytes_packed = header->hdr_msg_length; + request->req_base.req_ompi.req_status.MPI_SOURCE = header->hdr_src; + request->req_base.req_ompi.req_status.MPI_TAG = header->hdr_tag; + + /* + * If probe - signal request is complete - but don't notify PTL + */ + if(request->req_base.req_type == MCA_PML_REQUEST_PROBE) { + + ptl->ptl_recv_progress( ptl, + request, + header->hdr_msg_length, + header->hdr_msg_length ); + matched = mca_pml_uniq_recv_frag_match( ptl, frag, header ); + + } else { + + /* if required - setup pointer to ptls peer */ + if (NULL == frag->frag_base.frag_peer) { + frag->frag_base.frag_peer = mca_pml_uniq_proc_lookup_remote_peer(request->req_base.req_comm,header->hdr_src,ptl); + } + + /* notify ptl of match */ + ptl->ptl_matched(ptl, frag); + + }; + + /* process any additional fragments that arrived out of order */ + frag = (matches ? (mca_ptl_base_recv_frag_t*)ompi_list_remove_first(&matched_frags) : NULL); + }; + return matched; +} + + diff --git a/src/mca/pml/uniq/pml_uniq_recvfrag.h b/src/mca/pml/uniq/pml_uniq_recvfrag.h new file mode 100644 index 0000000000..c68c222c58 --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_recvfrag.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ + +#ifndef MCA_PML_TEG_RECVFRAG_H +#define MCA_PML_TEG_RECVFRAG_H + +#include "mca/ptl/ptl.h" +#include "mca/pml/base/pml_base_recvreq.h" +#include "mca/ptl/base/ptl_base_recvfrag.h" + +/** + * Called by the PTL to match attempt a match for new fragments. + * + * @param ptl (IN) The PTL pointer + * @param frag (IN) Receive fragment descriptor. + * @param header (IN) Header corresponding to the receive fragment. + * @return OMPI_SUCCESS or error status on failure. + */ +bool mca_pml_uniq_recv_frag_match( + mca_ptl_base_module_t* ptl, + mca_ptl_base_recv_frag_t* frag, + mca_ptl_base_match_header_t* header +); + +#endif + diff --git a/src/mca/pml/uniq/pml_uniq_recvreq.c b/src/mca/pml/uniq/pml_uniq_recvreq.c new file mode 100644 index 0000000000..3e70cd0d82 --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_recvreq.c @@ -0,0 +1,280 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "mca/pml/pml.h" +#include "mca/ptl/ptl.h" +#include "mca/ptl/base/ptl_base_comm.h" +#include "pml_uniq_recvreq.h" +#include "pml_uniq_sendreq.h" + + +static mca_ptl_base_recv_frag_t* mca_pml_uniq_recv_request_match_specific_proc( + mca_pml_base_recv_request_t* request, int proc); + + +static int mca_pml_uniq_recv_request_fini(struct ompi_request_t** request) +{ + MCA_PML_TEG_FINI(request); + return OMPI_SUCCESS; +} + +static int mca_pml_uniq_recv_request_free(struct ompi_request_t** request) +{ + MCA_PML_TEG_FREE(request); + return OMPI_SUCCESS; +} + + +static int mca_pml_uniq_recv_request_cancel(struct ompi_request_t* request, int complete) +{ + mca_pml_base_request_t* uniq_request = (mca_pml_base_request_t*)request; + ompi_communicator_t* ompi_comm = uniq_request->req_comm; + mca_pml_ptl_comm_t* pml_comm = (mca_pml_ptl_comm_t*)ompi_comm->c_pml_comm; + + if( true == request->req_complete ) { /* way to late to cancel this one */ + return OMPI_SUCCESS; + } + + /* The rest should be protected behind the match logic lock */ + OMPI_THREAD_LOCK(&pml_comm->c_matching_lock); + + if( OMPI_ANY_TAG == request->req_status.MPI_TAG ) { /* the match have not been already done */ + + if( uniq_request->req_peer == OMPI_ANY_SOURCE ) { + ompi_list_remove_item( &(pml_comm->c_wild_receives), + (ompi_list_item_t*)request ); + } else { + ompi_list_remove_item( pml_comm->c_specific_receives + uniq_request->req_peer, + (ompi_list_item_t*)request ); + } + } + + OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock); + + request->req_status._cancelled = true; + request->req_complete = true; /* mark it as completed so all the test/wait functions + * on this particular request will finish */ + /* Now we have a problem if we are in a multi-threaded environment. We should + * broadcast the condition on the request in order to allow the other threads + * to complete their test/wait functions. + */ + if(ompi_request_waiting) { + ompi_condition_broadcast(&ompi_request_cond); + } + return OMPI_SUCCESS; +} + +static void mca_pml_uniq_recv_request_construct(mca_pml_base_recv_request_t* request) +{ + request->req_base.req_type = MCA_PML_REQUEST_RECV; + request->req_base.req_ompi.req_fini = mca_pml_uniq_recv_request_fini; + request->req_base.req_ompi.req_free = mca_pml_uniq_recv_request_free; + request->req_base.req_ompi.req_cancel = mca_pml_uniq_recv_request_cancel; +} + +static void mca_pml_uniq_recv_request_destruct(mca_pml_base_recv_request_t* request) +{ +} + +OBJ_CLASS_INSTANCE( + mca_pml_uniq_recv_request_t, + mca_pml_base_recv_request_t, + mca_pml_uniq_recv_request_construct, + mca_pml_uniq_recv_request_destruct); + + +/* + * Update the recv request status to reflect the number of bytes + * received and actually delivered to the application. + */ + +void mca_pml_uniq_recv_request_progress( + struct mca_ptl_base_module_t* ptl, + mca_pml_base_recv_request_t* req, + size_t bytes_received, + size_t bytes_delivered) +{ + OMPI_THREAD_LOCK(&ompi_request_lock); + req->req_bytes_received += bytes_received; + req->req_bytes_delivered += bytes_delivered; + if (req->req_bytes_received >= req->req_bytes_packed) { + /* initialize request status */ + req->req_base.req_ompi.req_status._count = req->req_bytes_delivered; + req->req_base.req_pml_complete = true; + req->req_base.req_ompi.req_complete = true; + if(ompi_request_waiting) { + ompi_condition_broadcast(&ompi_request_cond); + } + } + OMPI_THREAD_UNLOCK(&ompi_request_lock); +} + + + +/* + * This routine is used to match a posted receive when the source process + * is specified. +*/ + +void mca_pml_uniq_recv_request_match_specific(mca_pml_base_recv_request_t* request) +{ + ompi_communicator_t *comm = request->req_base.req_comm; + mca_pml_ptl_comm_t* pml_comm = comm->c_pml_comm; + int req_peer = request->req_base.req_peer; + mca_ptl_base_recv_frag_t* frag; + + /* check for a specific match */ + OMPI_THREAD_LOCK(&pml_comm->c_matching_lock); + + /* assign sequence number */ + request->req_base.req_sequence = pml_comm->c_recv_seq++; + + if (ompi_list_get_size(&pml_comm->c_unexpected_frags[req_peer]) > 0 && + (frag = mca_pml_uniq_recv_request_match_specific_proc(request, req_peer)) != NULL) { + mca_ptl_base_module_t* ptl = frag->frag_base.frag_owner; + /* setup pointer to ptls peer */ + if(NULL == frag->frag_base.frag_peer) + frag->frag_base.frag_peer = mca_pml_uniq_proc_lookup_remote_peer(comm,req_peer,ptl); + OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock); + if( !((MCA_PML_REQUEST_IPROBE == request->req_base.req_type) || + (MCA_PML_REQUEST_PROBE == request->req_base.req_type)) ) { + ptl->ptl_matched(ptl, frag); + } + return; /* match found */ + } + + /* We didn't find any matches. Record this irecv so we can match + * it when the message comes in. + */ + if(request->req_base.req_type != MCA_PML_REQUEST_IPROBE) { + ompi_list_append(pml_comm->c_specific_receives+req_peer, (ompi_list_item_t*)request); + } + OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock); +} + + +/* + * this routine is used to try and match a wild posted receive - where + * wild is determined by the value assigned to the source process +*/ + +void mca_pml_uniq_recv_request_match_wild(mca_pml_base_recv_request_t* request) +{ + ompi_communicator_t *comm = request->req_base.req_comm; + mca_pml_ptl_comm_t* pml_comm = comm->c_pml_comm; + int proc_count = comm->c_remote_group->grp_proc_count; + int proc; + + /* + * Loop over all the outstanding messages to find one that matches. + * There is an outer loop over lists of messages from each + * process, then an inner loop over the messages from the + * process. + */ + OMPI_THREAD_LOCK(&pml_comm->c_matching_lock); + + /* assign sequence number */ + request->req_base.req_sequence = pml_comm->c_recv_seq++; + + for (proc = 0; proc < proc_count; proc++) { + mca_ptl_base_recv_frag_t* frag; + + /* continue if no frags to match */ + if (ompi_list_get_size(&pml_comm->c_unexpected_frags[proc]) == 0) + continue; + + /* loop over messages from the current proc */ + if ((frag = mca_pml_uniq_recv_request_match_specific_proc(request, proc)) != NULL) { + mca_ptl_base_module_t* ptl = frag->frag_base.frag_owner; + /* if required - setup pointer to ptls peer */ + if(NULL == frag->frag_base.frag_peer) + frag->frag_base.frag_peer = mca_pml_uniq_proc_lookup_remote_peer(comm,proc,ptl); + OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock); + if( !((MCA_PML_REQUEST_IPROBE == request->req_base.req_type) || + (MCA_PML_REQUEST_PROBE == request->req_base.req_type)) ) { + ptl->ptl_matched(ptl, frag); + } + return; /* match found */ + } + } + + /* We didn't find any matches. Record this irecv so we can match to + * it when the message comes in. + */ + + if(request->req_base.req_type != MCA_PML_REQUEST_IPROBE) + ompi_list_append(&pml_comm->c_wild_receives, (ompi_list_item_t*)request); + OMPI_THREAD_UNLOCK(&pml_comm->c_matching_lock); +} + + +/* + * this routine tries to match a posted receive. If a match is found, + * it places the request in the appropriate matched receive list. +*/ + +static mca_ptl_base_recv_frag_t* mca_pml_uniq_recv_request_match_specific_proc( + mca_pml_base_recv_request_t* request, int proc) +{ + mca_pml_ptl_comm_t *pml_comm = request->req_base.req_comm->c_pml_comm; + ompi_list_t* unexpected_frags = pml_comm->c_unexpected_frags+proc; + mca_ptl_base_recv_frag_t* frag; + mca_ptl_base_match_header_t* header; + int tag = request->req_base.req_tag; + + if( OMPI_ANY_TAG == tag ) { + for (frag = (mca_ptl_base_recv_frag_t*)ompi_list_get_first(unexpected_frags); + frag != (mca_ptl_base_recv_frag_t*)ompi_list_get_end(unexpected_frags); + frag = (mca_ptl_base_recv_frag_t*)ompi_list_get_next(frag)) { + header = &(frag->frag_base.frag_header.hdr_match); + + /* check first frag - we assume that process matching has been done already */ + if( header->hdr_tag >= 0 ) { + goto find_fragment; + } + } + } else { + for (frag = (mca_ptl_base_recv_frag_t*)ompi_list_get_first(unexpected_frags); + frag != (mca_ptl_base_recv_frag_t*)ompi_list_get_end(unexpected_frags); + frag = (mca_ptl_base_recv_frag_t*)ompi_list_get_next(frag)) { + header = &(frag->frag_base.frag_header.hdr_match); + + /* check first frag - we assume that process matching has been done already */ + if ( tag == header->hdr_tag ) { + /* we assume that the tag is correct from MPI point of view (ie. >= 0 ) */ + goto find_fragment; + } + } + } + return NULL; + find_fragment: + request->req_bytes_packed = header->hdr_msg_length; + request->req_base.req_ompi.req_status.MPI_TAG = header->hdr_tag; + request->req_base.req_ompi.req_status.MPI_SOURCE = header->hdr_src; + + if( !((MCA_PML_REQUEST_IPROBE == request->req_base.req_type) || + (MCA_PML_REQUEST_PROBE == request->req_base.req_type)) ) { + ompi_list_remove_item(unexpected_frags, (ompi_list_item_t*)frag); + frag->frag_request = request; + } else { + /* it's a probe, therefore report it's completion */ + mca_pml_uniq_recv_request_progress( NULL, request, header->hdr_msg_length, header->hdr_msg_length ); + } + return frag; +} + diff --git a/src/mca/pml/uniq/pml_uniq_recvreq.h b/src/mca/pml/uniq/pml_uniq_recvreq.h new file mode 100644 index 0000000000..3dcac7acad --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_recvreq.h @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ +#ifndef OMPI_PML_TEG_RECV_REQUEST_H +#define OMPI_PML_TEG_RECV_REQUEST_H + +#include "pml_uniq.h" +#include "pml_uniq_proc.h" +#include "mca/pml/base/pml_base_recvreq.h" +#include "mca/ptl/base/ptl_base_recvfrag.h" + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif +typedef mca_pml_base_recv_request_t mca_pml_uniq_recv_request_t; + +OBJ_CLASS_DECLARATION(mca_pml_uniq_recv_request_t); + + +/** + * Allocate a recv request from the modules free list. + * + * @param rc (OUT) OMPI_SUCCESS or error status on failure. + * @return Receive request. + */ +#define MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc) \ + do { \ + ompi_list_item_t* item; \ + OMPI_FREE_LIST_GET(&mca_pml_uniq.uniq_recv_requests, item, rc); \ + recvreq = (mca_pml_base_recv_request_t*)item; \ + } while(0) + +/** + * Return a recv request to the modules free list. + * + * @param request (IN) Receive request. + */ +#define MCA_PML_TEG_RECV_REQUEST_RETURN(request) \ + do { \ + MCA_PML_BASE_RECV_REQUEST_RETURN( request ); \ + OMPI_FREE_LIST_RETURN(&mca_pml_uniq.uniq_recv_requests, (ompi_list_item_t*)request); \ + } while(0) + +/** + * Attempt to match the request against the unexpected fragment list + * for all source ranks w/in the communicator. + * + * @param request (IN) Request to match. + */ +void mca_pml_uniq_recv_request_match_wild(mca_pml_base_recv_request_t* request); + +/** + * Attempt to match the request against the unexpected fragment list + * for a specific source rank. + * + * @param request (IN) Request to match. + */ +void mca_pml_uniq_recv_request_match_specific(mca_pml_base_recv_request_t* request); + +/** + * Start an initialized request. + * + * @param request Receive request. + * @return OMPI_SUCESS or error status on failure. + */ +static inline int mca_pml_uniq_recv_request_start(mca_pml_base_recv_request_t* request) +{ + /* init/re-init the request */ + request->req_bytes_received = 0; + request->req_bytes_delivered = 0; + request->req_base.req_pml_complete = false; + request->req_base.req_ompi.req_complete = false; + request->req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; + /* always set the req_status.MPI_TAG to ANY_TAG before starting the request. This field + * is used on the cancel part in order to find out if the request has been matched or not. + */ + request->req_base.req_ompi.req_status.MPI_TAG = OMPI_ANY_TAG; + request->req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; + request->req_base.req_ompi.req_status._cancelled = 0; + + /* attempt to match posted recv */ + if(request->req_base.req_peer == OMPI_ANY_SOURCE) { + mca_pml_uniq_recv_request_match_wild(request); + } else { + mca_pml_uniq_recv_request_match_specific(request); + } + return OMPI_SUCCESS; +} + +/** + * Update status of a recv request based on the completion status of + * the receive fragment. + * + * @param ptl (IN) The PTL pointer. + * @param request (IN) Receive request. + * @param bytes_received (IN) Bytes received from peer. + * @param bytes_delivered (IN) Bytes delivered to application. + */ +void mca_pml_uniq_recv_request_progress( + struct mca_ptl_base_module_t* ptl, + mca_pml_base_recv_request_t* request, + size_t bytes_received, + size_t bytes_delivered +); + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif +#endif + diff --git a/src/mca/pml/uniq/pml_uniq_sendreq.c b/src/mca/pml/uniq/pml_uniq_sendreq.c new file mode 100644 index 0000000000..bad29a79e8 --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_sendreq.c @@ -0,0 +1,190 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + + +/*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/ + +#include "ompi_config.h" +#include "include/constants.h" +#include "mca/pml/pml.h" +#include "mca/ptl/ptl.h" +#include "pml_uniq.h" +#include "pml_uniq_proc.h" +#include "pml_uniq_sendreq.h" +#include "pml_uniq_recvreq.h" + + + +static int mca_pml_uniq_send_request_fini(struct ompi_request_t** request) +{ + MCA_PML_TEG_FINI(request); + return OMPI_SUCCESS; +} + +static int mca_pml_uniq_send_request_free(struct ompi_request_t** request) +{ + MCA_PML_TEG_FREE(request); + return OMPI_SUCCESS; +} + +static int mca_pml_uniq_send_request_cancel(struct ompi_request_t* request, int complete) +{ + /* we dont cancel send requests by now */ + return OMPI_SUCCESS; +} + + +static void mca_pml_uniq_send_request_construct(mca_pml_base_send_request_t* req) +{ + req->req_base.req_type = MCA_PML_REQUEST_SEND; + req->req_base.req_ompi.req_fini = mca_pml_uniq_send_request_fini; + req->req_base.req_ompi.req_free = mca_pml_uniq_send_request_free; + req->req_base.req_ompi.req_cancel = mca_pml_uniq_send_request_cancel; +} + + +static void mca_pml_uniq_send_request_destruct(mca_pml_base_send_request_t* req) +{ +} + + +OBJ_CLASS_INSTANCE( + mca_pml_uniq_send_request_t, + mca_pml_base_send_request_t, + mca_pml_uniq_send_request_construct, + mca_pml_uniq_send_request_destruct); + + + +/** + * Schedule message delivery across potentially multiple PTLs. + * + * @param request (IN) Request to schedule + * @return status Error status + * + */ + + +int mca_pml_uniq_send_request_schedule(mca_pml_base_send_request_t* req) +{ + ompi_proc_t *proc = ompi_comm_peer_lookup(req->req_base.req_comm, req->req_base.req_peer); + mca_pml_proc_t* proc_pml = proc->proc_pml; + int send_count = 0, rc; + size_t bytes_remaining; + + /* + * Only allow one thread in this routine for a given request. + * However, we cannot block callers on a mutex, so simply keep track + * of the number of times the routine has been called and run through + * the scheduling logic once for every call. + */ + if(OMPI_THREAD_ADD32(&req->req_lock,1) == 1) { + mca_ptl_proc_t* ptl_proc = proc_pml->proc_ptl_next; + mca_ptl_base_module_t* ptl = ptl_proc->ptl; + /* allocate remaining bytes to PTLs */ + bytes_remaining = req->req_bytes_packed - req->req_offset; + /* The rest of the message will be scheduled over the same PTL (the one in the next field). We try + * to be PTL friendly here so we will respect the maximum size accepted by the PTL. + */ + if( bytes_remaining > ptl->ptl_max_frag_size) { + bytes_remaining = ptl->ptl_max_frag_size; + } + + rc = ptl->ptl_put(ptl, ptl_proc->ptl_peer, req, req->req_offset, bytes_remaining, 0); + if(rc == OMPI_SUCCESS) { + send_count++; + bytes_remaining = req->req_bytes_packed - req->req_offset; + } + + /* unable to complete send - queue for later */ + if(send_count == 0) { + OMPI_THREAD_LOCK(&mca_pml_uniq.uniq_lock); + ompi_list_append(&mca_pml_uniq.uniq_send_pending, (ompi_list_item_t*)req); + OMPI_THREAD_UNLOCK(&mca_pml_uniq.uniq_lock); + req->req_lock = 0; + return OMPI_ERR_OUT_OF_RESOURCE; + } + + /* free the request if completed while in the scheduler */ + if (req->req_base.req_free_called && req->req_base.req_pml_complete) { + MCA_PML_TEG_FREE((ompi_request_t**)&req); + } + } + return OMPI_SUCCESS; +} + + +/** + * Update the status of the send request to reflect the number of bytes + * "actually" sent (and acknowledged). This should be called by the + * lower layer PTL after the fragment is actually delivered and has been + * acknowledged (if required). Note that this routine should NOT be called + * directly by the PTL, a function pointer is setup on the PTL at init to + * enable upcalls into the PML w/out directly linking to a specific PML + * implementation. + */ + +void mca_pml_uniq_send_request_progress( + struct mca_ptl_base_module_t* ptl, + mca_pml_base_send_request_t* req, + size_t bytes_sent) +{ + bool schedule = false; + + OMPI_THREAD_LOCK(&ompi_request_lock); + req->req_bytes_sent += bytes_sent; + if (req->req_bytes_sent >= req->req_bytes_packed) { + req->req_base.req_pml_complete = true; + if (req->req_base.req_ompi.req_complete == false) { + req->req_base.req_ompi.req_status.MPI_SOURCE = req->req_base.req_comm->c_my_rank; + req->req_base.req_ompi.req_status.MPI_TAG = req->req_base.req_tag; + req->req_base.req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; + req->req_base.req_ompi.req_status._count = req->req_bytes_sent; + req->req_base.req_ompi.req_complete = true; + if(ompi_request_waiting) { + ompi_condition_broadcast(&ompi_request_cond); + } + } else if(req->req_base.req_free_called) { + /* don't free the request if in the scheduler */ + if(req->req_lock == 0) { + MCA_PML_TEG_FREE((ompi_request_t**)&req); + } + } else if (req->req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { + mca_pml_base_bsend_request_fini((ompi_request_t*)req); + } + /* test to see if we have scheduled the entire request */ + } else if (req->req_offset < req->req_bytes_packed) { + schedule = true; + } + OMPI_THREAD_UNLOCK(&ompi_request_lock); + + /* schedule remaining fragments of this request */ + if(schedule) { + mca_pml_uniq_send_request_schedule(req); + } + + /* check for pending requests that need to be progressed */ + while(ompi_list_get_size(&mca_pml_uniq.uniq_send_pending) != 0) { + OMPI_THREAD_LOCK(&mca_pml_uniq.uniq_lock); + req = (mca_pml_base_send_request_t*)ompi_list_remove_first(&mca_pml_uniq.uniq_send_pending); + OMPI_THREAD_UNLOCK(&mca_pml_uniq.uniq_lock); + if(req == NULL) + break; + if(mca_pml_uniq_send_request_schedule(req) != OMPI_SUCCESS) + break; + } +} + diff --git a/src/mca/pml/uniq/pml_uniq_sendreq.h b/src/mca/pml/uniq/pml_uniq_sendreq.h new file mode 100644 index 0000000000..834e70c627 --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_sendreq.h @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ +#ifndef OMPI_PML_TEG_SEND_REQUEST_H +#define OMPI_PML_TEG_SEND_REQUEST_H + +#include "mca/ptl/ptl.h" +#include "mca/pml/base/pml_base_sendreq.h" +#include "mca/ptl/base/ptl_base_sendfrag.h" +#include "mca/ptl/base/ptl_base_comm.h" +#include "pml_uniq_proc.h" +#include "pml_uniq_ptl.h" + +#if defined(c_plusplus) || defined(__cplusplus) +extern "C" { +#endif +typedef mca_pml_base_send_request_t mca_pml_uniq_send_request_t; + +OBJ_CLASS_DECLARATION(mca_pml_uniq_send_request_t); + + +#define MCA_PML_TEG_SEND_REQUEST_ALLOC( \ + comm, \ + dst, \ + sendreq, \ + rc) \ +{ \ + mca_pml_proc_t *proc = mca_pml_uniq_proc_lookup_remote(comm,dst); \ + mca_pml_base_ptl_t* ptl_base; \ + \ + if(NULL == proc) { \ + return OMPI_ERR_OUT_OF_RESOURCE; \ + } \ + ptl_base = proc->proc_ptl_first->ptl_base; \ + /* \ + * check to see if there is a cache of send requests associated with \ + * this ptl - if so try the allocation from there. \ + */ \ + if(NULL != ptl_base) { \ + OMPI_THREAD_LOCK(&ptl_base->ptl_cache_lock); \ + sendreq = (mca_pml_base_send_request_t*) \ + ompi_list_remove_first(&ptl_base->ptl_cache); \ + if(NULL != sendreq) { \ + OMPI_THREAD_UNLOCK(&ptl_base->ptl_cache_lock); \ + rc = OMPI_SUCCESS; \ + } else if (ptl_base->ptl_cache_alloc < ptl_base->ptl_cache_size) { \ + /* \ + * allocate an additional request to the cache \ + */ \ + mca_ptl_base_module_t* ptl = ptl_base->ptl; \ + ompi_list_item_t* item; \ + OMPI_FREE_LIST_WAIT(&mca_pml_uniq.uniq_send_requests, item, rc); \ + sendreq = (mca_pml_base_send_request_t*)item; \ + sendreq->req_ptl = ptl; \ + if(ptl->ptl_request_init(ptl, sendreq) == OMPI_SUCCESS) { \ + sendreq->req_cached = true; \ + ptl_base->ptl_cache_alloc++; \ + } \ + OMPI_THREAD_UNLOCK(&ptl_base->ptl_cache_lock); \ + } else { \ + /* \ + * take a request from the global pool \ + */ \ + ompi_list_item_t* item; \ + OMPI_THREAD_UNLOCK(&ptl_base->ptl_cache_lock); \ + OMPI_FREE_LIST_WAIT(&mca_pml_uniq.uniq_send_requests, item, rc); \ + sendreq = (mca_pml_base_send_request_t*)item; \ + sendreq->req_ptl = proc->proc_ptl_first->ptl; \ + } \ + \ + /* otherwise - take the allocation from the global list */ \ + } else { \ + ompi_list_item_t* item; \ + OMPI_FREE_LIST_WAIT(&mca_pml_uniq.uniq_send_requests, item, rc); \ + sendreq = (mca_pml_base_send_request_t*)item; \ + sendreq->req_ptl = proc->proc_ptl_first->ptl; \ + } \ + /* update request to point to current peer */ \ + sendreq->req_peer = proc->proc_ptl_first->ptl_peer; \ +} + + +#define MCA_PML_TEG_SEND_REQUEST_RETURN(sendreq) \ +{ \ + mca_ptl_base_module_t* ptl = (sendreq)->req_ptl; \ + mca_pml_base_ptl_t* ptl_base = ptl->ptl_base; \ + \ + /* Let the base handle the reference counts */ \ + MCA_PML_BASE_SEND_REQUEST_RETURN(sendreq); \ + \ + /* \ + * If there is a cache associated with the ptl - first attempt \ + * to return the send descriptor to the cache. \ + */ \ + if(NULL != ptl->ptl_base && (sendreq)->req_cached) { \ + OMPI_THREAD_LOCK(&ptl_base->ptl_cache_lock); \ + ompi_list_prepend(&ptl_base->ptl_cache, \ + (ompi_list_item_t*)sendreq); \ + OMPI_THREAD_UNLOCK(&ptl_base->ptl_cache_lock); \ + } else { \ + OMPI_FREE_LIST_RETURN( \ + &mca_pml_uniq.uniq_send_requests, (ompi_list_item_t*)sendreq); \ + } \ +} + + +/** + * Start a send request. + */ +#define MCA_PML_TEG_SEND_REQUEST_START(req, rc) \ +{ \ + mca_ptl_base_module_t* ptl = req->req_ptl; \ + size_t first_fragment_size = ptl->ptl_first_frag_size; \ + int flags; \ + \ + req->req_offset = 0; \ + req->req_lock = 0; \ + req->req_bytes_sent = 0; \ + req->req_peer_match.lval = 0; \ + req->req_peer_addr.lval = 0; \ + req->req_peer_size = 0; \ + req->req_base.req_pml_complete = false; \ + req->req_base.req_ompi.req_complete = false; \ + req->req_base.req_ompi.req_state = OMPI_REQUEST_ACTIVE; \ + req->req_base.req_sequence = mca_pml_ptl_comm_send_sequence( \ + req->req_base.req_comm->c_pml_comm, req->req_base.req_peer); \ + \ + /* handle buffered send */ \ + if(req->req_send_mode == MCA_PML_BASE_SEND_BUFFERED) { \ + mca_pml_base_bsend_request_start(&req->req_base.req_ompi); \ + } \ + \ + /* start the first fragment */ \ + if (first_fragment_size == 0 || \ + req->req_bytes_packed <= first_fragment_size) { \ + first_fragment_size = req->req_bytes_packed; \ + flags = (req->req_send_mode == MCA_PML_BASE_SEND_SYNCHRONOUS) ? \ + MCA_PTL_FLAGS_ACK : 0; \ + } else { \ + /* require match for first fragment of a multi-fragment */ \ + flags = MCA_PTL_FLAGS_ACK; \ + } \ + rc = ptl->ptl_send(ptl, req->req_peer, req, 0, first_fragment_size, \ + flags); \ +} + + +/** + * Schedule any data that was not delivered in the first fragment + * across the available PTLs. + */ +int mca_pml_uniq_send_request_schedule(mca_pml_base_send_request_t* req); + + +/** + * Update the request to reflect the number of bytes delivered. If this + * was the first fragment - schedule the rest of the data. + */ +void mca_pml_uniq_send_request_progress( + struct mca_ptl_base_module_t* ptl, + mca_pml_base_send_request_t* send_request, + size_t bytes_sent +); + +#if defined(c_plusplus) || defined(__cplusplus) +} +#endif +#endif + diff --git a/src/mca/pml/uniq/pml_uniq_start.c b/src/mca/pml/uniq/pml_uniq_start.c new file mode 100644 index 0000000000..c2025e156a --- /dev/null +++ b/src/mca/pml/uniq/pml_uniq_start.c @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University. + * All rights reserved. + * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. + * All rights reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "pml_uniq.h" +#include "pml_uniq_recvreq.h" +#include "pml_uniq_sendreq.h" + + +int mca_pml_uniq_start(size_t count, ompi_request_t** requests) +{ + int rc; + size_t i; + for(i=0; ireq_ompi.req_state) { + case OMPI_REQUEST_INACTIVE: + if(pml_request->req_pml_complete == true) + break; + /* otherwise fall through */ + case OMPI_REQUEST_ACTIVE: { + + ompi_request_t *request; + OMPI_THREAD_LOCK(&ompi_request_lock); + if (pml_request->req_pml_complete == false) { + /* free request after it completes */ + pml_request->req_free_called = true; + } else { + /* can reuse the existing request */ + OMPI_THREAD_UNLOCK(&ompi_request_lock); + break; + } + + /* allocate a new request */ + switch(pml_request->req_type) { + case MCA_PML_REQUEST_SEND: { + mca_pml_base_send_mode_t sendmode = + ((mca_pml_base_send_request_t*)pml_request)->req_send_mode; + rc = mca_pml_uniq_isend_init( + pml_request->req_addr, + pml_request->req_count, + pml_request->req_datatype, + pml_request->req_peer, + pml_request->req_tag, + sendmode, + pml_request->req_comm, + &request); + break; + } + case MCA_PML_REQUEST_RECV: + rc = mca_pml_uniq_irecv_init( + pml_request->req_addr, + pml_request->req_count, + pml_request->req_datatype, + pml_request->req_peer, + pml_request->req_tag, + pml_request->req_comm, + &request); + break; + default: + rc = OMPI_ERR_REQUEST; + break; + } + OMPI_THREAD_UNLOCK(&ompi_request_lock); + if(OMPI_SUCCESS != rc) + return rc; + pml_request = (mca_pml_base_request_t*)request; + requests[i] = request; + break; + } + default: + return OMPI_ERR_REQUEST; + } + + /* start the request */ + switch(pml_request->req_type) { + case MCA_PML_REQUEST_SEND: + { + mca_pml_base_send_request_t* sendreq = (mca_pml_base_send_request_t*)pml_request; + MCA_PML_TEG_SEND_REQUEST_START(sendreq, rc); + if(rc != OMPI_SUCCESS) + return rc; + break; + } + case MCA_PML_REQUEST_RECV: + { + mca_pml_base_recv_request_t* recvreq = (mca_pml_base_recv_request_t*)pml_request; + if((rc = mca_pml_uniq_recv_request_start(recvreq)) != OMPI_SUCCESS) + return rc; + break; + } + default: + return OMPI_ERR_REQUEST; + } + } + return OMPI_SUCCESS; +} +