diff --git a/src/communicator/comm_cid.c b/src/communicator/comm_cid.c index 8cd722ee7d..376eeb1454 100644 --- a/src/communicator/comm_cid.c +++ b/src/communicator/comm_cid.c @@ -68,6 +68,131 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int* outbuf, int send_first ); +#ifdef MULTI_THREADS +static ompi_mutex_t ompi_cid_lock; +static volatile ompi_list_t registered_threads; +static volatile int running_cid=-1; + + +int ompi_comm_nextcid ( ompi_communicator_t* newcomm, + ompi_communicator_t* comm, + ompi_communicator_t* bridgecomm, + void* local_leader, + void* remote_leader, + int mode, int send_first ) +{ + + int nextlocal_cid; + int nextcid; + int done=0; + int response=0, glresponse=0; + int flag; + int start=ompi_mpi_communicators.lowest_free; + int i; + + ompi_comm_cid_allredfct* allredfnct; + + /** + * Determine which implementation of allreduce we have to use + * for the current scenario + */ + switch (mode) + { + case OMPI_COMM_CID_INTRA: + allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra; + break; + case OMPI_COMM_CID_INTER: + allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter; + break; + case OMPI_COMM_CID_INTRA_BRIDGE: + allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge; + break; + case OMPI_COMM_CID_INTRA_OOB: + allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob; + break; + default: + return MPI_UNDEFINED; + break; + } + + + /* Lock the list + register threads in registered_threads + unlock + */ + + while (!done) { + /* lock + if (!lowest_cid ) { + unlock; + continue; + } + + unlock; + */ + + /** + * This is the real algorithm described in the doc + */ + + for (i=start; ic_contextid = nextcid; + newcomm->c_f_to_c_index = newcomm->c_contextid; + ompi_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm); + + /* + lock + unregister + unlock + */ + + return (MPI_SUCCESS); +} +#endif + int ompi_comm_nextcid ( ompi_communicator_t* newcomm, ompi_communicator_t* comm, ompi_communicator_t* bridgecomm, diff --git a/src/communicator/comm_dyn.c b/src/communicator/comm_dyn.c index 18c6ea63c6..d95b194431 100644 --- a/src/communicator/comm_dyn.c +++ b/src/communicator/comm_dyn.c @@ -46,7 +46,14 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, information of the remote process. Therefore, we have to exchange that. */ - rport = ompi_comm_get_rport (port,send_first,group->grp_proc_pointers[rank], tag); + if ( OMPI_COMM_JOIN_TAG != tag ) { + rport = ompi_comm_get_rport (port,send_first, + group->grp_proc_pointers[rank], tag); + } + else { + rport = port; + } + /* Exchange number of processes and msg length on both sides */ ompi_buffer_init (&nbuf, size*sizeof(ompi_process_name_t)); diff --git a/src/communicator/communicator.h b/src/communicator/communicator.h index f3d7eb5fa7..69dff1aa8a 100644 --- a/src/communicator/communicator.h +++ b/src/communicator/communicator.h @@ -39,6 +39,10 @@ extern ompi_class_t ompi_communicator_t_class; #define OMPI_COMM_SET_HIDDEN(comm) ((comm)->c_flags |= OMPI_COMM_HIDDEN) +/* a special tag to recognize an MPI_Comm_join in the comm_connect_accept + routine. */ +#define OMPI_COMM_JOIN_TAG 32000 + /** * Modes reqquired for accquiring the new comm-id. * The first (INTER/INTRA) indicates whether the