diff --git a/ompi/communicator/Makefile.am b/ompi/communicator/Makefile.am index 7eb0ddb8d3..f428ae6e25 100644 --- a/ompi/communicator/Makefile.am +++ b/ompi/communicator/Makefile.am @@ -10,6 +10,8 @@ # University of Stuttgart. All rights reserved. # Copyright (c) 2004-2005 The Regents of the University of California. # All rights reserved. +# Copyright (c) 2013 Los Alamos National Security, LLC. All rights +# reserved. # $COPYRIGHT$ # # Additional copyrights may follow @@ -20,9 +22,11 @@ # This makefile.am does not stand on its own - it is included from ompi/Makefile.am headers += \ - communicator/communicator.h + communicator/communicator.h \ + communicator/comm_request.h libmpi_la_SOURCES += \ communicator/comm_init.c \ communicator/comm.c \ - communicator/comm_cid.c + communicator/comm_cid.c \ + communicator/comm_request.c diff --git a/ompi/communicator/comm.c b/ompi/communicator/comm.c index 079528deac..d86d50b454 100644 --- a/ompi/communicator/comm.c +++ b/ompi/communicator/comm.c @@ -99,10 +99,41 @@ int ompi_comm_set ( ompi_communicator_t **ncomm, bool copy_topocomponent, ompi_group_t *local_group, ompi_group_t *remote_group ) +{ + ompi_request_t *req; + int rc; + + rc = ompi_comm_set_nb (ncomm, oldcomm, local_size, local_ranks, remote_size, remote_ranks, + attr, errh, copy_topocomponent, local_group, remote_group, &req); + if (OMPI_SUCCESS != rc) { + return rc; + } + + if (NULL != req) { + ompi_request_wait (&req, MPI_STATUS_IGNORE); + } + + return OMPI_SUCCESS; +} + +int ompi_comm_set_nb ( ompi_communicator_t **ncomm, + ompi_communicator_t *oldcomm, + int local_size, + int *local_ranks, + int remote_size, + int *remote_ranks, + opal_hash_table_t *attr, + ompi_errhandler_t *errh, + bool copy_topocomponent, + ompi_group_t *local_group, + ompi_group_t *remote_group, + ompi_request_t **req ) { ompi_communicator_t *newcomm = NULL; int ret; - + + *req = NULL; + /* ompi_comm_allocate */ newcomm = OBJ_NEW(ompi_communicator_t); /* fill in the inscribing hyper-cube dimensions */ @@ -134,9 +165,9 @@ int ompi_comm_set ( ompi_communicator_t **ncomm, } newcomm->c_flags |= OMPI_COMM_INTER; if ( OMPI_COMM_IS_INTRA(oldcomm) ) { - ompi_comm_dup(oldcomm, &newcomm->c_local_comm); + ompi_comm_idup(oldcomm, &newcomm->c_local_comm, req); } else { - ompi_comm_dup(oldcomm->c_local_comm, &newcomm->c_local_comm); + ompi_comm_idup(oldcomm->c_local_comm, &newcomm->c_local_comm, req); } } else { newcomm->c_remote_group = newcomm->c_local_group; @@ -845,6 +876,14 @@ ompi_comm_split_type(ompi_communicator_t *comm, /**********************************************************************/ /**********************************************************************/ int ompi_comm_dup ( ompi_communicator_t * comm, ompi_communicator_t **newcomm ) +{ + return ompi_comm_dup_with_info (comm, NULL, newcomm); +} + +/**********************************************************************/ +/**********************************************************************/ +/**********************************************************************/ +int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, ompi_communicator_t **newcomm ) { ompi_communicator_t *newcomp = NULL; int rsize = 0, mode = OMPI_COMM_CID_INTRA, rc = OMPI_SUCCESS; @@ -906,6 +945,202 @@ int ompi_comm_dup ( ompi_communicator_t * comm, ompi_communicator_t **newcomm ) *newcomm = newcomp; return MPI_SUCCESS; } + +struct ompi_comm_idup_with_info_context { + ompi_communicator_t *comm; + ompi_communicator_t *newcomp; + ompi_communicator_t **newcomm; +}; + +static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request); +static int ompi_comm_idup_with_info_finish (ompi_comm_request_t *request); +static int ompi_comm_idup_getcid (ompi_comm_request_t *request); + +int ompi_comm_idup (ompi_communicator_t *comm, ompi_communicator_t **newcomm, ompi_request_t **req) +{ + return ompi_comm_idup_with_info (comm, NULL, newcomm, req); +} + +int ompi_comm_idup_with_info (ompi_communicator_t *comm, ompi_info_t *info, ompi_communicator_t **newcomm, ompi_request_t **req) +{ + struct ompi_comm_idup_with_info_context *context; + ompi_comm_request_t *request; + ompi_request_t *subreq; + int rsize = 0, rc; + + *newcomm = MPI_COMM_NULL; + + request = ompi_comm_request_get (); + if (NULL == request) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + context = calloc (1, sizeof (*context)); + if (NULL == context) { + ompi_comm_request_return (request); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + context->comm = comm; + context->newcomm = newcomm; + + request->context = context; + + rc = ompi_comm_set_nb (&context->newcomp, /* new comm */ + comm, /* old comm */ + comm->c_local_group->grp_proc_count, /* local_size */ + NULL, /* local_procs */ + rsize, /* remote_size */ + NULL, /* remote_procs */ + comm->c_keyhash, /* attrs */ + comm->error_handler, /* error handler */ + true, /* copy the topo */ + comm->c_local_group, /* local group */ + comm->c_remote_group, /* remote group */ + &subreq); /* new subrequest */ + if (NULL == context->newcomp) { + ompi_comm_request_return (request); + return rc; + } + + ompi_comm_request_schedule_append (request, ompi_comm_idup_getcid, &subreq, subreq ? 1 : 0); + + /* kick off the request */ + ompi_comm_request_start (request); + *req = &request->super; + + return OMPI_SUCCESS; +} + +static int ompi_comm_idup_getcid (ompi_comm_request_t *request) +{ + struct ompi_comm_idup_with_info_context *context = + (struct ompi_comm_idup_with_info_context *) request->context; + ompi_request_t *subreq; + int rc, mode; + + if (OMPI_COMM_IS_INTER(context->comm)){ + mode = OMPI_COMM_CID_INTER; + } else { + mode = OMPI_COMM_CID_INTRA; + } + + /* Determine context id. It is identical to f_2_c_handle */ + rc = ompi_comm_nextcid_nb (context->newcomp, /* new communicator */ + context->comm, /* old comm */ + NULL, /* bridge comm */ + mode, /* mode */ + &subreq); /* new subrequest */ + if (OMPI_SUCCESS != rc) { + ompi_comm_request_return (request); + return rc; + } + + ompi_comm_request_schedule_append (request, ompi_comm_idup_with_info_activate, &subreq, 1); + + return OMPI_SUCCESS; +} + +static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request) +{ + struct ompi_comm_idup_with_info_context *context = + (struct ompi_comm_idup_with_info_context *) request->context; + ompi_request_t *subreq; + int rc, mode; + + if (OMPI_COMM_IS_INTER(context->comm)){ + mode = OMPI_COMM_CID_INTER; + } else { + mode = OMPI_COMM_CID_INTRA; + } + + /* Set name for debugging purposes */ + snprintf(context->newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d DUP FROM %d", + context->newcomp->c_contextid, context->comm->c_contextid ); + + /* activate communicator and init coll-module */ + rc = ompi_comm_activate_nb (&context->newcomp, context->comm, NULL, mode, &subreq); + if ( OMPI_SUCCESS != rc ) { + return rc; + } + + ompi_comm_request_schedule_append (request, ompi_comm_idup_with_info_finish, &subreq, 1); + + return OMPI_SUCCESS; +} + +static int ompi_comm_idup_with_info_finish (ompi_comm_request_t *request) +{ + struct ompi_comm_idup_with_info_context *context = + (struct ompi_comm_idup_with_info_context *) request->context; + + *context->newcomm = context->newcomp; + + /* done */ + return MPI_SUCCESS; +} + +/**********************************************************************/ +/**********************************************************************/ +/**********************************************************************/ +int ompi_comm_create_group (ompi_communicator_t *comm, ompi_group_t *group, int tag, ompi_communicator_t **newcomm) +{ + ompi_communicator_t *newcomp = NULL; + int mode = OMPI_COMM_CID_GROUP, rc = OMPI_SUCCESS; + + *newcomm = MPI_COMM_NULL; + + rc = ompi_comm_set ( &newcomp, /* new comm */ + comm, /* old comm */ + group->grp_proc_count, /* local_size */ + NULL, /* local_procs*/ + 0, /* remote_size */ + NULL, /* remote_procs */ + comm->c_keyhash, /* attrs */ + comm->error_handler, /* error handler */ + true, /* copy the topo */ + group, /* local group */ + NULL); /* remote group */ + if ( NULL == newcomp ) { + rc = MPI_ERR_INTERN; + return rc; + } + if ( MPI_SUCCESS != rc) { + return rc; + } + + /* Determine context id. It is identical to f_2_c_handle */ + rc = ompi_comm_nextcid ( newcomp, /* new communicator */ + comm, /* old comm */ + newcomp, /* bridge comm (used to pass the group into the group allreduce) */ + &tag, /* user defined tag */ + NULL, /* remote_leader */ + mode, /* mode */ + -1 ); /* send_first */ + if ( OMPI_SUCCESS != rc ) { + return rc; + } + + /* Set name for debugging purposes */ + snprintf(newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d GROUP FROM %d", + newcomp->c_contextid, comm->c_contextid ); + + /* activate communicator and init coll-module */ + rc = ompi_comm_activate( &newcomp, /* new communicator */ + comm, + newcomp, + &tag, + NULL, + mode, + -1 ); + if ( OMPI_SUCCESS != rc ) { + return rc; + } + + *newcomm = newcomp; + return MPI_SUCCESS; +} + /**********************************************************************/ /**********************************************************************/ /**********************************************************************/ diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index f317f26ca5..b03a26d2a9 100644 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -6,7 +6,7 @@ * Copyright (c) 2004-2011 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. - * Copyright (c) 2004-2008 High Performance Computing Center Stuttgart, + * Copyright (c) 2004-2008 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. @@ -15,20 +15,20 @@ * Copyright (c) 2006-2010 University of Houston. All rights reserved. * Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved. * Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights - * reserved. + * reserved. * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2013 Intel, Inc. All rights reserved. * $COPYRIGHT$ - * + * * Additional copyrights may follow - * + * * $HEADER$ */ #include "ompi_config.h" #include "opal/dss/dss.h" -#include "ompi/proc/proc.h" +#include "ompi/proc/proc.h" #include "ompi/communicator/communicator.h" #include "ompi/op/op.h" #include "ompi/constants.h" @@ -38,58 +38,81 @@ #include "ompi/mca/rte/rte.h" #include "ompi/mca/coll/base/base.h" #include "ompi/request/request.h" -#include "ompi/runtime/ompi_module_exchange.h" +#include "ompi/runtime/ompi_module_exchange.h" #include "ompi/runtime/mpiruntime.h" BEGIN_C_DECLS /** - * These functions make sure, that we determine the global result over + * These functions make sure, that we determine the global result over * an intra communicators (simple), an inter-communicator and a * pseudo inter-communicator described by two separate intra-comms * and a bridge-comm (intercomm-create scenario). */ -typedef int ompi_comm_cid_allredfct (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, +typedef int ompi_comm_cid_allredfct (int *inbuf, int* outbuf, + int count, struct ompi_op_t *op, ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* lleader, void* rleader, + ompi_communicator_t *bridgecomm, + void* lleader, void* rleader, int send_first ); -static int ompi_comm_allreduce_intra (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, +static int ompi_comm_allreduce_intra (int *inbuf, int* outbuf, + int count, struct ompi_op_t *op, ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, + ompi_communicator_t *bridgecomm, + void* local_leader, void* remote_ledaer, int send_first ); -static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, +static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, + ompi_communicator_t *bridgecomm, + void* local_leader, void* remote_leader, int send_first ); -static int ompi_comm_allreduce_intra_bridge(int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, +static int ompi_comm_allreduce_intra_bridge(int *inbuf, int* outbuf, + int count, struct ompi_op_t *op, ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, + ompi_communicator_t *bridgecomm, + void* local_leader, void* remote_leader, int send_first); -static int ompi_comm_allreduce_intra_oob (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, +static int ompi_comm_allreduce_intra_oob (int *inbuf, int* outbuf, + int count, struct ompi_op_t *op, ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, + ompi_communicator_t *bridgecomm, + void* local_leader, + void* remote_leader, int send_first ); +static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, + int count, struct ompi_op_t *op, + ompi_communicator_t *intercomm, + ompi_communicator_t *bridgecomm, + void* local_leader, + void* remote_leader, + int send_first); + +/* non-blocking intracommunicator allreduce */ +static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, + ompi_request_t **req); + +/* non-blocking intercommunicator allreduce */ +static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_communicator_t *intercomm, + ompi_communicator_t *bridgecomm, + ompi_request_t **req); + + static int ompi_comm_register_cid (uint32_t contextid); static int ompi_comm_unregister_cid (uint32_t contextid); static uint32_t ompi_comm_lowest_cid ( void ); @@ -127,17 +150,17 @@ int ompi_comm_cid_init (void) void *tlpointer; int ret; size_t i, size, numprocs; - + /** Note that the following call only returns processes - * with the same jobid. This is on purpose, since - * we switch for the dynamic communicators anyway + * with the same jobid. This is on purpose, since + * we switch for the dynamic communicators anyway * to the original (slower) cid allocation algorithm. - */ + */ procs = ompi_proc_world ( &numprocs ); for ( i=0; ic_contextid); - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - } while (OMPI_SUCCESS != response ); + + ret = ompi_comm_register_cid (comm->c_contextid); + if (OMPI_SUCCESS != ret) { + return ret; + } start = ompi_mpi_communicators.lowest_free; - + while (!done) { /** - * This is the real algorithm described in the doc + * This is the real algorithm described in the doc */ OPAL_THREAD_LOCK(&ompi_cid_lock); if (comm->c_contextid != ompi_comm_lowest_cid() ) { @@ -227,7 +249,7 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm, OPAL_THREAD_UNLOCK(&ompi_cid_lock); for (i=start; i < mca_pml.pml_max_contextid ; i++) { - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, + flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, i, comm); if (true == flag) { nextlocal_cid = i; @@ -245,10 +267,10 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm, response = 1; /* fine with me */ } else { - opal_pointer_array_set_item(&ompi_mpi_communicators, + opal_pointer_array_set_item(&ompi_mpi_communicators, nextlocal_cid, NULL); - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, + flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, nextcid, comm ); if (true == flag) { response = 1; /* works as well */ @@ -271,7 +293,7 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm, else if ( 0 == glresponse ) { if ( 1 == response ) { /* we could use that, but other don't agree */ - opal_pointer_array_set_item(&ompi_mpi_communicators, + opal_pointer_array_set_item(&ompi_mpi_communicators, nextcid, NULL); } start = nextcid+1; /* that's where we can start the next round */ @@ -284,13 +306,187 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm, opal_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm); release_and_return: - OPAL_THREAD_LOCK(&ompi_cid_lock); ompi_comm_unregister_cid (comm->c_contextid); - OPAL_THREAD_UNLOCK(&ompi_cid_lock); return ret; } +/* Non-blocking version of ompi_comm_nextcid */ +struct mca_comm_nextcid_context { + ompi_communicator_t* newcomm; + ompi_communicator_t* comm; + ompi_communicator_t* bridgecomm; + int mode; + int nextcid; + int nextlocal_cid; + int start; + int flag, rflag; +}; + +/* find the next available local cid and start an allreduce */ +static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request); +/* verify that the maximum cid is locally available and start an allreduce */ +static int ompi_comm_checkcid (ompi_comm_request_t *request); +/* verify that the cid was available globally */ +static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request); + +int ompi_comm_nextcid_nb (ompi_communicator_t* newcomm, + ompi_communicator_t* comm, + ompi_communicator_t* bridgecomm, + int mode, ompi_request_t **req) +{ + struct mca_comm_nextcid_context *context; + ompi_comm_request_t *request; + int ret; + + /** + * Determine which implementation of allreduce we have to use + * for the current scenario + */ + if (OMPI_COMM_CID_INTRA != mode && OMPI_COMM_CID_INTER != mode) { + return MPI_UNDEFINED; + } + + ret = ompi_comm_register_cid (comm->c_contextid); + if (OMPI_SUCCESS != ret) { + return ret; + } + + context = calloc (1, sizeof (*context)); + if (NULL == context) { + ompi_comm_unregister_cid (comm->c_contextid); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + request = ompi_comm_request_get (); + if (NULL == request) { + ompi_comm_unregister_cid (comm->c_contextid); + free (context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + context->newcomm = newcomm; + context->comm = comm; + context->bridgecomm = bridgecomm; + context->mode = mode; + context->start = ompi_mpi_communicators.lowest_free; + + request->context = context; + + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + ompi_comm_request_start (request); + + *req = &request->super; + + return OMPI_SUCCESS; +} + +static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) +{ + struct mca_comm_nextcid_context *context = request->context; + ompi_request_t *subreq; + unsigned int i; + bool flag; + int ret; + + /** + * This is the real algorithm described in the doc + */ + OPAL_THREAD_LOCK(&ompi_cid_lock); + if (context->comm->c_contextid != ompi_comm_lowest_cid() ) { + /* if not lowest cid, we do not continue, but sleep and try again */ + OPAL_THREAD_UNLOCK(&ompi_cid_lock); + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + + return OMPI_SUCCESS; + } + OPAL_THREAD_UNLOCK(&ompi_cid_lock); + + for (i = context->start ; i < mca_pml.pml_max_contextid ; ++i) { + flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, + i, context->comm); + if (true == flag) { + context->nextlocal_cid = i; + break; + } + } + + if (context->mode == OMPI_COMM_CID_INTRA) { + ret = ompi_comm_allreduce_intra_nb (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, + context->comm, context->bridgecomm, &subreq); + } else { + ret = ompi_comm_allreduce_inter_nb (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, + context->comm, context->bridgecomm, &subreq); + } + + if (OMPI_SUCCESS != ret) { + return ret; + } + + /* next we want to verify that the resulting commid is ok */ + ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1); + + return OMPI_SUCCESS; +} + +static int ompi_comm_checkcid (ompi_comm_request_t *request) +{ + struct mca_comm_nextcid_context *context = request->context; + ompi_request_t *subreq; + int ret; + + context->flag = (context->nextcid == context->nextlocal_cid); + + if (!context->flag) { + opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL); + + context->flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, + context->nextcid, context->comm); + } + + if (context->mode == OMPI_COMM_CID_INTRA) { + ret = ompi_comm_allreduce_intra_nb (&context->flag, &context->rflag, 1, MPI_MIN, context->comm, + context->bridgecomm, &subreq); + } else { + ret = ompi_comm_allreduce_inter_nb (&context->flag, &context->rflag, 1, MPI_MIN, context->comm, + context->bridgecomm, &subreq); + } + + if (OMPI_SUCCESS != ret) { + return ret; + } + + ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); + + return OMPI_SUCCESS; +} + +static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) +{ + struct mca_comm_nextcid_context *context = request->context; + + if (1 == context->rflag) { + /* set the according values to the newcomm */ + context->newcomm->c_contextid = context->nextcid; + context->newcomm->c_f_to_c_index = context->newcomm->c_contextid; + opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm); + + ompi_comm_unregister_cid (context->comm->c_contextid); + + /* done! */ + return OMPI_SUCCESS; + } + + if (1 == context->flag) { + /* we could use this cid, but other don't agree */ + opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextcid, NULL); + context->start = context->nextcid + 1; /* that's where we can start the next round */ + } + + /* try again */ + return ompi_comm_allreduce_getnextcid (request); +} + /**************************************************************************/ /**************************************************************************/ /**************************************************************************/ @@ -316,42 +512,54 @@ void ompi_comm_reg_finalize (void) } -static int ompi_comm_register_cid (uint32_t cid ) +static int ompi_comm_register_cid (uint32_t cid) { - opal_list_item_t *item; ompi_comm_reg_t *regcom; ompi_comm_reg_t *newentry = OBJ_NEW(ompi_comm_reg_t); + bool registered = false; - newentry->cid = cid; - if ( !(opal_list_is_empty (&ompi_registered_comms)) ) { - for (item = opal_list_get_first(&ompi_registered_comms); - item != opal_list_get_end(&ompi_registered_comms); - item = opal_list_get_next(item)) { - regcom = (ompi_comm_reg_t *)item; - if ( regcom->cid > cid ) { - break; - } + do { + /* Only one communicator function allowed in same time on the + * same communicator. + */ + OPAL_THREAD_LOCK(&ompi_cid_lock); + + newentry->cid = cid; + if ( !(opal_list_is_empty (&ompi_registered_comms)) ) { + bool ok = true; + + OPAL_LIST_FOREACH(regcom, &ompi_registered_comms, ompi_comm_reg_t) { + if ( regcom->cid > cid ) { + break; + } #if OMPI_ENABLE_THREAD_MULTIPLE - if( regcom->cid == cid ) { - /** - * The MPI standard state that is the user responsability to - * schedule the global communications in order to avoid any - * kind of troubles. As, managing communicators involve several - * collective communications, we should enforce a sequential - * execution order. This test only allow one communicator - * creation function based on the same communicator. - */ - OBJ_RELEASE(newentry); - return OMPI_ERROR; - } + if( regcom->cid == cid ) { + /** + * The MPI standard state that is the user responsability to + * schedule the global communications in order to avoid any + * kind of troubles. As, managing communicators involve several + * collective communications, we should enforce a sequential + * execution order. This test only allow one communicator + * creation function based on the same communicator. + */ + ok = false; + break; + } #endif /* OMPI_ENABLE_THREAD_MULTIPLE */ + } + if (ok) { + opal_list_insert_pos (&ompi_registered_comms, (opal_list_item_t *) regcom, + (opal_list_item_t *)newentry); + registered = true; + } + } else { + opal_list_append (&ompi_registered_comms, (opal_list_item_t *)newentry); + registered = true; } - opal_list_insert_pos (&ompi_registered_comms, item, - (opal_list_item_t *)newentry); - } - else { - opal_list_append (&ompi_registered_comms, (opal_list_item_t *)newentry); - } + + /* drop the lock before trying again */ + OPAL_THREAD_UNLOCK(&ompi_cid_lock); + } while (!registered); return OMPI_SUCCESS; } @@ -359,18 +567,19 @@ static int ompi_comm_register_cid (uint32_t cid ) static int ompi_comm_unregister_cid (uint32_t cid) { ompi_comm_reg_t *regcom; - opal_list_item_t *item; - for (item = opal_list_get_first(&ompi_registered_comms); - item != opal_list_get_end(&ompi_registered_comms); - item = opal_list_get_next(item)) { - regcom = (ompi_comm_reg_t *)item; + OPAL_THREAD_LOCK(&ompi_cid_lock); + + OPAL_LIST_FOREACH(regcom, &ompi_registered_comms, ompi_comm_reg_t) { if(regcom->cid == cid) { - opal_list_remove_item(&ompi_registered_comms, item); + opal_list_remove_item(&ompi_registered_comms, (opal_list_item_t *) regcom); OBJ_RELEASE(regcom); break; } } + + OPAL_THREAD_UNLOCK(&ompi_cid_lock); + return OMPI_SUCCESS; } @@ -387,7 +596,7 @@ static uint32_t ompi_comm_lowest_cid (void) /**************************************************************************/ /* This routine serves two purposes: * - the allreduce acts as a kind of Barrier, - * which avoids, that we have incoming fragments + * which avoids, that we have incoming fragments * on the new communicator before everybody has set * up the comm structure. * - some components (e.g. the collective MagPIe component @@ -399,7 +608,7 @@ static uint32_t ompi_comm_lowest_cid (void) * comm.c is, that this file contains the allreduce implementations * which are required, and thus we avoid having duplicate code... */ -int ompi_comm_activate ( ompi_communicator_t** newcomm, +int ompi_comm_activate ( ompi_communicator_t** newcomm, ompi_communicator_t* comm, ompi_communicator_t* bridgecomm, void* local_leader, @@ -429,17 +638,20 @@ int ompi_comm_activate ( ompi_communicator_t** newcomm, case OMPI_COMM_CID_INTRA_OOB: allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob; break; + case OMPI_COMM_CID_GROUP: + allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group; + break; default: return MPI_UNDEFINED; break; } if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) { - /* Initialize the PML stuff in the newcomm */ if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) { goto bail_on_error; } + OMPI_COMM_SET_PML_ADDED(*newcomm); } @@ -517,7 +729,152 @@ int ompi_comm_activate ( ompi_communicator_t** newcomm, OBJ_RELEASE(*newcomm); *newcomm = MPI_COMM_NULL; return ret; -} +} + +/* Non-blocking version of ompi_comm_activate */ +struct ompi_comm_activate_nb_context { + ompi_communicator_t **newcomm; + ompi_communicator_t *comm; + + /* storage for activate barrier */ + int ok; +}; + +static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request); + +int ompi_comm_activate_nb (ompi_communicator_t **newcomm, + ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, + int mode, ompi_request_t **req) +{ + struct ompi_comm_activate_nb_context *context; + ompi_comm_request_t *request; + ompi_request_t *subreq; + int ret = 0; + + request = ompi_comm_request_get (); + if (NULL == request) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + context = calloc (1, sizeof (*context)); + if (NULL == context) { + ompi_comm_request_return (request); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + context->newcomm = newcomm; + context->comm = comm; + + request->context = context; + + if (OMPI_COMM_CID_INTRA != mode && OMPI_COMM_CID_INTER != mode) { + return MPI_UNDEFINED; + } + + if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) { + /* Initialize the PML stuff in the newcomm */ + if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) { + OBJ_RELEASE(newcomm); + *newcomm = MPI_COMM_NULL; + return ret; + } + OMPI_COMM_SET_PML_ADDED(*newcomm); + } + + /* Step 1: the barrier, after which it is allowed to + * send messages over the new communicator + */ + if (mode == OMPI_COMM_CID_INTRA) { + ret = ompi_comm_allreduce_intra_nb (&context->ok, &context->ok, 1, MPI_MIN, + context->comm, bridgecomm, &subreq); + } else { + ret = ompi_comm_allreduce_inter_nb (&context->ok, &context->ok, 1, MPI_MIN, + context->comm, bridgecomm, &subreq); + } + + if (OMPI_SUCCESS != ret) { + ompi_comm_request_return (request); + return ret; + } + + ompi_comm_request_schedule_append (request, ompi_comm_activate_nb_complete, &subreq, 1); + ompi_comm_request_start (request); + + *req = &request->super; + + return OMPI_SUCCESS; +} + +static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) +{ + struct ompi_comm_activate_nb_context *context = + (struct ompi_comm_activate_nb_context *) request->context; + int ret; + + /** + * Check to see if this process is in the new communicator. + * + * Specifically, this function is invoked by all proceses in the + * old communicator, regardless of whether they are in the new + * communicator or not. This is because it is far simpler to use + * MPI collective functions on the old communicator to determine + * some data for the new communicator (e.g., remote_leader) than + * to kludge up our own pseudo-collective routines over just the + * processes in the new communicator. Hence, *all* processes in + * the old communicator need to invoke this function. + * + * That being said, only processes in the new communicator need to + * select a coll module for the new communicator. More + * specifically, proceses who are not in the new communicator + * should *not* select a coll module -- for example, + * ompi_comm_rank(newcomm) returns MPI_UNDEFINED for processes who + * are not in the new communicator. This can cause errors in the + * selection / initialization of a coll module. Plus, it's + * wasteful -- processes in the new communicator will end up + * freeing the new communicator anyway, so we might as well leave + * the coll selection as NULL (the coll base comm unselect code + * handles that case properly). + */ + if (MPI_UNDEFINED == (*context->newcomm)->c_local_group->grp_my_rank) { + return OMPI_SUCCESS; + } + + /* Let the collectives components fight over who will do + collective on this new comm. */ + if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(*context->newcomm))) { + OBJ_RELEASE(*context->newcomm); + *context->newcomm = MPI_COMM_NULL; + return ret; + } + + /* For an inter communicator, we have to deal with the potential + * problem of what is happening if the local_comm that we created + * has a lower CID than the parent comm. This is not a problem + * as long as the user calls MPI_Comm_free on the inter communicator. + * However, if the communicators are not freed by the user but released + * by Open MPI in MPI_Finalize, we walk through the list of still available + * communicators and free them one by one. Thus, local_comm is freed before + * the actual inter-communicator. However, the local_comm pointer in the + * inter communicator will still contain the 'previous' address of the local_comm + * and thus this will lead to a segmentation violation. In order to prevent + * that from happening, we increase the reference counter local_comm + * by one if its CID is lower than the parent. We cannot increase however + * its reference counter if the CID of local_comm is larger than + * the CID of the inter communicators, since a regular MPI_Comm_free would + * leave in that the case the local_comm hanging around and thus we would not + * recycle CID's properly, which was the reason and the cause for this trouble. + */ + if (OMPI_COMM_IS_INTER(*context->newcomm)) { + if (OMPI_COMM_CID_IS_LOWER(*context->newcomm, context->comm)) { + OMPI_COMM_SET_EXTRA_RETAIN (*context->newcomm); + OBJ_RETAIN (*context->newcomm); + } + } + + /* done */ + return OMPI_SUCCESS; +} /**************************************************************************/ /**************************************************************************/ @@ -528,45 +885,50 @@ int ompi_comm_activate ( ompi_communicator_t** newcomm, * - remote_leader * - send_first */ -static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, +static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, + ompi_communicator_t *bridgecomm, + void* local_leader, + void* remote_leader, int send_first ) { - return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT, - op,comm, + return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT, op, comm, comm->c_coll.coll_allreduce_module ); } +static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, + ompi_request_t **req) +{ + return comm->c_coll.coll_iallreduce (inbuf, outbuf, count, MPI_INT, op, comm, + req, comm->c_coll.coll_iallreduce_module); +} + + /* Arguments not used in this implementation: * - bridgecomm * - local_leader * - remote_leader * - send_first */ -static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, +static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, + ompi_communicator_t *bridgecomm, + void* local_leader, + void* remote_leader, int send_first ) { int local_rank, rsize; - int i, rc; + int rc; int *sbuf; int *tmpbuf=NULL; int *rcounts=NULL, scount=0; int *rdisps=NULL; - if ( &ompi_mpi_op_sum.op != op && &ompi_mpi_op_prod.op != op && - &ompi_mpi_op_max.op != op && &ompi_mpi_op_min.op != op ) { - return MPI_ERR_OP; - } - if ( !OMPI_COMM_IS_INTER (intercomm)) { return MPI_ERR_COMM; } @@ -600,7 +962,7 @@ static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf, /* local leader exchange their data and determine the overall result for both groups */ - rc = MCA_PML_CALL(irecv (outbuf, count, MPI_INT, 0, + rc = MCA_PML_CALL(irecv (outbuf, count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG, intercomm, &req)); if ( OMPI_SUCCESS != rc ) { @@ -618,42 +980,19 @@ static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf, goto exit; } - if ( &ompi_mpi_op_max.op == op ) { - for ( i = 0 ; i < count; i++ ) { - if (tmpbuf[i] > outbuf[i]) { - outbuf[i] = tmpbuf[i]; - } - } - } - else if ( &ompi_mpi_op_min.op == op ) { - for ( i = 0 ; i < count; i++ ) { - if (tmpbuf[i] < outbuf[i]) { - outbuf[i] = tmpbuf[i]; - } - } - } - else if ( &ompi_mpi_op_sum.op == op ) { - for ( i = 0 ; i < count; i++ ) { - outbuf[i] += tmpbuf[i]; - } - } - else if ( &ompi_mpi_op_prod.op == op ) { - for ( i = 0 ; i < count; i++ ) { - outbuf[i] *= tmpbuf[i]; - } - } + ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT); } /* distribute the overall result to all processes in the other group. Instead of using bcast, we are using here allgatherv, to avoid the - possible deadlock. Else, we need an algorithm to determine, - which group sends first in the inter-bcast and which receives + possible deadlock. Else, we need an algorithm to determine, + which group sends first in the inter-bcast and which receives the result first. */ rcounts[0] = count; sbuf = outbuf; rc = intercomm->c_coll.coll_allgatherv (sbuf, scount, MPI_INT, outbuf, - rcounts, rdisps, MPI_INT, + rcounts, rdisps, MPI_INT, intercomm, intercomm->c_coll.coll_allgatherv_module); @@ -667,17 +1006,224 @@ static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf, if ( NULL != rdisps ) { free ( rdisps ); } - + return (rc); } +/* Non-blocking version of ompi_comm_allreduce_inter */ +struct ompi_comm_allreduce_inter_context { + int *inbuf; + int *outbuf; + int count; + struct ompi_op_t *op; + ompi_communicator_t *intercomm; + ompi_communicator_t *bridgecomm; + int *tmpbuf; + int *rcounts; + int *rdisps; +}; + +static void ompi_comm_allreduce_inter_context_free (struct ompi_comm_allreduce_inter_context *context) +{ + if (context->tmpbuf) { + free (context->tmpbuf); + } + + if (context->rdisps) { + free (context->rdisps); + } + + if (context->rcounts) { + free (context->rcounts); + } + + free (context); +} + +static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request); +static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request); +static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request); +static int ompi_comm_allreduce_inter_allgather_complete (ompi_comm_request_t *request); + +/* Arguments not used in this implementation: + * - bridgecomm + */ +static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_communicator_t *intercomm, + ompi_communicator_t *bridgecomm, + ompi_request_t **req) +{ + struct ompi_comm_allreduce_inter_context *context = NULL; + ompi_comm_request_t *request = NULL; + ompi_request_t *subreq; + int local_rank, rsize, rc; + + if (!OMPI_COMM_IS_INTER (intercomm)) { + return MPI_ERR_COMM; + } + + request = ompi_comm_request_get (); + if (NULL == request) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + context = calloc (1, sizeof (*context)); + if (NULL == context) { + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + context->inbuf = inbuf; + context->outbuf = outbuf; + context->count = count; + context->op = op; + context->intercomm = intercomm; + context->bridgecomm = bridgecomm; + + /* Allocate temporary arrays */ + rsize = ompi_comm_remote_size (intercomm); + local_rank = ompi_comm_rank (intercomm); + + context->tmpbuf = (int *) calloc (count, sizeof(int)); + context->rdisps = (int *) calloc (rsize, sizeof(int)); + context->rcounts = (int *) calloc (rsize, sizeof(int)); + if (OPAL_UNLIKELY (NULL == context->tmpbuf || NULL == context->rdisps || NULL == context->rcounts)) { + rc = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + request->context = context; + + /* Execute the inter-allreduce: the result from the local will be in the buffer of the remote group + * and vise-versa. */ + rc = intercomm->c_coll.coll_iallreduce (inbuf, context->tmpbuf, count, MPI_INT, op, intercomm, + &subreq, intercomm->c_coll.coll_iallreduce_module); + if (OMPI_SUCCESS != rc) { + goto exit; + } + + if (0 == local_rank) { + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_exchange, &subreq, 1); + } else { + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_allgather, &subreq, 1); + } + + ompi_comm_request_start (request); + *req = &request->super; + +exit: + if (OMPI_SUCCESS != rc) { + if (context) { + ompi_comm_allreduce_inter_context_free (context); + } + + if (request) { + request->context = NULL; + ompi_comm_request_return (request); + } + } + + return rc; +} + + +static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request) +{ + struct ompi_comm_allreduce_inter_context *context = + (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_request_t *subreqs[2]; + int rc; + + /* local leader exchange their data and determine the overall result + for both groups */ + rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG, + context->intercomm, subreqs)); + if ( OMPI_SUCCESS != rc ) { + goto exit; + } + + rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG, + MCA_PML_BASE_SEND_STANDARD, context->intercomm, subreqs + 1)); + if ( OMPI_SUCCESS != rc ) { + goto exit; + } + + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2); + +exit: + if (OMPI_SUCCESS != rc) { + ompi_comm_allreduce_inter_context_free (context); + request->context = NULL; + } + + return rc; +} + +static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request) +{ + struct ompi_comm_allreduce_inter_context *context = + (struct ompi_comm_allreduce_inter_context *) request->context; + + ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT); + + return ompi_comm_allreduce_inter_allgather (request); +} + + +static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request) +{ + struct ompi_comm_allreduce_inter_context *context = + (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_request_t *subreq; + int scount = 0, rc; + + /* distribute the overall result to all processes in the other group. + Instead of using bcast, we are using here allgatherv, to avoid the + possible deadlock. Else, we need an algorithm to determine, + which group sends first in the inter-bcast and which receives + the result first. + */ + + if (0 != ompi_comm_rank (context->intercomm)) { + context->rcounts[0] = context->count; + } else { + scount = context->count; + } + + rc = context->intercomm->c_coll.coll_iallgatherv (context->outbuf, scount, MPI_INT, context->outbuf, + context->rcounts, context->rdisps, MPI_INT, + context->intercomm, &subreq, + context->intercomm->c_coll.coll_iallgatherv_module); + if (OMPI_SUCCESS != rc) { + ompi_comm_allreduce_inter_context_free (context); + request->context = NULL; + return rc; + } + + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_allgather_complete, &subreq, 1); + + return OMPI_SUCCESS; +} + +static int ompi_comm_allreduce_inter_allgather_complete (ompi_comm_request_t *request) +{ + /* free this request's context */ + ompi_comm_allreduce_inter_context_free (request->context); + /* prevent a double-free from the progress engine */ + request->context = NULL; + + /* done */ + return OMPI_SUCCESS; +} + /* Arguments not used in this implementation: * - send_first */ -static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, +static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, ompi_communicator_t *comm, - ompi_communicator_t *bcomm, + ompi_communicator_t *bcomm, void* lleader, void* rleader, int send_first ) { @@ -694,7 +1240,7 @@ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, &ompi_mpi_op_max.op != op && &ompi_mpi_op_min.op != op ) { return MPI_ERR_OP; } - + local_rank = ompi_comm_rank ( comm ); tmpbuf = (int *) malloc ( count * sizeof(int)); if ( NULL == tmpbuf ) { @@ -711,14 +1257,14 @@ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, if (local_rank == local_leader ) { MPI_Request req; - + rc = MCA_PML_CALL(irecv ( outbuf, count, MPI_INT, remote_leader, - OMPI_COMM_ALLREDUCE_TAG, + OMPI_COMM_ALLREDUCE_TAG, bcomm, &req)); if ( OMPI_SUCCESS != rc ) { - goto exit; + goto exit; } - rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, remote_leader, + rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, remote_leader, OMPI_COMM_ALLREDUCE_TAG, MCA_PML_BASE_SEND_STANDARD, bcomm)); if ( OMPI_SUCCESS != rc ) { @@ -755,7 +1301,7 @@ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, } } - rc = comm->c_coll.coll_bcast ( outbuf, count, MPI_INT, local_leader, + rc = comm->c_coll.coll_bcast ( outbuf, count, MPI_INT, local_leader, comm, comm->c_coll.coll_bcast_module ); exit: @@ -790,15 +1336,14 @@ static void comm_cid_recv(int status, * rleader is the OOB contact information of the * root processes in the other world. */ -static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, +static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, + ompi_communicator_t *bridgecomm, void* lleader, void* rleader, int send_first ) { int *tmpbuf=NULL; - int i; int rc; int local_leader, local_rank; ompi_process_name_t *remote_leader=NULL; @@ -809,12 +1354,6 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf, remote_leader = (ompi_process_name_t*)rleader; size_count = count; - if ( &ompi_mpi_op_sum.op != op && &ompi_mpi_op_prod.op != op && - &ompi_mpi_op_max.op != op && &ompi_mpi_op_min.op != op ) { - return MPI_ERR_OP; - } - - local_rank = ompi_comm_rank ( comm ); tmpbuf = (int *) malloc ( count * sizeof(int)); if ( NULL == tmpbuf ) { @@ -828,7 +1367,7 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf, if ( OMPI_SUCCESS != rc ) { goto exit; } - + if (local_rank == local_leader ) { opal_buffer_t *sbuf; @@ -873,33 +1412,10 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf, OBJ_DESTRUCT(&rcid.buf); count = (int)size_count; - if ( &ompi_mpi_op_max.op == op ) { - for ( i = 0 ; i < count; i++ ) { - if (tmpbuf[i] > outbuf[i]) { - outbuf[i] = tmpbuf[i]; - } - } - } - else if ( &ompi_mpi_op_min.op == op ) { - for ( i = 0 ; i < count; i++ ) { - if (tmpbuf[i] < outbuf[i]) { - outbuf[i] = tmpbuf[i]; - } - } - } - else if ( &ompi_mpi_op_sum.op == op ) { - for ( i = 0 ; i < count; i++ ) { - outbuf[i] += tmpbuf[i]; - } - } - else if ( &ompi_mpi_op_prod.op == op ) { - for ( i = 0 ; i < count; i++ ) { - outbuf[i] *= tmpbuf[i]; - } - } + ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT); } - rc = comm->c_coll.coll_bcast (outbuf, count, MPI_INT, + rc = comm->c_coll.coll_bcast (outbuf, count, MPI_INT, local_leader, comm, comm->c_coll.coll_bcast_module); @@ -911,4 +1427,76 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf, return (rc); } +static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, + int count, struct ompi_op_t *op, + ompi_communicator_t *comm, + ompi_communicator_t *newcomm, + void* local_leader, + void* remote_leader, + int send_first) +{ + ompi_group_t *group = newcomm->c_local_group; + int peers_group[3], peers_comm[3]; + const int group_size = ompi_group_size (group); + const int group_rank = ompi_group_rank (group); + int tag = *((int *) local_leader); + int *tmp1; + int i, rc; + + /* basic recursive doubling allreduce on the group */ + peers_group[0] = group_rank ? ((group_rank - 1) >> 1) : MPI_PROC_NULL; + peers_group[1] = (group_rank * 2 + 1) < group_size ? group_rank * 2 + 1: MPI_PROC_NULL; + peers_group[2] = (group_rank * 2 + 2) < group_size ? group_rank * 2 + 2 : MPI_PROC_NULL; + + /* translate the ranks into the ranks of the parent communicator */ + ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, peers_comm); + + tmp1 = malloc (sizeof (int) * count); + + /* reduce */ + memmove (outbuf, inbuf, sizeof (int) * count); + + for (i = 1 ; i < 3 ; ++i) { + if (MPI_PROC_NULL != peers_comm[i]) { + rc = MCA_PML_CALL(recv(tmp1, count, MPI_INT, peers_comm[i], tag, comm, + MPI_STATUS_IGNORE)); + if (OMPI_SUCCESS != rc) { + goto out; + } + /* this is integer reduction so we do not care about ordering */ + ompi_op_reduce (op, tmp1, outbuf, count, MPI_INT); + } + } + + if (MPI_PROC_NULL != peers_comm[0]) { + rc = MCA_PML_CALL(send(outbuf, count, MPI_INT, peers_comm[0], + tag, MCA_PML_BASE_SEND_STANDARD, comm)); + if (OMPI_SUCCESS != rc) { + goto out; + } + + rc = MCA_PML_CALL(recv(outbuf, count, MPI_INT, peers_comm[0], + tag, comm, MPI_STATUS_IGNORE)); + if (OMPI_SUCCESS != rc) { + goto out; + } + } + + /* broadcast */ + for (i = 1 ; i < 3 ; ++i) { + if (MPI_PROC_NULL != peers_comm[i]) { + rc = MCA_PML_CALL(send(outbuf, count, MPI_INT, peers_comm[i], tag, + MCA_PML_BASE_SEND_STANDARD, comm)); + if (OMPI_SUCCESS != rc) { + goto out; + } + } + } + + out: + free (tmp1); + + return rc; +} + END_C_DECLS diff --git a/ompi/communicator/comm_request.c b/ompi/communicator/comm_request.c new file mode 100644 index 0000000000..c691a728cc --- /dev/null +++ b/ompi/communicator/comm_request.c @@ -0,0 +1,248 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2013 Los Alamos National Security, LLC. All rights + * reseved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "comm_request.h" + +opal_free_list_t ompi_comm_requests; +opal_list_t ompi_comm_requests_active; +opal_mutex_t ompi_comm_request_mutex; +bool ompi_comm_request_progress_active; +bool ompi_comm_request_initialized = false; + +typedef struct ompi_comm_request_item_t { + opal_list_item_t super; + ompi_comm_request_callback_fn_t callback; + ompi_request_t *subreqs[OMPI_COMM_REQUEST_MAX_SUBREQ]; + int subreq_count; +} ompi_comm_request_item_t; +OBJ_CLASS_DECLARATION(ompi_comm_request_item_t); + +static int ompi_comm_request_progress (void); + +void ompi_comm_request_init (void) +{ + OBJ_CONSTRUCT(&ompi_comm_requests, opal_free_list_t); + (void) opal_free_list_init (&ompi_comm_requests, sizeof (ompi_comm_request_t), + OBJ_CLASS(ompi_comm_request_t), 0, -1, 8); + + OBJ_CONSTRUCT(&ompi_comm_requests_active, opal_list_t); + ompi_comm_request_progress_active = false; + OBJ_CONSTRUCT(&ompi_comm_request_mutex, opal_mutex_t); + ompi_comm_request_initialized = true; +} + +void ompi_comm_request_fini (void) +{ + if (!ompi_comm_request_initialized) { + return; + } + + ompi_comm_request_initialized = false; + + opal_mutex_lock (&ompi_comm_request_mutex); + if (ompi_comm_request_progress_active) { + opal_progress_unregister (ompi_comm_request_progress); + } + opal_mutex_unlock (&ompi_comm_request_mutex); + OBJ_DESTRUCT(&ompi_comm_request_mutex); + OBJ_DESTRUCT(&ompi_comm_requests_active); + OBJ_DESTRUCT(&ompi_comm_requests); +} + + +int ompi_comm_request_schedule_append (ompi_comm_request_t *request, ompi_comm_request_callback_fn_t callback, + ompi_request_t *subreqs[], int subreq_count) +{ + ompi_comm_request_item_t *request_item; + int i; + + if (subreq_count > OMPI_COMM_REQUEST_MAX_SUBREQ) { + return OMPI_ERR_BAD_PARAM; + } + + request_item = OBJ_NEW(ompi_comm_request_item_t); + if (NULL == request_item) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + request_item->callback = callback; + + for (i = 0 ; i < subreq_count ; ++i) { + request_item->subreqs[i] = subreqs[i]; + } + + request_item->subreq_count = subreq_count; + + opal_list_append (&request->schedule, &request_item->super); + + return OMPI_SUCCESS; +} + +static int ompi_comm_request_progress (void) +{ + ompi_comm_request_t *request, *next; + + if (opal_mutex_trylock (&ompi_comm_request_mutex)) { + return 0; + } + + OPAL_LIST_FOREACH_SAFE(request, next, &ompi_comm_requests_active, ompi_comm_request_t) { + int rc = OMPI_SUCCESS; + + if (opal_list_get_size (&request->schedule)) { + ompi_comm_request_item_t *request_item = (ompi_comm_request_item_t *) opal_list_remove_first (&request->schedule); + int item_complete = true; + + /* don't call ompi_request_test_all as it causes a recursive call into opal_progress */ + while (request_item->subreq_count) { + ompi_request_t *subreq = request_item->subreqs[request_item->subreq_count-1]; + if (true == subreq->req_complete) { + ompi_request_free (&subreq); + request_item->subreq_count--; + } else { + item_complete = false; + break; + } + } + + if (item_complete) { + if (request_item->callback) { + opal_mutex_unlock (&ompi_comm_request_mutex); + rc = request_item->callback (request); + opal_mutex_lock (&ompi_comm_request_mutex); + } + OBJ_RELEASE(request_item); + } else { + opal_list_prepend (&request->schedule, &request_item->super); + } + } + + /* if the request schedule is empty then the request is complete */ + if (0 == opal_list_get_size (&request->schedule)) { + opal_list_remove_item (&ompi_comm_requests_active, (opal_list_item_t *) request); + request->super.req_status.MPI_ERROR = (OMPI_SUCCESS == rc) ? MPI_SUCCESS : MPI_ERR_INTERN; + ompi_request_complete (&request->super, true); + } + } + + if (0 == opal_list_get_size (&ompi_comm_requests_active)) { + /* no more active requests. disable this progress function */ + ompi_comm_request_progress_active = false; + opal_progress_unregister (ompi_comm_request_progress); + } + + opal_mutex_unlock (&ompi_comm_request_mutex); + + return 1; +} + +void ompi_comm_request_start (ompi_comm_request_t *request) +{ + opal_mutex_lock (&ompi_comm_request_mutex); + opal_list_append (&ompi_comm_requests_active, (opal_list_item_t *) request); + + /* check if we need to start the communicator request progress function */ + if (!ompi_comm_request_progress_active) { + opal_progress_register (ompi_comm_request_progress); + ompi_comm_request_progress_active = true; + } + + opal_mutex_unlock (&ompi_comm_request_mutex); +} + +static int ompi_comm_request_cancel (struct ompi_request_t *ompi_req, int complete) +{ + ompi_comm_request_t *tmp, *request = (ompi_comm_request_t *) ompi_req; + ompi_comm_request_item_t *item, *next; + + opal_mutex_lock (&ompi_comm_request_mutex); + + OPAL_LIST_FOREACH_SAFE(item, next, &request->schedule, ompi_comm_request_item_t) { + for (int i = 0 ; i < item->subreq_count ; ++i) { + ompi_request_cancel (item->subreqs[i]); + } + + opal_list_remove_item (&request->schedule, &item->super); + OBJ_RELEASE(item); + } + + /* remove the request for the list of active requests */ + OPAL_LIST_FOREACH(tmp, &ompi_comm_requests_active, ompi_comm_request_t) { + if (tmp == request) { + opal_list_remove_item (&ompi_comm_requests_active, (opal_list_item_t *) request); + break; + } + } + + opal_mutex_unlock (&ompi_comm_request_mutex); + + return MPI_ERR_REQUEST; +} + +static int ompi_comm_request_free (struct ompi_request_t **ompi_req) +{ + ompi_comm_request_t *request = (ompi_comm_request_t *) *ompi_req; + + if (!(*ompi_req)->req_complete) { + return MPI_ERR_REQUEST; + } + + (*ompi_req)->req_complete = false; + ompi_comm_request_return (request); + + *ompi_req = MPI_REQUEST_NULL; + + return OMPI_SUCCESS; +} + +static void ompi_comm_request_construct (ompi_comm_request_t *request) +{ + request->context = NULL; + + request->super.req_type = OMPI_REQUEST_COMM; + request->super.req_status._cancelled = 0; + request->super.req_free = ompi_comm_request_free; + request->super.req_cancel = ompi_comm_request_cancel; + + OBJ_CONSTRUCT(&request->schedule, opal_list_t); +} + +static void ompi_comm_request_destruct (ompi_comm_request_t *request) +{ + OBJ_DESTRUCT(&request->schedule); +} +OBJ_CLASS_INSTANCE(ompi_comm_request_t, ompi_request_t, + ompi_comm_request_construct, + ompi_comm_request_destruct); + +OBJ_CLASS_INSTANCE(ompi_comm_request_item_t, opal_list_item_t, NULL, NULL); + +ompi_comm_request_t *ompi_comm_request_get (void) +{ + opal_free_list_item_t *item; + int rc; + + OPAL_FREE_LIST_GET(&ompi_comm_requests, item, rc); + (void) rc; + + return (ompi_comm_request_t *) item; +} + +void ompi_comm_request_return (ompi_comm_request_t *request) +{ + if (request->context) { + free (request->context); + request->context = NULL; + } + + OPAL_FREE_LIST_RETURN(&ompi_comm_requests, (opal_free_list_item_t *) request); +} + diff --git a/ompi/communicator/comm_request.h b/ompi/communicator/comm_request.h new file mode 100644 index 0000000000..246a3010b0 --- /dev/null +++ b/ompi/communicator/comm_request.h @@ -0,0 +1,40 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2013 Los Alamos National Security, LLC. All rights + * reseved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#if !defined(OMPI_COMM_REQ_H) +#define OMPI_COMM_REQ_H + +#include "opal/class/opal_list.h" +#include "opal/class/opal_free_list.h" +#include "ompi/request/request.h" + +/* increase this number if more subrequests are needed */ +#define OMPI_COMM_REQUEST_MAX_SUBREQ 2 + +typedef struct ompi_comm_request_t { + ompi_request_t super; + + void *context; + opal_list_t schedule; +} ompi_comm_request_t; +OBJ_CLASS_DECLARATION(ompi_comm_request_t); + +typedef int (*ompi_comm_request_callback_fn_t) (ompi_comm_request_t *); + +void ompi_comm_request_init (void); +void ompi_comm_request_fini (void); +int ompi_comm_request_schedule_append (ompi_comm_request_t *request, ompi_comm_request_callback_fn_t callback, + ompi_request_t *subreqs[], int subreq_count); +void ompi_comm_request_start (ompi_comm_request_t *request); +ompi_comm_request_t *ompi_comm_request_get (void); +void ompi_comm_request_return (ompi_comm_request_t *request); + +#endif /* OMPI_COMM_REQ_H */ diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index 4d54187658..fbd1ae2d47 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -31,6 +31,7 @@ #include "opal/class/opal_object.h" #include "ompi/errhandler/errhandler.h" #include "opal/threads/mutex.h" +#include "ompi/communicator/comm_request.h" #include "mpi.h" #include "ompi/group/group.h" @@ -91,6 +92,7 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t); #define OMPI_COMM_CID_INTER 0x00000040 #define OMPI_COMM_CID_INTRA_BRIDGE 0x00000080 #define OMPI_COMM_CID_INTRA_OOB 0x00000100 +#define OMPI_COMM_CID_GROUP 0x00000200 /** * The block of CIDs allocated for MPI_COMM_WORLD @@ -359,6 +361,13 @@ OMPI_DECLSPEC int ompi_comm_group (ompi_communicator_t *comm, ompi_group_t **gro int ompi_comm_create (ompi_communicator_t* comm, ompi_group_t *group, ompi_communicator_t** newcomm); + +/** + * Non-collective create communicator based on a group + */ +int ompi_comm_create_group (ompi_communicator_t *comm, ompi_group_t *group, int tag, + ompi_communicator_t **newcomm); + /** * Take an almost complete communicator and reserve the CID as well * as activate it (initialize the collective and the topologies). @@ -416,6 +425,37 @@ OMPI_DECLSPEC int ompi_comm_split_type(ompi_communicator_t *comm, * @param newcomm: the new communicator or MPI_COMM_NULL if any error is detected. */ OMPI_DECLSPEC int ompi_comm_dup (ompi_communicator_t *comm, ompi_communicator_t **newcomm); + +/** + * dup a communicator (non-blocking). Parameter are identical to the MPI-counterpart + * of the function. It has been extracted, since we need to be able + * to dup a communicator internally as well. + * + * @param comm: input communicator + * @param newcomm: the new communicator or MPI_COMM_NULL if any error is detected. + */ +OMPI_DECLSPEC int ompi_comm_idup (ompi_communicator_t *comm, ompi_communicator_t **newcomm, ompi_request_t **request); + +/** + * dup a communicator with info. Parameter are identical to the MPI-counterpart + * of the function. It has been extracted, since we need to be able + * to dup a communicator internally as well. + * + * @param comm: input communicator + * @param newcomm: the new communicator or MPI_COMM_NULL if any error is detected. + */ +OMPI_DECLSPEC int ompi_comm_dup_with_info (ompi_communicator_t *comm, ompi_info_t *info, ompi_communicator_t **newcomm); + +/** + * dup a communicator (non-blocking) with info. + * of the function. It has been extracted, since we need to be able + * to dup a communicator internally as well. + * + * @param comm: input communicator + * @param newcomm: the new communicator or MPI_COMM_NULL if any error is detected. + */ +OMPI_DECLSPEC int ompi_comm_idup_with_info (ompi_communicator_t *comm, ompi_info_t *info, ompi_communicator_t **newcomm, ompi_request_t **req); + /** * compare two communicators. * @@ -469,6 +509,21 @@ OMPI_DECLSPEC int ompi_comm_nextcid ( ompi_communicator_t* newcomm, int mode, int send_first); +/** + * allocate new communicator ID (non-blocking) + * @param newcomm: pointer to the new communicator + * @param oldcomm: original comm + * @param bridgecomm: bridge comm for intercomm_create + * @param mode: combination of input + * OMPI_COMM_CID_INTRA: intra-comm + * OMPI_COMM_CID_INTER: inter-comm + * This routine has to be thread safe in the final version. + */ +OMPI_DECLSPEC int ompi_comm_nextcid_nb (ompi_communicator_t* newcomm, + ompi_communicator_t* comm, + ompi_communicator_t* bridgecomm, + int mode, ompi_request_t **req); + /** * shut down the communicator infrastructure. */ @@ -489,6 +544,20 @@ OMPI_DECLSPEC int ompi_comm_set ( ompi_communicator_t** newcomm, bool copy_topocomponent, ompi_group_t *local_group, ompi_group_t *remote_group ); + +OMPI_DECLSPEC int ompi_comm_set_nb ( ompi_communicator_t **ncomm, + ompi_communicator_t *oldcomm, + int local_size, + int *local_ranks, + int remote_size, + int *remote_ranks, + opal_hash_table_t *attr, + ompi_errhandler_t *errh, + bool copy_topocomponent, + ompi_group_t *local_group, + ompi_group_t *remote_group, + ompi_request_t **req ); + /** * This is a short-hand routine used in intercomm_create. * The routine makes sure, that all processes have afterwards @@ -525,6 +594,11 @@ OMPI_DECLSPEC int ompi_comm_activate ( ompi_communicator_t** newcomm, int mode, int send_first ); +OMPI_DECLSPEC int ompi_comm_activate_nb (ompi_communicator_t **newcomm, + ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, + int mode, ompi_request_t **req); + /** * a simple function to dump the structure */ diff --git a/ompi/include/mpi.h.in b/ompi/include/mpi.h.in index e29bb4c8ca..c2c6a56e94 100644 --- a/ompi/include/mpi.h.in +++ b/ompi/include/mpi.h.in @@ -1240,10 +1240,13 @@ OMPI_DECLSPEC int MPI_Comm_create_errhandler(MPI_Comm_errhandler_function *func OMPI_DECLSPEC int MPI_Comm_create_keyval(MPI_Comm_copy_attr_function *comm_copy_attr_fn, MPI_Comm_delete_attr_function *comm_delete_attr_fn, int *comm_keyval, void *extra_state); +OMPI_DECLSPEC int MPI_Comm_create_group(MPI_Comm comm, MPI_Group group, int tag, MPI_Comm *newcomm); OMPI_DECLSPEC int MPI_Comm_create(MPI_Comm comm, MPI_Group group, MPI_Comm *newcomm); OMPI_DECLSPEC int MPI_Comm_delete_attr(MPI_Comm comm, int comm_keyval); OMPI_DECLSPEC int MPI_Comm_disconnect(MPI_Comm *comm); OMPI_DECLSPEC int MPI_Comm_dup(MPI_Comm comm, MPI_Comm *newcomm); +OMPI_DECLSPEC int MPI_Comm_idup(MPI_Comm comm, MPI_Comm *newcomm, MPI_Request *request); +OMPI_DECLSPEC int MPI_Comm_dup_with_info(MPI_Comm comm, MPI_Info info, MPI_Comm *newcomm); OMPI_DECLSPEC MPI_Comm MPI_Comm_f2c(MPI_Fint comm); OMPI_DECLSPEC int MPI_Comm_free_keyval(int *comm_keyval); OMPI_DECLSPEC int MPI_Comm_free(MPI_Comm *comm); @@ -1909,10 +1912,13 @@ OMPI_DECLSPEC int PMPI_Comm_create_errhandler(MPI_Comm_errhandler_function *fun OMPI_DECLSPEC int PMPI_Comm_create_keyval(MPI_Comm_copy_attr_function *comm_copy_attr_fn, MPI_Comm_delete_attr_function *comm_delete_attr_fn, int *comm_keyval, void *extra_state); +OMPI_DECLSPEC int PMPI_Comm_create_group(MPI_Comm comm, MPI_Group group, int tag, MPI_Comm *newcomm); OMPI_DECLSPEC int PMPI_Comm_create(MPI_Comm comm, MPI_Group group, MPI_Comm *newcomm); OMPI_DECLSPEC int PMPI_Comm_delete_attr(MPI_Comm comm, int comm_keyval); OMPI_DECLSPEC int PMPI_Comm_disconnect(MPI_Comm *comm); OMPI_DECLSPEC int PMPI_Comm_dup(MPI_Comm comm, MPI_Comm *newcomm); +OMPI_DECLSPEC int PMPI_Comm_idup(MPI_Comm comm, MPI_Comm *newcomm, MPI_Request *request); +OMPI_DECLSPEC int PMPI_Comm_dup_with_info(MPI_Comm comm, MPI_Info info, MPI_Comm *newcomm); OMPI_DECLSPEC MPI_Comm PMPI_Comm_f2c(MPI_Fint comm); OMPI_DECLSPEC int PMPI_Comm_free_keyval(int *comm_keyval); OMPI_DECLSPEC int PMPI_Comm_free(MPI_Comm *comm); diff --git a/ompi/mpi/c/Makefile.am b/ompi/mpi/c/Makefile.am index a4c868aa14..cd63a7b5d2 100644 --- a/ompi/mpi/c/Makefile.am +++ b/ompi/mpi/c/Makefile.am @@ -11,7 +11,9 @@ # All rights reserved. # Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. # Copyright (c) 2011 Sandia National Laboratories. All rights reserved. -# Copyright (c) 2012 Oak Rigde National Laboratory. All rights reserved. +# Copyright (c) 2012 Oak Ridge National Laboratory. All rights reserved. +# Copyright (c) 2013 Los Alamos National Security, LLC. All rights +# reserved. # $COPYRIGHT$ # # Additional copyrights may follow @@ -106,10 +108,13 @@ libmpi_c_mpi_la_SOURCES = \ comm_connect.c \ comm_create.c \ comm_create_errhandler.c \ + comm_create_group.c \ comm_create_keyval.c \ comm_delete_attr.c \ comm_disconnect.c \ comm_dup.c \ + comm_dup_with_info.c \ + comm_idup.c \ comm_f2c.c \ comm_free.c \ comm_free_keyval.c \ diff --git a/ompi/mpi/c/comm_create_group.c b/ompi/mpi/c/comm_create_group.c new file mode 100644 index 0000000000..a09c22b4a7 --- /dev/null +++ b/ompi/mpi/c/comm_create_group.c @@ -0,0 +1,80 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2008 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2007 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2013 Los Alamos National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/memchecker.h" + +#if OPAL_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES +#pragma weak MPI_Comm_create_group = PMPI_Comm_create_group +#endif + +#if OMPI_PROFILING_DEFINES +#include "ompi/mpi/c/profile/defines.h" +#endif + +static const char FUNC_NAME[] = "MPI_Comm_create_group"; + + +int MPI_Comm_create_group (MPI_Comm comm, MPI_Group group, int tag, MPI_Comm *newcomm) { + int rc; + + MEMCHECKER( + memchecker_comm(comm); + ); + + if ( MPI_PARAM_CHECK ) { + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + + if (ompi_comm_invalid (comm)) + return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_COMM, + FUNC_NAME); + + if (tag < 0 || tag > mca_pml.pml_max_tag) + return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_TAG, + FUNC_NAME); + + if ( NULL == group ) + return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_GROUP, + FUNC_NAME); + + if ( NULL == newcomm ) + return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, + FUNC_NAME); + } + + if (MPI_GROUP_NULL == group || MPI_UNDEFINED == ompi_group_rank (group)) { + *newcomm = MPI_COMM_NULL; + return MPI_SUCCESS; + } + + OPAL_CR_ENTER_LIBRARY(); + + rc = ompi_comm_create_group ((ompi_communicator_t *) comm, (ompi_group_t *) group, + tag, (ompi_communicator_t **) newcomm); + OMPI_ERRHANDLER_RETURN (rc, comm, rc, FUNC_NAME); +} diff --git a/ompi/mpi/c/comm_dup_with_info.c b/ompi/mpi/c/comm_dup_with_info.c new file mode 100644 index 0000000000..ec765a5870 --- /dev/null +++ b/ompi/mpi/c/comm_dup_with_info.c @@ -0,0 +1,72 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2008 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2008 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2006-2008 University of Houston. All rights reserved. + * Copyright (c) 2013 Los Alamos National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/memchecker.h" + +#if OPAL_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES +#pragma weak MPI_Comm_dup_with_info = PMPI_Comm_dup_with_info +#endif + +#if OMPI_PROFILING_DEFINES +#include "ompi/mpi/c/profile/defines.h" +#endif + +static const char FUNC_NAME[] = "MPI_Comm_dup_with_info"; + +int MPI_Comm_dup_with_info(MPI_Comm comm, MPI_Info info, MPI_Comm *newcomm) +{ + int rc; + + MEMCHECKER( + memchecker_comm(comm); + ); + + /* argument checking */ + if ( MPI_PARAM_CHECK ) { + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + + if (ompi_comm_invalid (comm)) + return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_COMM, + FUNC_NAME); + if (NULL == info || ompi_info_is_freed(info)) { + return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_INFO, + FUNC_NAME); + } + + if ( NULL == newcomm ) + return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, + FUNC_NAME); + } + + OPAL_CR_ENTER_LIBRARY(); + + rc = ompi_comm_dup_with_info (comm, info, newcomm); + OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME); +} + diff --git a/ompi/mpi/c/comm_idup.c b/ompi/mpi/c/comm_idup.c new file mode 100644 index 0000000000..9edaa8f0be --- /dev/null +++ b/ompi/mpi/c/comm_idup.c @@ -0,0 +1,68 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2008 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2008 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2006-2008 University of Houston. All rights reserved. + * Copyright (c) 2013 Los Alamos National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/memchecker.h" + +#if OPAL_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES +#pragma weak MPI_Comm_idup = PMPI_Comm_idup +#endif + +#if OMPI_PROFILING_DEFINES +#include "ompi/mpi/c/profile/defines.h" +#endif + +static const char FUNC_NAME[] = "MPI_Comm_idup"; + +int MPI_Comm_idup(MPI_Comm comm, MPI_Comm *newcomm, MPI_Request *request) +{ + int rc; + + MEMCHECKER( + memchecker_comm(comm); + ); + + /* argument checking */ + if ( MPI_PARAM_CHECK ) { + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + + if (ompi_comm_invalid (comm)) + return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_COMM, + FUNC_NAME); + + if ( NULL == newcomm ) + return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG, + FUNC_NAME); + } + + OPAL_CR_ENTER_LIBRARY(); + + rc = ompi_comm_idup (comm, newcomm, request); + OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME); +} + diff --git a/ompi/mpi/c/profile/Makefile.am b/ompi/mpi/c/profile/Makefile.am index 1e5a5551a0..b70666b76b 100644 --- a/ompi/mpi/c/profile/Makefile.am +++ b/ompi/mpi/c/profile/Makefile.am @@ -12,7 +12,9 @@ # All rights reserved. # Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. # Copyright (c) 2011 Sandia National Laboratories. All rights reserved. -# Copyright (c) 2012 Oak Rigde National Laboratory. All rights reserved. +# Copyright (c) 2012 Oak Ridge National Laboratory. All rights reserved. +# Copyright (c) 2013 Los Alamos National Security, LLC. All rights +# reserved. # $COPYRIGHT$ # # Additional copyrights may follow @@ -88,10 +90,13 @@ nodist_libmpi_c_pmpi_la_SOURCES = \ pcomm_connect.c \ pcomm_create.c \ pcomm_create_errhandler.c \ + pcomm_create_group.c \ pcomm_create_keyval.c \ pcomm_delete_attr.c \ pcomm_disconnect.c \ pcomm_dup.c \ + pcomm_dup_with_info.c \ + pcomm_idup.c \ pcomm_f2c.c \ pcomm_free.c \ pcomm_free_keyval.c \ diff --git a/ompi/mpi/c/profile/defines.h b/ompi/mpi/c/profile/defines.h index 208e061f98..17d512eae8 100644 --- a/ompi/mpi/c/profile/defines.h +++ b/ompi/mpi/c/profile/defines.h @@ -76,10 +76,13 @@ #define MPI_Comm_connect PMPI_Comm_connect #define MPI_Comm_create_errhandler PMPI_Comm_create_errhandler #define MPI_Comm_create_keyval PMPI_Comm_create_keyval +#define MPI_Comm_create_group PMPI_Comm_create_group #define MPI_Comm_create PMPI_Comm_create #define MPI_Comm_delete_attr PMPI_Comm_delete_attr #define MPI_Comm_disconnect PMPI_Comm_disconnect #define MPI_Comm_dup PMPI_Comm_dup +#define MPI_Comm_dup_with_info PMPI_Comm_dup_with_info +#define MPI_Comm_idup PMPI_Comm_idup #define MPI_Comm_f2c PMPI_Comm_f2c #define MPI_Comm_free_keyval PMPI_Comm_free_keyval #define MPI_Comm_free PMPI_Comm_free diff --git a/ompi/request/request_dbg.h b/ompi/request/request_dbg.h index e4421f4e0b..c25ae29f83 100644 --- a/ompi/request/request_dbg.h +++ b/ompi/request/request_dbg.h @@ -27,6 +27,7 @@ typedef enum { OMPI_REQUEST_COLL, /**< MPI-3 non-blocking collectives request */ OMPI_REQUEST_NULL, /**< NULL request */ OMPI_REQUEST_NOOP, /**< A request that does nothing (e.g., to PROC_NULL) */ + OMPI_REQUEST_COMM, /**< MPI-3 non-blocking communicator duplication */ OMPI_REQUEST_MAX /**< Maximum request type */ } ompi_request_type_t; diff --git a/ompi/runtime/ompi_mpi_finalize.c b/ompi/runtime/ompi_mpi_finalize.c index ebdfd5cc5b..55ac43ecca 100644 --- a/ompi/runtime/ompi_mpi_finalize.c +++ b/ompi/runtime/ompi_mpi_finalize.c @@ -283,6 +283,9 @@ int ompi_mpi_finalize(void) return ret; } + /* release resources held by comm requests */ + ompi_comm_request_fini (); + if (OMPI_SUCCESS != (ret = ompi_message_finalize())) { return ret; } diff --git a/ompi/runtime/ompi_mpi_init.c b/ompi/runtime/ompi_mpi_init.c index 7b2bc9ffb4..29b1bf9f54 100644 --- a/ompi/runtime/ompi_mpi_init.c +++ b/ompi/runtime/ompi_mpi_init.c @@ -871,6 +871,9 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided) goto error; } + /* Prepare communicator requests */ + ompi_comm_request_init (); + /* Init coll for the comms. This has to be after dpm_base_select, (since dpm.mark_dyncomm is not set in the communicator creation function else), but before dpm.dyncom_init, since this function