1
1

restructuring some of the communicator code.

with a couple of internal tricks, intercomm_create works now.

This commit was SVN r1855.
Этот коммит содержится в:
Edgar Gabriel 2004-08-03 22:07:45 +00:00
родитель 435ce5f5e6
Коммит 20a512a9b7
3 изменённых файлов: 308 добавлений и 254 удалений

Просмотреть файл

@ -8,6 +8,7 @@
#include "mpi.h"
#include "communicator/communicator.h"
#include "datatype/datatype.h"
#include "proc/proc.h"
#include "util/bit_ops.h"
#include "include/constants.h"
@ -66,23 +67,20 @@ static int ompi_comm_allgather_emulate_intra (void* inbuf, int incount, MPI_Data
* All other routines are just used to determine these elements.
*/
ompi_communicator_t * ompi_comm_set ( ompi_communicator_t* oldcomm,
int local_size,
ompi_proc_t **local_procs,
int remote_size,
ompi_proc_t **remote_procs,
ompi_hash_table_t *attr,
ompi_errhandler_t *errh,
mca_base_component_t *collcomponent,
mca_base_component_t *topocomponent )
int ompi_comm_set ( ompi_communicator_t *newcomm,
ompi_communicator_t* oldcomm,
int local_size,
ompi_proc_t **local_procs,
int remote_size,
ompi_proc_t **remote_procs,
ompi_hash_table_t *attr,
ompi_errhandler_t *errh,
mca_base_component_t *collcomponent,
mca_base_component_t *topocomponent )
{
ompi_communicator_t *newcomm;
ompi_proc_t *my_gpointer;
int my_grank;
/* Allocate comm structure */
newcomm = ompi_comm_allocate ( local_size, remote_size );
/* Set local_group information */
memcpy ( newcomm->c_local_group->grp_proc_pointers,
local_procs, local_size * sizeof(ompi_proc_t *));
@ -139,21 +137,17 @@ ompi_communicator_t * ompi_comm_set ( ompi_communicator_t* oldcomm,
/* Initialize the PML stuff in the newcomm */
if ( OMPI_ERROR == mca_pml.pml_add_comm(newcomm) ) {
goto err_exit;
return OMPI_ERROR;
}
/* Initialize the coll components */
/* Let the collectives components fight over who will do
collective on this new comm. */
if (OMPI_ERROR == mca_coll_base_comm_select(newcomm, collcomponent)) {
goto err_exit;
return OMPI_ERROR;
}
err_exit:
/* Free whatever has been allocated, print an appropriate
error message and return a null pointer */
return ( newcomm );
return (OMPI_SUCCESS);
}
@ -241,21 +235,11 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group,
else {
rsize = 0;
rprocs = NULL;
mode = OMPI_COMM_CID_INTRA;
mode = OMPI_COMM_CID_INTRA;
}
newcomp = ompi_comm_set ( comm, /* old comm */
group->grp_proc_count, /* local_size */
group->grp_proc_pointers, /* local_procs*/
rsize, /* remote_size */
rprocs, /* remote_procs */
NULL, /* attrs */
comm->error_handler, /* error handler */
NULL, /* coll component */
NULL /* topo component */
);
if ( MPI_COMM_NULL == newcomp ) {
newcomp = ompi_comm_allocate (group->grp_proc_count, rsize );
if ( NULL == newcomp ) {
rc = MPI_ERR_INTERN;
goto exit;
}
@ -271,6 +255,22 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group,
goto exit;
}
rc = ompi_comm_set ( newcomp, /* new comm */
comm, /* old comm */
group->grp_proc_count, /* local_size */
group->grp_proc_pointers, /* local_procs*/
rsize, /* remote_size */
rprocs, /* remote_procs */
NULL, /* attrs */
comm->error_handler, /* error handler */
NULL, /* coll component */
NULL /* topo component */
);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
/* Check whether we are part of the new comm.
If not, we have to free the structure again.
However, we could not avoid the comm_nextcid step, since
@ -445,17 +445,8 @@ int ompi_comm_split ( ompi_communicator_t* comm, int color, int key,
/* Step 3: set up the communicator */
/* --------------------------------------------------------- */
/* Create the communicator finally */
newcomp = ompi_comm_set ( comm, /* old comm */
my_size, /* local_size */
procs, /* local_procs*/
my_rsize, /* remote_size */
rprocs, /* remote_procs */
NULL, /* attrs */
comm->error_handler,/* error handler */
NULL, /* coll component */
NULL /* topo component */
);
if ( MPI_COMM_NULL == newcomp ) {
newcomp = ompi_comm_allocate (my_size, my_rsize );
if ( NULL == newcomp ) {
rc = MPI_ERR_INTERN;
goto exit;
}
@ -471,6 +462,22 @@ int ompi_comm_split ( ompi_communicator_t* comm, int color, int key,
goto exit;
}
rc = ompi_comm_set ( newcomp, /* new comm */
comm, /* old comm */
my_size, /* local_size */
procs, /* local_procs*/
my_rsize, /* remote_size */
rprocs, /* remote_procs */
NULL, /* attrs */
comm->error_handler,/* error handler */
NULL, /* coll component */
NULL /* topo component */
);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
exit:
if ( NULL != results ) {
free ( results );
@ -521,7 +528,6 @@ static int ompi_comm_allgather_emulate_intra( void *inbuf, int incount,
int rank, rsize, i, rc;
int *tmpbuf=NULL;
MPI_Request *req=NULL, sendreq;
MPI_Status *stats=NULL, status;
rsize = ompi_comm_remote_size(comm);
rank = ompi_comm_rank(comm);
@ -530,8 +536,7 @@ static int ompi_comm_allgather_emulate_intra( void *inbuf, int incount,
if ( 0 == rank ) {
tmpbuf = (int *) malloc (rsize*outcount*sizeof(int));
req = (MPI_Request *)malloc (rsize*outcount*sizeof(MPI_Request));
stats = (MPI_Status *)malloc (rsize*outcount*sizeof(MPI_Status));
if ( NULL == tmpbuf || NULL == req || NULL == stats) {
if ( NULL == tmpbuf || NULL == req ) {
return (OMPI_ERR_OUT_OF_RESOURCE);
}
@ -550,13 +555,13 @@ static int ompi_comm_allgather_emulate_intra( void *inbuf, int incount,
}
if ( 0 == rank ) {
rc = mca_pml.pml_wait (rsize, req, NULL, stats);
rc = mca_pml.pml_wait_all (rsize, req, MPI_STATUSES_IGNORE);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
}
rc = mca_pml.pml_wait (1, &sendreq, NULL, &status );
rc = mca_pml.pml_wait_all (1, &sendreq, MPI_STATUS_IGNORE);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
@ -579,15 +584,12 @@ static int ompi_comm_allgather_emulate_intra( void *inbuf, int incount,
}
}
rc = mca_pml.pml_wait (1, &sendreq, NULL, &status );
rc = mca_pml.pml_wait_all (1, &sendreq, MPI_STATUS_IGNORE );
exit:
if ( NULL != req ) {
free ( req );
}
if ( NULL != stats ) {
free ( stats );
}
if ( NULL != tmpbuf ) {
free ( tmpbuf );
}
@ -619,77 +621,137 @@ int ompi_comm_free ( ompi_communicator_t **comm )
/**********************************************************************/
/**********************************************************************/
/**********************************************************************/
ompi_proc_t **ompi_comm_get_rprocs ( ompi_communicator_t *local_comm,
ompi_communicator_t *bridge_comm,
int local_leader,
int remote_leader,
int tag,
int rsize)
{
ompi_proc_t **rprocs = NULL;
#if 0 /* TSW - fix this */
int local_rank, local_size;
ompi_process_id_t jobid;
uint32_t *rvpids=NULL, *vpids=NULL;
int rc, i;
local_rank = ompi_comm_rank ( local_comm );
/* This routine will be very soon cleaned up. We agreed on the
required interface with Brian, Rich and Tim W., as soon
as the functions are implemented, this function
will be simplified significantly.
*/
#define PROC local_comm->c_local_group->grp_proc_pointers
vpids = (uint32_t *) malloc ( local_size * sizeof(uint32_t));
rvpids = (uint32_t *) malloc ( rsize * sizeof(uint32_t));
rprocs = (ompi_proc_t **) malloc ( rsize * sizeof(ompi_proc_t *));
if ( NULL == vpids || NULL == rvpids || NULL == rprocs ) {
ompi_proc_t **ompi_comm_get_rprocs ( ompi_communicator_t *local_comm,
ompi_communicator_t *bridge_comm,
int local_leader,
int remote_leader,
int tag,
int rsize)
{
int rc, i, count[2];
int local_rank, local_size;
ompi_proc_t **rprocs=NULL;
typedef struct _tmp_pname {
uint32_t vpid;
uint32_t jobid;
uint32_t cellid;
} tmp_pname;
tmp_pname *lnames=NULL, *rnames=NULL;
ompi_process_name_t rprocname;
MPI_Datatype ntype=MPI_DATATYPE_NULL, btype, intype[2];
MPI_Aint extent, addr[2];
local_rank = ompi_comm_rank (local_comm);
local_size = ompi_comm_size (local_comm);
rnames = (tmp_pname *) malloc(rsize * sizeof (tmp_pname));
rprocs = (ompi_proc_t **) malloc(rsize * sizeof(ompi_proc_t *));
if (NULL == rprocs || NULL == rnames) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto err_exit;
}
if ( local_rank == local_leader ) {
/* generate a derived datatype describing tmp_pname */
btype = MPI_UNSIGNED;
ompi_ddt_type_extent (btype, &extent);
if ( extent != 4 ) {
btype = MPI_UNSIGNED_SHORT;
ompi_ddt_type_extent ( btype, &extent);
if ( 4 != extent ) {
btype = MPI_UNSIGNED_LONG;
ompi_ddt_type_extent ( btype, &extent);
}
}
addr[0] = 0;
addr[1] = ( ((char *)&rnames[1]) - ((char *)&rnames[0]));
intype[0] = btype;
intype[1] = MPI_UB;
count[0] = 3;
count[1] = 1;
ompi_ddt_create_struct (2, count, addr, intype, &ntype );
ompi_ddt_commit (&ntype);
if (local_rank == local_leader) {
MPI_Request req;
MPI_Status status;
/* generate vpid list */
lnames=(tmp_pname *) malloc (local_size*sizeof(tmp_pname));
if ( NULL == lnames ) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto err_exit;
}
/* generate name list */
for ( i = 0; i < local_size; i++ ){
vpids[i] = (uint32_t) local_comm->c_local_group->grp_proc_pointers[i]->proc_name;
lnames[i].vpid = PROC[i]->proc_name.vpid;
lnames[i].jobid = PROC[i]->proc_name.jobid;
lnames[i].cellid = PROC[i]->proc_name.cellid;
}
/* local leader exchange group sizes and vpid lists */
rc = mca_pml.pml_irecv (rvpids, rsize, MPI_UNSIGNED, remote_leader, tag,
rc = mca_pml.pml_irecv (rnames, rsize, ntype, remote_leader, tag,
bridge_comm, &req );
if ( rc != MPI_SUCCESS ) {
goto err_exit;
}
rc = mca_pml.pml_send (vpids, local_size, MPI_UNSIGNED, remote_leader, tag,
rc = mca_pml.pml_send (lnames, local_size, ntype, remote_leader, tag,
MCA_PML_BASE_SEND_STANDARD, bridge_comm );
if ( rc != MPI_SUCCESS ) {
goto err_exit;
}
rc = mca_pml.pml_wait ( 1, &req, NULL, &status);
rc = mca_pml.pml_wait_all ( 1, &req, MPI_STATUS_IGNORE );
if ( rc != MPI_SUCCESS ) {
goto err_exit;
}
}
rc = local_comm->c_coll.coll_bcast( rvpids, rsize, MPI_UNSIGNED,
rc = local_comm->c_coll.coll_bcast( rnames, rsize, ntype,
local_leader, local_comm );
if ( rc != MPI_SUCCESS ) {
goto err_exit;
}
for ( i = 0; i < rsize; i++ ) {
rprocs[i] = ompi_proc_find ( job, rvpids[i] );
rprocname.vpid = rnames[i].vpid;
rprocname.cellid = rnames[i].cellid;
rprocname.jobid = rnames[i].jobid;
/* ompi_proc_find should be used here, it
seems however not to work at the moment.
Will have to be fixed later.
*/
rprocs[i] = ompi_proc_find (&rprocname );
}
err_exit:
if ( NULL != vpids ) {
free ( vpids );
if ( NULL != lnames ) {
free ( lnames );
}
if ( NULL != rvpids) {
free ( rvpids );
if ( NULL != rnames) {
free ( rnames );
}
/* rprocs has to be freed in the level above (i.e. intercomm_create ) */
#endif
return rprocs;
if ( ntype != MPI_DATATYPE_NULL ) {
ompi_ddt_destroy ( &ntype );
}
if ( OMPI_SUCCESS !=rc ) {
printf("%d: Error in ompi_get_rprocs\n", local_rank);
if ( NULL != rprocs ) {
free ( rprocs);
rprocs=NULL;
}
}
return rprocs;
}
/**********************************************************************/
/**********************************************************************/
@ -697,57 +759,41 @@ return rprocs;
int ompi_comm_determine_first ( ompi_communicator_t *intercomm, int high )
{
int flag, rhigh;
int local_rank, rc;
ompi_proc_t *lvpid, *rvpid;
ompi_ns_cmp_bitmask_t mask;
int rank, rsize;
int *rcounts;
MPI_Aint *rdisps;
int scount=0;
int *sbuf;
int rc;
lvpid = intercomm->c_local_group->grp_proc_pointers[0];
rvpid = intercomm->c_remote_group->grp_proc_pointers[0];
local_rank = ompi_comm_rank ( intercomm );
/*
* determine maximal high value over the intercomm
*/
mask = OMPI_NS_CMP_CELLID | OMPI_NS_CMP_JOBID | OMPI_NS_CMP_VPID;
if ( ompi_name_server.compare(mask, &lvpid->proc_name,
&rvpid->proc_name) < 0 ) {
if ( 0 == local_rank ) {
rc = intercomm->c_coll.coll_bcast(&high, 1, MPI_INT, MPI_ROOT,
intercomm );
}
else {
rc = intercomm->c_coll.coll_bcast(&high, 1, MPI_INT, MPI_PROC_NULL,
intercomm );
}
if ( rc != MPI_SUCCESS ) {
return rc;
}
rc = intercomm->c_coll.coll_bcast ( &rhigh, 1, MPI_INT, 0, intercomm );
if ( rc != MPI_SUCCESS ) {
return rc;
}
rank = ompi_comm_rank (intercomm);
rsize= ompi_comm_remote_size (intercomm);
rdisps = (MPI_Aint *) calloc ( rsize, sizeof(MPI_Aint));
rcounts = (int *) calloc ( rsize, sizeof(int));
if ( NULL == rdisps || NULL == rcounts ){
return OMPI_ERR_OUT_OF_RESOURCE;
}
else {
rc = intercomm->c_coll.coll_bcast ( &rhigh, 1, MPI_INT, 0, intercomm );
if ( rc != MPI_SUCCESS ) {
return rc;
}
if ( 0 == local_rank ) {
rc = intercomm->c_coll.coll_bcast ( &high, 1, MPI_INT, MPI_ROOT,
intercomm );
}
else {
rc = intercomm->c_coll.coll_bcast(&high, 1, MPI_INT, MPI_PROC_NULL,
intercomm);
}
if ( rc != MPI_SUCCESS ) {
return rc;
}
rcounts[0] = 1;
sbuf = &high;
if ( 0 == rank ) {
scount = 1;
}
rc = intercomm->c_coll.coll_allgatherv(sbuf, scount, MPI_INT,
&rhigh, rcounts, rdisps,
MPI_INT, intercomm);
if ( rc != OMPI_SUCCESS ) {
flag = -1;
}
if ( NULL != rdisps ) {
free ( rdisps );
}
if ( NULL != rcounts ) {
free ( rcounts );
}
/* This is the logic for determining who is first, who is second */
if ( high && !rhigh ) {
@ -756,8 +802,8 @@ int ompi_comm_determine_first ( ompi_communicator_t *intercomm, int high )
else if ( !high && rhigh ) {
flag = false;
}
#if 0
else {
if ( lvpid > rvpid ) {
flag = true;
}
@ -765,6 +811,7 @@ int ompi_comm_determine_first ( ompi_communicator_t *intercomm, int high )
flag = false;
}
}
#endif
return flag;
}

Просмотреть файл

@ -28,30 +28,39 @@
* and a bridge-comm (intercomm-create scenario).
*/
typedef int ompi_comm_cid_allredfct (int *inbuf, int* outbuf, int count,
ompi_op_t *op, ompi_communicator_t *comm,
typedef int ompi_comm_cid_allredfct (int *inbuf, int* outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
void* lleader, void* rleader );
static int ompi_comm_allreduce_intra ( int *inbuf, int* outbuf, int count,
ompi_op_t *op, ompi_communicator_t *intercomm,
static int ompi_comm_allreduce_intra (int *inbuf, int* outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader, void* remote_ledaer );
void* local_leader,
void* remote_ledaer );
static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf, int count,
ompi_op_t *op, ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader, void* remote_leader );
static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader);
static int ompi_comm_allreduce_intra_bridge(int *inbuf, int* outbuf, int count,
ompi_op_t *op, ompi_communicator_t *intercomm,
static int ompi_comm_allreduce_intra_bridge(int *inbuf, int* outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader, void* remote_leader );
void* local_leader,
void* remote_leader);
static int ompi_comm_allreduce_intra_oob (int *inbuf, int* outbuf, int count,
ompi_op_t *op, ompi_communicator_t *intercomm,
static int ompi_comm_allreduce_intra_oob (int *inbuf, int* outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader, void* remote_leader );
void* local_leader,
void* remote_leader);
int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
@ -79,19 +88,17 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
switch (mode)
{
case OMPI_COMM_CID_INTRA:
allredfnct = (ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra;
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra;
break;
#if 0
case OMPI_COMM_CID_INTER:
allredfnct = (ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter;
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_inter;
break;
#endif
case OMPI_COMM_CID_INTRA_BRIDGE:
allredfnct = (ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge;
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge;
break;
#if 0
case OMPI_COMM_CID_INTRA_OOB:
allredfnct = (ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
break;
#endif
default:
@ -104,7 +111,7 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
*/
while ( !done ){
for (i=start; i<OMPI_MAX_COMM ;i++) {
flag = ompi_pointer_array_test_and_set_item (&ompi_mpi_communicators, i, comm);
flag=ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators, i, comm);
if (true == flag) {
nextlocal_cid = i;
break;
@ -117,8 +124,9 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
response = 1; /* fine with me */
}
else {
ompi_pointer_array_set_item ( &ompi_mpi_communicators, nextlocal_cid, NULL);
flag = ompi_pointer_array_test_and_set_item ( &ompi_mpi_communicators,
ompi_pointer_array_set_item(&ompi_mpi_communicators,
nextlocal_cid, NULL);
flag = ompi_pointer_array_test_and_set_item(&ompi_mpi_communicators,
nextcid, comm );
if (true == flag) {
response = 1; /* works as well */
@ -145,39 +153,43 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
return (MPI_SUCCESS);
}
/********************************************************************************/
/********************************************************************************/
/********************************************************************************/
/**************************************************************************/
/**************************************************************************/
/**************************************************************************/
/* Arguments not used in this implementation:
* - bridgecomm
* - local_leader
* - remote_leader
*/
static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf, int count,
ompi_op_t *op, ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
void* local_leader, void* remote_leader )
static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader )
{
return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT, op,
comm );
return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT,
op,comm );
}
#if 0
/* Arguments not used in this implementation:
* - bridgecomm
* - local_leader
* - remote_leader
*/
static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf, int count,
ompi_op_t *op, ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader, void* remote_leader )
static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader )
{
int local_rank;
ompi_proc_t *lvpid, *rvpid;
int i;
int *tmpbuf;
int rc;
int local_rank, rsize;
int i, rc;
int *sbuf;
int *tmpbuf=NULL;
int *rcounts=NULL, scount=0;
int *rdisps=NULL;
if ( &ompi_mpi_op_sum != op && &ompi_mpi_op_prod != op &&
&ompi_mpi_op_max != op && &ompi_mpi_op_min != op ) {
@ -188,37 +200,44 @@ static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf, int count,
return MPI_ERR_COMM;
}
/* Allocate temporary arrays */
rsize = ompi_comm_remote_size (intercomm);
local_rank = ompi_comm_rank ( intercomm );
tmpbuf = (int *) malloc ( count * sizeof(int));
if ( NULL == tmpbuf ) {
return MPI_ERR_INTERN;
}
tmpbuf = (int *) malloc ( count * sizeof(int));
rdisps = (int *) calloc ( rsize, sizeof(int));
rcounts = (int *) calloc ( rsize, sizeof(int) );
if ( NULL == tmpbuf || NULL == rdisps || NULL == rcounts ) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
/* Execute the inter-allreduce: the result of our group will
be in the buffer of the remote group */
rc = intercomm->c_coll.coll_allreduce ( inbuf, tmpbuf, count, MPI_INT,
op, intercomm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
lvpid = intercomm->c_local_group->grp_proc_pointers[0];
rvpid = intercomm->c_remote_group->grp_proc_pointers[0];
if ( 0 == local_rank ) {
MPI_Request req;
MPI_Status status;
/* local leader exchange data */
rc = mca_pml.pml_irecv (outbuf, count, MPI_INT, 0, OMPI_COMM_CID_TAG,
intercomm, &req );
/* for the allgatherv later */
scount = count;
/* local leader exchange their data and determine the overall result
for both groups */
rc = mca_pml.pml_irecv (outbuf, count, MPI_INT, 0,
OMPI_COMM_CID_TAG, intercomm, &req );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = mca_pml.pml_send (tmpbuf, count, MPI_INT, 0, OMPI_COMM_CID_TAG,
rc = mca_pml.pml_send (tmpbuf,count,MPI_INT,0,OMPI_COMM_CID_TAG,
MCA_PML_BASE_SEND_STANDARD, intercomm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = mca_pml.pml_wait ( 1, &req, NULL, &status);
rc = mca_pml.pml_wait_all ( 1, &req, MPI_STATUS_IGNORE );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
@ -244,57 +263,41 @@ static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf, int count,
}
}
}
/* Bcast result to both groups */
if ( lvpid->proc_vpid > rvpid->proc_vpid ) {
if ( 0 == local_rank ) {
rc = intercomm->c_coll.coll_bcast ( &outbuf, count, MPI_INT,
MPI_ROOT, intercomm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
}
else {
rc = intercomm->c_coll.coll_bcast ( &outbuf, count, MPI_INT,
MPI_PROC_NULL, intercomm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
}
rc = intercomm->c_coll.coll_bcast ( &outbuf, count, MPI_INT, 0,
intercomm );
}
else {
rc = intercomm->c_coll.coll_bcast ( &outbuf, count, MPI_INT, 0,
intercomm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
if ( 0 == local_rank )
rc = intercomm->c_coll.coll_bcast ( &outbuf, count, MPI_INT,
MPI_ROOT, intercomm );
else
intercomm->c_coll.coll_bcast ( &outbuf, count, MPI_INT,
MPI_PROC_NULL, intercomm );
}
/* distribute the overall result to all processes in the other group.
Instead of using bcast, we are using here allgatherv, to avoid the
possible deadlock. Else, we need an algorithm to determine,
which group sends first in the inter-bcast and which receives
the result first.
*/
rcounts[0] = count;
sbuf = outbuf;
rc = intercomm->c_coll.coll_allgatherv (sbuf, scount, MPI_INT, outbuf,
rcounts, rdisps, MPI_INT,
intercomm);
exit:
if ( NULL != tmpbuf ) {
free ( tmpbuf );
}
if ( NULL != rcounts ) {
free ( rcounts );
}
if ( NULL != rdisps ) {
free ( rdisps );
}
return (rc);
}
#endif
/* Arguments not used in this implementation:
all arguments are in use.
*/
static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, int count,
ompi_op_t *op, ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
void* lleader, void* rleader )
static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bcomm,
void* lleader, void* rleader )
{
int *tmpbuf=NULL;
int local_rank;
@ -325,20 +328,19 @@ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, int count,
if (local_rank == local_leader ) {
MPI_Request req;
MPI_Status status;
rc = mca_pml.pml_irecv ( outbuf, count, MPI_INT, remote_leader,
OMPI_COMM_CID_TAG, bridgecomm, &req );
OMPI_COMM_CID_TAG, bcomm, &req );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = mca_pml.pml_send (tmpbuf, count, MPI_INT, remote_leader,
OMPI_COMM_CID_TAG, MCA_PML_BASE_SEND_STANDARD,
bridgecomm );
OMPI_COMM_CID_TAG,
MCA_PML_BASE_SEND_STANDARD, bcomm );
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = mca_pml.pml_wait ( 1, &req, NULL, &status);
rc = mca_pml.pml_wait_all ( 1, &req, MPI_STATUS_IGNORE);
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
@ -366,7 +368,8 @@ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, int count,
}
rc = comm->c_coll.coll_bcast ( outbuf, count, MPI_INT, local_leader, comm);
rc = comm->c_coll.coll_bcast ( outbuf, count, MPI_INT, local_leader,
comm);
exit:
if (NULL != tmpbuf ) {
@ -383,8 +386,9 @@ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf, int count,
* lleader and rleader are the OOB contact information of the
* root processes.
*/
static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf, int count,
ompi_op_t *op, ompi_communicator_t *comm,
static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,
int count, ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
void* lleader, void* rleader )
{
@ -424,7 +428,8 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf, int count,
MPI_Status status;
/*
* To be done:: OOB sendrecv between the two leaders the local result.
* To be done:: OOB sendrecv between the two leaders the local
* result.
*/
if ( &ompi_mpi_op_max == op ) {
for ( i = 0 ; i < count; i++ ) {
@ -449,7 +454,8 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf, int count,
}
rc = comm->c_coll.coll_bcast_intra ( outbuf, count, MPI_INT, local_leader, comm);
rc = comm->c_coll.coll_bcast_intra ( outbuf, count, MPI_INT,
local_leader, comm);
exit:
if (NULL != tmpbuf ) {

Просмотреть файл

@ -239,7 +239,7 @@ extern "C" {
* and sets all other elements to zero.
*/
ompi_communicator_t* ompi_comm_allocate ( int local_group_size,
int remote_group_size );
int remote_group_size );
/**
* allocate new communicator ID
@ -276,15 +276,16 @@ extern "C" {
* This is THE routine, where all the communicator stuff
* is really set.
*/
ompi_communicator_t* ompi_comm_set ( ompi_communicator_t* oldcomm,
int local_size,
ompi_proc_t **local_procs,
int remote_size,
ompi_proc_t **remote_procs,
ompi_hash_table_t *attr,
ompi_errhandler_t *errh,
mca_base_component_t *collcomponent,
mca_base_component_t *topocomponent );
int ompi_comm_set ( ompi_communicator_t* newcomm,
ompi_communicator_t* oldcomm,
int local_size,
ompi_proc_t **local_procs,
int remote_size,
ompi_proc_t **remote_procs,
ompi_hash_table_t *attr,
ompi_errhandler_t *errh,
mca_base_component_t *collcomponent,
mca_base_component_t *topocomponent );
/**
* This is a short-hand routine used in intercomm_create.
* The routine makes sure, that all processes have afterwards