adding the code to manage the communicator cid allocation in multithreaded scenarios. The code seems to work in a single threaded scenario (and thus should not generate any errors hopefully), the multithreaded case still to be tested.
This commit was SVN r2792.
Этот коммит содержится в:
родитель
479f6c5e6b
Коммит
e1406a1d5d
@ -12,6 +12,7 @@
|
|||||||
#include "proc/proc.h"
|
#include "proc/proc.h"
|
||||||
#include "include/constants.h"
|
#include "include/constants.h"
|
||||||
#include "class/ompi_pointer_array.h"
|
#include "class/ompi_pointer_array.h"
|
||||||
|
#include "class/ompi_list.h"
|
||||||
#include "mca/pcm/pcm.h"
|
#include "mca/pcm/pcm.h"
|
||||||
#include "mca/pml/pml.h"
|
#include "mca/pml/pml.h"
|
||||||
#include "mca/coll/coll.h"
|
#include "mca/coll/coll.h"
|
||||||
@ -67,11 +68,27 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int* outbuf,
|
|||||||
void* remote_leader,
|
void* remote_leader,
|
||||||
int send_first );
|
int send_first );
|
||||||
|
|
||||||
|
static int ompi_comm_register_cid (uint32_t contextid);
|
||||||
|
static int ompi_comm_unregister_cid (uint32_t contextid);
|
||||||
|
static uint32_t ompi_comm_lowest_cid ( void );
|
||||||
|
|
||||||
|
struct ompi_comm_reg_t{
|
||||||
|
ompi_list_item_t super;
|
||||||
|
uint32_t cid;
|
||||||
|
};
|
||||||
|
typedef struct ompi_comm_reg_t ompi_comm_reg_t;
|
||||||
|
OBJ_CLASS_DECLARATION(ompi_comm_reg_t);
|
||||||
|
|
||||||
|
static void ompi_comm_reg_constructor(ompi_comm_reg_t *regcom);
|
||||||
|
static void ompi_comm_reg_destructor(ompi_comm_reg_t *regcom);
|
||||||
|
|
||||||
|
OBJ_CLASS_INSTANCE (ompi_comm_reg_t,
|
||||||
|
ompi_list_item_t,
|
||||||
|
ompi_comm_reg_constructor,
|
||||||
|
ompi_comm_reg_destructor );
|
||||||
|
|
||||||
#ifdef MULTI_THREADS
|
|
||||||
static ompi_mutex_t ompi_cid_lock;
|
static ompi_mutex_t ompi_cid_lock;
|
||||||
static volatile ompi_list_t registered_threads;
|
static ompi_list_t ompi_registered_comms;
|
||||||
static volatile int running_cid=-1;
|
|
||||||
|
|
||||||
|
|
||||||
int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
|
int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
|
||||||
@ -116,24 +133,23 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Lock the list
|
OMPI_THREAD_LOCK(&ompi_cid_lock);
|
||||||
register threads in registered_threads
|
ompi_comm_register_cid (comm->c_contextid);
|
||||||
unlock
|
OMPI_THREAD_UNLOCK(&ompi_cid_lock);
|
||||||
*/
|
|
||||||
|
|
||||||
while (!done) {
|
while (!done) {
|
||||||
/* lock
|
/**
|
||||||
if (!lowest_cid ) {
|
* This is the real algorithm described in the doc
|
||||||
unlock;
|
*/
|
||||||
|
|
||||||
|
OMPI_THREAD_LOCK(&ompi_cid_lock);
|
||||||
|
if (comm->c_contextid != ompi_comm_lowest_cid() ) {
|
||||||
|
/* if not lowest cid, we do not continue, but sleep and try again */
|
||||||
|
OMPI_THREAD_UNLOCK(&ompi_cid_lock);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
OMPI_THREAD_UNLOCK(&ompi_cid_lock);
|
||||||
|
|
||||||
unlock;
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the real algorithm described in the doc
|
|
||||||
*/
|
|
||||||
|
|
||||||
for (i=start; i<OMPI_MAX_COMM ;i++) {
|
for (i=start; i<OMPI_MAX_COMM ;i++) {
|
||||||
flag=ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators, i, comm);
|
flag=ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators, i, comm);
|
||||||
@ -183,112 +199,80 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
|
|||||||
newcomm->c_f_to_c_index = newcomm->c_contextid;
|
newcomm->c_f_to_c_index = newcomm->c_contextid;
|
||||||
ompi_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm);
|
ompi_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm);
|
||||||
|
|
||||||
/*
|
OMPI_THREAD_LOCK(&ompi_cid_lock);
|
||||||
lock
|
ompi_comm_unregister_cid (comm->c_contextid);
|
||||||
unregister
|
OMPI_THREAD_UNLOCK(&ompi_cid_lock);
|
||||||
unlock
|
|
||||||
*/
|
|
||||||
|
|
||||||
return (MPI_SUCCESS);
|
return (MPI_SUCCESS);
|
||||||
}
|
}
|
||||||
#endif
|
/**************************************************************************/
|
||||||
|
/**************************************************************************/
|
||||||
int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
|
/**************************************************************************/
|
||||||
ompi_communicator_t* comm,
|
static void ompi_comm_reg_constructor (ompi_comm_reg_t *regcom)
|
||||||
ompi_communicator_t* bridgecomm,
|
|
||||||
void* local_leader,
|
|
||||||
void* remote_leader,
|
|
||||||
int mode, int send_first )
|
|
||||||
{
|
{
|
||||||
|
regcom->cid=MPI_UNDEFINED;
|
||||||
int nextlocal_cid;
|
|
||||||
int nextcid;
|
|
||||||
int done=0;
|
|
||||||
int response=0, glresponse=0;
|
|
||||||
int flag;
|
|
||||||
int start=ompi_mpi_communicators.lowest_free;
|
|
||||||
int i;
|
|
||||||
|
|
||||||
ompi_comm_cid_allredfct* allredfnct;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Determine which implementation of allreduce we have to use
|
|
||||||
* for the current scenario
|
|
||||||
*/
|
|
||||||
switch (mode)
|
|
||||||
{
|
|
||||||
case OMPI_COMM_CID_INTRA:
|
|
||||||
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra;
|
|
||||||
break;
|
|
||||||
case OMPI_COMM_CID_INTER:
|
|
||||||
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter;
|
|
||||||
break;
|
|
||||||
case OMPI_COMM_CID_INTRA_BRIDGE:
|
|
||||||
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge;
|
|
||||||
break;
|
|
||||||
case OMPI_COMM_CID_INTRA_OOB:
|
|
||||||
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
return MPI_UNDEFINED;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the real algorithm described in the doc
|
|
||||||
*/
|
|
||||||
while ( !done ){
|
|
||||||
for (i=start; i<OMPI_MAX_COMM ;i++) {
|
|
||||||
flag=ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators, i, comm);
|
|
||||||
if (true == flag) {
|
|
||||||
nextlocal_cid = i;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(allredfnct)(&nextlocal_cid, &nextcid, 1, MPI_MAX, comm, bridgecomm,
|
|
||||||
local_leader, remote_leader, send_first );
|
|
||||||
if (nextcid == nextlocal_cid) {
|
|
||||||
response = 1; /* fine with me */
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
ompi_pointer_array_set_item(&ompi_mpi_communicators,
|
|
||||||
nextlocal_cid, NULL);
|
|
||||||
|
|
||||||
flag = ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators,
|
|
||||||
nextcid, comm );
|
|
||||||
if (true == flag) {
|
|
||||||
response = 1; /* works as well */
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
response = 0; /* nope, not acceptable */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(allredfnct)(&response, &glresponse, 1, MPI_MIN, comm, bridgecomm,
|
|
||||||
local_leader, remote_leader, send_first );
|
|
||||||
if (1 == glresponse) {
|
|
||||||
done = 1; /* we are done */
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
else if ( 0 == glresponse ) {
|
|
||||||
if ( 1 == response ) {
|
|
||||||
/* we could use that, but other don't agree */
|
|
||||||
ompi_pointer_array_set_item(&ompi_mpi_communicators,
|
|
||||||
nextcid, NULL);
|
|
||||||
}
|
|
||||||
start = nextcid+1; /* that's where we can start the next round */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* set the according values to the newcomm */
|
|
||||||
newcomm->c_contextid = nextcid;
|
|
||||||
newcomm->c_f_to_c_index = newcomm->c_contextid;
|
|
||||||
ompi_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm);
|
|
||||||
|
|
||||||
return (MPI_SUCCESS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void ompi_comm_reg_destructor (ompi_comm_reg_t *regcom)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void ompi_comm_reg_init (void)
|
||||||
|
{
|
||||||
|
OBJ_CONSTRUCT(&ompi_registered_comms, ompi_list_t);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ompi_comm_reg_finalize (void)
|
||||||
|
{
|
||||||
|
OBJ_DESTRUCT(&ompi_registered_comms);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int ompi_comm_register_cid (uint32_t cid )
|
||||||
|
{
|
||||||
|
ompi_list_item_t *item=NULL;
|
||||||
|
ompi_comm_reg_t *regcom=NULL;
|
||||||
|
ompi_comm_reg_t *newentry = OBJ_NEW(ompi_comm_reg_t);
|
||||||
|
|
||||||
|
newentry->cid = cid;
|
||||||
|
if ( !(ompi_list_is_empty (&ompi_registered_comms)) ) {
|
||||||
|
for (item = ompi_list_get_first(&ompi_registered_comms);
|
||||||
|
item != ompi_list_get_end(&ompi_registered_comms);
|
||||||
|
item = ompi_list_get_next(item)) {
|
||||||
|
regcom = (ompi_comm_reg_t *)item;
|
||||||
|
if ( regcom->cid > cid ) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ompi_list_insert_pos (&ompi_registered_comms, (ompi_list_item_t *)regcom,
|
||||||
|
(ompi_list_item_t *)newentry);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ompi_list_append (&ompi_registered_comms, (ompi_list_item_t *)newentry);
|
||||||
|
}
|
||||||
|
|
||||||
|
return OMPI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int ompi_comm_unregister_cid (uint32_t cid)
|
||||||
|
{
|
||||||
|
ompi_comm_reg_t *regcom=NULL;
|
||||||
|
ompi_list_item_t *item=ompi_list_remove_first(&ompi_registered_comms);
|
||||||
|
|
||||||
|
regcom = (ompi_comm_reg_t *) item;
|
||||||
|
OBJ_RELEASE(regcom);
|
||||||
|
|
||||||
|
return OMPI_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static uint32_t ompi_comm_lowest_cid (void)
|
||||||
|
{
|
||||||
|
ompi_comm_reg_t *regcom=NULL;
|
||||||
|
ompi_list_item_t *item=ompi_list_get_first (&ompi_registered_comms);
|
||||||
|
|
||||||
|
regcom = (ompi_comm_reg_t *)item;
|
||||||
|
return regcom->cid;
|
||||||
|
}
|
||||||
/**************************************************************************/
|
/**************************************************************************/
|
||||||
/**************************************************************************/
|
/**************************************************************************/
|
||||||
/**************************************************************************/
|
/**************************************************************************/
|
||||||
|
@ -127,6 +127,10 @@ int ompi_comm_init(void)
|
|||||||
OBJ_RETAIN(&ompi_mpi_group_null);
|
OBJ_RETAIN(&ompi_mpi_group_null);
|
||||||
OBJ_RETAIN(&ompi_mpi_errors_are_fatal);
|
OBJ_RETAIN(&ompi_mpi_errors_are_fatal);
|
||||||
|
|
||||||
|
/* initialize the comm_reg stuff for multi-threaded comm_cid
|
||||||
|
allocation */
|
||||||
|
ompi_comm_reg_init();
|
||||||
|
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -208,6 +212,9 @@ int ompi_comm_finalize(void)
|
|||||||
|
|
||||||
OBJ_DESTRUCT (&ompi_mpi_communicators);
|
OBJ_DESTRUCT (&ompi_mpi_communicators);
|
||||||
|
|
||||||
|
/* finalize the comm_reg stuff */
|
||||||
|
ompi_comm_reg_finalize();
|
||||||
|
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -382,6 +382,15 @@ extern "C" {
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* these are the init and finalize functions for the comm_reg
|
||||||
|
* stuff. These routines are necessary for handling multi-threading
|
||||||
|
* scenarious in the communicator_cid allocation
|
||||||
|
*/
|
||||||
|
void ompi_comm_reg_init(void);
|
||||||
|
void ompi_comm_reg_finalize(void);
|
||||||
|
|
||||||
|
|
||||||
#if defined(c_plusplus) || defined(__cplusplus)
|
#if defined(c_plusplus) || defined(__cplusplus)
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user