fixes to make comm_join work
This commit was SVN r2745.
Этот коммит содержится в:
родитель
6f3ebd184a
Коммит
65e4b61ec2
@ -68,6 +68,131 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int* outbuf,
|
|||||||
int send_first );
|
int send_first );
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef MULTI_THREADS
|
||||||
|
static ompi_mutex_t ompi_cid_lock;
|
||||||
|
static volatile ompi_list_t registered_threads;
|
||||||
|
static volatile int running_cid=-1;
|
||||||
|
|
||||||
|
|
||||||
|
int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
|
||||||
|
ompi_communicator_t* comm,
|
||||||
|
ompi_communicator_t* bridgecomm,
|
||||||
|
void* local_leader,
|
||||||
|
void* remote_leader,
|
||||||
|
int mode, int send_first )
|
||||||
|
{
|
||||||
|
|
||||||
|
int nextlocal_cid;
|
||||||
|
int nextcid;
|
||||||
|
int done=0;
|
||||||
|
int response=0, glresponse=0;
|
||||||
|
int flag;
|
||||||
|
int start=ompi_mpi_communicators.lowest_free;
|
||||||
|
int i;
|
||||||
|
|
||||||
|
ompi_comm_cid_allredfct* allredfnct;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine which implementation of allreduce we have to use
|
||||||
|
* for the current scenario
|
||||||
|
*/
|
||||||
|
switch (mode)
|
||||||
|
{
|
||||||
|
case OMPI_COMM_CID_INTRA:
|
||||||
|
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra;
|
||||||
|
break;
|
||||||
|
case OMPI_COMM_CID_INTER:
|
||||||
|
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter;
|
||||||
|
break;
|
||||||
|
case OMPI_COMM_CID_INTRA_BRIDGE:
|
||||||
|
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge;
|
||||||
|
break;
|
||||||
|
case OMPI_COMM_CID_INTRA_OOB:
|
||||||
|
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
return MPI_UNDEFINED;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* Lock the list
|
||||||
|
register threads in registered_threads
|
||||||
|
unlock
|
||||||
|
*/
|
||||||
|
|
||||||
|
while (!done) {
|
||||||
|
/* lock
|
||||||
|
if (!lowest_cid ) {
|
||||||
|
unlock;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
unlock;
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is the real algorithm described in the doc
|
||||||
|
*/
|
||||||
|
|
||||||
|
for (i=start; i<OMPI_MAX_COMM ;i++) {
|
||||||
|
flag=ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators, i, comm);
|
||||||
|
if (true == flag) {
|
||||||
|
nextlocal_cid = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
(allredfnct)(&nextlocal_cid, &nextcid, 1, MPI_MAX, comm, bridgecomm,
|
||||||
|
local_leader, remote_leader, send_first );
|
||||||
|
if (nextcid == nextlocal_cid) {
|
||||||
|
response = 1; /* fine with me */
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ompi_pointer_array_set_item(&ompi_mpi_communicators,
|
||||||
|
nextlocal_cid, NULL);
|
||||||
|
|
||||||
|
flag = ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators,
|
||||||
|
nextcid, comm );
|
||||||
|
if (true == flag) {
|
||||||
|
response = 1; /* works as well */
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
response = 0; /* nope, not acceptable */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
(allredfnct)(&response, &glresponse, 1, MPI_MIN, comm, bridgecomm,
|
||||||
|
local_leader, remote_leader, send_first );
|
||||||
|
if (1 == glresponse) {
|
||||||
|
done = 1; /* we are done */
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if ( 0 == glresponse ) {
|
||||||
|
if ( 1 == response ) {
|
||||||
|
/* we could use that, but other don't agree */
|
||||||
|
ompi_pointer_array_set_item(&ompi_mpi_communicators,
|
||||||
|
nextcid, NULL);
|
||||||
|
}
|
||||||
|
start = nextcid+1; /* that's where we can start the next round */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* set the according values to the newcomm */
|
||||||
|
newcomm->c_contextid = nextcid;
|
||||||
|
newcomm->c_f_to_c_index = newcomm->c_contextid;
|
||||||
|
ompi_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm);
|
||||||
|
|
||||||
|
/*
|
||||||
|
lock
|
||||||
|
unregister
|
||||||
|
unlock
|
||||||
|
*/
|
||||||
|
|
||||||
|
return (MPI_SUCCESS);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
|
int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
|
||||||
ompi_communicator_t* comm,
|
ompi_communicator_t* comm,
|
||||||
ompi_communicator_t* bridgecomm,
|
ompi_communicator_t* bridgecomm,
|
||||||
|
@ -46,7 +46,14 @@ int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root,
|
|||||||
information of the remote process. Therefore, we have to
|
information of the remote process. Therefore, we have to
|
||||||
exchange that.
|
exchange that.
|
||||||
*/
|
*/
|
||||||
rport = ompi_comm_get_rport (port,send_first,group->grp_proc_pointers[rank], tag);
|
if ( OMPI_COMM_JOIN_TAG != tag ) {
|
||||||
|
rport = ompi_comm_get_rport (port,send_first,
|
||||||
|
group->grp_proc_pointers[rank], tag);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
rport = port;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Exchange number of processes and msg length on both sides */
|
/* Exchange number of processes and msg length on both sides */
|
||||||
ompi_buffer_init (&nbuf, size*sizeof(ompi_process_name_t));
|
ompi_buffer_init (&nbuf, size*sizeof(ompi_process_name_t));
|
||||||
|
@ -39,6 +39,10 @@ extern ompi_class_t ompi_communicator_t_class;
|
|||||||
|
|
||||||
#define OMPI_COMM_SET_HIDDEN(comm) ((comm)->c_flags |= OMPI_COMM_HIDDEN)
|
#define OMPI_COMM_SET_HIDDEN(comm) ((comm)->c_flags |= OMPI_COMM_HIDDEN)
|
||||||
|
|
||||||
|
/* a special tag to recognize an MPI_Comm_join in the comm_connect_accept
|
||||||
|
routine. */
|
||||||
|
#define OMPI_COMM_JOIN_TAG 32000
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Modes reqquired for accquiring the new comm-id.
|
* Modes reqquired for accquiring the new comm-id.
|
||||||
* The first (INTER/INTRA) indicates whether the
|
* The first (INTER/INTRA) indicates whether the
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user