1
1

Add support for MPI_Comm_dup_with_info, MPI_Comm_create_group, and

MPI_Comm_idup.

As part of this work I implemented a basic request scheduler in
ompi/comm/comm_request.c. This scheduler might be useful for more
than just communicator requests and could be moved to ompi/request
if there is a demand. Otherwise I will leave it where it is.

Added a non-blocking version of ompi_comm_set to support ompi_comm_idup.
The call makes a recursive call to comm_dup and a non-blocking version
was needed. To simplify the code the blocking version calls the nonblocking
version and waits on the resulting request if one exists.

cmr=v1.7.4:reviewer=jsquyres:ticket=trac:3796

This commit was SVN r29334.

The following Trac tickets were found above:
  Ticket 3796 --> https://svn.open-mpi.org/trac/ompi/ticket/3796
Этот коммит содержится в:
Nathan Hjelm 2013-10-02 14:26:40 +00:00
родитель 7bedf62dd8
Коммит 195c892a9d
16 изменённых файлов: 1635 добавлений и 200 удалений

Просмотреть файл

@ -10,6 +10,8 @@
# University of Stuttgart. All rights reserved.
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2013 Los Alamos National Security, LLC. All rights
# reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
@ -20,9 +22,11 @@
# This makefile.am does not stand on its own - it is included from ompi/Makefile.am
headers += \
communicator/communicator.h
communicator/communicator.h \
communicator/comm_request.h
libmpi_la_SOURCES += \
communicator/comm_init.c \
communicator/comm.c \
communicator/comm_cid.c
communicator/comm_cid.c \
communicator/comm_request.c

Просмотреть файл

@ -99,10 +99,41 @@ int ompi_comm_set ( ompi_communicator_t **ncomm,
bool copy_topocomponent,
ompi_group_t *local_group,
ompi_group_t *remote_group )
{
ompi_request_t *req;
int rc;
rc = ompi_comm_set_nb (ncomm, oldcomm, local_size, local_ranks, remote_size, remote_ranks,
attr, errh, copy_topocomponent, local_group, remote_group, &req);
if (OMPI_SUCCESS != rc) {
return rc;
}
if (NULL != req) {
ompi_request_wait (&req, MPI_STATUS_IGNORE);
}
return OMPI_SUCCESS;
}
int ompi_comm_set_nb ( ompi_communicator_t **ncomm,
ompi_communicator_t *oldcomm,
int local_size,
int *local_ranks,
int remote_size,
int *remote_ranks,
opal_hash_table_t *attr,
ompi_errhandler_t *errh,
bool copy_topocomponent,
ompi_group_t *local_group,
ompi_group_t *remote_group,
ompi_request_t **req )
{
ompi_communicator_t *newcomm = NULL;
int ret;
*req = NULL;
/* ompi_comm_allocate */
newcomm = OBJ_NEW(ompi_communicator_t);
/* fill in the inscribing hyper-cube dimensions */
@ -134,9 +165,9 @@ int ompi_comm_set ( ompi_communicator_t **ncomm,
}
newcomm->c_flags |= OMPI_COMM_INTER;
if ( OMPI_COMM_IS_INTRA(oldcomm) ) {
ompi_comm_dup(oldcomm, &newcomm->c_local_comm);
ompi_comm_idup(oldcomm, &newcomm->c_local_comm, req);
} else {
ompi_comm_dup(oldcomm->c_local_comm, &newcomm->c_local_comm);
ompi_comm_idup(oldcomm->c_local_comm, &newcomm->c_local_comm, req);
}
} else {
newcomm->c_remote_group = newcomm->c_local_group;
@ -845,6 +876,14 @@ ompi_comm_split_type(ompi_communicator_t *comm,
/**********************************************************************/
/**********************************************************************/
int ompi_comm_dup ( ompi_communicator_t * comm, ompi_communicator_t **newcomm )
{
return ompi_comm_dup_with_info (comm, NULL, newcomm);
}
/**********************************************************************/
/**********************************************************************/
/**********************************************************************/
int ompi_comm_dup_with_info ( ompi_communicator_t * comm, ompi_info_t *info, ompi_communicator_t **newcomm )
{
ompi_communicator_t *newcomp = NULL;
int rsize = 0, mode = OMPI_COMM_CID_INTRA, rc = OMPI_SUCCESS;
@ -906,6 +945,202 @@ int ompi_comm_dup ( ompi_communicator_t * comm, ompi_communicator_t **newcomm )
*newcomm = newcomp;
return MPI_SUCCESS;
}
struct ompi_comm_idup_with_info_context {
ompi_communicator_t *comm;
ompi_communicator_t *newcomp;
ompi_communicator_t **newcomm;
};
static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request);
static int ompi_comm_idup_with_info_finish (ompi_comm_request_t *request);
static int ompi_comm_idup_getcid (ompi_comm_request_t *request);
int ompi_comm_idup (ompi_communicator_t *comm, ompi_communicator_t **newcomm, ompi_request_t **req)
{
return ompi_comm_idup_with_info (comm, NULL, newcomm, req);
}
int ompi_comm_idup_with_info (ompi_communicator_t *comm, ompi_info_t *info, ompi_communicator_t **newcomm, ompi_request_t **req)
{
struct ompi_comm_idup_with_info_context *context;
ompi_comm_request_t *request;
ompi_request_t *subreq;
int rsize = 0, rc;
*newcomm = MPI_COMM_NULL;
request = ompi_comm_request_get ();
if (NULL == request) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
context = calloc (1, sizeof (*context));
if (NULL == context) {
ompi_comm_request_return (request);
return OMPI_ERR_OUT_OF_RESOURCE;
}
context->comm = comm;
context->newcomm = newcomm;
request->context = context;
rc = ompi_comm_set_nb (&context->newcomp, /* new comm */
comm, /* old comm */
comm->c_local_group->grp_proc_count, /* local_size */
NULL, /* local_procs */
rsize, /* remote_size */
NULL, /* remote_procs */
comm->c_keyhash, /* attrs */
comm->error_handler, /* error handler */
true, /* copy the topo */
comm->c_local_group, /* local group */
comm->c_remote_group, /* remote group */
&subreq); /* new subrequest */
if (NULL == context->newcomp) {
ompi_comm_request_return (request);
return rc;
}
ompi_comm_request_schedule_append (request, ompi_comm_idup_getcid, &subreq, subreq ? 1 : 0);
/* kick off the request */
ompi_comm_request_start (request);
*req = &request->super;
return OMPI_SUCCESS;
}
static int ompi_comm_idup_getcid (ompi_comm_request_t *request)
{
struct ompi_comm_idup_with_info_context *context =
(struct ompi_comm_idup_with_info_context *) request->context;
ompi_request_t *subreq;
int rc, mode;
if (OMPI_COMM_IS_INTER(context->comm)){
mode = OMPI_COMM_CID_INTER;
} else {
mode = OMPI_COMM_CID_INTRA;
}
/* Determine context id. It is identical to f_2_c_handle */
rc = ompi_comm_nextcid_nb (context->newcomp, /* new communicator */
context->comm, /* old comm */
NULL, /* bridge comm */
mode, /* mode */
&subreq); /* new subrequest */
if (OMPI_SUCCESS != rc) {
ompi_comm_request_return (request);
return rc;
}
ompi_comm_request_schedule_append (request, ompi_comm_idup_with_info_activate, &subreq, 1);
return OMPI_SUCCESS;
}
static int ompi_comm_idup_with_info_activate (ompi_comm_request_t *request)
{
struct ompi_comm_idup_with_info_context *context =
(struct ompi_comm_idup_with_info_context *) request->context;
ompi_request_t *subreq;
int rc, mode;
if (OMPI_COMM_IS_INTER(context->comm)){
mode = OMPI_COMM_CID_INTER;
} else {
mode = OMPI_COMM_CID_INTRA;
}
/* Set name for debugging purposes */
snprintf(context->newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d DUP FROM %d",
context->newcomp->c_contextid, context->comm->c_contextid );
/* activate communicator and init coll-module */
rc = ompi_comm_activate_nb (&context->newcomp, context->comm, NULL, mode, &subreq);
if ( OMPI_SUCCESS != rc ) {
return rc;
}
ompi_comm_request_schedule_append (request, ompi_comm_idup_with_info_finish, &subreq, 1);
return OMPI_SUCCESS;
}
static int ompi_comm_idup_with_info_finish (ompi_comm_request_t *request)
{
struct ompi_comm_idup_with_info_context *context =
(struct ompi_comm_idup_with_info_context *) request->context;
*context->newcomm = context->newcomp;
/* done */
return MPI_SUCCESS;
}
/**********************************************************************/
/**********************************************************************/
/**********************************************************************/
int ompi_comm_create_group (ompi_communicator_t *comm, ompi_group_t *group, int tag, ompi_communicator_t **newcomm)
{
ompi_communicator_t *newcomp = NULL;
int mode = OMPI_COMM_CID_GROUP, rc = OMPI_SUCCESS;
*newcomm = MPI_COMM_NULL;
rc = ompi_comm_set ( &newcomp, /* new comm */
comm, /* old comm */
group->grp_proc_count, /* local_size */
NULL, /* local_procs*/
0, /* remote_size */
NULL, /* remote_procs */
comm->c_keyhash, /* attrs */
comm->error_handler, /* error handler */
true, /* copy the topo */
group, /* local group */
NULL); /* remote group */
if ( NULL == newcomp ) {
rc = MPI_ERR_INTERN;
return rc;
}
if ( MPI_SUCCESS != rc) {
return rc;
}
/* Determine context id. It is identical to f_2_c_handle */
rc = ompi_comm_nextcid ( newcomp, /* new communicator */
comm, /* old comm */
newcomp, /* bridge comm (used to pass the group into the group allreduce) */
&tag, /* user defined tag */
NULL, /* remote_leader */
mode, /* mode */
-1 ); /* send_first */
if ( OMPI_SUCCESS != rc ) {
return rc;
}
/* Set name for debugging purposes */
snprintf(newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d GROUP FROM %d",
newcomp->c_contextid, comm->c_contextid );
/* activate communicator and init coll-module */
rc = ompi_comm_activate( &newcomp, /* new communicator */
comm,
newcomp,
&tag,
NULL,
mode,
-1 );
if ( OMPI_SUCCESS != rc ) {
return rc;
}
*newcomm = newcomp;
return MPI_SUCCESS;
}
/**********************************************************************/
/**********************************************************************/
/**********************************************************************/

Просмотреть файл

@ -90,6 +90,29 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int* outbuf,
void* remote_leader,
int send_first );
static int ompi_comm_allreduce_group (int *inbuf, int* outbuf,
int count, struct ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader,
int send_first);
/* non-blocking intracommunicator allreduce */
static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf,
int count, struct ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
ompi_request_t **req);
/* non-blocking intercommunicator allreduce */
static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf,
int count, struct ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
ompi_request_t **req);
static int ompi_comm_register_cid (uint32_t contextid);
static int ompi_comm_unregister_cid (uint32_t contextid);
static uint32_t ompi_comm_lowest_cid ( void );
@ -158,10 +181,10 @@ int ompi_comm_cid_init (void)
#else
ompi_comm_world_thread_level_mult = 0; // silence compiler warning if not used
#endif
return OMPI_SUCCESS;
}
int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
ompi_communicator_t* comm,
ompi_communicator_t* bridgecomm,
@ -199,19 +222,18 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
case OMPI_COMM_CID_INTRA_OOB:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
break;
case OMPI_COMM_CID_GROUP:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group;
break;
default:
return MPI_UNDEFINED;
break;
}
do {
/* Only one communicator function allowed in same time on the
* same communicator.
*/
OPAL_THREAD_LOCK(&ompi_cid_lock);
response = ompi_comm_register_cid (comm->c_contextid);
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
} while (OMPI_SUCCESS != response );
ret = ompi_comm_register_cid (comm->c_contextid);
if (OMPI_SUCCESS != ret) {
return ret;
}
start = ompi_mpi_communicators.lowest_free;
while (!done) {
@ -284,13 +306,187 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
opal_pointer_array_set_item (&ompi_mpi_communicators, nextcid, newcomm);
release_and_return:
OPAL_THREAD_LOCK(&ompi_cid_lock);
ompi_comm_unregister_cid (comm->c_contextid);
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
return ret;
}
/* Non-blocking version of ompi_comm_nextcid */
struct mca_comm_nextcid_context {
ompi_communicator_t* newcomm;
ompi_communicator_t* comm;
ompi_communicator_t* bridgecomm;
int mode;
int nextcid;
int nextlocal_cid;
int start;
int flag, rflag;
};
/* find the next available local cid and start an allreduce */
static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request);
/* verify that the maximum cid is locally available and start an allreduce */
static int ompi_comm_checkcid (ompi_comm_request_t *request);
/* verify that the cid was available globally */
static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request);
int ompi_comm_nextcid_nb (ompi_communicator_t* newcomm,
ompi_communicator_t* comm,
ompi_communicator_t* bridgecomm,
int mode, ompi_request_t **req)
{
struct mca_comm_nextcid_context *context;
ompi_comm_request_t *request;
int ret;
/**
* Determine which implementation of allreduce we have to use
* for the current scenario
*/
if (OMPI_COMM_CID_INTRA != mode && OMPI_COMM_CID_INTER != mode) {
return MPI_UNDEFINED;
}
ret = ompi_comm_register_cid (comm->c_contextid);
if (OMPI_SUCCESS != ret) {
return ret;
}
context = calloc (1, sizeof (*context));
if (NULL == context) {
ompi_comm_unregister_cid (comm->c_contextid);
return OMPI_ERR_OUT_OF_RESOURCE;
}
request = ompi_comm_request_get ();
if (NULL == request) {
ompi_comm_unregister_cid (comm->c_contextid);
free (context);
return OMPI_ERR_OUT_OF_RESOURCE;
}
context->newcomm = newcomm;
context->comm = comm;
context->bridgecomm = bridgecomm;
context->mode = mode;
context->start = ompi_mpi_communicators.lowest_free;
request->context = context;
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
ompi_comm_request_start (request);
*req = &request->super;
return OMPI_SUCCESS;
}
static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
{
struct mca_comm_nextcid_context *context = request->context;
ompi_request_t *subreq;
unsigned int i;
bool flag;
int ret;
/**
* This is the real algorithm described in the doc
*/
OPAL_THREAD_LOCK(&ompi_cid_lock);
if (context->comm->c_contextid != ompi_comm_lowest_cid() ) {
/* if not lowest cid, we do not continue, but sleep and try again */
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
return OMPI_SUCCESS;
}
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
for (i = context->start ; i < mca_pml.pml_max_contextid ; ++i) {
flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators,
i, context->comm);
if (true == flag) {
context->nextlocal_cid = i;
break;
}
}
if (context->mode == OMPI_COMM_CID_INTRA) {
ret = ompi_comm_allreduce_intra_nb (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX,
context->comm, context->bridgecomm, &subreq);
} else {
ret = ompi_comm_allreduce_inter_nb (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX,
context->comm, context->bridgecomm, &subreq);
}
if (OMPI_SUCCESS != ret) {
return ret;
}
/* next we want to verify that the resulting commid is ok */
ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1);
return OMPI_SUCCESS;
}
static int ompi_comm_checkcid (ompi_comm_request_t *request)
{
struct mca_comm_nextcid_context *context = request->context;
ompi_request_t *subreq;
int ret;
context->flag = (context->nextcid == context->nextlocal_cid);
if (!context->flag) {
opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
context->flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators,
context->nextcid, context->comm);
}
if (context->mode == OMPI_COMM_CID_INTRA) {
ret = ompi_comm_allreduce_intra_nb (&context->flag, &context->rflag, 1, MPI_MIN, context->comm,
context->bridgecomm, &subreq);
} else {
ret = ompi_comm_allreduce_inter_nb (&context->flag, &context->rflag, 1, MPI_MIN, context->comm,
context->bridgecomm, &subreq);
}
if (OMPI_SUCCESS != ret) {
return ret;
}
ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1);
return OMPI_SUCCESS;
}
static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
{
struct mca_comm_nextcid_context *context = request->context;
if (1 == context->rflag) {
/* set the according values to the newcomm */
context->newcomm->c_contextid = context->nextcid;
context->newcomm->c_f_to_c_index = context->newcomm->c_contextid;
opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm);
ompi_comm_unregister_cid (context->comm->c_contextid);
/* done! */
return OMPI_SUCCESS;
}
if (1 == context->flag) {
/* we could use this cid, but other don't agree */
opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextcid, NULL);
context->start = context->nextcid + 1; /* that's where we can start the next round */
}
/* try again */
return ompi_comm_allreduce_getnextcid (request);
}
/**************************************************************************/
/**************************************************************************/
/**************************************************************************/
@ -318,16 +514,21 @@ void ompi_comm_reg_finalize (void)
static int ompi_comm_register_cid (uint32_t cid)
{
opal_list_item_t *item;
ompi_comm_reg_t *regcom;
ompi_comm_reg_t *newentry = OBJ_NEW(ompi_comm_reg_t);
bool registered = false;
do {
/* Only one communicator function allowed in same time on the
* same communicator.
*/
OPAL_THREAD_LOCK(&ompi_cid_lock);
newentry->cid = cid;
if ( !(opal_list_is_empty (&ompi_registered_comms)) ) {
for (item = opal_list_get_first(&ompi_registered_comms);
item != opal_list_get_end(&ompi_registered_comms);
item = opal_list_get_next(item)) {
regcom = (ompi_comm_reg_t *)item;
bool ok = true;
OPAL_LIST_FOREACH(regcom, &ompi_registered_comms, ompi_comm_reg_t) {
if ( regcom->cid > cid ) {
break;
}
@ -341,36 +542,44 @@ static int ompi_comm_register_cid (uint32_t cid )
* execution order. This test only allow one communicator
* creation function based on the same communicator.
*/
OBJ_RELEASE(newentry);
return OMPI_ERROR;
ok = false;
break;
}
#endif /* OMPI_ENABLE_THREAD_MULTIPLE */
}
opal_list_insert_pos (&ompi_registered_comms, item,
if (ok) {
opal_list_insert_pos (&ompi_registered_comms, (opal_list_item_t *) regcom,
(opal_list_item_t *)newentry);
registered = true;
}
else {
} else {
opal_list_append (&ompi_registered_comms, (opal_list_item_t *)newentry);
registered = true;
}
/* drop the lock before trying again */
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
} while (!registered);
return OMPI_SUCCESS;
}
static int ompi_comm_unregister_cid (uint32_t cid)
{
ompi_comm_reg_t *regcom;
opal_list_item_t *item;
for (item = opal_list_get_first(&ompi_registered_comms);
item != opal_list_get_end(&ompi_registered_comms);
item = opal_list_get_next(item)) {
regcom = (ompi_comm_reg_t *)item;
OPAL_THREAD_LOCK(&ompi_cid_lock);
OPAL_LIST_FOREACH(regcom, &ompi_registered_comms, ompi_comm_reg_t) {
if(regcom->cid == cid) {
opal_list_remove_item(&ompi_registered_comms, item);
opal_list_remove_item(&ompi_registered_comms, (opal_list_item_t *) regcom);
OBJ_RELEASE(regcom);
break;
}
}
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
return OMPI_SUCCESS;
}
@ -429,17 +638,20 @@ int ompi_comm_activate ( ompi_communicator_t** newcomm,
case OMPI_COMM_CID_INTRA_OOB:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
break;
case OMPI_COMM_CID_GROUP:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group;
break;
default:
return MPI_UNDEFINED;
break;
}
if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) {
/* Initialize the PML stuff in the newcomm */
if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) {
goto bail_on_error;
}
OMPI_COMM_SET_PML_ADDED(*newcomm);
}
@ -519,6 +731,151 @@ int ompi_comm_activate ( ompi_communicator_t** newcomm,
return ret;
}
/* Non-blocking version of ompi_comm_activate */
struct ompi_comm_activate_nb_context {
ompi_communicator_t **newcomm;
ompi_communicator_t *comm;
/* storage for activate barrier */
int ok;
};
static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request);
int ompi_comm_activate_nb (ompi_communicator_t **newcomm,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
int mode, ompi_request_t **req)
{
struct ompi_comm_activate_nb_context *context;
ompi_comm_request_t *request;
ompi_request_t *subreq;
int ret = 0;
request = ompi_comm_request_get ();
if (NULL == request) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
context = calloc (1, sizeof (*context));
if (NULL == context) {
ompi_comm_request_return (request);
return OMPI_ERR_OUT_OF_RESOURCE;
}
context->newcomm = newcomm;
context->comm = comm;
request->context = context;
if (OMPI_COMM_CID_INTRA != mode && OMPI_COMM_CID_INTER != mode) {
return MPI_UNDEFINED;
}
if (MPI_UNDEFINED != (*newcomm)->c_local_group->grp_my_rank) {
/* Initialize the PML stuff in the newcomm */
if ( OMPI_SUCCESS != (ret = MCA_PML_CALL(add_comm(*newcomm))) ) {
OBJ_RELEASE(newcomm);
*newcomm = MPI_COMM_NULL;
return ret;
}
OMPI_COMM_SET_PML_ADDED(*newcomm);
}
/* Step 1: the barrier, after which it is allowed to
* send messages over the new communicator
*/
if (mode == OMPI_COMM_CID_INTRA) {
ret = ompi_comm_allreduce_intra_nb (&context->ok, &context->ok, 1, MPI_MIN,
context->comm, bridgecomm, &subreq);
} else {
ret = ompi_comm_allreduce_inter_nb (&context->ok, &context->ok, 1, MPI_MIN,
context->comm, bridgecomm, &subreq);
}
if (OMPI_SUCCESS != ret) {
ompi_comm_request_return (request);
return ret;
}
ompi_comm_request_schedule_append (request, ompi_comm_activate_nb_complete, &subreq, 1);
ompi_comm_request_start (request);
*req = &request->super;
return OMPI_SUCCESS;
}
static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request)
{
struct ompi_comm_activate_nb_context *context =
(struct ompi_comm_activate_nb_context *) request->context;
int ret;
/**
* Check to see if this process is in the new communicator.
*
* Specifically, this function is invoked by all proceses in the
* old communicator, regardless of whether they are in the new
* communicator or not. This is because it is far simpler to use
* MPI collective functions on the old communicator to determine
* some data for the new communicator (e.g., remote_leader) than
* to kludge up our own pseudo-collective routines over just the
* processes in the new communicator. Hence, *all* processes in
* the old communicator need to invoke this function.
*
* That being said, only processes in the new communicator need to
* select a coll module for the new communicator. More
* specifically, proceses who are not in the new communicator
* should *not* select a coll module -- for example,
* ompi_comm_rank(newcomm) returns MPI_UNDEFINED for processes who
* are not in the new communicator. This can cause errors in the
* selection / initialization of a coll module. Plus, it's
* wasteful -- processes in the new communicator will end up
* freeing the new communicator anyway, so we might as well leave
* the coll selection as NULL (the coll base comm unselect code
* handles that case properly).
*/
if (MPI_UNDEFINED == (*context->newcomm)->c_local_group->grp_my_rank) {
return OMPI_SUCCESS;
}
/* Let the collectives components fight over who will do
collective on this new comm. */
if (OMPI_SUCCESS != (ret = mca_coll_base_comm_select(*context->newcomm))) {
OBJ_RELEASE(*context->newcomm);
*context->newcomm = MPI_COMM_NULL;
return ret;
}
/* For an inter communicator, we have to deal with the potential
* problem of what is happening if the local_comm that we created
* has a lower CID than the parent comm. This is not a problem
* as long as the user calls MPI_Comm_free on the inter communicator.
* However, if the communicators are not freed by the user but released
* by Open MPI in MPI_Finalize, we walk through the list of still available
* communicators and free them one by one. Thus, local_comm is freed before
* the actual inter-communicator. However, the local_comm pointer in the
* inter communicator will still contain the 'previous' address of the local_comm
* and thus this will lead to a segmentation violation. In order to prevent
* that from happening, we increase the reference counter local_comm
* by one if its CID is lower than the parent. We cannot increase however
* its reference counter if the CID of local_comm is larger than
* the CID of the inter communicators, since a regular MPI_Comm_free would
* leave in that the case the local_comm hanging around and thus we would not
* recycle CID's properly, which was the reason and the cause for this trouble.
*/
if (OMPI_COMM_IS_INTER(*context->newcomm)) {
if (OMPI_COMM_CID_IS_LOWER(*context->newcomm, context->comm)) {
OMPI_COMM_SET_EXTRA_RETAIN (*context->newcomm);
OBJ_RETAIN (*context->newcomm);
}
}
/* done */
return OMPI_SUCCESS;
}
/**************************************************************************/
/**************************************************************************/
/**************************************************************************/
@ -536,11 +893,21 @@ static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf,
void* remote_leader,
int send_first )
{
return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT,
op,comm,
return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT, op, comm,
comm->c_coll.coll_allreduce_module );
}
static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf,
int count, struct ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
ompi_request_t **req)
{
return comm->c_coll.coll_iallreduce (inbuf, outbuf, count, MPI_INT, op, comm,
req, comm->c_coll.coll_iallreduce_module);
}
/* Arguments not used in this implementation:
* - bridgecomm
* - local_leader
@ -556,17 +923,12 @@ static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf,
int send_first )
{
int local_rank, rsize;
int i, rc;
int rc;
int *sbuf;
int *tmpbuf=NULL;
int *rcounts=NULL, scount=0;
int *rdisps=NULL;
if ( &ompi_mpi_op_sum.op != op && &ompi_mpi_op_prod.op != op &&
&ompi_mpi_op_max.op != op && &ompi_mpi_op_min.op != op ) {
return MPI_ERR_OP;
}
if ( !OMPI_COMM_IS_INTER (intercomm)) {
return MPI_ERR_COMM;
}
@ -618,30 +980,7 @@ static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf,
goto exit;
}
if ( &ompi_mpi_op_max.op == op ) {
for ( i = 0 ; i < count; i++ ) {
if (tmpbuf[i] > outbuf[i]) {
outbuf[i] = tmpbuf[i];
}
}
}
else if ( &ompi_mpi_op_min.op == op ) {
for ( i = 0 ; i < count; i++ ) {
if (tmpbuf[i] < outbuf[i]) {
outbuf[i] = tmpbuf[i];
}
}
}
else if ( &ompi_mpi_op_sum.op == op ) {
for ( i = 0 ; i < count; i++ ) {
outbuf[i] += tmpbuf[i];
}
}
else if ( &ompi_mpi_op_prod.op == op ) {
for ( i = 0 ; i < count; i++ ) {
outbuf[i] *= tmpbuf[i];
}
}
ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT);
}
/* distribute the overall result to all processes in the other group.
@ -671,6 +1010,213 @@ static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf,
return (rc);
}
/* Non-blocking version of ompi_comm_allreduce_inter */
struct ompi_comm_allreduce_inter_context {
int *inbuf;
int *outbuf;
int count;
struct ompi_op_t *op;
ompi_communicator_t *intercomm;
ompi_communicator_t *bridgecomm;
int *tmpbuf;
int *rcounts;
int *rdisps;
};
static void ompi_comm_allreduce_inter_context_free (struct ompi_comm_allreduce_inter_context *context)
{
if (context->tmpbuf) {
free (context->tmpbuf);
}
if (context->rdisps) {
free (context->rdisps);
}
if (context->rcounts) {
free (context->rcounts);
}
free (context);
}
static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request);
static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request);
static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request);
static int ompi_comm_allreduce_inter_allgather_complete (ompi_comm_request_t *request);
/* Arguments not used in this implementation:
* - bridgecomm
*/
static int ompi_comm_allreduce_inter_nb (int *inbuf, int *outbuf,
int count, struct ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
ompi_request_t **req)
{
struct ompi_comm_allreduce_inter_context *context = NULL;
ompi_comm_request_t *request = NULL;
ompi_request_t *subreq;
int local_rank, rsize, rc;
if (!OMPI_COMM_IS_INTER (intercomm)) {
return MPI_ERR_COMM;
}
request = ompi_comm_request_get ();
if (NULL == request) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
context = calloc (1, sizeof (*context));
if (NULL == context) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
context->inbuf = inbuf;
context->outbuf = outbuf;
context->count = count;
context->op = op;
context->intercomm = intercomm;
context->bridgecomm = bridgecomm;
/* Allocate temporary arrays */
rsize = ompi_comm_remote_size (intercomm);
local_rank = ompi_comm_rank (intercomm);
context->tmpbuf = (int *) calloc (count, sizeof(int));
context->rdisps = (int *) calloc (rsize, sizeof(int));
context->rcounts = (int *) calloc (rsize, sizeof(int));
if (OPAL_UNLIKELY (NULL == context->tmpbuf || NULL == context->rdisps || NULL == context->rcounts)) {
rc = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
request->context = context;
/* Execute the inter-allreduce: the result from the local will be in the buffer of the remote group
* and vise-versa. */
rc = intercomm->c_coll.coll_iallreduce (inbuf, context->tmpbuf, count, MPI_INT, op, intercomm,
&subreq, intercomm->c_coll.coll_iallreduce_module);
if (OMPI_SUCCESS != rc) {
goto exit;
}
if (0 == local_rank) {
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_exchange, &subreq, 1);
} else {
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_allgather, &subreq, 1);
}
ompi_comm_request_start (request);
*req = &request->super;
exit:
if (OMPI_SUCCESS != rc) {
if (context) {
ompi_comm_allreduce_inter_context_free (context);
}
if (request) {
request->context = NULL;
ompi_comm_request_return (request);
}
}
return rc;
}
static int ompi_comm_allreduce_inter_leader_exchange (ompi_comm_request_t *request)
{
struct ompi_comm_allreduce_inter_context *context =
(struct ompi_comm_allreduce_inter_context *) request->context;
ompi_request_t *subreqs[2];
int rc;
/* local leader exchange their data and determine the overall result
for both groups */
rc = MCA_PML_CALL(irecv (context->outbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG,
context->intercomm, subreqs));
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
rc = MCA_PML_CALL(isend (context->tmpbuf, context->count, MPI_INT, 0, OMPI_COMM_ALLREDUCE_TAG,
MCA_PML_BASE_SEND_STANDARD, context->intercomm, subreqs + 1));
if ( OMPI_SUCCESS != rc ) {
goto exit;
}
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_leader_reduce, subreqs, 2);
exit:
if (OMPI_SUCCESS != rc) {
ompi_comm_allreduce_inter_context_free (context);
request->context = NULL;
}
return rc;
}
static int ompi_comm_allreduce_inter_leader_reduce (ompi_comm_request_t *request)
{
struct ompi_comm_allreduce_inter_context *context =
(struct ompi_comm_allreduce_inter_context *) request->context;
ompi_op_reduce (context->op, context->tmpbuf, context->outbuf, context->count, MPI_INT);
return ompi_comm_allreduce_inter_allgather (request);
}
static int ompi_comm_allreduce_inter_allgather (ompi_comm_request_t *request)
{
struct ompi_comm_allreduce_inter_context *context =
(struct ompi_comm_allreduce_inter_context *) request->context;
ompi_request_t *subreq;
int scount = 0, rc;
/* distribute the overall result to all processes in the other group.
Instead of using bcast, we are using here allgatherv, to avoid the
possible deadlock. Else, we need an algorithm to determine,
which group sends first in the inter-bcast and which receives
the result first.
*/
if (0 != ompi_comm_rank (context->intercomm)) {
context->rcounts[0] = context->count;
} else {
scount = context->count;
}
rc = context->intercomm->c_coll.coll_iallgatherv (context->outbuf, scount, MPI_INT, context->outbuf,
context->rcounts, context->rdisps, MPI_INT,
context->intercomm, &subreq,
context->intercomm->c_coll.coll_iallgatherv_module);
if (OMPI_SUCCESS != rc) {
ompi_comm_allreduce_inter_context_free (context);
request->context = NULL;
return rc;
}
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_inter_allgather_complete, &subreq, 1);
return OMPI_SUCCESS;
}
static int ompi_comm_allreduce_inter_allgather_complete (ompi_comm_request_t *request)
{
/* free this request's context */
ompi_comm_allreduce_inter_context_free (request->context);
/* prevent a double-free from the progress engine */
request->context = NULL;
/* done */
return OMPI_SUCCESS;
}
/* Arguments not used in this implementation:
* - send_first
*/
@ -798,7 +1344,6 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,
int send_first )
{
int *tmpbuf=NULL;
int i;
int rc;
int local_leader, local_rank;
ompi_process_name_t *remote_leader=NULL;
@ -809,12 +1354,6 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,
remote_leader = (ompi_process_name_t*)rleader;
size_count = count;
if ( &ompi_mpi_op_sum.op != op && &ompi_mpi_op_prod.op != op &&
&ompi_mpi_op_max.op != op && &ompi_mpi_op_min.op != op ) {
return MPI_ERR_OP;
}
local_rank = ompi_comm_rank ( comm );
tmpbuf = (int *) malloc ( count * sizeof(int));
if ( NULL == tmpbuf ) {
@ -873,30 +1412,7 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,
OBJ_DESTRUCT(&rcid.buf);
count = (int)size_count;
if ( &ompi_mpi_op_max.op == op ) {
for ( i = 0 ; i < count; i++ ) {
if (tmpbuf[i] > outbuf[i]) {
outbuf[i] = tmpbuf[i];
}
}
}
else if ( &ompi_mpi_op_min.op == op ) {
for ( i = 0 ; i < count; i++ ) {
if (tmpbuf[i] < outbuf[i]) {
outbuf[i] = tmpbuf[i];
}
}
}
else if ( &ompi_mpi_op_sum.op == op ) {
for ( i = 0 ; i < count; i++ ) {
outbuf[i] += tmpbuf[i];
}
}
else if ( &ompi_mpi_op_prod.op == op ) {
for ( i = 0 ; i < count; i++ ) {
outbuf[i] *= tmpbuf[i];
}
}
ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT);
}
rc = comm->c_coll.coll_bcast (outbuf, count, MPI_INT,
@ -911,4 +1427,76 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,
return (rc);
}
static int ompi_comm_allreduce_group (int *inbuf, int* outbuf,
int count, struct ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *newcomm,
void* local_leader,
void* remote_leader,
int send_first)
{
ompi_group_t *group = newcomm->c_local_group;
int peers_group[3], peers_comm[3];
const int group_size = ompi_group_size (group);
const int group_rank = ompi_group_rank (group);
int tag = *((int *) local_leader);
int *tmp1;
int i, rc;
/* basic recursive doubling allreduce on the group */
peers_group[0] = group_rank ? ((group_rank - 1) >> 1) : MPI_PROC_NULL;
peers_group[1] = (group_rank * 2 + 1) < group_size ? group_rank * 2 + 1: MPI_PROC_NULL;
peers_group[2] = (group_rank * 2 + 2) < group_size ? group_rank * 2 + 2 : MPI_PROC_NULL;
/* translate the ranks into the ranks of the parent communicator */
ompi_group_translate_ranks (group, 3, peers_group, comm->c_local_group, peers_comm);
tmp1 = malloc (sizeof (int) * count);
/* reduce */
memmove (outbuf, inbuf, sizeof (int) * count);
for (i = 1 ; i < 3 ; ++i) {
if (MPI_PROC_NULL != peers_comm[i]) {
rc = MCA_PML_CALL(recv(tmp1, count, MPI_INT, peers_comm[i], tag, comm,
MPI_STATUS_IGNORE));
if (OMPI_SUCCESS != rc) {
goto out;
}
/* this is integer reduction so we do not care about ordering */
ompi_op_reduce (op, tmp1, outbuf, count, MPI_INT);
}
}
if (MPI_PROC_NULL != peers_comm[0]) {
rc = MCA_PML_CALL(send(outbuf, count, MPI_INT, peers_comm[0],
tag, MCA_PML_BASE_SEND_STANDARD, comm));
if (OMPI_SUCCESS != rc) {
goto out;
}
rc = MCA_PML_CALL(recv(outbuf, count, MPI_INT, peers_comm[0],
tag, comm, MPI_STATUS_IGNORE));
if (OMPI_SUCCESS != rc) {
goto out;
}
}
/* broadcast */
for (i = 1 ; i < 3 ; ++i) {
if (MPI_PROC_NULL != peers_comm[i]) {
rc = MCA_PML_CALL(send(outbuf, count, MPI_INT, peers_comm[i], tag,
MCA_PML_BASE_SEND_STANDARD, comm));
if (OMPI_SUCCESS != rc) {
goto out;
}
}
}
out:
free (tmp1);
return rc;
}
END_C_DECLS

248
ompi/communicator/comm_request.c Обычный файл
Просмотреть файл

@ -0,0 +1,248 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2013 Los Alamos National Security, LLC. All rights
* reseved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "comm_request.h"
opal_free_list_t ompi_comm_requests;
opal_list_t ompi_comm_requests_active;
opal_mutex_t ompi_comm_request_mutex;
bool ompi_comm_request_progress_active;
bool ompi_comm_request_initialized = false;
typedef struct ompi_comm_request_item_t {
opal_list_item_t super;
ompi_comm_request_callback_fn_t callback;
ompi_request_t *subreqs[OMPI_COMM_REQUEST_MAX_SUBREQ];
int subreq_count;
} ompi_comm_request_item_t;
OBJ_CLASS_DECLARATION(ompi_comm_request_item_t);
static int ompi_comm_request_progress (void);
void ompi_comm_request_init (void)
{
OBJ_CONSTRUCT(&ompi_comm_requests, opal_free_list_t);
(void) opal_free_list_init (&ompi_comm_requests, sizeof (ompi_comm_request_t),
OBJ_CLASS(ompi_comm_request_t), 0, -1, 8);
OBJ_CONSTRUCT(&ompi_comm_requests_active, opal_list_t);
ompi_comm_request_progress_active = false;
OBJ_CONSTRUCT(&ompi_comm_request_mutex, opal_mutex_t);
ompi_comm_request_initialized = true;
}
void ompi_comm_request_fini (void)
{
if (!ompi_comm_request_initialized) {
return;
}
ompi_comm_request_initialized = false;
opal_mutex_lock (&ompi_comm_request_mutex);
if (ompi_comm_request_progress_active) {
opal_progress_unregister (ompi_comm_request_progress);
}
opal_mutex_unlock (&ompi_comm_request_mutex);
OBJ_DESTRUCT(&ompi_comm_request_mutex);
OBJ_DESTRUCT(&ompi_comm_requests_active);
OBJ_DESTRUCT(&ompi_comm_requests);
}
int ompi_comm_request_schedule_append (ompi_comm_request_t *request, ompi_comm_request_callback_fn_t callback,
ompi_request_t *subreqs[], int subreq_count)
{
ompi_comm_request_item_t *request_item;
int i;
if (subreq_count > OMPI_COMM_REQUEST_MAX_SUBREQ) {
return OMPI_ERR_BAD_PARAM;
}
request_item = OBJ_NEW(ompi_comm_request_item_t);
if (NULL == request_item) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
request_item->callback = callback;
for (i = 0 ; i < subreq_count ; ++i) {
request_item->subreqs[i] = subreqs[i];
}
request_item->subreq_count = subreq_count;
opal_list_append (&request->schedule, &request_item->super);
return OMPI_SUCCESS;
}
static int ompi_comm_request_progress (void)
{
ompi_comm_request_t *request, *next;
if (opal_mutex_trylock (&ompi_comm_request_mutex)) {
return 0;
}
OPAL_LIST_FOREACH_SAFE(request, next, &ompi_comm_requests_active, ompi_comm_request_t) {
int rc = OMPI_SUCCESS;
if (opal_list_get_size (&request->schedule)) {
ompi_comm_request_item_t *request_item = (ompi_comm_request_item_t *) opal_list_remove_first (&request->schedule);
int item_complete = true;
/* don't call ompi_request_test_all as it causes a recursive call into opal_progress */
while (request_item->subreq_count) {
ompi_request_t *subreq = request_item->subreqs[request_item->subreq_count-1];
if (true == subreq->req_complete) {
ompi_request_free (&subreq);
request_item->subreq_count--;
} else {
item_complete = false;
break;
}
}
if (item_complete) {
if (request_item->callback) {
opal_mutex_unlock (&ompi_comm_request_mutex);
rc = request_item->callback (request);
opal_mutex_lock (&ompi_comm_request_mutex);
}
OBJ_RELEASE(request_item);
} else {
opal_list_prepend (&request->schedule, &request_item->super);
}
}
/* if the request schedule is empty then the request is complete */
if (0 == opal_list_get_size (&request->schedule)) {
opal_list_remove_item (&ompi_comm_requests_active, (opal_list_item_t *) request);
request->super.req_status.MPI_ERROR = (OMPI_SUCCESS == rc) ? MPI_SUCCESS : MPI_ERR_INTERN;
ompi_request_complete (&request->super, true);
}
}
if (0 == opal_list_get_size (&ompi_comm_requests_active)) {
/* no more active requests. disable this progress function */
ompi_comm_request_progress_active = false;
opal_progress_unregister (ompi_comm_request_progress);
}
opal_mutex_unlock (&ompi_comm_request_mutex);
return 1;
}
void ompi_comm_request_start (ompi_comm_request_t *request)
{
opal_mutex_lock (&ompi_comm_request_mutex);
opal_list_append (&ompi_comm_requests_active, (opal_list_item_t *) request);
/* check if we need to start the communicator request progress function */
if (!ompi_comm_request_progress_active) {
opal_progress_register (ompi_comm_request_progress);
ompi_comm_request_progress_active = true;
}
opal_mutex_unlock (&ompi_comm_request_mutex);
}
static int ompi_comm_request_cancel (struct ompi_request_t *ompi_req, int complete)
{
ompi_comm_request_t *tmp, *request = (ompi_comm_request_t *) ompi_req;
ompi_comm_request_item_t *item, *next;
opal_mutex_lock (&ompi_comm_request_mutex);
OPAL_LIST_FOREACH_SAFE(item, next, &request->schedule, ompi_comm_request_item_t) {
for (int i = 0 ; i < item->subreq_count ; ++i) {
ompi_request_cancel (item->subreqs[i]);
}
opal_list_remove_item (&request->schedule, &item->super);
OBJ_RELEASE(item);
}
/* remove the request for the list of active requests */
OPAL_LIST_FOREACH(tmp, &ompi_comm_requests_active, ompi_comm_request_t) {
if (tmp == request) {
opal_list_remove_item (&ompi_comm_requests_active, (opal_list_item_t *) request);
break;
}
}
opal_mutex_unlock (&ompi_comm_request_mutex);
return MPI_ERR_REQUEST;
}
static int ompi_comm_request_free (struct ompi_request_t **ompi_req)
{
ompi_comm_request_t *request = (ompi_comm_request_t *) *ompi_req;
if (!(*ompi_req)->req_complete) {
return MPI_ERR_REQUEST;
}
(*ompi_req)->req_complete = false;
ompi_comm_request_return (request);
*ompi_req = MPI_REQUEST_NULL;
return OMPI_SUCCESS;
}
static void ompi_comm_request_construct (ompi_comm_request_t *request)
{
request->context = NULL;
request->super.req_type = OMPI_REQUEST_COMM;
request->super.req_status._cancelled = 0;
request->super.req_free = ompi_comm_request_free;
request->super.req_cancel = ompi_comm_request_cancel;
OBJ_CONSTRUCT(&request->schedule, opal_list_t);
}
static void ompi_comm_request_destruct (ompi_comm_request_t *request)
{
OBJ_DESTRUCT(&request->schedule);
}
OBJ_CLASS_INSTANCE(ompi_comm_request_t, ompi_request_t,
ompi_comm_request_construct,
ompi_comm_request_destruct);
OBJ_CLASS_INSTANCE(ompi_comm_request_item_t, opal_list_item_t, NULL, NULL);
ompi_comm_request_t *ompi_comm_request_get (void)
{
opal_free_list_item_t *item;
int rc;
OPAL_FREE_LIST_GET(&ompi_comm_requests, item, rc);
(void) rc;
return (ompi_comm_request_t *) item;
}
void ompi_comm_request_return (ompi_comm_request_t *request)
{
if (request->context) {
free (request->context);
request->context = NULL;
}
OPAL_FREE_LIST_RETURN(&ompi_comm_requests, (opal_free_list_item_t *) request);
}

40
ompi/communicator/comm_request.h Обычный файл
Просмотреть файл

@ -0,0 +1,40 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2013 Los Alamos National Security, LLC. All rights
* reseved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#if !defined(OMPI_COMM_REQ_H)
#define OMPI_COMM_REQ_H
#include "opal/class/opal_list.h"
#include "opal/class/opal_free_list.h"
#include "ompi/request/request.h"
/* increase this number if more subrequests are needed */
#define OMPI_COMM_REQUEST_MAX_SUBREQ 2
typedef struct ompi_comm_request_t {
ompi_request_t super;
void *context;
opal_list_t schedule;
} ompi_comm_request_t;
OBJ_CLASS_DECLARATION(ompi_comm_request_t);
typedef int (*ompi_comm_request_callback_fn_t) (ompi_comm_request_t *);
void ompi_comm_request_init (void);
void ompi_comm_request_fini (void);
int ompi_comm_request_schedule_append (ompi_comm_request_t *request, ompi_comm_request_callback_fn_t callback,
ompi_request_t *subreqs[], int subreq_count);
void ompi_comm_request_start (ompi_comm_request_t *request);
ompi_comm_request_t *ompi_comm_request_get (void);
void ompi_comm_request_return (ompi_comm_request_t *request);
#endif /* OMPI_COMM_REQ_H */

Просмотреть файл

@ -31,6 +31,7 @@
#include "opal/class/opal_object.h"
#include "ompi/errhandler/errhandler.h"
#include "opal/threads/mutex.h"
#include "ompi/communicator/comm_request.h"
#include "mpi.h"
#include "ompi/group/group.h"
@ -91,6 +92,7 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t);
#define OMPI_COMM_CID_INTER 0x00000040
#define OMPI_COMM_CID_INTRA_BRIDGE 0x00000080
#define OMPI_COMM_CID_INTRA_OOB 0x00000100
#define OMPI_COMM_CID_GROUP 0x00000200
/**
* The block of CIDs allocated for MPI_COMM_WORLD
@ -359,6 +361,13 @@ OMPI_DECLSPEC int ompi_comm_group (ompi_communicator_t *comm, ompi_group_t **gro
int ompi_comm_create (ompi_communicator_t* comm, ompi_group_t *group,
ompi_communicator_t** newcomm);
/**
* Non-collective create communicator based on a group
*/
int ompi_comm_create_group (ompi_communicator_t *comm, ompi_group_t *group, int tag,
ompi_communicator_t **newcomm);
/**
* Take an almost complete communicator and reserve the CID as well
* as activate it (initialize the collective and the topologies).
@ -416,6 +425,37 @@ OMPI_DECLSPEC int ompi_comm_split_type(ompi_communicator_t *comm,
* @param newcomm: the new communicator or MPI_COMM_NULL if any error is detected.
*/
OMPI_DECLSPEC int ompi_comm_dup (ompi_communicator_t *comm, ompi_communicator_t **newcomm);
/**
* dup a communicator (non-blocking). Parameter are identical to the MPI-counterpart
* of the function. It has been extracted, since we need to be able
* to dup a communicator internally as well.
*
* @param comm: input communicator
* @param newcomm: the new communicator or MPI_COMM_NULL if any error is detected.
*/
OMPI_DECLSPEC int ompi_comm_idup (ompi_communicator_t *comm, ompi_communicator_t **newcomm, ompi_request_t **request);
/**
* dup a communicator with info. Parameter are identical to the MPI-counterpart
* of the function. It has been extracted, since we need to be able
* to dup a communicator internally as well.
*
* @param comm: input communicator
* @param newcomm: the new communicator or MPI_COMM_NULL if any error is detected.
*/
OMPI_DECLSPEC int ompi_comm_dup_with_info (ompi_communicator_t *comm, ompi_info_t *info, ompi_communicator_t **newcomm);
/**
* dup a communicator (non-blocking) with info.
* of the function. It has been extracted, since we need to be able
* to dup a communicator internally as well.
*
* @param comm: input communicator
* @param newcomm: the new communicator or MPI_COMM_NULL if any error is detected.
*/
OMPI_DECLSPEC int ompi_comm_idup_with_info (ompi_communicator_t *comm, ompi_info_t *info, ompi_communicator_t **newcomm, ompi_request_t **req);
/**
* compare two communicators.
*
@ -469,6 +509,21 @@ OMPI_DECLSPEC int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
int mode,
int send_first);
/**
* allocate new communicator ID (non-blocking)
* @param newcomm: pointer to the new communicator
* @param oldcomm: original comm
* @param bridgecomm: bridge comm for intercomm_create
* @param mode: combination of input
* OMPI_COMM_CID_INTRA: intra-comm
* OMPI_COMM_CID_INTER: inter-comm
* This routine has to be thread safe in the final version.
*/
OMPI_DECLSPEC int ompi_comm_nextcid_nb (ompi_communicator_t* newcomm,
ompi_communicator_t* comm,
ompi_communicator_t* bridgecomm,
int mode, ompi_request_t **req);
/**
* shut down the communicator infrastructure.
*/
@ -489,6 +544,20 @@ OMPI_DECLSPEC int ompi_comm_set ( ompi_communicator_t** newcomm,
bool copy_topocomponent,
ompi_group_t *local_group,
ompi_group_t *remote_group );
OMPI_DECLSPEC int ompi_comm_set_nb ( ompi_communicator_t **ncomm,
ompi_communicator_t *oldcomm,
int local_size,
int *local_ranks,
int remote_size,
int *remote_ranks,
opal_hash_table_t *attr,
ompi_errhandler_t *errh,
bool copy_topocomponent,
ompi_group_t *local_group,
ompi_group_t *remote_group,
ompi_request_t **req );
/**
* This is a short-hand routine used in intercomm_create.
* The routine makes sure, that all processes have afterwards
@ -525,6 +594,11 @@ OMPI_DECLSPEC int ompi_comm_activate ( ompi_communicator_t** newcomm,
int mode,
int send_first );
OMPI_DECLSPEC int ompi_comm_activate_nb (ompi_communicator_t **newcomm,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
int mode, ompi_request_t **req);
/**
* a simple function to dump the structure
*/

Просмотреть файл

@ -1240,10 +1240,13 @@ OMPI_DECLSPEC int MPI_Comm_create_errhandler(MPI_Comm_errhandler_function *func
OMPI_DECLSPEC int MPI_Comm_create_keyval(MPI_Comm_copy_attr_function *comm_copy_attr_fn,
MPI_Comm_delete_attr_function *comm_delete_attr_fn,
int *comm_keyval, void *extra_state);
OMPI_DECLSPEC int MPI_Comm_create_group(MPI_Comm comm, MPI_Group group, int tag, MPI_Comm *newcomm);
OMPI_DECLSPEC int MPI_Comm_create(MPI_Comm comm, MPI_Group group, MPI_Comm *newcomm);
OMPI_DECLSPEC int MPI_Comm_delete_attr(MPI_Comm comm, int comm_keyval);
OMPI_DECLSPEC int MPI_Comm_disconnect(MPI_Comm *comm);
OMPI_DECLSPEC int MPI_Comm_dup(MPI_Comm comm, MPI_Comm *newcomm);
OMPI_DECLSPEC int MPI_Comm_idup(MPI_Comm comm, MPI_Comm *newcomm, MPI_Request *request);
OMPI_DECLSPEC int MPI_Comm_dup_with_info(MPI_Comm comm, MPI_Info info, MPI_Comm *newcomm);
OMPI_DECLSPEC MPI_Comm MPI_Comm_f2c(MPI_Fint comm);
OMPI_DECLSPEC int MPI_Comm_free_keyval(int *comm_keyval);
OMPI_DECLSPEC int MPI_Comm_free(MPI_Comm *comm);
@ -1909,10 +1912,13 @@ OMPI_DECLSPEC int PMPI_Comm_create_errhandler(MPI_Comm_errhandler_function *fun
OMPI_DECLSPEC int PMPI_Comm_create_keyval(MPI_Comm_copy_attr_function *comm_copy_attr_fn,
MPI_Comm_delete_attr_function *comm_delete_attr_fn,
int *comm_keyval, void *extra_state);
OMPI_DECLSPEC int PMPI_Comm_create_group(MPI_Comm comm, MPI_Group group, int tag, MPI_Comm *newcomm);
OMPI_DECLSPEC int PMPI_Comm_create(MPI_Comm comm, MPI_Group group, MPI_Comm *newcomm);
OMPI_DECLSPEC int PMPI_Comm_delete_attr(MPI_Comm comm, int comm_keyval);
OMPI_DECLSPEC int PMPI_Comm_disconnect(MPI_Comm *comm);
OMPI_DECLSPEC int PMPI_Comm_dup(MPI_Comm comm, MPI_Comm *newcomm);
OMPI_DECLSPEC int PMPI_Comm_idup(MPI_Comm comm, MPI_Comm *newcomm, MPI_Request *request);
OMPI_DECLSPEC int PMPI_Comm_dup_with_info(MPI_Comm comm, MPI_Info info, MPI_Comm *newcomm);
OMPI_DECLSPEC MPI_Comm PMPI_Comm_f2c(MPI_Fint comm);
OMPI_DECLSPEC int PMPI_Comm_free_keyval(int *comm_keyval);
OMPI_DECLSPEC int PMPI_Comm_free(MPI_Comm *comm);

Просмотреть файл

@ -11,7 +11,9 @@
# All rights reserved.
# Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2011 Sandia National Laboratories. All rights reserved.
# Copyright (c) 2012 Oak Rigde National Laboratory. All rights reserved.
# Copyright (c) 2012 Oak Ridge National Laboratory. All rights reserved.
# Copyright (c) 2013 Los Alamos National Security, LLC. All rights
# reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
@ -106,10 +108,13 @@ libmpi_c_mpi_la_SOURCES = \
comm_connect.c \
comm_create.c \
comm_create_errhandler.c \
comm_create_group.c \
comm_create_keyval.c \
comm_delete_attr.c \
comm_disconnect.c \
comm_dup.c \
comm_dup_with_info.c \
comm_idup.c \
comm_f2c.c \
comm_free.c \
comm_free_keyval.c \

80
ompi/mpi/c/comm_create_group.c Обычный файл
Просмотреть файл

@ -0,0 +1,80 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2008 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <stdio.h>
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/params.h"
#include "ompi/communicator/communicator.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/memchecker.h"
#if OPAL_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES
#pragma weak MPI_Comm_create_group = PMPI_Comm_create_group
#endif
#if OMPI_PROFILING_DEFINES
#include "ompi/mpi/c/profile/defines.h"
#endif
static const char FUNC_NAME[] = "MPI_Comm_create_group";
int MPI_Comm_create_group (MPI_Comm comm, MPI_Group group, int tag, MPI_Comm *newcomm) {
int rc;
MEMCHECKER(
memchecker_comm(comm);
);
if ( MPI_PARAM_CHECK ) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
if (ompi_comm_invalid (comm))
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_COMM,
FUNC_NAME);
if (tag < 0 || tag > mca_pml.pml_max_tag)
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_TAG,
FUNC_NAME);
if ( NULL == group )
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_GROUP,
FUNC_NAME);
if ( NULL == newcomm )
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
}
if (MPI_GROUP_NULL == group || MPI_UNDEFINED == ompi_group_rank (group)) {
*newcomm = MPI_COMM_NULL;
return MPI_SUCCESS;
}
OPAL_CR_ENTER_LIBRARY();
rc = ompi_comm_create_group ((ompi_communicator_t *) comm, (ompi_group_t *) group,
tag, (ompi_communicator_t **) newcomm);
OMPI_ERRHANDLER_RETURN (rc, comm, rc, FUNC_NAME);
}

72
ompi/mpi/c/comm_dup_with_info.c Обычный файл
Просмотреть файл

@ -0,0 +1,72 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2008 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2006-2008 University of Houston. All rights reserved.
* Copyright (c) 2013 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <stdio.h>
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/params.h"
#include "ompi/communicator/communicator.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/memchecker.h"
#if OPAL_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES
#pragma weak MPI_Comm_dup_with_info = PMPI_Comm_dup_with_info
#endif
#if OMPI_PROFILING_DEFINES
#include "ompi/mpi/c/profile/defines.h"
#endif
static const char FUNC_NAME[] = "MPI_Comm_dup_with_info";
int MPI_Comm_dup_with_info(MPI_Comm comm, MPI_Info info, MPI_Comm *newcomm)
{
int rc;
MEMCHECKER(
memchecker_comm(comm);
);
/* argument checking */
if ( MPI_PARAM_CHECK ) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
if (ompi_comm_invalid (comm))
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_COMM,
FUNC_NAME);
if (NULL == info || ompi_info_is_freed(info)) {
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_INFO,
FUNC_NAME);
}
if ( NULL == newcomm )
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
}
OPAL_CR_ENTER_LIBRARY();
rc = ompi_comm_dup_with_info (comm, info, newcomm);
OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME);
}

68
ompi/mpi/c/comm_idup.c Обычный файл
Просмотреть файл

@ -0,0 +1,68 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2008 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2006-2008 University of Houston. All rights reserved.
* Copyright (c) 2013 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <stdio.h>
#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/params.h"
#include "ompi/communicator/communicator.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/memchecker.h"
#if OPAL_HAVE_WEAK_SYMBOLS && OMPI_PROFILING_DEFINES
#pragma weak MPI_Comm_idup = PMPI_Comm_idup
#endif
#if OMPI_PROFILING_DEFINES
#include "ompi/mpi/c/profile/defines.h"
#endif
static const char FUNC_NAME[] = "MPI_Comm_idup";
int MPI_Comm_idup(MPI_Comm comm, MPI_Comm *newcomm, MPI_Request *request)
{
int rc;
MEMCHECKER(
memchecker_comm(comm);
);
/* argument checking */
if ( MPI_PARAM_CHECK ) {
OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
if (ompi_comm_invalid (comm))
return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_COMM,
FUNC_NAME);
if ( NULL == newcomm )
return OMPI_ERRHANDLER_INVOKE(comm, MPI_ERR_ARG,
FUNC_NAME);
}
OPAL_CR_ENTER_LIBRARY();
rc = ompi_comm_idup (comm, newcomm, request);
OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME);
}

Просмотреть файл

@ -12,7 +12,9 @@
# All rights reserved.
# Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2011 Sandia National Laboratories. All rights reserved.
# Copyright (c) 2012 Oak Rigde National Laboratory. All rights reserved.
# Copyright (c) 2012 Oak Ridge National Laboratory. All rights reserved.
# Copyright (c) 2013 Los Alamos National Security, LLC. All rights
# reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
@ -88,10 +90,13 @@ nodist_libmpi_c_pmpi_la_SOURCES = \
pcomm_connect.c \
pcomm_create.c \
pcomm_create_errhandler.c \
pcomm_create_group.c \
pcomm_create_keyval.c \
pcomm_delete_attr.c \
pcomm_disconnect.c \
pcomm_dup.c \
pcomm_dup_with_info.c \
pcomm_idup.c \
pcomm_f2c.c \
pcomm_free.c \
pcomm_free_keyval.c \

Просмотреть файл

@ -76,10 +76,13 @@
#define MPI_Comm_connect PMPI_Comm_connect
#define MPI_Comm_create_errhandler PMPI_Comm_create_errhandler
#define MPI_Comm_create_keyval PMPI_Comm_create_keyval
#define MPI_Comm_create_group PMPI_Comm_create_group
#define MPI_Comm_create PMPI_Comm_create
#define MPI_Comm_delete_attr PMPI_Comm_delete_attr
#define MPI_Comm_disconnect PMPI_Comm_disconnect
#define MPI_Comm_dup PMPI_Comm_dup
#define MPI_Comm_dup_with_info PMPI_Comm_dup_with_info
#define MPI_Comm_idup PMPI_Comm_idup
#define MPI_Comm_f2c PMPI_Comm_f2c
#define MPI_Comm_free_keyval PMPI_Comm_free_keyval
#define MPI_Comm_free PMPI_Comm_free

Просмотреть файл

@ -27,6 +27,7 @@ typedef enum {
OMPI_REQUEST_COLL, /**< MPI-3 non-blocking collectives request */
OMPI_REQUEST_NULL, /**< NULL request */
OMPI_REQUEST_NOOP, /**< A request that does nothing (e.g., to PROC_NULL) */
OMPI_REQUEST_COMM, /**< MPI-3 non-blocking communicator duplication */
OMPI_REQUEST_MAX /**< Maximum request type */
} ompi_request_type_t;

Просмотреть файл

@ -283,6 +283,9 @@ int ompi_mpi_finalize(void)
return ret;
}
/* release resources held by comm requests */
ompi_comm_request_fini ();
if (OMPI_SUCCESS != (ret = ompi_message_finalize())) {
return ret;
}

Просмотреть файл

@ -871,6 +871,9 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
goto error;
}
/* Prepare communicator requests */
ompi_comm_request_init ();
/* Init coll for the comms. This has to be after dpm_base_select,
(since dpm.mark_dyncomm is not set in the communicator creation
function else), but before dpm.dyncom_init, since this function