1
1

First cut of the basic collective module. Much work still needs to be

done -- still need:
- safe collective tags on user communuicators (negative tags)
- PML entry points for all MPI point-to-point calls
- datatype buffer and copy functions
As such, there's lots of warnings in this code right now (although it does
compile).  It will not run properly at all.

This commit was SVN r268.
Этот коммит содержится в:
Jeff Squyres 2004-01-11 21:26:55 +00:00
родитель 34048ae22d
Коммит 03bd284527
27 изменённых файлов: 2466 добавлений и 0 удалений

14
src/mca/mpi/coll/basic/.cvsignore Обычный файл
Просмотреть файл

@ -0,0 +1,14 @@
Makefile
Makefile.in
acinclude.m4
aclocal.m4
configure
configure.ac
config.log
config.status
libtool
autom4te.cache
.libs
.deps
*.la
.lam*

39
src/mca/mpi/coll/basic/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,39 @@
# -*- makefile -*-
#
# $HEADER$
#
# Use the top-level LAM Makefile.options
include $(top_lam_srcdir)/config/Makefile.options
SUBDIRS = src
DIST_SUBDIRS = config $(SUBDIRS)
EXTRA_DIST = VERSION
# According to the MCA spec, we have to make the output library here
# in the top-level directory, and it has to be named
# liblam_ssi_coll_lam_basic.la
if LAM_BUILD_LOADABLE_MODULE
module_noinst =
module_install = mca_coll_basic.la
else
module_noinst = libmca_mpi_coll_basic.la
module_install =
endif
lamssiexecdir = $(libdir)/lam
lamssiexec_LTLIBRARIES = $(module_install)
mca_coll_basic_la_SOURCES =
mca_coll_basic_la_LIBADD = \
src/libmca_coll_basic.la \
$(top_lam_builddir)/src/mpi/libmpi.la \
$(top_lam_builddir)/src/lam/liblam.la
mca_coll_basic_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(module_noinst)
libmca_mpi_coll_basic_la_SOURCES =
libmca_mpi_coll_basic_la_LIBADD = src/libmca_coll_basic.la
libmca_mpi_coll_basic_la_LDFLAGS = -module -avoid-version

6
src/mca/mpi/coll/basic/VERSION Обычный файл
Просмотреть файл

@ -0,0 +1,6 @@
major=8
minor=0
release=0
alpha=0
beta=0
cvs=1

9
src/mca/mpi/coll/basic/config/.cvsignore Обычный файл
Просмотреть файл

@ -0,0 +1,9 @@
config.guess
config.sub
depcomp
install-sh
ltmain.sh
Makefile
Makefile.in
missing
mkinstalldirs

15
src/mca/mpi/coll/basic/config/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,15 @@
# -*- makefile -*-
#
# $HEADER$
#
include $(top_lam_srcdir)/config/Makefile.options
# This file is only here so that "make dist" grabs all the extra
# config-level files that are necessary to build a LAM distribution
# tarball. Nothing gets built in this directory.
# Seems to be an automake bug -- depcomp is not automatically included
# in distribution tarballs.
EXTRA_DIST = depcomp

9
src/mca/mpi/coll/basic/configure.params Обычный файл
Просмотреть файл

@ -0,0 +1,9 @@
# -*- shell-script -*-
#
# $HEADER$
#
# Specific to this module
PARAM_INIT_FILE=src/coll_basic.c
PARAM_CONFIG_FILES="Makefile config/Makefile src/Makefile"

12
src/mca/mpi/coll/basic/src/.cvsignore Обычный файл
Просмотреть файл

@ -0,0 +1,12 @@
Makefile
Makefile.in
.libs
*.o
*.lo
*.la
.deps
stamp-h
stamp-h1
stamp-h.in
coll_basic_config.h
coll_basic_config.h.in

33
src/mca/mpi/coll/basic/src/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,33 @@
# -*- makefile -*-
#
# $HEADER$
#
include $(top_lam_srcdir)/config/Makefile.options
AM_CPPFLAGS = \
-I$(top_lam_builddir)/src/include \
-I$(top_lam_srcdir)/src \
-I$(top_lam_srcdir)/src/include
noinst_LTLIBRARIES = libmca_coll_basic.la
libmca_coll_basic_la_SOURCES = \
coll_basic.h \
coll_basic.c \
coll_basic_allgather.c \
coll_basic_allgatherv.c \
coll_basic_allreduce.c \
coll_basic_alltoall.c \
coll_basic_alltoallv.c \
coll_basic_alltoallw.c \
coll_basic_barrier.c \
coll_basic_bcast.c \
coll_basic_gather.c \
coll_basic_gatherv.c \
coll_basic_module.c \
coll_basic_reduce.c \
coll_basic_reduce_scatter.c \
coll_basic_scan.c \
coll_basic_exscan.c \
coll_basic_scatter.c \
coll_basic_scatterv.c

226
src/mca/mpi/coll/basic/src/coll_basic.c Обычный файл
Просмотреть файл

@ -0,0 +1,226 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include <stdio.h>
#include "mpi.h"
#include "mca/mpi/coll/coll.h"
#include "mca/mpi/coll/base/base.h"
#include "coll_basic.h"
/*
* Linear set of collective algorithms
*/
static const mca_coll_1_0_0_t linear = {
/* Per-communicator initialization and finalization functions */
mca_coll_basic_init,
mca_coll_basic_finalize,
/* Checkpoint / restart functions */
NULL,
NULL,
NULL,
NULL,
/* Memory allocation / freeing */
NULL,
NULL,
/* Collective function pointers */
mca_coll_basic_allgather,
NULL,
mca_coll_basic_allgatherv,
NULL,
mca_coll_basic_allreduce,
NULL,
mca_coll_basic_alltoall,
NULL,
mca_coll_basic_alltoallv,
NULL,
mca_coll_basic_alltoallw,
NULL,
mca_coll_basic_barrier_lin,
NULL,
true,
mca_coll_basic_bcast_lin,
NULL,
mca_coll_basic_exscan,
NULL,
mca_coll_basic_gather,
NULL,
mca_coll_basic_gatherv,
NULL,
true,
mca_coll_basic_reduce_lin,
NULL,
mca_coll_basic_reduce_scatter,
NULL,
mca_coll_basic_scan,
NULL,
mca_coll_basic_scatter,
NULL,
mca_coll_basic_scatterv,
NULL
};
/*
* Lograthmic set of collective algorithms. Note that not all
* collectives have lograthmic algorithms. For example, scan will use
* the same algorithm as in the linear set.
*/
static const mca_coll_1_0_0_t log = {
/* Per-communicator initialization and finalization functions */
mca_coll_basic_init,
mca_coll_basic_finalize,
/* Checkpoint / restart functions */
NULL,
NULL,
NULL,
NULL,
/* Memory allocation / freeing */
NULL,
NULL,
/* Collective function pointers */
mca_coll_basic_allgather,
NULL,
mca_coll_basic_allgatherv,
NULL,
mca_coll_basic_allreduce,
NULL,
mca_coll_basic_alltoall,
NULL,
mca_coll_basic_alltoallv,
NULL,
mca_coll_basic_alltoallw,
NULL,
mca_coll_basic_barrier_log,
NULL,
true,
mca_coll_basic_bcast_log,
NULL,
mca_coll_basic_exscan,
NULL,
mca_coll_basic_gather,
NULL,
mca_coll_basic_gatherv,
NULL,
true,
mca_coll_basic_reduce_log,
NULL,
mca_coll_basic_reduce_scatter,
NULL,
mca_coll_basic_scan,
NULL,
mca_coll_basic_scatter,
NULL,
mca_coll_basic_scatterv,
NULL
};
/*
* Initial query function that is invoked during MPI_INIT, allowing
* this module to indicate what level of thread support it provides.
*/
int mca_coll_basic_thread_query(int *thread_min, int *thread_max)
{
*thread_min = MPI_THREAD_SINGLE;
*thread_max = MPI_THREAD_MULTIPLE;
return LAM_SUCCESS;
}
/*
* Invoked when there's a new communicator that has been created.
* Look at the communicator and decide which set of functions and
* priority we want to return.
*/
const mca_coll_1_0_0_t *mca_coll_basic_query(MPI_Comm comm, int *priority)
{
int size;
/* This module should always have the lowest available priority */
*priority = 0;
/* Choose whether to use linear or log-based algorithms. */
MPI_Comm_size(comm, &size);
if (size <= mca_coll_base_crossover) {
return &linear;
} else {
return &log;
}
}
/*
* Init on the communicator
*/
int mca_coll_basic_init(MPI_Comm comm, const mca_coll_1_0_0_t **new_coll)
{
/* Nothing to init on the communicator */
return LAM_SUCCESS;
}
/*
* Finalize on the communicator
*/
int mca_coll_basic_finalize(MPI_Comm comm)
{
/* Nothing to finalize on the communicator */
return LAM_SUCCESS;
}

109
src/mca/mpi/coll/basic/src/coll_basic.h Обычный файл
Просмотреть файл

@ -0,0 +1,109 @@
/*
* $HEADER$
*/
#ifndef MCA_COLL_BASIC_EXPORT_H
#define MCA_COLL_BASIC_EXPORT_H
#include "lam_config.h"
#include "mca/mca.h"
#include "mca/mpi/coll/coll.h"
/*
* Globally exported variable
*/
extern const mca_coll_base_module_1_0_0_t mca_coll_basic_module;
/*
* coll API functions
*/
#ifdef __cplusplus
extern "C" {
#endif
/* basic's query function is prototyped in <mca/mpi/coll/coll.h>
because other modules may invoke it. */
int mca_coll_basic_thread_query(int *thread_min, int *thread_max);
int mca_coll_basic_init(MPI_Comm comm, const mca_coll_1_0_0_t **new_coll);
int mca_coll_basic_finalize(MPI_Comm comm);
int mca_coll_basic_allgather(void *sbuf, int scount,
MPI_Datatype sdtype, void *rbuf,
int rcount, MPI_Datatype rdtype,
MPI_Comm comm);
int mca_coll_basic_allgatherv(void *sbuf, int scount,
MPI_Datatype sdtype, void * rbuf,
int *rcounts, int *disps,
MPI_Datatype rdtype,
MPI_Comm comm);
int mca_coll_basic_allreduce(void *sbuf, void *rbuf, int count,
MPI_Datatype dtype, MPI_Op op,
MPI_Comm comm);
int mca_coll_basic_alltoall(void *sbuf, int scount,
MPI_Datatype sdtype, void* rbuf,
int rcount, MPI_Datatype rdtype,
MPI_Comm comm);
int mca_coll_basic_alltoallv(void *sbuf, int *scounts,
int *sdisps, MPI_Datatype sdtype,
void *rbuf, int *rcounts,
int *rdisps, MPI_Datatype rdtype,
MPI_Comm comm);
int mca_coll_basic_alltoallw(void *sbuf, int *scounts,
int *sdisps, MPI_Datatype *sdtypes,
void *rbuf, int *rcounts,
int *rdisps, MPI_Datatype *rdtypes,
MPI_Comm comm);
int mca_coll_basic_barrier_lin(MPI_Comm comm);
int mca_coll_basic_barrier_log(MPI_Comm comm);
int mca_coll_basic_bcast_lin(void *buff, int count,
MPI_Datatype datatype, int root,
MPI_Comm comm);
int mca_coll_basic_bcast_log(void *buff, int count,
MPI_Datatype datatype, int root,
MPI_Comm comm);
int mca_coll_basic_exscan(void *sbuf, void *rbuf, int count,
MPI_Datatype dtype, MPI_Op op,
MPI_Comm comm);
int mca_coll_basic_gather(void *sbuf, int scount,
MPI_Datatype sdtype, void *rbuf,
int rcount, MPI_Datatype rdtype,
int root, MPI_Comm comm);
int mca_coll_basic_gatherv(void *sbuf, int scount,
MPI_Datatype sdtype, void *rbuf,
int *rcounts, int *disps,
MPI_Datatype rdtype, int root,
MPI_Comm comm);
int mca_coll_basic_reduce_lin(void *sbuf, void* rbuf, int count,
MPI_Datatype dtype, MPI_Op op,
int root, MPI_Comm comm);
int mca_coll_basic_reduce_log(void *sbuf, void* rbuf, int count,
MPI_Datatype dtype, MPI_Op op,
int root, MPI_Comm comm);
int mca_coll_basic_reduce_scatter(void *sbuf, void *rbuf,
int *rcounts,
MPI_Datatype dtype,
MPI_Op op, MPI_Comm comm);
int mca_coll_basic_scan(void *sbuf, void *rbuf, int count,
MPI_Datatype dtype, MPI_Op op,
MPI_Comm comm);
int mca_coll_basic_scatter(void *sbuf, int scount,
MPI_Datatype sdtype, void *rbuf,
int rcount, MPI_Datatype rdtype,
int root, MPI_Comm comm);
int mca_coll_basic_scatterv(void *sbuf, int *scounts,
int *disps, MPI_Datatype sdtype,
void* rbuf, int rcount,
MPI_Datatype rdtype, int root,
MPI_Comm comm);
#ifdef __cplusplus
}
#endif
#endif /* MCA_COLL_BASIC_EXPORT_H */

Просмотреть файл

@ -0,0 +1,42 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include "lam/constants.h"
#include "mpi.h"
#include "mpi/communicator/communicator.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* allgather
*
* Function: - allgather using other MPI collections
* Accepts: - same as MPI_Allgather()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_allgather(void *sbuf, int scount,
MPI_Datatype sdtype, void *rbuf,
int rcount, MPI_Datatype rdtype,
MPI_Comm comm)
{
int size;
int err;
/* Gather and broadcast. */
MPI_Comm_size(comm, &size);
err = comm->c_coll.coll_gather_intra(sbuf, scount, sdtype, rbuf, rcount,
rdtype, 0, comm);
if (MPI_SUCCESS != err)
return err;
err = comm->c_coll.coll_bcast_intra(rbuf, rcount * size, rdtype,
0, comm);
return err;
}

Просмотреть файл

@ -0,0 +1,42 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include "lam/constants.h"
#include "mpi.h"
#include "mpi/communicator/communicator.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* allgatherv
*
* Function: - allgather using other MPI collectives
* Accepts: - same as MPI_Allgatherv()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_allgatherv(void *sbuf, int scount,
MPI_Datatype sdtype, void * rbuf,
int *rcounts, int *disps,
MPI_Datatype rdtype,
MPI_Comm comm)
{
int i, size;
int err;
/* Collect all values at each process, one at a time. */
MPI_Comm_size(comm, &size);
for (i = 0; i < size; ++i) {
err = comm->c_coll.coll_gatherv_intra(sbuf, scount, sdtype, rbuf,
rcounts, disps, rdtype, i, comm);
if (MPI_SUCCESS != err)
return err;
}
return MPI_SUCCESS;
}

Просмотреть файл

@ -0,0 +1,36 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include "lam/constants.h"
#include "mpi.h"
#include "mpi/communicator/communicator.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* allreduce
*
* Function: - allreduce using other MPI collectives
* Accepts: - same as MPI_Allreduce()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_allreduce(void *sbuf, void *rbuf, int count,
MPI_Datatype dtype, MPI_Op op,
MPI_Comm comm)
{
int err;
/* Reduce to 0 and broadcast. */
err = comm->c_coll.coll_reduce_intra(sbuf, rbuf, count, dtype,
op, 0, comm);
if (MPI_SUCCESS != err)
return err;
return comm->c_coll.coll_bcast_intra(rbuf, count, dtype, 0, comm);
}

Просмотреть файл

@ -0,0 +1,148 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include <stdio.h>
#include <errno.h>
#include "lam/constants.h"
#include "mpi.h"
#include "lam/util/malloc.h"
#include "mpi/datatype/datatype.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* alltoall
*
* Function: - MPI_Alltoall for non-lamd RPI's
* Accepts: - same as MPI_Alltoall()
* Returns: - MPI_SUCCESS or an MPI error code
*/
int mca_coll_basic_alltoall(void *sbuf, int scount,
MPI_Datatype sdtype, void *rbuf,
int rcount, MPI_Datatype rdtype,
MPI_Comm comm)
{
int i;
int rank;
int size;
int nreqs;
int err;
char *psnd;
char *prcv;
MPI_Aint sndinc;
MPI_Aint rcvinc;
MPI_Request *req;
MPI_Request *preq;
MPI_Request *qreq;
/* Initialize. */
MPI_Comm_size(comm, &size);
MPI_Comm_rank(comm, &rank);
MPI_Type_extent(sdtype, &sndinc);
MPI_Type_extent(rdtype, &rcvinc);
sndinc *= scount;
rcvinc *= rcount;
/* Allocate arrays of requests. */
nreqs = 2 * (size - 1);
if (nreqs > 0) {
req = (MPI_Request *) LAM_MALLOC(nreqs * sizeof(MPI_Request));
if (NULL == req) {
LAM_FREE(req);
return ENOMEM;
}
} else {
req = NULL;
}
/* simple optimization */
psnd = ((char *) sbuf) + (rank * sndinc);
prcv = ((char *) rbuf) + (rank * rcvinc);
#if 0
/* JMS: Need a lam_datatype_something() here that allows two
different datatypes */
err = lam_dtsndrcv(psnd, scount, sdtype,
prcv, rcount, rdtype, BLKMPIALLTOALL, comm);
if (MPI_SUCCESS != err) {
if (NULL != req)
LAM_FREE(req);
lam_mkpt(comm);
return err;
}
#endif
/* If only one process, we're done. */
if (1 == size) {
return MPI_SUCCESS;
}
/* Initiate all send/recv to/from others. */
preq = req;
qreq = req + size - 1;
prcv = (char*) rbuf;
psnd = (char*) sbuf;
for (i = (rank + 1) % size; i != rank;
i = (i + 1) % size, ++preq, ++qreq) {
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Recv_init(prcv + (i * rcvinc), rcount, rdtype, i,
BLKMPIALLTOALL, comm, preq);
if (MPI_SUCCESS != err) {
LAM_FREE(req);
return err;
}
#endif
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Send_init(psnd + (i * sndinc), scount, sdtype, i,
BLKMPIALLTOALL, comm, qreq);
if (MPI_SUCCESS != err) {
LAM_FREE(req);
return err;
}
#endif
}
/* Start all the requests. */
err = MPI_Startall(nreqs, req);
if (MPI_SUCCESS != err) {
LAM_FREE(req);
return err;
}
/* Wait for them all. */
err = MPI_Waitall(nreqs, req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) {
LAM_FREE(req);
return err;
}
for (i = 0, preq = req; i < nreqs; ++i, ++preq) {
err = MPI_Request_free(preq);
if (MPI_SUCCESS != err) {
LAM_FREE(req);
return err;
}
}
/* All done */
LAM_FREE(req);
return MPI_SUCCESS;
}

Просмотреть файл

@ -0,0 +1,153 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include <stdio.h>
#include <errno.h>
#include "lam/constants.h"
#include "mpi.h"
#include "lam/util/malloc.h"
#include "mpi/datatype/datatype.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* alltoallv
*
* Function: - MPI_Alltoallv for non-lamd RPIs
* Accepts: - same as MPI_Alltoallv()
* Returns: - MPI_SUCCESS or an MPI error code
*/
int
mca_coll_basic_alltoallv(void *sbuf, int *scounts, int *sdisps,
MPI_Datatype sdtype, void *rbuf,
int *rcounts, int *rdisps,
MPI_Datatype rdtype, MPI_Comm comm)
{
int i;
int size;
int rank;
int nreqs;
int err;
char *psnd;
char *prcv;
MPI_Aint sndextent;
MPI_Aint rcvextent;
MPI_Request *req;
MPI_Request *preq;
/* Initialize. */
MPI_Comm_size(comm, &size);
MPI_Comm_rank(comm, &rank);
MPI_Type_extent(sdtype, &sndextent);
MPI_Type_extent(rdtype, &rcvextent);
/* Allocate arrays of requests. */
nreqs = 2 * (size - 1);
if (nreqs > 0) {
req = (MPI_Request *) LAM_MALLOC(nreqs * sizeof(MPI_Request));
if (NULL == req) {
LAM_FREE(req);
return ENOMEM;
}
} else {
req = NULL;
}
/* simple optimization */
psnd = ((char *) sbuf) + (sdisps[rank] * sndextent);
prcv = ((char *) rbuf) + (rdisps[rank] * rcvextent);
#if 0
/* JMS: Need a lam_datatype_something() here that allows two
different datatypes */
err = lam_dtsndrcv(psnd, scounts[rank], sdtype,
prcv, rcounts[rank], rdtype, BLKMPIALLTOALLV, comm);
if (MPI_SUCCESS != err) {
if (NULL != req)
LAM_FREE(req);
return err;
}
#endif
/* If only one process, we're done. */
if (1 == size) {
return MPI_SUCCESS;
}
/* Initiate all send/recv to/from others. */
preq = req;
for (i = 0; i < size; ++i) {
if (i == rank)
continue;
prcv = ((char *) rbuf) + (rdisps[i] * rcvextent);
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Recv_init(prcv, rcounts[i], rdtype,
i, BLKMPIALLTOALLV, comm, preq++);
if (MPI_SUCCESS != err) {
LAM_FREE(req);
return err;
}
#endif
}
for (i = 0; i < size; ++i) {
if (i == rank)
continue;
psnd = ((char *) sbuf) + (sdisps[i] * sndextent);
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Send_init(psnd, scounts[i], sdtype,
i, BLKMPIALLTOALLV, comm, preq++);
if (MPI_SUCCESS != err) {
LAM_FREE(req);
return err;
}
#endif
}
/* Start all requests. */
err = MPI_Startall(nreqs, req);
if (MPI_SUCCESS != err) {
LAM_FREE(req);
return err;
}
/* Wait for them all. */
err = MPI_Waitall(nreqs, req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) {
LAM_FREE(req);
return err;
}
/* Free the requests. */
for (i = 0, preq = req; i < nreqs; ++i, ++preq) {
err = MPI_Request_free(preq);
if (err != MPI_SUCCESS) {
LAM_FREE(req);
return err;
}
}
/* All done */
LAM_FREE(req);
return MPI_SUCCESS;
}

Просмотреть файл

@ -0,0 +1,148 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include <stdio.h>
#include <errno.h>
#include "lam/constants.h"
#include "mpi.h"
#include "lam/util/malloc.h"
#include "mpi/datatype/datatype.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* alltoallw
*
* Function: - MPI_Alltoallw for non-lamd RPIs
* Accepts: - same as MPI_Alltoallw()
* Returns: - MPI_SUCCESS or an MPI error code
*/
int mca_coll_basic_alltoallw(void *sbuf, int *scounts, int *sdisps,
MPI_Datatype *sdtypes, void *rbuf,
int *rcounts, int *rdisps,
MPI_Datatype *rdtypes, MPI_Comm comm)
{
int i;
int size;
int rank;
int nreqs;
int err;
char *psnd;
char *prcv;
MPI_Request *req;
MPI_Request *preq;
/* Initialize. */
MPI_Comm_size(comm, &size);
MPI_Comm_rank(comm, &rank);
/* Allocate arrays of requests. */
nreqs = 2 * (size - 1);
if (nreqs > 0) {
req = (MPI_Request *) LAM_MALLOC(nreqs * sizeof(MPI_Request));
if (NULL == req) {
LAM_FREE(req);
return ENOMEM;
}
} else {
req = NULL;
}
/* simple optimization */
psnd = ((char *) sbuf) + sdisps[rank];
prcv = ((char *) rbuf) + rdisps[rank];
#if 0
/* JMS: Need a lam_datatype_something() here that allows two
different datatypes */
err = lam_dtsndrcv(psnd, scounts[rank], sdtypes[rank],
prcv, rcounts[rank], rdtypes[rank], BLKMPIALLTOALLW, comm);
if (MPI_SUCCESS != err) {
if (MPI_REQUEST_NULL != req)
LAM_FREE(req);
return err;
}
#endif
/* If only one process, we're done. */
if (1 == size) {
return MPI_SUCCESS;
}
/* Initiate all send/recv to/from others. */
preq = req;
for (i = 0; i < size; ++i) {
if (i == rank)
continue;
prcv = ((char *) rbuf) + rdisps[i];
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Recv_init(prcv, rcounts[i], rdtypes[i],
i, BLKMPIALLTOALLW, comm, preq++);
if (MPI_SUCCESS != err) {
LAM_FREE(req);
return err;
}
#endif
}
for (i = 0; i < size; ++i) {
if (i == rank)
continue;
psnd = ((char *) sbuf) + sdisps[i];
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Send_init(psnd, scounts[i], sdtypes[i],
i, BLKMPIALLTOALLW, comm, preq++);
if (MPI_SUCCESS != err) {
LAM_FREE(req);
return err;
}
#endif
}
/* Start all requests. */
err = MPI_Startall(nreqs, req);
if (MPI_SUCCESS != err) {
LAM_FREE(req);
return err;
}
/* Wait for them all. */
err = MPI_Waitall(nreqs, req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) {
LAM_FREE(req);
return err;
}
/* Free the requests. */
for (i = 0, preq = req; i < nreqs; ++i, ++preq) {
err = MPI_Request_free(preq);
if (MPI_SUCCESS != err) {
LAM_FREE(req);
return err;
}
}
/* All done */
LAM_FREE(req);
return MPI_SUCCESS;
}

Просмотреть файл

@ -0,0 +1,178 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include "lam/constants.h"
#include "mpi.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* barrier_lin
*
* Function: - barrier using O(N) algorithm
* Accepts: - same as MPI_Barrier()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_barrier_lin(MPI_Comm comm)
{
int size;
int rank;
int err;
int i;
MPI_Comm_size(comm, &size);
MPI_Comm_rank(comm, &rank);
/* All non-root send & receive zero-length message. */
if (rank > 0) {
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Send((void *) 0, 0, MPI_BYTE, 0, BLKMPIBARRIER, comm);
if (MPI_SUCCESS != err) {
return err;
}
#endif
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Recv((void *) 0, 0, MPI_BYTE, 0, BLKMPIBARRIER, comm,
MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) {
return err;
}
#endif
}
/* The root collects and broadcasts the messages. */
else {
for (i = 1; i < size; ++i) {
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Recv((void *) 0, 0, MPI_BYTE, MPI_ANY_SOURCE,
BLKMPIBARRIER, comm, MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) {
return err;
}
#endif
}
for (i = 1; i < size; ++i) {
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Send((void *) 0, 0, MPI_BYTE, i, BLKMPIBARRIER, comm);
if (MPI_SUCCESS != err) {
return err;
}
#endif
}
}
/* All done */
return MPI_SUCCESS;
}
/*
* barrier_log
*
* Function: - barrier using O(log(N)) algorithm
* Accepts: - same as MPI_Barrier()
* Returns: - MPI_SUCCESS or error code
*/
int
mca_coll_basic_barrier_log(MPI_Comm comm)
{
int size;
int rank;
int peer;
int dim;
int hibit;
int mask;
int err;
int i;
/* Send null-messages up and down the tree. Synchronization at the
root (rank 0). */
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
#if 0
/* JMS Need to cache this info somewhere */
dim = comm->c_cube_dim;
hibit = lam_hibit(rank, dim);
#endif
--dim;
/* Receive from children. */
for (i = dim, mask = 1 << i; i > hibit; --i, mask >>= 1) {
peer = rank | mask;
if (peer < size) {
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Recv((void *) 0, 0, MPI_BYTE,
peer, BLKMPIBARRIER, comm, MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) {
return err;
}
#endif
}
}
/* Send to and receive from parent. */
if (rank > 0) {
peer = rank & ~(1 << hibit);
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Send((void *) 0, 0, MPI_BYTE, peer, BLKMPIBARRIER, comm);
if (MPI_SUCCESS != err) {
return err;
}
#endif
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Recv((void *) 0, 0, MPI_BYTE, peer,
BLKMPIBARRIER, comm, MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) {
return err;
}
#endif
}
/* Send to children. */
for (i = hibit + 1, mask = 1 << i; i <= dim; ++i, mask <<= 1) {
peer = rank | mask;
if (peer < size) {
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Send((void *) 0, 0, MPI_BYTE, peer, BLKMPIBARRIER, comm);
if (MPI_SUCCESS != err) {
return err;
}
#endif
}
}
/* All done */
return MPI_SUCCESS;
}

193
src/mca/mpi/coll/basic/src/coll_basic_bcast.c Обычный файл
Просмотреть файл

@ -0,0 +1,193 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include "lam/constants.h"
#include "mpi.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* bcast_lin
*
* Function: - broadcast using O(N) algorithm
* Accepts: - same arguments as MPI_Bcast()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_bcast_lin(void *buff, int count,
MPI_Datatype datatype, int root,
MPI_Comm comm)
{
int i;
int size;
int rank;
int err;
MPI_Request *preq;
/* JMS: Need to define this somewhere */
#define LAM_COLLMAXLIN 4
MPI_Request reqs[LAM_COLLMAXLIN];
MPI_Comm_size(comm, &size);
MPI_Comm_rank(comm, &rank);
/* Non-root receive the data. */
if (rank != root) {
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
return MPI_Recv(buff, count, datatype, root,
BLKMPIBCAST, comm, MPI_STATUS_IGNORE);
#endif
}
/* Root sends data to all others. */
for (i = 0, preq = reqs; i < size; ++i) {
if (i == rank)
continue;
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Send_init(buff, count, datatype, i, BLKMPIBCAST,
comm, preq++);
if (MPI_SUCCESS != err) {
return err;
}
#endif
}
/* Start and wait on all requests. */
err = MPI_Startall(size - 1, reqs);
if (MPI_SUCCESS != err) {
return err;
}
err = MPI_Waitall(size - 1, reqs, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) {
return err;
}
/* Free the requests. */
for (i = 0, preq = reqs; i < size; ++i) {
if (i == rank)
continue;
err = MPI_Request_free(preq);
if (MPI_SUCCESS != err)
return err;
++preq;
}
/* All done */
return MPI_SUCCESS;
}
/*
* bcast_log
*
* Function: - broadcast using O(log(N)) algorithm
* Accepts: - same arguments as MPI_Bcast()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_bcast_log(void *buff, int count,
MPI_Datatype datatype, int root,
MPI_Comm comm)
{
int i;
int size;
int rank;
int vrank;
int peer;
int dim;
int hibit;
int mask;
int err;
int nreqs;
MPI_Request *preq;
/* JMS: Need to define this somewhere */
#define LAM_COLLMAXDIM 64
MPI_Request reqs[LAM_COLLMAXDIM];
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
vrank = (rank + size - root) % size;
#if 0
/* JMS Need to cache this somewhere */
dim = comm->c_cube_dim;
hibit = lam_hibit(vrank, dim);
#endif
--dim;
/* Receive data from parent in the tree. */
if (vrank > 0) {
peer = ((vrank & ~(1 << hibit)) + root) % size;
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Recv(buff, count, datatype, peer,
BLKMPIBCAST, comm, MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) {
return err;
}
#endif
}
/* Send data to the children. */
preq = reqs;
nreqs = 0;
for (i = hibit + 1, mask = 1 << i; i <= dim; ++i, mask <<= 1) {
peer = vrank | mask;
if (peer < size) {
peer = (peer + root) % size;
++nreqs;
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Send_init(buff, count, datatype, peer, BLKMPIBCAST,
comm, preq++);
if (MPI_SUCCESS != err) {
return err;
}
#endif
}
}
/* Start and wait on all requests. */
if (nreqs > 0) {
err = MPI_Startall(nreqs, reqs);
if (MPI_SUCCESS != err) {
return err;
}
err = MPI_Waitall(nreqs, reqs, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) {
return err;
}
for (i = 0, preq = reqs; i < nreqs; ++i, ++preq) {
err = MPI_Request_free(preq);
if (MPI_SUCCESS != err) {
return err;
}
}
}
/* All done */
return MPI_SUCCESS;
}

Просмотреть файл

@ -0,0 +1,170 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include <stdio.h>
#include "lam/constants.h"
#include "lam/util/malloc.h"
#include "mpi.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* exscan
*
* Function: - basic exscan operation
* Accepts: - same arguments as MPI_Exccan()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_exscan(void *sbuf, void *rbuf, int count,
MPI_Datatype dtype, MPI_Op op, MPI_Comm comm)
{
int size;
int rank;
int err;
char *origin, *tmpbuf = NULL;
char *gathered_buffer = NULL, *gathered_origin;
/* Initialize. */
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
#if 0
/* JMS: Need to replace lots things in this file: lam_dt* stuff with
lam_datatype_*() functions. Also need to replace lots of
MPI_Send/MPI_Recv with negative tags and PML entry points. */
/* Otherwise receive previous buffer and reduce. Store the recieved
buffer in different array and then send the reduced array to the
next process */
/* JMS Need to replace this with some lam_datatype_*() function */
err = lam_dtbuffer(dtype, count, &gathered_buffer, &gathered_origin);
if (MPI_SUCCESS != err) {
return err;
}
if (0 != rank) {
if (!op->op_commute) {
/* JMS Need to replace with this some lam_datatype_*() function */
err = lam_dtbuffer(dtype, count, &tmpbuf, &origin);
if (MPI_SUCCESS != err) {
if (NULL != gathered_buffer) {
LAM_FREE(gathered_buffer);
}
return err;
}
/* Copy the send buffer into the receive buffer. */
/* JMS Need to replace with this some lam_datatype_*() function */
err = lam_dtsndrcv(sbuf, count, dtype, rbuf,
count, dtype, BLKMPIEXSCAN, comm);
if (MPI_SUCCESS != err) {
if (NULL != gathered_buffer) {
LAM_FREE(gathered_buffer);
}
if (NULL != tmpbuf) {
LAM_FREE(tmpbuf);
}
return err;
}
/* JMS Need to replace this with negative tags and PML entry
point */
err = MPI_Recv(origin, count, dtype,
rank - 1, BLKMPIEXSCAN, comm, MPI_STATUS_IGNORE);
/* JMS Need to add error checking here */
/* JMS Need to replace with this some lam_datatype_*() function */
err = lam_dtsndrcv(origin, count, dtype, gathered_origin,
count, dtype, BLKMPIEXSCAN, comm);
} else {
origin = sbuf;
/* JMS Need to replace this with negative tags and PML entry
point */
err = MPI_Recv(rbuf, count, dtype,
rank - 1, BLKMPIEXSCAN, comm, MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) {
if (NULL != gathered_buffer) {
LAM_FREE(gathered_buffer);
}
if (NULL != tmpbuf) {
LAM_FREE(tmpbuf);
}
return err;
}
/* JMS Need to replace with this some lam_datatype_*() function */
err = lam_dtsndrcv(rbuf, count, dtype, gathered_origin,
count, dtype, BLKMPIEXSCAN, comm);
}
if (err != MPI_SUCCESS) {
if (NULL != gathered_buffer) {
LAM_FREE(gathered_buffer);
}
if (NULL != tmpbuf) {
LAM_FREE(tmpbuf);
}
return err;
}
if (op->op_flags & LAM_LANGF77) {
(op->op_func)(origin, rbuf, &count, &dtype->dt_f77handle);
} else {
(op->op_func)(origin, rbuf, &count, &dtype);
}
}
/* Send the result to next process. */
if (rank < (size - 1)) {
if (0 == rank)
err = MPI_Send(sbuf, count, dtype, rank + 1, BLKMPIEXSCAN, comm);
else
err = MPI_Send(rbuf, count, dtype, rank + 1, BLKMPIEXSCAN, comm);
if (MPI_SUCCESS != err) {
if (NULL != gathered_buffer) {
LAM_FREE(gathered_buffer);
}
if (NULL != tmpbuf) {
LAM_FREE(tmpbuf);
}
return err;
}
}
if (rank != 0) {
err = lam_dtsndrcv(gathered_origin, count, dtype, rbuf,
count, dtype, BLKMPIEXSCAN, comm);
if (MPI_SUCCESS != err) {
if (NULL != gathered_buffer) {
LAM_FREE(gathered_buffer);
}
if (NULL != tmpbuf) {
LAM_FREE(tmpbuf);
}
return err;
}
}
#endif
if (NULL != gathered_buffer)
LAM_FREE(gathered_buffer);
if (NULL != tmpbuf)
LAM_FREE(tmpbuf);
/* All done */
return MPI_SUCCESS;
}

Просмотреть файл

@ -0,0 +1,72 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include "lam/constants.h"
#include "mpi.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* gather
*
* Function: - basic gather operation
* Accepts: - same arguments as MPI_Gather()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_gather(void *sbuf, int scount, MPI_Datatype sdtype,
void *rbuf, int rcount, MPI_Datatype rdtype,
int root, MPI_Comm comm)
{
int i;
int err;
int rank;
int size;
char *ptmp;
MPI_Aint incr;
MPI_Aint extent;
#if 0
/* JMS: Need to replace lots things in this file: lam_dt* stuff with
lam_datatype_*() functions. Also need to replace lots of
MPI_Send/MPI_Recv with negative tags and PML entry points. */
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
/* Everyone but root sends data and returns. */
if (rank != root) {
err = MPI_Send(sbuf, scount, sdtype, root, BLKMPIGATHER, comm);
return err;
}
/* I am the root, loop receiving the data. */
MPI_Type_extent(rdtype, &extent);
incr = extent * rcount;
for (i = 0, ptmp = (char *) rbuf; i < size; ++i, ptmp += incr) {
/* simple optimization */
if (i == rank) {
err = lam_dtsndrcv(sbuf, scount, sdtype, ptmp,
rcount, rdtype, BLKMPIGATHER, comm);
} else {
err = MPI_Recv(ptmp, rcount, rdtype, i,
BLKMPIGATHER, comm, MPI_STATUS_IGNORE);
}
if (MPI_SUCCESS != err) {
return err;
}
}
#endif
/* All done */
return MPI_SUCCESS;
}

Просмотреть файл

@ -0,0 +1,73 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include "lam/constants.h"
#include "mpi.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* gatherv
*
* Function: - basic gatherv operation
* Accepts: - same arguments as MPI_Bcast()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_gatherv(void *sbuf, int scount, MPI_Datatype sdtype,
void *rbuf, int *rcounts, int *disps,
MPI_Datatype rdtype, int root,
MPI_Comm comm)
{
int i;
int rank;
int size;
int err;
char *ptmp;
MPI_Aint extent;
#if 0
/* JMS: Need to replace lots things in this file: lam_dt* stuff with
lam_datatype_*() functions. Also need to replace lots of
MPI_Send/MPI_Recv with negative tags and PML entry points. */
MPI_Comm_size(comm, &size);
MPI_Comm_rank(comm, &rank);
/* Everyone but root sends data and returns. */
if (rank != root) {
err = MPI_Send(sbuf, scount, sdtype, root, BLKMPIGATHERV, comm);
return err;
}
/* I am the root, loop receiving data. */
MPI_Type_extent(rdtype, &extent);
for (i = 0; i < size; ++i) {
ptmp = ((char *) rbuf) + (extent * disps[i]);
/* simple optimization */
if (i == rank) {
err = lam_dtsndrcv(sbuf, scount, sdtype,
ptmp, rcounts[i], rdtype, BLKMPIGATHERV, comm);
} else {
err = MPI_Recv(ptmp, rcounts[i], rdtype, i,
BLKMPIGATHERV, comm, MPI_STATUS_IGNORE);
}
if (MPI_SUCCESS != err) {
return err;
}
}
#endif
/* All done */
return MPI_SUCCESS;
}

Просмотреть файл

@ -0,0 +1,65 @@
/*
* $HEADER$
*
* These symbols are in a file by themselves to provide nice linker
* semantics. Since linkers generally pull in symbols by object
* files, keeping these symbols as the only symbols in this file
* prevents utility programs such as "laminfo" from having to import
* entire modules just to query their version and parameters.
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include "mpi.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* Public string showing the coll lam_basic module version number
*/
const char *mca_coll_basic_module_version_string =
"LAM/MPI basic collective MCA module version " MCA_COLL_BASIC_VERSION;
/*
* Instantiate the public struct with all of our public information
* and pointers to our public functions in it
*/
const mca_coll_base_module_1_0_0_t mca_coll_basic_module = {
/* First, the mca_module_t struct containing meta information
about the module itself */
{
/* Indicate that we are a coll v1.0.0 module (which also implies a
specific MCA version) */
MCA_COLL_BASE_VERSION_1_0_0,
/* Module name and version */
"basic",
MCA_COLL_BASIC_MAJOR_VERSION,
MCA_COLL_BASIC_MINOR_VERSION,
MCA_COLL_BASIC_RELEASE_VERSION,
/* Module open and close functions */
NULL,
NULL
},
/* Next the MCA v1.0.0 module meta data */
{
/* Whether the module is checkpointable or not */
true
},
/* Initialization / querying functions */
mca_coll_basic_thread_query,
mca_coll_basic_query
};

Просмотреть файл

@ -0,0 +1,297 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include <stdio.h>
#include "lam/constants.h"
#include "lam/util/malloc.h"
#include "mpi.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* reduce_lin
*
* Function: - reduction using O(N) algorithm
* Accepts: - same as MPI_Reduce()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_reduce_lin(void *sbuf, void *rbuf, int count,
MPI_Datatype dtype, MPI_Op op,
int root, MPI_Comm comm)
{
int i;
int size;
int rank;
int err;
char *buffer = NULL;
char *origin = NULL;
char *inbuf;
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
/* If not root, send data to the root. */
if (rank != root) {
#if 0
/* JMS This needs to be replaced with negative tags and direct
calls into the PML */
err = MPI_Send(sbuf, count, dtype, root, BLKMPIREDUCE, comm);
#endif
return err;
}
/* Root receives and reduces messages. Allocate buffer to receive
messages. */
if (size > 1) {
#if 0
/* JMS Needs to be replaced with lam_datatype_*() functions */
err = lam_dtbuffer(dtype, count, &buffer, &origin);
if (MPI_SUCCESS != err)
return err;
#endif
}
/* Initialize the receive buffer. */
if (rank == (size - 1)) {
#if 0
/* JMS Needs to be replaced with lam_datatype_*() functions */
err = lam_dtsndrcv(sbuf, count, dtype, rbuf, count,
dtype, BLKMPIREDUCE, comm);
#endif
} else {
#if 0
/* JMS This needs to be replaced with negative tags and direct
calls into the PML */
err = MPI_Recv(rbuf, count, dtype, size - 1,
BLKMPIREDUCE, comm, MPI_STATUS_IGNORE);
#endif
}
if (MPI_SUCCESS != err) {
if (NULL != buffer)
LAM_FREE(buffer);
return err;
}
/* Loop receiving and calling reduction function (C or Fortran). */
for (i = size - 2; i >= 0; --i) {
if (rank == i) {
inbuf = sbuf;
} else {
#if 0
/* JMS This needs to be replaced with negative tags and direct
calls into the PML */
err = MPI_Recv(origin, count, dtype, i, BLKMPIREDUCE, comm,
MPI_STATUS_IGNORE);
#endif
if (MPI_SUCCESS != err) {
if (NULL != buffer)
LAM_FREE(buffer);
return err;
}
inbuf = origin;
}
/* Call reduction function. */
#if 0
/* JMS Need MPI_Op */
if (op->op_flags & LAM_LANGF77) {
(op->op_func)(inbuf, rbuf, &count, &dtype->dt_f77handle);
} else {
(op->op_func)(inbuf, rbuf, &count, &dtype);
}
#endif
}
if (NULL != buffer)
LAM_FREE(buffer);
/* All done */
return (MPI_SUCCESS);
}
/*
* reduce_log
*
* Function: - reduction using O(log N) algorithm
* Accepts: - same as MPI_Reduce()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_reduce_log(void *sbuf, void *rbuf, int count,
MPI_Datatype dtype, MPI_Op op,
int root, MPI_Comm comm)
{
int i;
int size;
int rank;
int vrank;
int err;
int peer;
int dim;
int mask;
int fl_recv;
char *buf1;
char *buf2;
char *origin1;
char *origin2;
void *inmsg;
void *resmsg;
/* Allocate the incoming and resulting message buffers. */
#if 0
/* JMS Needs to be replaced with lam_datatype_*() functions */
err = lam_dtbuffer(dtype, count, &buf1, &origin1);
if (MPI_SUCCESS != err)
return err;
err = lam_dtbuffer(dtype, count, &buf2, &origin2);
if (MPI_SUCCESS != err) {
if (NULL != buf1)
LAM_FREE(buf1);
return err;
}
#endif
MPI_Comm_size(comm, &size);
MPI_Comm_rank(comm, &rank);
#if 0
/* JMS Need MPI_Op */
vrank = (op->op_commute) ? (rank - root + size) % size : rank;
#endif
#if 0
/* JMS Need to cache this somewhere */
dim = comm->c_cube_dim;
#endif
/* Loop over cube dimensions. High processes send to low ones in the
dimension. */
inmsg = origin1;
resmsg = origin2;
fl_recv = 0;
for (i = 0, mask = 1; i < dim; ++i, mask <<= 1) {
/* A high-proc sends to low-proc and stops. */
if (vrank & mask) {
peer = vrank & ~mask;
#if 0
/* JMS Need MPI_Op */
if (op->op_commute)
peer = (peer + root) % size;
#endif
#if 0
/* JMS This needs to be replaced with negative tags and direct
calls into the PML */
err = MPI_Send((fl_recv) ? resmsg : sbuf, count,
dtype, peer, BLKMPIREDUCE, comm);
#endif
if (MPI_SUCCESS != err) {
if (NULL != buf1)
LAM_FREE(buf1);
if (NULL != buf2)
LAM_FREE(buf2);
return err;
}
break;
}
/* A low-proc receives, reduces, and moves to a higher
dimension. */
else {
peer = vrank | mask;
if (peer >= size)
continue;
#if 0
/* JMS Need MPI_Op */
if (op->op_commute)
peer = (peer + root) % size;
#endif
fl_recv = 1;
#if 0
/* JMS This needs to be replaced with negative tags and direct
calls into the PML */
err = MPI_Recv(inmsg, count, dtype, peer,
BLKMPIREDUCE, comm, MPI_STATUS_IGNORE);
#endif
if (MPI_SUCCESS != err) {
if (NULL != buf1)
LAM_FREE(buf1);
if (NULL != buf2)
LAM_FREE(buf2);
return err;
}
#if 0
/* JMS Need MPI_Op */
if (op->op_flags & LAM_LANGF77) {
(*op->op_func)((i > 0) ? resmsg : sbuf,
inmsg, &count, &dtype->dt_f77handle);
} else {
(*op->op_func)((i > 0) ? resmsg : sbuf, inmsg, &count, &dtype);
}
#endif
if (inmsg == origin1) {
resmsg = origin1;
inmsg = origin2;
} else {
resmsg = origin2;
inmsg = origin1;
}
}
}
/* Get the result to the root if needed. */
err = MPI_SUCCESS;
if (0 == vrank) {
if (root == rank) {
#if 0
/* JMS Needs to be replaced with lam_datatype_*() functions */
lam_dtcpy(rbuf, (i > 0) ? resmsg : sbuf, count, dtype);
#endif
} else {
#if 0
/* JMS This needs to be replaced with negative tags and direct
calls into the PML */
err = MPI_Send((i > 0) ? resmsg : sbuf, count,
dtype, root, BLKMPIREDUCE, comm);
#endif
}
} else if (rank == root) {
#if 0
/* JMS This needs to be replaced with negative tags and direct
calls into the PML */
err = MPI_Recv(rbuf, count, dtype, 0, BLKMPIREDUCE, comm,
MPI_STATUS_IGNORE);
#endif
}
if (NULL != buf1)
LAM_FREE(buf1);
if (NULL != buf2)
LAM_FREE(buf2);
/* All done */
return err;
}

Просмотреть файл

@ -0,0 +1,92 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include <stdio.h>
#include <errno.h>
#include "lam/constants.h"
#include "lam/util/malloc.h"
#include "mpi.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* reduce_scatter
*
* Function: - reduce then scatter
* Accepts: - same as MPI_Reduce_scatter()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_reduce_scatter(void *sbuf, void *rbuf, int *rcounts,
MPI_Datatype dtype, MPI_Op op,
MPI_Comm comm)
{
int i;
int err;
int rank;
int size;
int count;
int *disps = NULL;
char *buffer = NULL;
char *origin = NULL;
MPI_Comm_size(comm, &size);
MPI_Comm_rank(comm, &rank);
/* Initialize reduce & scatterv info at the root (rank 0). */
for (i = 0, count = 0; i < size; ++i) {
if (rcounts[i] < 0) {
return EINVAL;
}
count += rcounts[i];
}
if (0 == rank) {
disps = (int *) LAM_MALLOC((unsigned) size * sizeof(int));
if (NULL == disps) {
LAM_FREE(disps);
return errno;
}
#if 0
/* JMS Need to replace this with lam_datatype_*() functions */
err = lam_dtbuffer(dtype, count, &buffer, &origin);
if (MPI_SUCCESS != err) {
LAM_FREE(disps);
return err;
}
#endif
disps[0] = 0;
for (i = 0; i < (size - 1); ++i)
disps[i + 1] = disps[i] + rcounts[i];
}
/* reduction */
err = MPI_Reduce(sbuf, origin, count, dtype, op, 0, comm);
if (MPI_SUCCESS != err) {
if (NULL != disps)
LAM_FREE(disps);
if (NULL != buffer)
LAM_FREE(buffer);
return err;
}
/* scatter */
err = MPI_Scatterv(origin, rcounts, disps, dtype,
rbuf, rcounts[rank], dtype, 0, comm);
if (NULL != disps)
LAM_FREE(disps);
if (NULL != buffer)
LAM_FREE(buffer);
return err;
}

135
src/mca/mpi/coll/basic/src/coll_basic_scan.c Обычный файл
Просмотреть файл

@ -0,0 +1,135 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include <stdio.h>
#include "lam/constants.h"
#include "lam/util/malloc.h"
#include "mpi.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* scan
*
* Function: - basic scan operation
* Accepts: - same arguments as MPI_Scan()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_scan(void *sbuf, void *rbuf, int count,
MPI_Datatype dtype, MPI_Op op, MPI_Comm comm)
{
int size;
int rank;
int err;
char *tmpbuf = NULL;
char *origin;
/* Initialize. */
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
/* If I'm rank 0, initialize the recv buffer. */
if (0 == rank) {
#if 0
/* JMS Need to replace this with lam_datatype_*() functions */
err = lam_dtsndrcv(sbuf, count, dtype,
rbuf, count, dtype, BLKMPISCAN, comm);
if (MPI_SUCCESS != err) {
return err;
}
#endif
}
/* Otherwise receive previous buffer and reduce. */
else {
#if 0
/* JMS Need MPI_Op */
if (!op->op_commute) {
#else
if (1) {
#endif
/* Allocate a temporary buffer. */
#if 0
/* JMS Need to replace this with lam_datatype_*() functions */
err = lam_dtbuffer(dtype, count, &tmpbuf, &origin);
if (MPI_SUCCESS != err) {
return err;
}
#endif
/* Copy the send buffer into the receive buffer. */
#if 0
/* JMS Need to replace this with lam_datatype_*() functions */
err = lam_dtsndrcv(sbuf, count, dtype, rbuf,
count, dtype, BLKMPISCAN, comm);
if (MPI_SUCCESS != err) {
if (NULL != tmpbuf)
LAM_FREE(tmpbuf);
return err;
}
#endif
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Recv(origin, count, dtype,
rank - 1, BLKMPISCAN, comm, MPI_STATUS_IGNORE);
#endif
} else {
origin = sbuf;
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Recv(rbuf, count, dtype,
rank - 1, BLKMPISCAN, comm, MPI_STATUS_IGNORE);
#endif
}
if (MPI_SUCCESS != err) {
if (NULL != tmpbuf)
LAM_FREE(tmpbuf);
return err;
}
#if 0
/* JMS Need MPI_Op */
if (op->op_flags & LAM_LANGF77) {
(op->op_func)(origin, rbuf, &count, &dtype->dt_f77handle);
} else {
(op->op_func)(origin, rbuf, &count, &dtype);
}
#endif
if (NULL != tmpbuf)
LAM_FREE(tmpbuf);
}
/* Send result to next process. */
if (rank < (size - 1)) {
#if 0
/* JMS: Need to replace this with negative tags and and direct PML
calls */
err = MPI_Send(rbuf, count, dtype, rank + 1, BLKMPISCAN, comm);
#endif
if (MPI_SUCCESS != err) {
return err;
}
}
/* All done */
return MPI_SUCCESS;
}

Просмотреть файл

@ -0,0 +1,74 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include "lam/constants.h"
#include "mpi.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* scatter
*
* Function: - scatter operation
* Accepts: - same arguments as MPI_Scatter()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_scatter(void *sbuf, int scount, MPI_Datatype sdtype,
void *rbuf, int rcount, MPI_Datatype rdtype,
int root, MPI_Comm comm)
{
int i;
int rank;
int size;
int err;
char *ptmp;
MPI_Aint incr;
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
/* If not root, receive data. */
if (rank != root) {
#if 0
/* JMS Need to replace with negative tags and direct PML calls */
err = MPI_Recv(rbuf, rcount, rdtype,
root, BLKMPISCATTER, comm, MPI_STATUS_IGNORE);
#endif
return err;
}
/* I am the root, loop sending data. */
MPI_Type_extent(sdtype, &incr);
incr *= scount;
for (i = 0, ptmp = (char *) sbuf; i < size; ++i, ptmp += incr) {
/* simple optimization */
if (i == rank) {
#if 0
/* JMS Need to replace this with lam_datatype_*() functions */
err = lam_dtsndrcv(ptmp, scount, sdtype, rbuf,
rcount, rdtype, BLKMPISCATTER, comm);
#endif
} else {
#if 0
/* JMS Need to replace this with negative tags and direct PML calls */
err = MPI_Send(ptmp, scount, sdtype, i, BLKMPISCATTER, comm);
#endif
}
if (MPI_SUCCESS != err) {
return err;
}
}
/* All done */
return MPI_SUCCESS;
}

Просмотреть файл

@ -0,0 +1,76 @@
/*
* $HEADER$
*/
#include "lam_config.h"
#include "coll_basic_config.h"
#include "lam/constants.h"
#include "mpi.h"
#include "mca/mpi/coll/coll.h"
#include "coll_basic.h"
/*
* scatterv
*
* Function: - scatterv operation
* Accepts: - same arguments as MPI_Scatter()
* Returns: - MPI_SUCCESS or error code
*/
int mca_coll_basic_scatterv(void *sbuf, int *scounts,
int *disps, MPI_Datatype sdtype,
void *rbuf, int rcount,
MPI_Datatype rdtype, int root,
MPI_Comm comm)
{
int i;
int rank;
int size;
int err;
char *ptmp;
MPI_Aint extent;
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
/* If not root, receive data. */
if (rank != root) {
#if 0
/* JMS Need to replace this with negative tags and direct PML calls */
err = MPI_Recv(rbuf, rcount, rdtype,
root, BLKMPISCATTERV, comm, MPI_STATUS_IGNORE);
#endif
return err;
}
/* I am the root, loop sending data. */
MPI_Type_extent(sdtype, &extent);
for (i = 0; i < size; ++i) {
ptmp = ((char *) sbuf) + (extent * disps[i]);
/* simple optimization */
if (i == rank) {
#if 0
/* JMS Need to replace this with lam_datatype_*() functions */
err = lam_dtsndrcv(ptmp, scounts[i], sdtype, rbuf,
rcount, rdtype, BLKMPISCATTERV, comm);
#endif
} else {
#if 0
/* JMS Need to replace this with negative tags and direct PML calls */
err = MPI_Send(ptmp, scounts[i], sdtype, i, BLKMPISCATTERV, comm);
#endif
}
if (MPI_SUCCESS != err) {
return err;
}
}
/* All done */
return MPI_SUCCESS;
}