From 035c2e2e2aefbd36a200a0a1c4262fde959de462 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Fri, 8 Jul 2016 10:46:05 -0600 Subject: [PATCH] ompi/comm: refactor communicator cid code This commit simplifies the communicator context ID generation by removing the blocking code. The high level calls: ompi_comm_nextcid and ompi_comm_activate remain but now call the non-blocking variants and wait on the resulting request. This was done to remove the parallel paths for context ID generation in preperation for further improvements of the CID generation code. Signed-off-by: Nathan Hjelm --- ompi/communicator/comm.c | 158 +-- ompi/communicator/comm_cid.c | 1626 ++++++++++++------------------ ompi/communicator/comm_init.c | 8 - ompi/communicator/comm_request.c | 5 +- ompi/communicator/comm_request.h | 2 +- ompi/communicator/communicator.h | 71 +- ompi/dpm/dpm.c | 30 +- ompi/mpi/c/intercomm_create.c | 22 +- ompi/mpi/c/intercomm_merge.c | 23 +- 9 files changed, 735 insertions(+), 1210 deletions(-) diff --git a/ompi/communicator/comm.c b/ompi/communicator/comm.c index c1997d3841..342e5028af 100644 --- a/ompi/communicator/comm.c +++ b/ompi/communicator/comm.c @@ -358,13 +358,7 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group, } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -374,13 +368,7 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group, newcomp->c_contextid, comm->c_contextid ); /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -609,13 +597,7 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first, doesn't matter */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -634,36 +616,15 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); exit: - if ( NULL != results ) { - free ( results ); - } - if ( NULL != sorted ) { - free ( sorted ); - } - if ( NULL != rresults) { - free ( rresults ); - } - if ( NULL != rsorted ) { - free ( rsorted ); - } - if ( NULL != lranks ) { - free ( lranks ); - } - if ( NULL != rranks ) { - free ( rranks ); - } + free ( results ); + free ( sorted ); + free ( rresults ); + free ( rsorted ); + free ( lranks ); + free ( rranks ); /* Step 4: if we are not part of the comm, free the struct */ /* --------------------------------------------------------- */ @@ -675,7 +636,7 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key, } *newcomm = newcomp; - return ( rc ); + return rc; } @@ -925,13 +886,7 @@ ompi_comm_split_type(ompi_communicator_t *comm, } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send first, doesn't matter */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -950,13 +905,7 @@ ompi_comm_split_type(ompi_communicator_t *comm, /* Activate the communicator and init coll-component */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { goto exit; } @@ -1031,13 +980,7 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1047,13 +990,7 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp newcomp->c_contextid, comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - NULL, - NULL, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1062,11 +999,15 @@ int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, omp return MPI_SUCCESS; } -struct ompi_comm_idup_with_info_context { +struct ompi_comm_idup_with_info_context_t { + opal_object_t super; ompi_communicator_t *comm; ompi_communicator_t *newcomp; }; +typedef struct ompi_comm_idup_with_info_context_t ompi_comm_idup_with_info_context_t; +OBJ_CLASS_INSTANCE(ompi_comm_idup_with_info_context_t, opal_object_t, NULL, NULL); + static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request); static int ompi_comm_idup_with_info_finish (ompi_comm_request_t *request); static int ompi_comm_idup_getcid (ompi_comm_request_t *request); @@ -1085,7 +1026,7 @@ int ompi_comm_idup_with_info (ompi_communicator_t *comm, ompi_info_t *info, ompi static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *group, ompi_group_t *remote_group, ompi_info_t *info, ompi_communicator_t **newcomm, ompi_request_t **req) { - struct ompi_comm_idup_with_info_context *context; + ompi_comm_idup_with_info_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq[1]; int rc; @@ -1101,7 +1042,7 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro return OMPI_ERR_OUT_OF_RESOURCE; } - context = calloc (1, sizeof (*context)); + context = OBJ_NEW(ompi_comm_idup_with_info_context_t); if (NULL == context) { ompi_comm_request_return (request); return OMPI_ERR_OUT_OF_RESOURCE; @@ -1109,7 +1050,7 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro context->comm = comm; - request->context = context; + request->context = &context->super; rc = ompi_comm_set_nb (&context->newcomp, /* new comm */ comm, /* old comm */ @@ -1142,8 +1083,8 @@ static int ompi_comm_idup_internal (ompi_communicator_t *comm, ompi_group_t *gro static int ompi_comm_idup_getcid (ompi_comm_request_t *request) { - struct ompi_comm_idup_with_info_context *context = - (struct ompi_comm_idup_with_info_context *) request->context; + ompi_comm_idup_with_info_context_t *context = + (ompi_comm_idup_with_info_context_t *) request->context; ompi_request_t *subreq[1]; int rc, mode; @@ -1154,11 +1095,8 @@ static int ompi_comm_idup_getcid (ompi_comm_request_t *request) } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid_nb (context->newcomp, /* new communicator */ - context->comm, /* old comm */ - NULL, /* bridge comm */ - mode, /* mode */ - subreq); /* new subrequest */ + rc = ompi_comm_nextcid_nb (context->newcomp, context->comm, NULL, NULL, + NULL, false, mode, subreq); if (OMPI_SUCCESS != rc) { ompi_comm_request_return (request); return rc; @@ -1171,8 +1109,8 @@ static int ompi_comm_idup_getcid (ompi_comm_request_t *request) static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request) { - struct ompi_comm_idup_with_info_context *context = - (struct ompi_comm_idup_with_info_context *) request->context; + ompi_comm_idup_with_info_context_t *context = + (ompi_comm_idup_with_info_context_t *) request->context; ompi_request_t *subreq[1]; int rc, mode; @@ -1187,7 +1125,7 @@ static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request) context->newcomp->c_contextid, context->comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate_nb (&context->newcomp, context->comm, NULL, mode, subreq); + rc = ompi_comm_activate_nb (&context->newcomp, context->comm, NULL, NULL, NULL, false, mode, subreq); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1233,13 +1171,7 @@ int ompi_comm_create_group (ompi_communicator_t *comm, ompi_group_t *group, int } /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old comm */ - newcomp, /* bridge comm (used to pass the group into the group allreduce) */ - &tag, /* user defined tag */ - NULL, /* remote_leader */ - mode, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_nextcid (newcomp, comm, NULL, &tag, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1249,13 +1181,7 @@ int ompi_comm_create_group (ompi_communicator_t *comm, ompi_group_t *group, int newcomp->c_contextid, comm->c_contextid ); /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new communicator */ - comm, - newcomp, - &tag, - NULL, - mode, - -1 ); + rc = ompi_comm_activate (&newcomp, comm, NULL, &tag, NULL, false, mode); if ( OMPI_SUCCESS != rc ) { return rc; } @@ -1924,13 +1850,8 @@ int ompi_comm_enable(ompi_communicator_t *old_comm, int ret = OMPI_SUCCESS; /* Determine context id. It is identical to f_2_c_handle */ - ret = ompi_comm_nextcid ( new_comm, /* new communicator */ - old_comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTRA, /* mode */ - -1 ); /* send first, doesn't matter */ + ret = ompi_comm_nextcid (new_comm, old_comm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTRA); if (OMPI_SUCCESS != ret) { /* something wrong happened while setting the communicator */ goto complete_and_return; @@ -1953,15 +1874,8 @@ int ompi_comm_enable(ompi_communicator_t *old_comm, goto complete_and_return; } - ret = ompi_comm_activate( &new_comm, /* new communicator */ - old_comm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTRA, /* mode */ - -1 ); /* send first, doesn't matter */ - - + ret = ompi_comm_activate (&new_comm, old_comm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTRA); if (OMPI_SUCCESS != ret) { /* something wrong happened while setting the communicator */ goto complete_and_return; diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index ea53930f7c..ab9c996119 100644 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -14,7 +14,7 @@ * Copyright (c) 2007 Voltaire All rights reserved. * Copyright (c) 2006-2010 University of Houston. All rights reserved. * Copyright (c) 2009 Sun Microsystems, Inc. All rights reserved. - * Copyright (c) 2012-2014 Los Alamos National Security, LLC. All rights + * Copyright (c) 2012-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. @@ -44,7 +44,90 @@ #include "ompi/request/request.h" #include "ompi/runtime/mpiruntime.h" -BEGIN_C_DECLS +struct ompi_comm_cid_context_t; + +typedef int (*ompi_comm_allreduce_impl_fn_t) (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, + struct ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); + + +struct ompi_comm_cid_context_t { + opal_object_t super; + + ompi_communicator_t *newcomm; + ompi_communicator_t **newcommp; + ompi_communicator_t *comm; + ompi_communicator_t *bridgecomm; + + ompi_comm_allreduce_impl_fn_t allreduce_fn; + + int nextcid; + int nextlocal_cid; + int start; + int flag, rflag; + int local_leader; + int remote_leader; + int iter; + /** storage for activate barrier */ + int ok; + char *port_string; + bool send_first; + int pml_tag; + char *pmix_tag; +}; + +typedef struct ompi_comm_cid_context_t ompi_comm_cid_context_t; + +static void mca_comm_cid_context_construct (ompi_comm_cid_context_t *context) +{ + memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super)); +} + +static void mca_comm_cid_context_destruct (ompi_comm_cid_context_t *context) +{ + free (context->port_string); + free (context->pmix_tag); +} + +OBJ_CLASS_INSTANCE (ompi_comm_cid_context_t, opal_object_t, + mca_comm_cid_context_construct, + mca_comm_cid_context_destruct); + +struct ompi_comm_allreduce_context_t { + opal_object_t super; + + int *inbuf; + int *outbuf; + int count; + struct ompi_op_t *op; + ompi_comm_cid_context_t *cid_context; + int *tmpbuf; + + /* for intercomm allreduce */ + int *rcounts; + int *rdisps; + + /* for group allreduce */ + int peers_comm[3]; +}; + +typedef struct ompi_comm_allreduce_context_t ompi_comm_allreduce_context_t; + +static void ompi_comm_allreduce_context_construct (ompi_comm_allreduce_context_t *context) +{ + memset ((void *) ((intptr_t) context + sizeof (context->super)), 0, sizeof (*context) - sizeof (context->super)); +} + +static void ompi_comm_allreduce_context_destruct (ompi_comm_allreduce_context_t *context) +{ + free (context->tmpbuf); + free (context->rcounts); + free (context->rdisps); +} + +OBJ_CLASS_INSTANCE (ompi_comm_allreduce_context_t, opal_object_t, + ompi_comm_allreduce_context_construct, + ompi_comm_allreduce_context_destruct); /** * These functions make sure, that we determine the global result over @@ -53,90 +136,29 @@ BEGIN_C_DECLS * and a bridge-comm (intercomm-create scenario). */ - -typedef int ompi_comm_cid_allredfct (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_intra (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_ledaer, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_intra_bridge(int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter); - -static int ompi_comm_allreduce_intra_pmix (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ); - -static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter); - /* non-blocking intracommunicator allreduce */ -static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, +static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, ompi_request_t **req); /* non-blocking intercommunicator allreduce */ -static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, +static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, ompi_request_t **req); +static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -static int ompi_comm_register_cid (uint32_t contextid); -static int ompi_comm_unregister_cid (uint32_t contextid); -static uint32_t ompi_comm_lowest_cid ( void ); +static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -struct ompi_comm_reg_t{ - opal_list_item_t super; - uint32_t cid; -}; -typedef struct ompi_comm_reg_t ompi_comm_reg_t; -OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_comm_reg_t); +static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req); -static void ompi_comm_reg_constructor(ompi_comm_reg_t *regcom); -static void ompi_comm_reg_destructor(ompi_comm_reg_t *regcom); - -OBJ_CLASS_INSTANCE (ompi_comm_reg_t, - opal_list_item_t, - ompi_comm_reg_constructor, - ompi_comm_reg_destructor ); - -static opal_mutex_t ompi_cid_lock; -static opal_list_t ompi_registered_comms; +static opal_mutex_t ompi_cid_lock = OPAL_MUTEX_STATIC_INIT; int ompi_comm_cid_init (void) @@ -144,156 +166,80 @@ int ompi_comm_cid_init (void) return OMPI_SUCCESS; } -int ompi_comm_nextcid ( ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, int send_first ) +static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, const char *pmix_tag, bool send_first, + int mode) { + ompi_comm_cid_context_t *context; + ompi_comm_request_t *request; int ret; - int nextcid; - bool flag; - int nextlocal_cid; - int done=0; - int response, glresponse=0; - int start; - unsigned int i; - int iter=0; - ompi_comm_cid_allredfct* allredfnct; - /** - * Determine which implementation of allreduce we have to use - * for the current scenario - */ - - switch (mode) - { - case OMPI_COMM_CID_INTRA: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra; - break; - case OMPI_COMM_CID_INTER: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter; - break; - case OMPI_COMM_CID_INTRA_BRIDGE: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge; - break; - case OMPI_COMM_CID_INTRA_PMIX: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_pmix; - break; - case OMPI_COMM_CID_GROUP: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group; - break; - default: - return MPI_UNDEFINED; - break; - } - - ret = ompi_comm_register_cid (comm->c_contextid); - if (OMPI_SUCCESS != ret) { - return ret; - } - start = ompi_mpi_communicators.lowest_free; - - while (!done) { - /** - * This is the real algorithm described in the doc - */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - if (comm->c_contextid != ompi_comm_lowest_cid() ) { - /* if not lowest cid, we do not continue, but sleep and try again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - continue; - } - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - - nextlocal_cid = mca_pml.pml_max_contextid; - flag = false; - for (i=start; i < mca_pml.pml_max_contextid ; i++) { - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - i, comm); - if (true == flag) { - nextlocal_cid = i; - break; - } - } - - ret = (allredfnct)(&nextlocal_cid, &nextcid, 1, MPI_MAX, comm, bridgecomm, - local_leader, remote_leader, send_first, "nextcid", iter ); - ++iter; - if( OMPI_SUCCESS != ret ) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextlocal_cid, NULL); - goto release_and_return; - } - - if (mca_pml.pml_max_contextid == (unsigned int) nextcid) { - /* at least one peer ran out of CIDs */ - if (flag) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextlocal_cid, NULL); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto release_and_return; - } - } - - if (nextcid == nextlocal_cid) { - response = 1; /* fine with me */ - } - else { - opal_pointer_array_set_item(&ompi_mpi_communicators, - nextlocal_cid, NULL); - - flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - nextcid, comm ); - if (true == flag) { - response = 1; /* works as well */ - } - else { - response = 0; /* nope, not acceptable */ - } - } - - ret = (allredfnct)(&response, &glresponse, 1, MPI_MIN, comm, bridgecomm, - local_leader, remote_leader, send_first, "nextcid", iter ); - ++iter; - if( OMPI_SUCCESS != ret ) { - opal_pointer_array_set_item(&ompi_mpi_communicators, nextcid, NULL); - goto release_and_return; - } - if (1 == glresponse) { - done = 1; /* we are done */ - break; - } - else if ( 0 == glresponse ) { - if ( 1 == response ) { - /* we could use that, but other don't agree */ - opal_pointer_array_set_item(&ompi_mpi_communicators, - nextcid, NULL); - } - start = nextcid+1; /* that's where we can start the next round */ - } + context = OBJ_NEW(ompi_comm_cid_context_t); + if (OPAL_UNLIKELY(NULL == context)) { + return NULL; } - /* set the according values to the newcomm */ - newcomm->c_contextid = nextcid; - opal_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm); + context->newcomm = newcomm; + context->comm = comm; + context->bridgecomm = bridgecomm; - release_and_return: - ompi_comm_unregister_cid (comm->c_contextid); + /* Determine which implementation of allreduce we have to use + * for the current mode. */ + switch (mode) { + case OMPI_COMM_CID_INTRA: + context->allreduce_fn = ompi_comm_allreduce_intra_nb; + break; + case OMPI_COMM_CID_INTER: + context->allreduce_fn = ompi_comm_allreduce_inter_nb; + break; + case OMPI_COMM_CID_GROUP: + context->allreduce_fn = ompi_comm_allreduce_group_nb; + context->pml_tag = ((int *) arg0)[0]; + break; + case OMPI_COMM_CID_INTRA_PMIX: + context->allreduce_fn = ompi_comm_allreduce_intra_pmix_nb; + context->local_leader = ((int *) arg0)[0]; + if (arg1) { + context->port_string = strdup ((char *) arg1); + } + context->pmix_tag = strdup ((char *) pmix_tag); + break; + case OMPI_COMM_CID_INTRA_BRIDGE: + context->allreduce_fn = ompi_comm_allreduce_intra_bridge_nb; + context->local_leader = ((int *) arg0)[0]; + context->remote_leader = ((int *) arg1)[0]; + break; + default: + OBJ_RELEASE(context); + return NULL; + } - return ret; + context->send_first = send_first; + context->iter = 0; + + return context; } -/* Non-blocking version of ompi_comm_nextcid */ -struct mca_comm_nextcid_context { - ompi_communicator_t* newcomm; - ompi_communicator_t* comm; - ompi_communicator_t* bridgecomm; - int mode; - int nextcid; - int nextlocal_cid; - int start; - int flag, rflag; -}; +static ompi_comm_allreduce_context_t *ompi_comm_allreduce_context_alloc (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context) +{ + ompi_comm_allreduce_context_t *context; + + context = OBJ_NEW(ompi_comm_allreduce_context_t); + if (OPAL_UNLIKELY(NULL == context)) { + return NULL; + } + + context->inbuf = inbuf; + context->outbuf = outbuf; + context->count = count; + context->op = op; + context->cid_context = cid_context; + + return context; +} /* find the next available local cid and start an allreduce */ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request); @@ -301,82 +247,83 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request); static int ompi_comm_checkcid (ompi_comm_request_t *request); /* verify that the cid was available globally */ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request); +/* lock the cid generator */ +static int ompi_comm_cid_lock (ompi_comm_request_t *request); -int ompi_comm_nextcid_nb (ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - int mode, ompi_request_t **req) +int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode, ompi_request_t **req) { - struct mca_comm_nextcid_context *context; + ompi_comm_cid_context_t *context; ompi_comm_request_t *request; int ret; - /** - * Determine which implementation of allreduce we have to use - * for the current scenario - */ - if (OMPI_COMM_CID_INTRA != mode && OMPI_COMM_CID_INTER != mode) { - return MPI_UNDEFINED; - } - - ret = ompi_comm_register_cid (comm->c_contextid); - if (OMPI_SUCCESS != ret) { - return ret; - } - - context = calloc (1, sizeof (*context)); + context = mca_comm_cid_context_alloc (newcomm, comm, bridgecomm, arg0, arg1, + "nextcid", send_first, mode); if (NULL == context) { - ompi_comm_unregister_cid (comm->c_contextid); return OMPI_ERR_OUT_OF_RESOURCE; } + context->start = ompi_mpi_communicators.lowest_free; + request = ompi_comm_request_get (); if (NULL == request) { - ompi_comm_unregister_cid (comm->c_contextid); - free (context); + OBJ_RELEASE(context); return OMPI_ERR_OUT_OF_RESOURCE; } - context->newcomm = newcomm; - context->comm = comm; - context->bridgecomm = bridgecomm; - context->mode = mode; - context->start = ompi_mpi_communicators.lowest_free; + request->context = &context->super; - request->context = context; - - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0); ompi_comm_request_start (request); *req = &request->super; + return OMPI_SUCCESS; } +int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode) +{ + ompi_request_t *req; + int rc; + + rc = ompi_comm_nextcid_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req); + if (OMPI_SUCCESS != rc) { + return rc; + } + + ompi_request_wait_completion (req); + rc = req->req_status.MPI_ERROR; + ompi_comm_request_return ((ompi_comm_request_t *) req); + + return rc; +} + +static int ompi_comm_cid_lock (ompi_comm_request_t *request) +{ + if (!OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) { + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); + } + + return ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0); +} + static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; ompi_request_t *subreq; - unsigned int i; bool flag; int ret; /** * This is the real algorithm described in the doc */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - if (context->comm->c_contextid != ompi_comm_lowest_cid() ) { - /* if not lowest cid, we do not continue, but sleep and try again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0); - - return OMPI_SUCCESS; - } - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - flag = false; context->nextlocal_cid = mca_pml.pml_max_contextid; - for (i = context->start ; i < mca_pml.pml_max_contextid ; ++i) { + for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) { flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, i, context->comm); if (true == flag) { @@ -385,15 +332,10 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) } } - if (context->mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, - context->comm, context->bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, - context->comm, context->bridgecomm, &subreq); - } - + ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX, + context, &subreq); if (OMPI_SUCCESS != ret) { + OPAL_THREAD_UNLOCK(&ompi_cid_lock); return ret; } @@ -403,18 +345,17 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request) opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL); } + OPAL_THREAD_UNLOCK(&ompi_cid_lock); return OMPI_ERR_OUT_OF_RESOURCE; } /* next we want to verify that the resulting commid is ok */ - ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1); - - return OMPI_SUCCESS; + return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1); } static int ompi_comm_checkcid (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; ompi_request_t *subreq; int ret; @@ -423,37 +364,31 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request) if (!context->flag) { opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL); - context->flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, - context->nextcid, context->comm); + context->flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, + context->nextcid, context->comm); } - if (context->mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->flag, &context->rflag, 1, MPI_MIN, context->comm, - context->bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->flag, &context->rflag, 1, MPI_MIN, context->comm, - context->bridgecomm, &subreq); + ++context->iter; + + ret = context->allreduce_fn (&context->flag, &context->rflag, 1, MPI_MAX, context, &subreq); + if (OMPI_SUCCESS == ret) { + ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); } - if (OMPI_SUCCESS != ret) { - return ret; - } - - ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1); - - return OMPI_SUCCESS; + return ret; } static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) { - struct mca_comm_nextcid_context *context = request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; if (1 == context->rflag) { /* set the according values to the newcomm */ context->newcomm->c_contextid = context->nextcid; opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm); - ompi_comm_unregister_cid (context->comm->c_contextid); + /* unlock the cid generator */ + OPAL_THREAD_UNLOCK(&ompi_cid_lock); /* done! */ return OMPI_SUCCESS; @@ -461,116 +396,16 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) if (1 == context->flag) { /* we could use this cid, but other don't agree */ - opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextcid, NULL); + opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, NULL); context->start = context->nextcid + 1; /* that's where we can start the next round */ } + ++context->iter; + /* try again */ return ompi_comm_allreduce_getnextcid (request); } -/**************************************************************************/ -/**************************************************************************/ -/**************************************************************************/ -static void ompi_comm_reg_constructor (ompi_comm_reg_t *regcom) -{ - regcom->cid=MPI_UNDEFINED; -} - -static void ompi_comm_reg_destructor (ompi_comm_reg_t *regcom) -{ -} - -void ompi_comm_reg_init (void) -{ - OBJ_CONSTRUCT(&ompi_registered_comms, opal_list_t); - OBJ_CONSTRUCT(&ompi_cid_lock, opal_mutex_t); -} - -void ompi_comm_reg_finalize (void) -{ - OBJ_DESTRUCT(&ompi_registered_comms); - OBJ_DESTRUCT(&ompi_cid_lock); -} - - -static int ompi_comm_register_cid (uint32_t cid) -{ - ompi_comm_reg_t *regcom; - ompi_comm_reg_t *newentry = OBJ_NEW(ompi_comm_reg_t); - bool registered = false; - - do { - /* Only one communicator function allowed in same time on the - * same communicator. - */ - OPAL_THREAD_LOCK(&ompi_cid_lock); - - newentry->cid = cid; - if ( !(opal_list_is_empty (&ompi_registered_comms)) ) { - bool ok = true; - - OPAL_LIST_FOREACH(regcom, &ompi_registered_comms, ompi_comm_reg_t) { - if ( regcom->cid > cid ) { - break; - } - if( regcom->cid == cid ) { - /** - * The MPI standard state that is the user responsability to - * schedule the global communications in order to avoid any - * kind of troubles. As, managing communicators involve several - * collective communications, we should enforce a sequential - * execution order. This test only allow one communicator - * creation function based on the same communicator. - */ - ok = false; - break; - } - } - if (ok) { - opal_list_insert_pos (&ompi_registered_comms, (opal_list_item_t *) regcom, - (opal_list_item_t *)newentry); - registered = true; - } - } else { - opal_list_append (&ompi_registered_comms, (opal_list_item_t *)newentry); - registered = true; - } - - /* drop the lock before trying again */ - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - } while (!registered); - - return OMPI_SUCCESS; -} - -static int ompi_comm_unregister_cid (uint32_t cid) -{ - ompi_comm_reg_t *regcom; - - OPAL_THREAD_LOCK(&ompi_cid_lock); - - OPAL_LIST_FOREACH(regcom, &ompi_registered_comms, ompi_comm_reg_t) { - if(regcom->cid == cid) { - opal_list_remove_item(&ompi_registered_comms, (opal_list_item_t *) regcom); - OBJ_RELEASE(regcom); - break; - } - } - - OPAL_THREAD_UNLOCK(&ompi_cid_lock); - - return OMPI_SUCCESS; -} - -static uint32_t ompi_comm_lowest_cid (void) -{ - ompi_comm_reg_t *regcom=NULL; - opal_list_item_t *item=opal_list_get_first (&ompi_registered_comms); - - regcom = (ompi_comm_reg_t *)item; - return regcom->cid; -} /**************************************************************************/ /**************************************************************************/ /**************************************************************************/ @@ -588,174 +423,43 @@ static uint32_t ompi_comm_lowest_cid (void) * comm.c is, that this file contains the allreduce implementations * which are required, and thus we avoid having duplicate code... */ -int ompi_comm_activate ( ompi_communicator_t** newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first ) -{ - int ret = 0; - - int ok=0, gok=0; - ompi_comm_cid_allredfct* allredfnct; - - /* Step 1: the barrier, after which it is allowed to - * send messages over the new communicator - */ - switch (mode) - { - case OMPI_COMM_CID_INTRA: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra; - break; - case OMPI_COMM_CID_INTER: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter; - break; - case OMPI_COMM_CID_INTRA_BRIDGE: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge; - break; - case OMPI_COMM_CID_INTRA_PMIX: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_pmix; - break; - case OMPI_COMM_CID_GROUP: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group; - break; - default: - return MPI_UNDEFINED; - break; - } - - if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) { - /* Initialize the PML stuff in the newcomm */ - if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) { - goto bail_on_error; - } - - OMPI_COMM_SET_PML_ADDED(*newcomm); - } - - - ret = (allredfnct)(&ok, &gok, 1, MPI_MIN, comm, bridgecomm, - local_leader, remote_leader, send_first, "activate", 0 ); - if( OMPI_SUCCESS != ret ) { - goto bail_on_error; - } - - - - /** - * Check to see if this process is in the new communicator. - * - * Specifically, this function is invoked by all proceses in the - * old communicator, regardless of whether they are in the new - * communicator or not. This is because it is far simpler to use - * MPI collective functions on the old communicator to determine - * some data for the new communicator (e.g., remote_leader) than - * to kludge up our own pseudo-collective routines over just the - * processes in the new communicator. Hence, *all* processes in - * the old communicator need to invoke this function. - * - * That being said, only processes in the new communicator need to - * select a coll module for the new communicator. More - * specifically, proceses who are not in the new communicator - * should *not* select a coll module -- for example, - * ompi_comm_rank(newcomm) returns MPI_UNDEFINED for processes who - * are not in the new communicator. This can cause errors in the - * selection / initialization of a coll module. Plus, it's - * wasteful -- processes in the new communicator will end up - * freeing the new communicator anyway, so we might as well leave - * the coll selection as NULL (the coll base comm unselect code - * handles that case properly). - */ - if (MPI_UNDEFINED == (*newcomm)->c_local_group->grp_my_rank) { - return OMPI_SUCCESS; - } - - /* Let the collectives components fight over who will do - collective on this new comm. */ - if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(*newcomm))) { - goto bail_on_error; - } - - /* For an inter communicator, we have to deal with the potential - * problem of what is happening if the local_comm that we created - * has a lower CID than the parent comm. This is not a problem - * as long as the user calls MPI_Comm_free on the inter communicator. - * However, if the communicators are not freed by the user but released - * by Open MPI in MPI_Finalize, we walk through the list of still available - * communicators and free them one by one. Thus, local_comm is freed before - * the actual inter-communicator. However, the local_comm pointer in the - * inter communicator will still contain the 'previous' address of the local_comm - * and thus this will lead to a segmentation violation. In order to prevent - * that from happening, we increase the reference counter local_comm - * by one if its CID is lower than the parent. We cannot increase however - * its reference counter if the CID of local_comm is larger than - * the CID of the inter communicators, since a regular MPI_Comm_free would - * leave in that the case the local_comm hanging around and thus we would not - * recycle CID's properly, which was the reason and the cause for this trouble. - */ - if ( OMPI_COMM_IS_INTER(*newcomm)) { - if ( OMPI_COMM_CID_IS_LOWER(*newcomm, comm)) { - OMPI_COMM_SET_EXTRA_RETAIN (*newcomm); - OBJ_RETAIN (*newcomm); - } - } - - - return OMPI_SUCCESS; - - bail_on_error: - OBJ_RELEASE(*newcomm); - *newcomm = MPI_COMM_NULL; - return ret; -} /* Non-blocking version of ompi_comm_activate */ -struct ompi_comm_activate_nb_context { - ompi_communicator_t **newcomm; - ompi_communicator_t *comm; - - /* storage for activate barrier */ - int ok; -}; - static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request); -int ompi_comm_activate_nb (ompi_communicator_t **newcomm, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - int mode, ompi_request_t **req) +int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode, ompi_request_t **req) { - struct ompi_comm_activate_nb_context *context; + ompi_comm_cid_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq; + int local_leader; + int remote_leader; int ret = 0; + context = mca_comm_cid_context_alloc (*newcomm, comm, bridgecomm, arg0, arg1, "activate", + send_first, mode); + if (NULL == context) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + /* keep track of the pointer so it can be set to MPI_COMM_NULL on failure */ + context->newcommp = newcomm; + request = ompi_comm_request_get (); if (NULL == request) { + OBJ_RELEASE(context); return OMPI_ERR_OUT_OF_RESOURCE; } - context = calloc (1, sizeof (*context)); - if (NULL == context) { - ompi_comm_request_return (request); - return OMPI_ERR_OUT_OF_RESOURCE; - } - - context->newcomm = newcomm; - context->comm = comm; - - request->context = context; - - if (OMPI_COMM_CID_INTRA != mode && OMPI_COMM_CID_INTER != mode) { - return MPI_UNDEFINED; - } + request->context = &context->super; if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) { /* Initialize the PML stuff in the newcomm */ if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) { OBJ_RELEASE(newcomm); + OBJ_RELEASE(context); *newcomm = MPI_COMM_NULL; return ret; } @@ -765,14 +469,8 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, /* Step 1: the barrier, after which it is allowed to * send messages over the new communicator */ - if (mode == OMPI_COMM_CID_INTRA) { - ret = ompi_comm_allreduce_intra_nb (&context->ok, &context->ok, 1, MPI_MIN, - context->comm, bridgecomm, &subreq); - } else { - ret = ompi_comm_allreduce_inter_nb (&context->ok, &context->ok, 1, MPI_MIN, - context->comm, bridgecomm, &subreq); - } - + ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MAX, context, + &subreq); if (OMPI_SUCCESS != ret) { ompi_comm_request_return (request); return ret; @@ -786,10 +484,28 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, return OMPI_SUCCESS; } +int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode) +{ + ompi_request_t *req; + int rc; + + rc = ompi_comm_activate_nb (newcomm, comm, bridgecomm, arg0, arg1, send_first, mode, &req); + if (OMPI_SUCCESS != rc) { + return rc; + } + + ompi_request_wait_completion (req); + rc = req->req_status.MPI_ERROR; + ompi_comm_request_return ((ompi_comm_request_t *) req); + + return rc; +} + static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) { - struct ompi_comm_activate_nb_context *context = - (struct ompi_comm_activate_nb_context *) request->context; + ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; int ret; /** @@ -816,15 +532,15 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) * the coll selection as NULL (the coll base comm unselect code * handles that case properly). */ - if (MPI_UNDEFINED == (*context->newcomm)->c_local_group->grp_my_rank) { + if (MPI_UNDEFINED == (context->newcomm)->c_local_group->grp_my_rank) { return OMPI_SUCCESS; } /* Let the collectives components fight over who will do collective on this new comm. */ - if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(*context->newcomm))) { - OBJ_RELEASE(*context->newcomm); - *context->newcomm = MPI_COMM_NULL; + if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(context->newcomm))) { + OBJ_RELEASE(context->newcomm); + *context->newcommp = MPI_COMM_NULL; return ret; } @@ -845,10 +561,10 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) * leave in that the case the local_comm hanging around and thus we would not * recycle CID's properly, which was the reason and the cause for this trouble. */ - if (OMPI_COMM_IS_INTER(*context->newcomm)) { - if (OMPI_COMM_CID_IS_LOWER(*context->newcomm, context->comm)) { - OMPI_COMM_SET_EXTRA_RETAIN (*context->newcomm); - OBJ_RETAIN (*context->newcomm); + if (OMPI_COMM_IS_INTER(context->newcomm)) { + if (OMPI_COMM_CID_IS_LOWER(context->newcomm, context->comm)) { + OMPI_COMM_SET_EXTRA_RETAIN (context->newcomm); + OBJ_RETAIN (context->newcomm); } } @@ -859,207 +575,47 @@ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) /**************************************************************************/ /**************************************************************************/ /**************************************************************************/ -/* Arguments not used in this implementation: - * - bridgecomm - * - local_leader - * - remote_leader - * - send_first - */ -static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *context, ompi_request_t **req) { - return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT, op, comm, - comm->c_coll.coll_allreduce_module ); -} + ompi_communicator_t *comm = context->comm; -static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - ompi_request_t **req) -{ return comm->c_coll.coll_iallreduce (inbuf, outbuf, count, MPI_INT, op, comm, req, comm->c_coll.coll_iallreduce_module); } - -/* Arguments not used in this implementation: - * - bridgecomm - * - local_leader - * - remote_leader - * - send_first - */ -static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, - void* local_leader, - void* remote_leader, - int send_first, char *tag, int iter ) -{ - int local_rank, rsize; - int rc; - int *sbuf; - int *tmpbuf=NULL; - int *rcounts=NULL, scount=0; - int *rdisps=NULL; - - if ( !OMPI_COMM_IS_INTER (intercomm)) { - return MPI_ERR_COMM; - } - - /* Allocate temporary arrays */ - rsize = ompi_comm_remote_size (intercomm); - local_rank = ompi_comm_rank ( intercomm ); - - tmpbuf = (int *) malloc ( count * sizeof(int)); - rdisps = (int *) calloc ( rsize, sizeof(int)); - rcounts = (int *) calloc ( rsize, sizeof(int) ); - if ( OPAL_UNLIKELY (NULL == tmpbuf || NULL == rdisps || NULL == rcounts)) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } - - /* Execute the inter-allreduce: the result of our group will - be in the buffer of the remote group */ - rc = intercomm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT, - op, intercomm, - intercomm->c_coll.coll_allreduce_module); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - if ( 0 == local_rank ) { - MPI_Request req; - - /* for the allgatherv later */ - scount = count; - - /* local leader exchange their data and determine the overall result - for both groups */ - rc = MCA_PML_CALL(irecv (outbuf, count, MPI_INT, 0, - OMPI_COMM_ALLREDUCE_TAG, - intercomm, &req)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, 0, - OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, - intercomm)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = ompi_request_wait ( &req, MPI_STATUS_IGNORE ); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT); - } - - /* distribute the overall result to all processes in the other group. - Instead of using bcast, we are using here allgatherv, to avoid the - possible deadlock. Else, we need an algorithm to determine, - which group sends first in the inter-bcast and which receives - the result first. - */ - rcounts[0] = count; - sbuf = outbuf; - rc = intercomm->c_coll.coll_allgatherv (sbuf, scount, MPI_INT, outbuf, - rcounts, rdisps, MPI_INT, - intercomm, - intercomm->c_coll.coll_allgatherv_module); - - exit: - if ( NULL != tmpbuf ) { - free ( tmpbuf ); - } - if ( NULL != rcounts ) { - free ( rcounts ); - } - if ( NULL != rdisps ) { - free ( rdisps ); - } - - return (rc); -} - /* Non-blocking version of ompi_comm_allreduce_inter */ -struct ompi_comm_allreduce_inter_context { - int *inbuf; - int *outbuf; - int count; - struct ompi_op_t *op; - ompi_communicator_t *intercomm; - ompi_communicator_t *bridgecomm; - int *tmpbuf; - int *rcounts; - int *rdisps; -}; - -static void ompi_comm_allreduce_inter_context_free (struct ompi_comm_allreduce_inter_context *context) -{ - if (context->tmpbuf) { - free (context->tmpbuf); - } - - if (context->rdisps) { - free (context->rdisps); - } - - if (context->rcounts) { - free (context->rcounts); - } - - free (context); -} - static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request); static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request); static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request); -static int ompi_comm_allreduce_inter_allgather_complete (ompi_comm_request_t *request); -/* Arguments not used in this implementation: - * - bridgecomm - */ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, int count, struct ompi_op_t *op, - ompi_communicator_t *intercomm, - ompi_communicator_t *bridgecomm, + ompi_comm_cid_context_t *cid_context, ompi_request_t **req) { - struct ompi_comm_allreduce_inter_context *context = NULL; - ompi_comm_request_t *request = NULL; + ompi_communicator_t *intercomm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + ompi_comm_request_t *request; ompi_request_t *subreq; int local_rank, rsize, rc; - if (!OMPI_COMM_IS_INTER (intercomm)) { + if (!OMPI_COMM_IS_INTER (cid_context->comm)) { return MPI_ERR_COMM; } request = ompi_comm_request_get (); - if (NULL == request) { + if (OPAL_UNLIKELY(NULL == request)) { return OMPI_ERR_OUT_OF_RESOURCE; } - context = calloc (1, sizeof (*context)); - if (NULL == context) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + ompi_comm_request_return (request); + return OMPI_ERR_OUT_OF_RESOURCE; } - context->inbuf = inbuf; - context->outbuf = outbuf; - context->count = count; - context->op = op; - context->intercomm = intercomm; - context->bridgecomm = bridgecomm; + request->context = &context->super; /* Allocate temporary arrays */ rsize = ompi_comm_remote_size (intercomm); @@ -1069,18 +625,17 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, context->rdisps = (int *) calloc (rsize, sizeof(int)); context->rcounts = (int *) calloc (rsize, sizeof(int)); if (OPAL_UNLIKELY (NULL == context->tmpbuf || NULL == context->rdisps || NULL == context->rcounts)) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + ompi_comm_request_return (request); + return OMPI_ERR_OUT_OF_RESOURCE; } - request->context = context; - /* Execute the inter-allreduce: the result from the local will be in the buffer of the remote group * and vise-versa. */ rc = intercomm->c_coll.coll_iallreduce (inbuf, context->tmpbuf, count, MPI_INT, op, intercomm, &subreq, intercomm->c_coll.coll_iallreduce_module); - if (OMPI_SUCCESS != rc) { - goto exit; + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + ompi_comm_request_return (request); + return rc; } if (0 == local_rank) { @@ -1092,58 +647,37 @@ static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf, ompi_comm_request_start (request); *req = &request->super; -exit: - if (OMPI_SUCCESS != rc) { - if (context) { - ompi_comm_allreduce_inter_context_free (context); - } - - if (request) { - request->context = NULL; - ompi_comm_request_return (request); - } - } - - return rc; + return OMPI_SUCCESS; } static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *intercomm = context->cid_context->comm; ompi_request_t *subreqs[2]; int rc; /* local leader exchange their data and determine the overall result for both groups */ rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG, - context->intercomm, subreqs)); - if ( OMPI_SUCCESS != rc ) { - goto exit; + intercomm, subreqs)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, context->intercomm, subreqs + 1)); - if ( OMPI_SUCCESS != rc ) { - goto exit; + MCA_PML_BASE_SEND_STANDARD, intercomm, subreqs + 1)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; } - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2); - -exit: - if (OMPI_SUCCESS != rc) { - ompi_comm_allreduce_inter_context_free (context); - request->context = NULL; - } - - return rc; + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2); } static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT); @@ -1153,8 +687,8 @@ static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request) { - struct ompi_comm_allreduce_inter_context *context = - (struct ompi_comm_allreduce_inter_context *) request->context; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *intercomm = context->cid_context->comm; ompi_request_t *subreq; int scount = 0, rc; @@ -1165,245 +699,369 @@ static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request) the result first. */ - if (0 != ompi_comm_rank (context->intercomm)) { + if (0 != ompi_comm_rank (intercomm)) { context->rcounts[0] = context->count; } else { scount = context->count; } - rc = context->intercomm->c_coll.coll_iallgatherv (context->outbuf, scount, MPI_INT, context->outbuf, - context->rcounts, context->rdisps, MPI_INT, - context->intercomm, &subreq, - context->intercomm->c_coll.coll_iallgatherv_module); + rc = intercomm->c_coll.coll_iallgatherv (context->outbuf, scount, MPI_INT, context->outbuf, + context->rcounts, context->rdisps, MPI_INT, intercomm, + &subreq, intercomm->c_coll.coll_iallgatherv_module); if (OMPI_SUCCESS != rc) { - ompi_comm_allreduce_inter_context_free (context); - request->context = NULL; return rc; } - ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_allgather_complete, &subreq, 1); - - return OMPI_SUCCESS; + return ompi_comm_request_schedule_append (request, NULL, &subreq, 1); } -static int ompi_comm_allreduce_inter_allgather_complete (ompi_comm_request_t *request) +static int ompi_comm_allreduce_bridged_schedule_bcast (ompi_comm_request_t *request) { - /* free this request's context */ - ompi_comm_allreduce_inter_context_free (request->context); - /* prevent a double-free from the progress engine */ - request->context = NULL; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *comm = context->cid_context->comm; + ompi_request_t *subreq; + int rc; - /* done */ - return OMPI_SUCCESS; + rc = comm->c_coll.coll_ibcast (context->outbuf, context->count, MPI_INT, + context->cid_context->local_leader, comm, + &subreq, comm->c_coll.coll_ibcast_module); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + return ompi_comm_request_schedule_append (request, NULL, &subreq, 1); } -/* Arguments not used in this implementation: - * - send_first - */ -static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bcomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_bridged_xchng_complete (ompi_comm_request_t *request) { - int *tmpbuf=NULL; - int local_rank; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + + /* step 3: reduce leader data */ + ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT); + + /* schedule the broadcast to local peers */ + return ompi_comm_allreduce_bridged_schedule_bcast (request); +} + +static int ompi_comm_allreduce_bridged_reduce_complete (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_communicator_t *bridgecomm = context->cid_context->bridgecomm; + ompi_request_t *subreq[2]; + int rc; + + /* step 2: leader exchange */ + rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, context->cid_context->remote_leader, + OMPI_COMM_ALLREDUCE_TAG, bridgecomm, subreq + 1)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, context->cid_context->remote_leader, + OMPI_COMM_ALLREDUCE_TAG, MCA_PML_BASE_SEND_STANDARD, bridgecomm, + subreq)); + if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) { + return rc; + } + + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_xchng_complete, subreq, 2); +} + +static int ompi_comm_allreduce_intra_bridge_nb (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) +{ + ompi_communicator_t *comm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + int local_rank = ompi_comm_rank (comm); + ompi_comm_request_t *request; + ompi_request_t *subreq; int i; int rc; - int local_leader, remote_leader; - local_leader = (*((int*)lleader)); - remote_leader = (*((int*)rleader)); - - if ( &ompi_mpi_op_sum.op != op && &ompi_mpi_op_prod.op != op && - &ompi_mpi_op_max.op != op && &ompi_mpi_op_min.op != op ) { - return MPI_ERR_OP; + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + return OMPI_ERR_OUT_OF_RESOURCE; } - local_rank = ompi_comm_rank ( comm ); - tmpbuf = (int *) malloc ( count * sizeof(int)); - if ( NULL == tmpbuf ) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + if (local_rank == cid_context->local_leader) { + context->tmpbuf = (int *) calloc (count, sizeof (int)); + if (OPAL_UNLIKELY(NULL == context->tmpbuf)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } } - /* Intercomm_create */ - rc = comm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT, - op, comm, comm->c_coll.coll_allreduce_module ); + request = ompi_comm_request_get (); + if (OPAL_UNLIKELY(NULL == request)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + request->context = &context->super; + + if (cid_context->local_leader == local_rank) { + memcpy (context->tmpbuf, inbuf, count * sizeof (int)); + } + + /* step 1: reduce to the local leader */ + rc = comm->c_coll.coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, + cid_context->local_leader, comm, &subreq, + comm->c_coll.coll_ireduce_module); if ( OMPI_SUCCESS != rc ) { - goto exit; + ompi_comm_request_return (request); + return rc; } - if (local_rank == local_leader ) { - MPI_Request req; + if (cid_context->local_leader == local_rank) { + rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_bridged_reduce_complete, + &subreq, 1); + } else { + /* go ahead and schedule the broadcast */ + ompi_comm_request_schedule_append (request, NULL, &subreq, 1); - rc = MCA_PML_CALL(irecv ( outbuf, count, MPI_INT, remote_leader, - OMPI_COMM_ALLREDUCE_TAG, - bcomm, &req)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = MCA_PML_CALL(send (tmpbuf, count, MPI_INT, remote_leader, - OMPI_COMM_ALLREDUCE_TAG, - MCA_PML_BASE_SEND_STANDARD, bcomm)); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - rc = ompi_request_wait( &req, MPI_STATUS_IGNORE); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - if ( &ompi_mpi_op_max.op == op ) { - for ( i = 0 ; i < count; i++ ) { - if (tmpbuf[i] > outbuf[i]) { - outbuf[i] = tmpbuf[i]; - } - } - } - else if ( &ompi_mpi_op_min.op == op ) { - for ( i = 0 ; i < count; i++ ) { - if (tmpbuf[i] < outbuf[i]) { - outbuf[i] = tmpbuf[i]; - } - } - } - else if ( &ompi_mpi_op_sum.op == op ) { - for ( i = 0 ; i < count; i++ ) { - outbuf[i] += tmpbuf[i]; - } - } - else if ( &ompi_mpi_op_prod.op == op ) { - for ( i = 0 ; i < count; i++ ) { - outbuf[i] *= tmpbuf[i]; - } - } + rc = ompi_comm_allreduce_bridged_schedule_bcast (request); } - rc = comm->c_coll.coll_bcast ( outbuf, count, MPI_INT, local_leader, - comm, comm->c_coll.coll_bcast_module ); - - exit: - if (NULL != tmpbuf ) { - free (tmpbuf); + if (OMPI_SUCCESS != rc) { + ompi_comm_request_return (request); + return rc; } - return (rc); + ompi_comm_request_start (request); + + *req = &request->super; + + return OMPI_SUCCESS; } -/* Arguments not used in this implementation: - * - bridgecomm - * - * lleader is the local rank of root in comm - * rleader is the port_string - */ -static int ompi_comm_allreduce_intra_pmix (int *inbuf, int *outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - void* lleader, void* rleader, - int send_first, char *tag, int iter ) +static int ompi_comm_allreduce_pmix_reduce_complete (ompi_comm_request_t *request) { - int *tmpbuf=NULL; - int rc; - int local_leader, local_rank; - char *port_string; + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + int32_t size_count = context->count; opal_value_t info; opal_pmix_pdata_t pdat; opal_buffer_t sbuf; - int32_t size_count; + int rc; - local_leader = (*((int*)lleader)); - port_string = (char*)rleader; - size_count = count; + fprintf (stderr, "reduce complete\n"); - local_rank = ompi_comm_rank ( comm ); - tmpbuf = (int *) malloc ( count * sizeof(int)); - if ( NULL == tmpbuf ) { - rc = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } + OBJ_CONSTRUCT(&sbuf, opal_buffer_t); - /* comm is an intra-communicator */ - rc = comm->c_coll.coll_allreduce(inbuf,tmpbuf,count,MPI_INT,op, comm, - comm->c_coll.coll_allreduce_module); - if ( OMPI_SUCCESS != rc ) { - goto exit; - } - - if (local_rank == local_leader ) { - OBJ_CONSTRUCT(&sbuf, opal_buffer_t); - - if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, tmpbuf, (int32_t)count, OPAL_INT))) { - goto exit; - } - OBJ_CONSTRUCT(&info, opal_value_t); - OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t); - - info.type = OPAL_BYTE_OBJECT; - pdat.value.type = OPAL_BYTE_OBJECT; - - opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size); + if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, context->tmpbuf, (int32_t)context->count, OPAL_INT))) { OBJ_DESTRUCT(&sbuf); + fprintf (stderr, "pack failed. rc %d\n", rc); + return rc; + } - if (send_first) { - (void)asprintf(&info.key, "%s:%s:send:%d", port_string, tag, iter); - (void)asprintf(&pdat.value.key, "%s:%s:recv:%d", port_string, tag, iter); - } else { - (void)asprintf(&info.key, "%s:%s:recv:%d", port_string, tag, iter); - (void)asprintf(&pdat.value.key, "%s:%s:send:%d", port_string, tag, iter); - } + OBJ_CONSTRUCT(&info, opal_value_t); + OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t); - OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); - OBJ_DESTRUCT(&info); + info.type = OPAL_BYTE_OBJECT; + pdat.value.type = OPAL_BYTE_OBJECT; - if (OPAL_SUCCESS != rc) { - OBJ_DESTRUCT(&pdat); - goto exit; - } - OBJ_CONSTRUCT(&sbuf, opal_buffer_t); - opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size); - pdat.value.data.bo.bytes = NULL; - pdat.value.data.bo.size = 0; + opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size); + OBJ_DESTRUCT(&sbuf); + + if (cid_context->send_first) { + (void)asprintf(&info.key, "%s:%s:send:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + (void)asprintf(&pdat.value.key, "%s:%s:recv:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + } else { + (void)asprintf(&info.key, "%s:%s:recv:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + (void)asprintf(&pdat.value.key, "%s:%s:send:%d", cid_context->port_string, cid_context->pmix_tag, + cid_context->iter); + } + + fprintf (stderr, "%s, %s\n", info.key, pdat.value.key); + + /* this macro is not actually non-blocking. if a non-blocking version becomes available this function + * needs to be reworked to take advantage of it. */ + OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60); + OBJ_DESTRUCT(&info); + fprintf (stderr, "OPAL_PMIX_EXCHANGE returned %d\n", rc); + if (OPAL_SUCCESS != rc) { OBJ_DESTRUCT(&pdat); - - if (OPAL_SUCCESS != (rc = opal_dss.unpack(&sbuf, outbuf, &size_count, OPAL_INT))) { - OBJ_DESTRUCT(&sbuf); - goto exit; - } - OBJ_DESTRUCT(&sbuf); - count = (int)size_count; - ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT); + return rc; } - rc = comm->c_coll.coll_bcast (outbuf, count, MPI_INT, - local_leader, comm, - comm->c_coll.coll_bcast_module); + OBJ_CONSTRUCT(&sbuf, opal_buffer_t); + opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size); + pdat.value.data.bo.bytes = NULL; + pdat.value.data.bo.size = 0; + OBJ_DESTRUCT(&pdat); - exit: - if (NULL != tmpbuf ) { - free (tmpbuf); + rc = opal_dss.unpack (&sbuf, context->outbuf, &size_count, OPAL_INT); + OBJ_DESTRUCT(&sbuf); + if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) { + return rc; } - return (rc); + ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, size_count, MPI_INT); + + return ompi_comm_allreduce_bridged_schedule_bcast (request); } -static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, - int count, struct ompi_op_t *op, - ompi_communicator_t *comm, - ompi_communicator_t *newcomm, - void* local_leader, - void* remote_leader, - int send_first, char *intag, int iter) +static int ompi_comm_allreduce_intra_pmix_nb (int *inbuf, int *outbuf, + int count, struct ompi_op_t *op, + ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) { - ompi_group_t *group = newcomm->c_local_group; - int peers_group[3], peers_comm[3]; + ompi_communicator_t *comm = cid_context->comm; + ompi_comm_allreduce_context_t *context; + int local_rank = ompi_comm_rank (comm); + ompi_comm_request_t *request; + ompi_request_t *subreq; + int rc; + + fprintf (stderr, "Calling ompi_comm_allreduce_intra_pmix_nb. rank %d\n", local_rank); + + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (OPAL_UNLIKELY(NULL == context)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + if (cid_context->local_leader == local_rank) { + context->tmpbuf = (int *) calloc (count, sizeof(int)); + if (OPAL_UNLIKELY(NULL == context->tmpbuf)) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + } + + request = ompi_comm_request_get (); + if (NULL == request) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + request->context = &context->super; + + /* comm is an intra-communicator */ + rc = comm->c_coll.coll_ireduce (inbuf, context->tmpbuf, count, MPI_INT, op, + cid_context->local_leader, comm, + &subreq, comm->c_coll.coll_ireduce_module); + if ( OMPI_SUCCESS != rc ) { + ompi_comm_request_return (request); + return rc; + } + + if (cid_context->local_leader == local_rank) { + rc = ompi_comm_request_schedule_append (request, ompi_comm_allreduce_pmix_reduce_complete, + &subreq, 1); + } else { + /* go ahead and schedule the broadcast */ + rc = ompi_comm_request_schedule_append (request, NULL, &subreq, 1); + + rc = ompi_comm_allreduce_bridged_schedule_bcast (request); + } + + if (OMPI_SUCCESS != rc) { + ompi_comm_request_return (request); + return rc; + } + + ompi_comm_request_start (request); + *req = (ompi_request_t *) request; + + /* use the same function as bridged to schedule the broadcast */ + return OMPI_SUCCESS; +} + +static int ompi_comm_allreduce_group_broadcast (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + ompi_request_t *subreq[2]; + int subreq_count = 0; + int rc; + + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[i+1], + cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD, + cid_context->comm, subreq + subreq_count++)); + if (OMPI_SUCCESS != rc) { + return rc; + } + } + } + + return ompi_comm_request_schedule_append (request, NULL, subreq, subreq_count); +} + +static int ompi_comm_allreduce_group_recv_complete (ompi_comm_request_t *request) +{ + ompi_comm_allreduce_context_t *context = (ompi_comm_allreduce_context_t *) request->context; + ompi_comm_cid_context_t *cid_context = context->cid_context; + int *tmp = context->tmpbuf; + ompi_request_t *subreq[2]; + int rc; + + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + ompi_op_reduce (context->op, tmp, context->outbuf, context->count, MPI_INT); + tmp += context->count; + } + } + + if (MPI_PROC_NULL != context->peers_comm[0]) { + /* interior node */ + rc = MCA_PML_CALL(isend(context->outbuf, context->count, MPI_INT, context->peers_comm[0], + cid_context->pml_tag, MCA_PML_BASE_SEND_STANDARD, + cid_context->comm, subreq)); + if (OMPI_SUCCESS != rc) { + return rc; + } + + rc = MCA_PML_CALL(irecv(context->outbuf, context->count, MPI_INT, context->peers_comm[0], + cid_context->pml_tag, cid_context->comm, subreq + 1)); + if (OMPI_SUCCESS != rc) { + return rc; + } + + return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_broadcast, subreq, 2); + } + + /* root */ + return ompi_comm_allreduce_group_broadcast (request); +} + +static int ompi_comm_allreduce_group_nb (int *inbuf, int *outbuf, int count, + struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context, + ompi_request_t **req) +{ + ompi_group_t *group = cid_context->newcomm->c_local_group; const int group_size = ompi_group_size (group); const int group_rank = ompi_group_rank (group); - int tag = *((int *) local_leader); - int *tmp1; - int i, rc=OMPI_SUCCESS; + ompi_communicator_t *comm = cid_context->comm; + int peers_group[3], *tmp, subreq_count = 0; + ompi_comm_allreduce_context_t *context; + ompi_comm_request_t *request; + ompi_request_t *subreq[3]; + + context = ompi_comm_allreduce_context_alloc (inbuf, outbuf, count, op, cid_context); + if (NULL == context) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + tmp = context->tmpbuf = calloc (sizeof (int), count * 3); + if (NULL == context->tmpbuf) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + request = ompi_comm_request_get (); + if (NULL == request) { + OBJ_RELEASE(context); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + request->context = &context->super; /* basic recursive doubling allreduce on the group */ peers_group[0] = group_rank ? ((group_rank - 1) >> 1) : MPI_PROC_NULL; @@ -1411,54 +1069,28 @@ static int ompi_comm_allreduce_group (int *inbuf, int* outbuf, peers_group[2] = (group_rank * 2 + 2) < group_size ? group_rank * 2 + 2 : MPI_PROC_NULL; /* translate the ranks into the ranks of the parent communicator */ - ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, peers_comm); - - tmp1 = malloc (sizeof (int) * count); + ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, context->peers_comm); /* reduce */ memmove (outbuf, inbuf, sizeof (int) * count); - for (i = 1 ; i < 3 ; ++i) { - if (MPI_PROC_NULL != peers_comm[i]) { - rc = MCA_PML_CALL(recv(tmp1, count, MPI_INT, peers_comm[i], tag, comm, - MPI_STATUS_IGNORE)); + for (int i = 0 ; i < 2 ; ++i) { + if (MPI_PROC_NULL != context->peers_comm[i + 1]) { + int rc = MCA_PML_CALL(irecv(tmp, count, MPI_INT, context->peers_comm[i + 1], + cid_context->pml_tag, comm, subreq + subreq_count++)); if (OMPI_SUCCESS != rc) { - goto out; + ompi_comm_request_return (request); + return rc; } - /* this is integer reduction so we do not care about ordering */ - ompi_op_reduce (op, tmp1, outbuf, count, MPI_INT); + + tmp += count; } } - if (MPI_PROC_NULL != peers_comm[0]) { - rc = MCA_PML_CALL(send(outbuf, count, MPI_INT, peers_comm[0], - tag, MCA_PML_BASE_SEND_STANDARD, comm)); - if (OMPI_SUCCESS != rc) { - goto out; - } + ompi_comm_request_schedule_append (request, ompi_comm_allreduce_group_recv_complete, subreq, subreq_count); - rc = MCA_PML_CALL(recv(outbuf, count, MPI_INT, peers_comm[0], - tag, comm, MPI_STATUS_IGNORE)); - if (OMPI_SUCCESS != rc) { - goto out; - } - } + ompi_comm_request_start (request); + *req = &request->super; - /* broadcast */ - for (i = 1 ; i < 3 ; ++i) { - if (MPI_PROC_NULL != peers_comm[i]) { - rc = MCA_PML_CALL(send(outbuf, count, MPI_INT, peers_comm[i], tag, - MCA_PML_BASE_SEND_STANDARD, comm)); - if (OMPI_SUCCESS != rc) { - goto out; - } - } - } - - out: - free (tmp1); - - return rc; + return OMPI_SUCCESS; } - -END_C_DECLS diff --git a/ompi/communicator/comm_init.c b/ompi/communicator/comm_init.c index f2acab5bbe..f453ca1e8e 100644 --- a/ompi/communicator/comm_init.c +++ b/ompi/communicator/comm_init.c @@ -206,10 +206,6 @@ int ompi_comm_init(void) OBJ_RETAIN(&ompi_mpi_group_null.group); OBJ_RETAIN(&ompi_mpi_errors_are_fatal.eh); - /* initialize the comm_reg stuff for multi-threaded comm_cid - allocation */ - ompi_comm_reg_init(); - /* initialize communicator requests (for ompi_comm_idup) */ ompi_comm_request_init (); @@ -328,13 +324,9 @@ int ompi_comm_finalize(void) } } - OBJ_DESTRUCT (&ompi_mpi_communicators); OBJ_DESTRUCT (&ompi_comm_f_to_c_table); - /* finalize the comm_reg stuff */ - ompi_comm_reg_finalize(); - /* finalize communicator requests */ ompi_comm_request_fini (); diff --git a/ompi/communicator/comm_request.c b/ompi/communicator/comm_request.c index 9c7b8770e1..ebc9edee8b 100644 --- a/ompi/communicator/comm_request.c +++ b/ompi/communicator/comm_request.c @@ -235,6 +235,7 @@ static void ompi_comm_request_destruct (ompi_comm_request_t *request) { OBJ_DESTRUCT(&request->schedule); } + OBJ_CLASS_INSTANCE(ompi_comm_request_t, ompi_request_t, ompi_comm_request_construct, ompi_comm_request_destruct); @@ -258,10 +259,10 @@ ompi_comm_request_t *ompi_comm_request_get (void) void ompi_comm_request_return (ompi_comm_request_t *request) { if (request->context) { - free (request->context); - request->context = NULL; + OBJ_RELEASE (request->context); } + OMPI_REQUEST_FINI(&request->super); opal_free_list_return (&ompi_comm_requests, (opal_free_list_item_t *) request); } diff --git a/ompi/communicator/comm_request.h b/ompi/communicator/comm_request.h index 2c5a3e3fce..43082d6093 100644 --- a/ompi/communicator/comm_request.h +++ b/ompi/communicator/comm_request.h @@ -21,7 +21,7 @@ typedef struct ompi_comm_request_t { ompi_request_t super; - void *context; + opal_object_t *context; opal_list_t schedule; } ompi_comm_request_t; OBJ_CLASS_DECLARATION(ompi_comm_request_t); diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index c7dada0509..a72e29027c 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -496,24 +496,27 @@ ompi_communicator_t* ompi_comm_allocate (int local_group_size, * @param mode: combination of input * OMPI_COMM_CID_INTRA: intra-comm * OMPI_COMM_CID_INTER: inter-comm + * OMPI_COMM_CID_GROUP: only decide CID within the ompi_group_t + * associated with the communicator. arg0 + * must point to an int which will be used + * as the pml tag for communication. * OMPI_COMM_CID_INTRA_BRIDGE: 2 intracomms connected by - * a bridge comm. local_leader - * and remote leader are in this - * case an int (rank in bridge-comm). + * a bridge comm. arg0 and arg1 must point + * to integers representing the local and + * remote leader ranks. the remote leader rank + * is a rank in the bridgecomm. * OMPI_COMM_CID_INTRA_PMIX: 2 intracomms, leaders talk - * through PMIx. lleader and rleader - * are the required contact information. + * through PMIx. arg0 must point to an integer + * representing the local leader rank. arg1 + * must point to a string representing the + * port of the remote leader. * @param send_first: to avoid a potential deadlock for * the OOB version. * This routine has to be thread safe in the final version. */ -OMPI_DECLSPEC int ompi_comm_nextcid ( ompi_communicator_t* newcomm, - ompi_communicator_t* oldcomm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first); +OMPI_DECLSPEC int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode); /** * allocate new communicator ID (non-blocking) @@ -525,10 +528,9 @@ OMPI_DECLSPEC int ompi_comm_nextcid ( ompi_communicator_t* newcomm, * OMPI_COMM_CID_INTER: inter-comm * This routine has to be thread safe in the final version. */ -OMPI_DECLSPEC int ompi_comm_nextcid_nb (ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - int mode, ompi_request_t **req); +OMPI_DECLSPEC int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1, + bool send_first, int mode, ompi_request_t **req); /** * shut down the communicator infrastructure. @@ -621,18 +623,25 @@ int ompi_comm_determine_first ( ompi_communicator_t *intercomm, int high ); -OMPI_DECLSPEC int ompi_comm_activate ( ompi_communicator_t** newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, - int send_first ); +OMPI_DECLSPEC int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode); -OMPI_DECLSPEC int ompi_comm_activate_nb (ompi_communicator_t **newcomm, - ompi_communicator_t *comm, - ompi_communicator_t *bridgecomm, - int mode, ompi_request_t **req); +/** + * Non-blocking variant of comm_activate. + * + * @param[inout] newcomm New communicator + * @param[in] comm Parent communicator + * @param[in] bridgecomm Bridge communicator (used for PMIX and bridge modes) + * @param[in] arg0 Mode argument 0 + * @param[in] arg1 Mode argument 1 + * @param[in] send_first Send first from this process (PMIX mode only) + * @param[in] mode Collective mode + * @param[out] req New request object to track this operation + */ +OMPI_DECLSPEC int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *comm, + ompi_communicator_t *bridgecomm, const void *arg0, + const void *arg1, bool send_first, int mode, ompi_request_t **req); /** * a simple function to dump the structure @@ -642,14 +651,6 @@ int ompi_comm_dump ( ompi_communicator_t *comm ); /* setting name */ int ompi_comm_set_name (ompi_communicator_t *comm, const char *name ); -/* - * these are the init and finalize functions for the comm_reg - * stuff. These routines are necessary for handling multi-threading - * scenarious in the communicator_cid allocation - */ -void ompi_comm_reg_init(void); -void ompi_comm_reg_finalize(void); - /* global variable to save the number od dynamic communicators */ extern int ompi_comm_num_dyncomm; diff --git a/ompi/dpm/dpm.c b/ompi/dpm/dpm.c index 7b97f90209..a211cec4c1 100644 --- a/ompi/dpm/dpm.c +++ b/ompi/dpm/dpm.c @@ -469,25 +469,25 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, new_group_pointer = MPI_GROUP_NULL; /* allocate comm_cid */ - rc = ompi_comm_nextcid ( newcomp, /* new communicator */ - comm, /* old communicator */ - NULL, /* bridge comm */ - &root, /* local leader */ - (void*)port_string, /* rendezvous point */ - OMPI_COMM_CID_INTRA_PMIX, /* mode */ - send_first ); /* send or recv first */ + rc = ompi_comm_nextcid ( newcomp, /* new communicator */ + comm, /* old communicator */ + NULL, /* bridge comm */ + &root, /* local leader */ + (void*)port_string, /* rendezvous point */ + send_first, /* send or recv first */ + OMPI_COMM_CID_INTRA_PMIX); /* mode */ if (OMPI_SUCCESS != rc) { goto exit; } /* activate comm and init coll-component */ - rc = ompi_comm_activate ( &newcomp, /* new communicator */ - comm, /* old communicator */ - NULL, /* bridge comm */ - &root, /* local leader */ - (void*)port_string, /* rendezvous point */ - OMPI_COMM_CID_INTRA_PMIX, /* mode */ - send_first ); /* send or recv first */ + rc = ompi_comm_activate ( &newcomp, /* new communicator */ + comm, /* old communicator */ + NULL, /* bridge comm */ + &root, /* local leader */ + (void*)port_string, /* rendezvous point */ + send_first, /* send or recv first */ + OMPI_COMM_CID_INTRA_PMIX); /* mode */ if (OMPI_SUCCESS != rc) { goto exit; } @@ -500,7 +500,7 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root, exit: if (OMPI_SUCCESS != rc) { if (MPI_COMM_NULL != newcomp && NULL != newcomp) { - OBJ_RETAIN(newcomp); + OBJ_RELEASE(newcomp); newcomp = MPI_COMM_NULL; } } diff --git a/ompi/mpi/c/intercomm_create.c b/ompi/mpi/c/intercomm_create.c index 8346a75ec1..bd4872ba4d 100644 --- a/ompi/mpi/c/intercomm_create.c +++ b/ompi/mpi/c/intercomm_create.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology @@ -14,6 +15,8 @@ * Copyright (c) 2012-2013 Inria. All rights reserved. * Copyright (c) 2014-2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2016 Los Alamos National Security, LLC. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -199,26 +202,15 @@ int MPI_Intercomm_create(MPI_Comm local_comm, int local_leader, new_group_pointer = MPI_GROUP_NULL; /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new comm */ - local_comm, /* old comm */ - bridge_comm, /* bridge comm */ - &lleader, /* local leader */ - &rleader, /* remote_leader */ - OMPI_COMM_CID_INTRA_BRIDGE, /* mode */ - -1 ); /* send_first */ - + rc = ompi_comm_nextcid (newcomp, local_comm, bridge_comm, &lleader, + &rleader, false, OMPI_COMM_CID_INTRA_BRIDGE); if ( MPI_SUCCESS != rc ) { goto err_exit; } /* activate comm and init coll-module */ - rc = ompi_comm_activate ( &newcomp, - local_comm, /* old comm */ - bridge_comm, /* bridge comm */ - &lleader, /* local leader */ - &rleader, /* remote_leader */ - OMPI_COMM_CID_INTRA_BRIDGE, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_activate (&newcomp, local_comm, bridge_comm, &lleader, &rleader, + false, OMPI_COMM_CID_INTRA_BRIDGE); if ( MPI_SUCCESS != rc ) { goto err_exit; } diff --git a/ompi/mpi/c/intercomm_merge.c b/ompi/mpi/c/intercomm_merge.c index b0cfb2dcde..ed6ff727ce 100644 --- a/ompi/mpi/c/intercomm_merge.c +++ b/ompi/mpi/c/intercomm_merge.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana * University Research and Technology @@ -14,6 +15,8 @@ * Copyright (c) 2012-2013 Inria. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2016 Los Alamos National Security, LLC. All rights + * reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -117,26 +120,16 @@ int MPI_Intercomm_merge(MPI_Comm intercomm, int high, OBJ_RELEASE(new_group_pointer); new_group_pointer = MPI_GROUP_NULL; - /* Determine context id. It is identical to f_2_c_handle */ - rc = ompi_comm_nextcid ( newcomp, /* new comm */ - intercomm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTER, /* mode */ - -1 ); /* send_first */ + /* Determine context id */ + rc = ompi_comm_nextcid (newcomp, intercomm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTER); if ( OMPI_SUCCESS != rc ) { goto exit; } /* activate communicator and init coll-module */ - rc = ompi_comm_activate( &newcomp, /* new comm */ - intercomm, /* old comm */ - NULL, /* bridge comm */ - NULL, /* local leader */ - NULL, /* remote_leader */ - OMPI_COMM_CID_INTER, /* mode */ - -1 ); /* send_first */ + rc = ompi_comm_activate (&newcomp, intercomm, NULL, NULL, NULL, false, + OMPI_COMM_CID_INTER); if ( OMPI_SUCCESS != rc ) { goto exit; }