diff --git a/src/communicator/communicator.h b/src/communicator/communicator.h index 7b1ca743ba..6fe0678cc6 100644 --- a/src/communicator/communicator.h +++ b/src/communicator/communicator.h @@ -15,6 +15,7 @@ #include "mca/topo/topo.h" #include "lfc/lam_hash_table.h" #include "attribute/attribute.h" +#include "request/request.h" extern lam_class_t lam_communicator_t_class; @@ -74,6 +75,11 @@ struct lam_communicator_t { mca_coll_1_0_0_t c_coll; struct mca_coll_comm_t* c_coll_comm; + + /* VPS: This will be moved in the coll module later on */ + lam_request_t **bcast_lin_reqs; + lam_request_t **bcast_log_reqs; + }; typedef struct lam_communicator_t lam_communicator_t; diff --git a/src/communicator/lam_comm_init.c b/src/communicator/lam_comm_init.c index 110db10439..4cb8934df1 100644 --- a/src/communicator/lam_comm_init.c +++ b/src/communicator/lam_comm_init.c @@ -10,6 +10,8 @@ #include "communicator/communicator.h" #include "include/constants.h" #include "mca/pml/pml.h" +#include "mca/coll/coll.h" +#include "mca/coll/base/base.h" /* @@ -67,7 +69,10 @@ int lam_comm_init(void) strncpy (lam_mpi_comm_null.c_name, "MPI_COMM_NULL", strlen("MPI_COMM_NULL")+1 ); lam_mpi_comm_null.c_flags |= LAM_COMM_NAMEISSET; - + + /* VPS: Remove this later */ + lam_mpi_comm_null.bcast_lin_reqs = NULL; + lam_mpi_comm_null.bcast_log_reqs = NULL; /* Setup MPI_COMM_WORLD */ OBJ_CONSTRUCT(&lam_mpi_comm_world, lam_communicator_t); @@ -90,6 +95,18 @@ int lam_comm_init(void) strncpy (lam_mpi_comm_world.c_name, "MPI_COMM_WORLD", strlen("MPI_COMM_WORLD")+1 ); lam_mpi_comm_world.c_flags |= LAM_COMM_NAMEISSET; + + /* VPS: Remove this later */ + lam_mpi_comm_world.bcast_lin_reqs = + malloc (mca_coll_base_bcast_collmaxlin * sizeof(lam_request_t*)); + if (NULL == lam_mpi_comm_world.bcast_lin_reqs) { + return LAM_ERR_OUT_OF_RESOURCE; + } + lam_mpi_comm_world.bcast_log_reqs = + malloc (mca_coll_base_bcast_collmaxdim * sizeof(lam_request_t*)); + if (NULL == lam_mpi_comm_world.bcast_log_reqs) { + return LAM_ERR_OUT_OF_RESOURCE; + } /* Setup MPI_COMM_SELF */ OBJ_CONSTRUCT(&lam_mpi_comm_self, lam_communicator_t); @@ -112,6 +129,22 @@ int lam_comm_init(void) strlen("MPI_COMM_SELF")+1 ); lam_mpi_comm_self.c_flags |= LAM_COMM_NAMEISSET; + /* VPS: Remove this later */ + lam_mpi_comm_self.bcast_lin_reqs = + malloc (mca_coll_base_bcast_collmaxlin * sizeof(lam_request_t*)); + if (NULL == lam_mpi_comm_self.bcast_lin_reqs) { + return LAM_ERR_OUT_OF_RESOURCE; + } + lam_mpi_comm_self.bcast_log_reqs = + malloc (mca_coll_base_bcast_collmaxdim * sizeof(lam_request_t*)); + if (NULL == lam_mpi_comm_self.bcast_log_reqs) { + return LAM_ERR_OUT_OF_RESOURCE; + } + + /* Some topo setup */ + + /* Some attribute setup stuff */ + return LAM_SUCCESS; } @@ -148,6 +181,7 @@ lam_communicator_t *lam_comm_allocate ( int local_size, int remote_size ) new_comm = NULL; } } + return new_comm; } @@ -157,10 +191,11 @@ lam_communicator_t *lam_comm_allocate ( int local_size, int remote_size ) */ int lam_comm_nextcid ( lam_communicator_t* comm, int mode ) { - static int nextcid=0; + static int nextcid=1; return nextcid++; } + /* ** COunterpart to MPI_Comm_group. To be used within LAM functions. */ @@ -196,7 +231,7 @@ int lam_comm_split ( lam_communicator_t* comm, int color, int key, lam_communicator_t **newcomm ) { lam_group_t *new_group; - /*int myinfo[2];*/ + int myinfo[2]; int size, my_size; int my_grank; int i, loc; @@ -212,18 +247,22 @@ int lam_comm_split ( lam_communicator_t* comm, int color, int key, else { /* sort according to color and rank */ size = lam_comm_size ( comm ); - + results = (int*) malloc ( 2 * size * sizeof(int)); if ( !results ) return MPI_ERR_INTERN; - /* What is the precise name of the allgather which I should call ? */ - /*rc = coll_allgather_intra ( myinfo, 2, MPI_INT, + /* Fill in my information */ + myinfo[0] = color; + myinfo[1] = key; + + /* Gather information from everyone */ + rc = comm->c_coll.coll_allgather_intra ( myinfo, 2, MPI_INT, results, 2, MPI_INT, comm ); if ( rc != LAM_SUCCESS ) { free ( results ); return rc; } - */ + /* now how many do we have in 'my/our' new group */ for ( my_size = 0, i=0; i < size; i++) @@ -273,7 +312,7 @@ int lam_comm_split ( lam_communicator_t* comm, int color, int key, } /* end proc loop */ /* find my rank */ - my_grank=comm->c_local_group->grp_my_rank; + my_grank=comm->c_local_group->grp_my_rank; my_gpointer=comm->c_local_group->grp_proc_pointers[my_grank]; lam_set_group_rank(new_group, my_gpointer); @@ -317,13 +356,60 @@ int lam_comm_create ( lam_communicator_t *comm, lam_group_t *group, newcomp->c_my_rank = group->grp_my_rank; /* determine context id */ - newcomp->c_contextid = lam_comm_nextcid ( comm, LAM_COMM_INTRA_INTRA); + /* newcomp->c_contextid = lam_comm_nextcid ( comm, LAM_COMM_INTRA_INTRA); */ + + /* just for now ! */ + newcomp->c_contextid = newcomp->c_f_to_c_index; } + newcomp->c_cube_dim = lam_cube_dim(group->grp_proc_count); + /* copy error handler */ newcomp->error_handler = comm->error_handler; OBJ_RETAIN ( newcomp->error_handler ); + /* Initialize the PML stuff in the newcomp */ + mca_pml.pml_add_comm(newcomp); + + if (-1 == lam_pointer_array_add(&lam_mpi_communicators, newcomp)) { + return MPI_ERR_INTERN; + } + + /* We add the name here for debugging purposes */ + snprintf(newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d", + newcomp->c_my_rank); + + /* Let the collectives modules fight over who will do + collective on this new comm. */ + if (LAM_ERROR == mca_coll_base_init_comm(newcomp)) + return MPI_ERR_INTERN; + + /* ******* VPS: Remove this later -- need to be in coll module ****** */ + /* VPS: Cache the reqs for bcast */ + newcomp->bcast_lin_reqs = + malloc (mca_coll_base_bcast_collmaxlin * sizeof(lam_request_t*)); + if (NULL == newcomp->bcast_lin_reqs) { + return LAM_ERR_OUT_OF_RESOURCE; + } + newcomp->bcast_log_reqs = + malloc (mca_coll_base_bcast_collmaxdim * sizeof(lam_request_t*)); + if (NULL == newcomp->bcast_log_reqs) { + return LAM_ERR_OUT_OF_RESOURCE; + } + + /* Some topo setup */ + + /* Some attribute setup stuff */ + + if ( MPI_PROC_NULL == newcomp->c_local_group->grp_my_rank ) { + /* we are not part of the new comm, so we have to free it again. + However, we could not avoid the comm_nextcid step, since + all processes of the original comm have to participate in + that function call. Additionally, all errhandler stuff etc. + has to be set to make lam_comm_free happy */ + lam_comm_free ( &newcomp ); + } + *newcomm = newcomp; return MPI_SUCCESS; } @@ -339,27 +425,34 @@ int lam_comm_free ( lam_communicator_t **comm ) comp = (lam_communicator_t *)*comm; + /* Release attributes */ +#if 0 + lam_attr_delete_all ( COMM_ATTR, comp ); +#endif + + /* **************VPS: need a coll_base_finalize ******** */ + + /* Release topology information */ + /* Release local group */ grp = comp->c_local_group; for ( proc = 0; proc grp_proc_count; proc++ ) OBJ_RELEASE (grp->grp_proc_pointers[proc]); - OBJ_RELEASE(grp); /* Release remote group */ if ( comp->c_flags & LAM_COMM_INTER ) { grp = comp->c_remote_group; for ( proc = 0; proc grp_proc_count; proc++ ) OBJ_RELEASE (grp->grp_proc_pointers[proc]); - OBJ_RELEASE(grp); } /* Release error handler */ OBJ_RELEASE ( comp->error_handler ); - /* Release attributes */ - lam_attr_delete_all ( COMM_ATTR, comp ); - - /* Release topology information */ + /******** VPS: this goes away *************/ + /* Release the cached bcast requests */ + free (comp->bcast_lin_reqs); + free (comp->bcast_log_reqs); /* Release finally the communicator itself */ OBJ_RELEASE ( comp ); @@ -418,7 +511,7 @@ static void lam_comm_construct(lam_communicator_t* comm) comm->c_f_to_c_index = lam_pointer_array_add (&lam_mpi_communicators, comm); comm->c_name[0] = '\0'; - comm->c_contextid = 0; + comm->c_contextid = MPI_UNDEFINED; comm->c_flags = 0; comm->c_my_rank = 0; comm->c_cube_dim = 0; @@ -446,7 +539,6 @@ static void lam_comm_destruct(lam_communicator_t* dead_comm) dead_comm->c_f_to_c_index, NULL); } return; - }