1
1
openmpi/src/communicator/comm_cid.c
Edgar Gabriel 65e4b61ec2 fixes to make comm_join work
This commit was SVN r2745.
2004-09-17 16:28:58 +00:00

689 строки
22 KiB
C

/*
* $HEADER$
*/
#include "ompi_config.h"
#include <string.h>
#include <stdio.h>
#include "mpi.h"
#include "communicator/communicator.h"
#include "op/op.h"
#include "proc/proc.h"
#include "include/constants.h"
#include "class/ompi_pointer_array.h"
#include "mca/pcm/pcm.h"
#include "mca/pml/pml.h"
#include "mca/coll/coll.h"
#include "mca/coll/base/base.h"
#include "mca/oob/oob.h"
#define OMPI_COLL_TAG_ALLREDUCE 31000
#define OMPI_MAX_COMM 32768
/**
* These functions make sure, that we determine the global result over
* an intra communicators (simple), an inter-communicator and a
* pseudo inter-communicator described by two separate intra-comms
* and a bridge-comm (intercomm-create scenario).
*/
typedef int ompi_comm_cid_allredfct (int *inbuf, int* outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
void* lleader, void* rleader,
int send_first );
static int ompi_comm_allreduce_intra (int *inbuf, int* outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_ledaer,
int send_first );
static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader,
int send_first );
static int ompi_comm_allreduce_intra_bridge(int *inbuf, int* outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader,
int send_first);
static int ompi_comm_allreduce_intra_oob (int *inbuf, int* outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader,
int send_first );
#ifdef MULTI_THREADS
static ompi_mutex_t ompi_cid_lock;
static volatile ompi_list_t registered_threads;
static volatile int running_cid=-1;
int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
ompi_communicator_t* comm,
ompi_communicator_t* bridgecomm,
void* local_leader,
void* remote_leader,
int mode, int send_first )
{
int nextlocal_cid;
int nextcid;
int done=0;
int response=0, glresponse=0;
int flag;
int start=ompi_mpi_communicators.lowest_free;
int i;
ompi_comm_cid_allredfct* allredfnct;
/**
* Determine which implementation of allreduce we have to use
* for the current scenario
*/
switch (mode)
{
case OMPI_COMM_CID_INTRA:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra;
break;
case OMPI_COMM_CID_INTER:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter;
break;
case OMPI_COMM_CID_INTRA_BRIDGE:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge;
break;
case OMPI_COMM_CID_INTRA_OOB:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
break;
default:
return MPI_UNDEFINED;
break;
}
/* Lock the list
register threads in registered_threads
unlock
*/
while (!done) {
/* lock
if (!lowest_cid ) {
unlock;
continue;
}
unlock;
*/
/**
* This is the real algorithm described in the doc
*/
for (i=start; i<OMPI_MAX_COMM ;i++) {
flag=ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators, i, comm);
if (true == flag) {
nextlocal_cid = i;
break;
}
}
(allredfnct)(&nextlocal_cid, &nextcid, 1, MPI_MAX, comm, bridgecomm,
local_leader, remote_leader, send_first );
if (nextcid == nextlocal_cid) {
response = 1; /* fine with me */
}
else {
ompi_pointer_array_set_item(&ompi_mpi_communicators,
nextlocal_cid, NULL);
flag = ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators,
nextcid, comm );
if (true == flag) {
response = 1; /* works as well */
}
else {
response = 0; /* nope, not acceptable */
}
}
(allredfnct)(&response, &glresponse, 1, MPI_MIN, comm, bridgecomm,
local_leader, remote_leader, send_first );
if (1 == glresponse) {
done = 1; /* we are done */
break;
}
else if ( 0 == glresponse ) {
if ( 1 == response ) {
/* we could use that, but other don't agree */
ompi_pointer_array_set_item(&ompi_mpi_communicators,
nextcid, NULL);
}
start = nextcid+1; /* that's where we can start the next round */
}
}
/* set the according values to the newcomm */
newcomm->c_contextid = nextcid;
newcomm->c_f_to_c_index = newcomm->c_contextid;
ompi_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm);
/*
lock
unregister
unlock
*/
return (MPI_SUCCESS);
}
#endif
int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
ompi_communicator_t* comm,
ompi_communicator_t* bridgecomm,
void* local_leader,
void* remote_leader,
int mode, int send_first )
{
int nextlocal_cid;
int nextcid;
int done=0;
int response=0, glresponse=0;
int flag;
int start=ompi_mpi_communicators.lowest_free;
int i;
ompi_comm_cid_allredfct* allredfnct;
/**
* Determine which implementation of allreduce we have to use
* for the current scenario
*/
switch (mode)
{
case OMPI_COMM_CID_INTRA:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra;
break;
case OMPI_COMM_CID_INTER:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter;
break;
case OMPI_COMM_CID_INTRA_BRIDGE:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge;
break;
case OMPI_COMM_CID_INTRA_OOB:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
break;
default:
return MPI_UNDEFINED;
break;
}
/**
* This is the real algorithm described in the doc
*/
while ( !done ){
for (i=start; i<OMPI_MAX_COMM ;i++) {
flag=ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators, i, comm);
if (true == flag) {
nextlocal_cid = i;
break;
}
}
(allredfnct)(&nextlocal_cid, &nextcid, 1, MPI_MAX, comm, bridgecomm,
local_leader, remote_leader, send_first );
if (nextcid == nextlocal_cid) {
response = 1; /* fine with me */
}
else {
ompi_pointer_array_set_item(&ompi_mpi_communicators,
nextlocal_cid, NULL);
flag = ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators,
nextcid, comm );
if (true == flag) {
response = 1; /* works as well */
}
else {
response = 0; /* nope, not acceptable */
}
}
(allredfnct)(&response, &glresponse, 1, MPI_MIN, comm, bridgecomm,
local_leader, remote_leader, send_first );
if (1 == glresponse) {
done = 1; /* we are done */
break;
}
else if ( 0 == glresponse ) {
if ( 1 == response ) {
/* we could use that, but other don't agree */
ompi_pointer_array_set_item(&ompi_mpi_communicators,
nextcid, NULL);
}
start = nextcid+1; /* that's where we can start the next round */
}
}
/* set the according values to the newcomm */
newcomm->c_contextid = nextcid;
newcomm->c_f_to_c_index = newcomm->c_contextid;
ompi_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm);
return (MPI_SUCCESS);
}
/**************************************************************************/
/**************************************************************************/
/**************************************************************************/
/* This routine serves two purposes:
* - the allreduce acts as a kind of Barrier,
* which avoids, that we have incoming fragments
* on the new communicator before everybody has set
* up the comm structure.
* - some components (e.g. the collective MagPIe component
* might want to generate new communicators and communicate
* using the new comm. Thus, it can just be called after
* the 'barrier'.
*
* The reason that this routine is in comm_cid and not in
* comm.c is, that this file contains the allreduce implementations
* which are required, and thus we avoid having duplicate code...
*/
int ompi_comm_activate ( ompi_communicator_t* newcomm,
ompi_communicator_t* comm,
ompi_communicator_t* bridgecomm,
void* local_leader,
void* remote_leader,
int mode,
int send_first,
mca_base_component_t *collcomponent )
{
int ok=0, gok=0;
ompi_comm_cid_allredfct* allredfnct;
/* Step 1: the barrier, after which it is allowed to
* send messages over the new communicator
*/
switch (mode)
{
case OMPI_COMM_CID_INTRA:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra;
break;
case OMPI_COMM_CID_INTER:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter;
break;
case OMPI_COMM_CID_INTRA_BRIDGE:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge;
break;
case OMPI_COMM_CID_INTRA_OOB:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
break;
default:
return MPI_UNDEFINED;
break;
}
(allredfnct)(&ok, &gok, 1, MPI_MIN, comm, bridgecomm,
local_leader, remote_leader, send_first );
/* Step 2: call all functions, which might use the new communicator
* already.
*/
/* Initialize the coll components */
/* Let the collectives components fight over who will do
collective on this new comm. */
if (OMPI_ERROR == mca_coll_base_comm_select(newcomm, collcomponent)) {
return OMPI_ERROR;
}
return OMPI_SUCCESS;
}
/**************************************************************************/
/**************************************************************************/
/**************************************************************************/
/* Arguments not used in this implementation:
* - bridgecomm
* - local_leader
* - remote_leader
* - send_first
*/
static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader,
int send_first )
{
return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT,
op,comm );
}
/* Arguments not used in this implementation:
* - bridgecomm
* - local_leader
* - remote_leader
* - send_first
*/
static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader,
int send_first )
{
int local_rank, rsize;
int i, rc;
int *sbuf;
int *tmpbuf=NULL;
int *rcounts=NULL, scount=0;
int *rdisps=NULL;
if ( &ompi_mpi_op_sum != op && &ompi_mpi_op_prod != op &&
&ompi_mpi_op_max != op && &ompi_mpi_op_min != op ) {
return MPI_ERR_OP;
}
if ( !OMPI_COMM_IS_INTER (intercomm)) {
return MPI_ERR_COMM;
}
/* Allocate temporary arrays */
rsize = ompi_comm_remote_size (intercomm);
local_rank = ompi_comm_rank ( intercomm );
tmpbuf = (int *) malloc ( count * sizeof(int));
rdisps = (int *) calloc ( rsize, sizeof(int));
rcounts = (int *) calloc ( rsize, sizeof(int) );
if ( NULL == tmpbuf || NULL == rdisps || NULL == rcounts ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* Execute the inter-allreduce: the result of our group will
be in the buffer of the remote group */
rc = intercomm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT,
op, intercomm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
if ( 0 == local_rank ) {
MPI_Request req;
/* for the allgatherv later */
scount = count;
/* local leader exchange their data and determine the overall result
for both groups */
rc = mca_pml.pml_irecv (outbuf, count, MPI_INT, 0,
OMPI_COLL_TAG_ALLREDUCE
, intercomm, &req );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = mca_pml.pml_send (tmpbuf, count, MPI_INT, 0,
OMPI_COLL_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, intercomm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = mca_pml.pml_wait_all ( 1, &req, MPI_STATUS_IGNORE );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
if ( &ompi_mpi_op_max == op ) {
for ( i = 0 ; i < count; i++ ) {
if (tmpbuf[i] > outbuf[i]) outbuf[i] = tmpbuf[i];
}
}
else if ( &ompi_mpi_op_min == op ) {
for ( i = 0 ; i < count; i++ ) {
if (tmpbuf[i] < outbuf[i]) outbuf[i] = tmpbuf[i];
}
}
else if ( &ompi_mpi_op_sum == op ) {
for ( i = 0 ; i < count; i++ ) {
outbuf[i] += tmpbuf[i];
}
}
else if ( &ompi_mpi_op_prod == op ) {
for ( i = 0 ; i < count; i++ ) {
outbuf[i] *= tmpbuf[i];
}
}
}
/* distribute the overall result to all processes in the other group.
Instead of using bcast, we are using here allgatherv, to avoid the
possible deadlock. Else, we need an algorithm to determine,
which group sends first in the inter-bcast and which receives
the result first.
*/
rcounts[0] = count;
sbuf = outbuf;
rc = intercomm->c_coll.coll_allgatherv (sbuf, scount, MPI_INT, outbuf,
rcounts, rdisps, MPI_INT,
intercomm);
exit:
if ( NULL != tmpbuf ) {
free ( tmpbuf );
}
if ( NULL != rcounts ) {
free ( rcounts );
}
if ( NULL != rdisps ) {
free ( rdisps );
}
return (rc);
}
/* Arguments not used in this implementation:
* - send_first
*/
static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bcomm,
void* lleader, void* rleader,
int send_first )
{
int *tmpbuf=NULL;
int local_rank;
int i;
int rc;
int local_leader, remote_leader;
local_leader = (*((int*)lleader));
remote_leader = (*((int*)rleader));
if ( &ompi_mpi_op_sum != op && &ompi_mpi_op_prod != op &&
&ompi_mpi_op_max != op && &ompi_mpi_op_min != op ) {
return MPI_ERR_OP;
}
local_rank = ompi_comm_rank ( comm );
tmpbuf = (int *) malloc ( count * sizeof(int));
if ( NULL == tmpbuf ) {
return MPI_ERR_INTERN;
}
/* Intercomm_create */
rc = comm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT,
op, comm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
if (local_rank == local_leader ) {
MPI_Request req;
rc = mca_pml.pml_irecv ( outbuf, count, MPI_INT, remote_leader,
OMPI_COLL_TAG_ALLREDUCE,
bcomm, &req );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = mca_pml.pml_send (tmpbuf, count, MPI_INT, remote_leader,
OMPI_COLL_TAG_ALLREDUCE,
MCA_PML_BASE_SEND_STANDARD, bcomm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = mca_pml.pml_wait_all ( 1, &req, MPI_STATUS_IGNORE);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
if ( &ompi_mpi_op_max == op ) {
for ( i = 0 ; i < count; i++ ) {
if (tmpbuf[i] > outbuf[i]) outbuf[i] = tmpbuf[i];
}
}
else if ( &ompi_mpi_op_min == op ) {
for ( i = 0 ; i < count; i++ ) {
if (tmpbuf[i] < outbuf[i]) outbuf[i] = tmpbuf[i];
}
}
else if ( &ompi_mpi_op_sum == op ) {
for ( i = 0 ; i < count; i++ ) {
outbuf[i] += tmpbuf[i];
}
}
else if ( &ompi_mpi_op_prod == op ) {
for ( i = 0 ; i < count; i++ ) {
outbuf[i] *= tmpbuf[i];
}
}
}
rc = comm->c_coll.coll_bcast ( outbuf, count, MPI_INT, local_leader,
comm);
exit:
if (NULL != tmpbuf ) {
free (tmpbuf);
}
return (rc);
}
/* Arguments not used in this implementation:
* - bridgecomm
*
* lleader is the local rank of root in comm
* rleader is the OOB contact information of the
* root processes in the other world.
*/
static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
void* lleader, void* rleader,
int send_first )
{
int *tmpbuf=NULL;
int i;
int rc;
int local_leader, local_rank;
ompi_process_name_t *remote_leader=NULL;
local_leader = (*((int*)lleader));
remote_leader = (ompi_process_name_t*)rleader;
if ( &ompi_mpi_op_sum != op && &ompi_mpi_op_prod != op &&
&ompi_mpi_op_max != op && &ompi_mpi_op_min != op ) {
return MPI_ERR_OP;
}
local_rank = ompi_comm_rank ( comm );
tmpbuf = (int *) malloc ( count * sizeof(int));
if ( NULL == tmpbuf ) {
return MPI_ERR_INTERN;
}
/* comm is an intra-communicator */
rc = comm->c_coll.coll_allreduce(inbuf,tmpbuf,count,MPI_INT,op, comm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
if (local_rank == local_leader ) {
ompi_buffer_t sbuf;
ompi_buffer_t rbuf;
ompi_buffer_init(&sbuf, count * sizeof(int));
ompi_pack(sbuf, tmpbuf, count, OMPI_INT32);
if ( send_first ) {
rc = mca_oob_send_packed(remote_leader, sbuf, 0, 0);
rc = mca_oob_recv_packed (remote_leader, &rbuf, NULL);
}
else {
rc = mca_oob_recv_packed(remote_leader, &rbuf, NULL);
rc = mca_oob_send_packed(remote_leader, sbuf, 0, 0);
}
ompi_unpack(rbuf, outbuf, count, OMPI_INT32);
ompi_buffer_free(sbuf);
ompi_buffer_free(rbuf);
if ( &ompi_mpi_op_max == op ) {
for ( i = 0 ; i < count; i++ ) {
if (tmpbuf[i] > outbuf[i]) outbuf[i] = tmpbuf[i];
}
}
else if ( &ompi_mpi_op_min == op ) {
for ( i = 0 ; i < count; i++ ) {
if (tmpbuf[i] < outbuf[i]) outbuf[i] = tmpbuf[i];
}
}
else if ( &ompi_mpi_op_sum == op ) {
for ( i = 0 ; i < count; i++ ) {
outbuf[i] += tmpbuf[i];
}
}
else if ( &ompi_mpi_op_prod == op ) {
for ( i = 0 ; i < count; i++ ) {
outbuf[i] *= tmpbuf[i];
}
}
}
rc = comm->c_coll.coll_bcast (outbuf, count, MPI_INT,
local_leader, comm);
exit:
if (NULL != tmpbuf ) {
free (tmpbuf);
}
return (rc);
}