diff --git a/src/communicator/comm_cid.c b/src/communicator/comm_cid.c index 376eeb1454..51f3fce5b5 100644 --- a/src/communicator/comm_cid.c +++ b/src/communicator/comm_cid.c @@ -12,6 +12,7 @@ #include "proc/proc.h" #include "include/constants.h" #include "class/ompi_pointer_array.h" +#include "class/ompi_list.h" #include "mca/pcm/pcm.h" #include "mca/pml/pml.h" #include "mca/coll/coll.h" @@ -67,11 +68,27 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int* outbuf, void* remote_leader, int send_first ); +static int ompi_comm_register_cid (uint32_t contextid); +static int ompi_comm_unregister_cid (uint32_t contextid); +static uint32_t ompi_comm_lowest_cid ( void ); + +struct ompi_comm_reg_t{ + ompi_list_item_t super; + uint32_t cid; +}; +typedef struct ompi_comm_reg_t ompi_comm_reg_t; +OBJ_CLASS_DECLARATION(ompi_comm_reg_t); + +static void ompi_comm_reg_constructor(ompi_comm_reg_t *regcom); +static void ompi_comm_reg_destructor(ompi_comm_reg_t *regcom); + +OBJ_CLASS_INSTANCE (ompi_comm_reg_t, + ompi_list_item_t, + ompi_comm_reg_constructor, + ompi_comm_reg_destructor ); -#ifdef MULTI_THREADS static ompi_mutex_t ompi_cid_lock; -static volatile ompi_list_t registered_threads; -static volatile int running_cid=-1; +static ompi_list_t ompi_registered_comms; int ompi_comm_nextcid ( ompi_communicator_t* newcomm, @@ -116,25 +133,24 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm, } - /* Lock the list - register threads in registered_threads - unlock - */ + OMPI_THREAD_LOCK(&ompi_cid_lock); + ompi_comm_register_cid (comm->c_contextid); + OMPI_THREAD_UNLOCK(&ompi_cid_lock); while (!done) { - /* lock - if (!lowest_cid ) { - unlock; - continue; - } - - unlock; - */ - /** * This is the real algorithm described in the doc */ + OMPI_THREAD_LOCK(&ompi_cid_lock); + if (comm->c_contextid != ompi_comm_lowest_cid() ) { + /* if not lowest cid, we do not continue, but sleep and try again */ + OMPI_THREAD_UNLOCK(&ompi_cid_lock); + continue; + } + OMPI_THREAD_UNLOCK(&ompi_cid_lock); + + for (i=start; ic_f_to_c_index = newcomm->c_contextid; ompi_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm); - /* - lock - unregister - unlock - */ - + OMPI_THREAD_LOCK(&ompi_cid_lock); + ompi_comm_unregister_cid (comm->c_contextid); + OMPI_THREAD_UNLOCK(&ompi_cid_lock); + return (MPI_SUCCESS); } -#endif - -int ompi_comm_nextcid ( ompi_communicator_t* newcomm, - ompi_communicator_t* comm, - ompi_communicator_t* bridgecomm, - void* local_leader, - void* remote_leader, - int mode, int send_first ) +/**************************************************************************/ +/**************************************************************************/ +/**************************************************************************/ +static void ompi_comm_reg_constructor (ompi_comm_reg_t *regcom) { - - int nextlocal_cid; - int nextcid; - int done=0; - int response=0, glresponse=0; - int flag; - int start=ompi_mpi_communicators.lowest_free; - int i; - - ompi_comm_cid_allredfct* allredfnct; - - /** - * Determine which implementation of allreduce we have to use - * for the current scenario - */ - switch (mode) - { - case OMPI_COMM_CID_INTRA: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra; - break; - case OMPI_COMM_CID_INTER: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter; - break; - case OMPI_COMM_CID_INTRA_BRIDGE: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge; - break; - case OMPI_COMM_CID_INTRA_OOB: - allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob; - break; - default: - return MPI_UNDEFINED; - break; - } - - /** - * This is the real algorithm described in the doc - */ - while ( !done ){ - for (i=start; ic_contextid = nextcid; - newcomm->c_f_to_c_index = newcomm->c_contextid; - ompi_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm); - - return (MPI_SUCCESS); + regcom->cid=MPI_UNDEFINED; } +static void ompi_comm_reg_destructor (ompi_comm_reg_t *regcom) +{ +} + +void ompi_comm_reg_init (void) +{ + OBJ_CONSTRUCT(&ompi_registered_comms, ompi_list_t); +} + +void ompi_comm_reg_finalize (void) +{ + OBJ_DESTRUCT(&ompi_registered_comms); +} + + +static int ompi_comm_register_cid (uint32_t cid ) +{ + ompi_list_item_t *item=NULL; + ompi_comm_reg_t *regcom=NULL; + ompi_comm_reg_t *newentry = OBJ_NEW(ompi_comm_reg_t); + + newentry->cid = cid; + if ( !(ompi_list_is_empty (&ompi_registered_comms)) ) { + for (item = ompi_list_get_first(&ompi_registered_comms); + item != ompi_list_get_end(&ompi_registered_comms); + item = ompi_list_get_next(item)) { + regcom = (ompi_comm_reg_t *)item; + if ( regcom->cid > cid ) { + break; + } + } + ompi_list_insert_pos (&ompi_registered_comms, (ompi_list_item_t *)regcom, + (ompi_list_item_t *)newentry); + } + else { + ompi_list_append (&ompi_registered_comms, (ompi_list_item_t *)newentry); + } + + return OMPI_SUCCESS; +} + +static int ompi_comm_unregister_cid (uint32_t cid) +{ + ompi_comm_reg_t *regcom=NULL; + ompi_list_item_t *item=ompi_list_remove_first(&ompi_registered_comms); + + regcom = (ompi_comm_reg_t *) item; + OBJ_RELEASE(regcom); + + return OMPI_SUCCESS; +} + +static uint32_t ompi_comm_lowest_cid (void) +{ + ompi_comm_reg_t *regcom=NULL; + ompi_list_item_t *item=ompi_list_get_first (&ompi_registered_comms); + + regcom = (ompi_comm_reg_t *)item; + return regcom->cid; +} /**************************************************************************/ /**************************************************************************/ /**************************************************************************/ diff --git a/src/communicator/comm_init.c b/src/communicator/comm_init.c index 4f29ef8031..62434d8c82 100644 --- a/src/communicator/comm_init.c +++ b/src/communicator/comm_init.c @@ -127,6 +127,10 @@ int ompi_comm_init(void) OBJ_RETAIN(&ompi_mpi_group_null); OBJ_RETAIN(&ompi_mpi_errors_are_fatal); + /* initialize the comm_reg stuff for multi-threaded comm_cid + allocation */ + ompi_comm_reg_init(); + return OMPI_SUCCESS; } @@ -208,6 +212,9 @@ int ompi_comm_finalize(void) OBJ_DESTRUCT (&ompi_mpi_communicators); + /* finalize the comm_reg stuff */ + ompi_comm_reg_finalize(); + return OMPI_SUCCESS; } diff --git a/src/communicator/communicator.h b/src/communicator/communicator.h index 69dff1aa8a..945984098a 100644 --- a/src/communicator/communicator.h +++ b/src/communicator/communicator.h @@ -382,6 +382,15 @@ extern "C" { + /* + * these are the init and finalize functions for the comm_reg + * stuff. These routines are necessary for handling multi-threading + * scenarious in the communicator_cid allocation + */ + void ompi_comm_reg_init(void); + void ompi_comm_reg_finalize(void); + + #if defined(c_plusplus) || defined(__cplusplus) } #endif