/* -*- Mode: C; c-basic-offset:4 ; -*- */ /* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2007 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006-2007 University of Houston. All rights reserved. * Copyright (c) 2006-2007 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2007 Cisco, Inc. All rights reserved. * * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "ompi_config.h" #include #include #ifdef HAVE_SYS_UIO_H #include #endif #ifdef HAVE_NET_UIO_H #include #endif #ifdef HAVE_UNISTD_H #include #endif #ifdef HAVE_SYS_TIME_H #include #endif /* HAVE_SYS_TIME_H */ #include "opal/util/opal_environ.h" #include "opal/util/printf.h" #include "opal/util/convert.h" #include "opal/threads/mutex.h" #include "opal/util/bit_ops.h" #include "opal/util/argv.h" #include "ompi/communicator/communicator.h" #include "ompi/request/request.h" #include "ompi/errhandler/errhandler.h" #include "ompi/proc/proc.h" #include "ompi/info/info.h" #include "ompi/constants.h" #include "ompi/mca/pml/pml.h" #include "ompi/runtime/ompi_module_exchange.h" #include "orte/util/proc_info.h" #include "orte/dss/dss.h" #include "orte/mca/ns/ns.h" #include "orte/mca/gpr/gpr.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/ras/ras_types.h" #include "orte/mca/rmaps/rmaps_types.h" #include "orte/mca/rmgr/rmgr.h" #include "orte/mca/rmgr/base/base.h" #include "orte/mca/smr/smr_types.h" #include "orte/mca/rml/rml.h" #include "orte/mca/grpcomm/grpcomm.h" #include "orte/runtime/runtime.h" static int ompi_comm_get_rport (orte_process_name_t *port, int send_first, struct ompi_proc_t *proc, orte_rml_tag_t tag, orte_process_name_t *rport); int ompi_comm_connect_accept ( ompi_communicator_t *comm, int root, orte_process_name_t *port, int send_first, ompi_communicator_t **newcomm, orte_rml_tag_t tag ) { int size, rsize, rank, rc; orte_std_cntr_t num_vals; orte_std_cntr_t rnamebuflen = 0; int rnamebuflen_int = 0; void *rnamebuf=NULL; ompi_communicator_t *newcomp=MPI_COMM_NULL; ompi_proc_t **rprocs=NULL; ompi_group_t *group=comm->c_local_group; orte_process_name_t *rport=NULL, tmp_port_name; orte_buffer_t *nbuf=NULL, *nrbuf=NULL; ompi_proc_t **proc_list=NULL, **new_proc_list; int i,j, new_proc_len; ompi_group_t *new_group_pointer; size = ompi_comm_size ( comm ); rank = ompi_comm_rank ( comm ); /* tell the progress engine to tick the event library more often, to make sure that the OOB messages get sent */ opal_progress_event_users_increment(); if ( rank == root ) { /* The process receiving first does not have yet the contact information of the remote process. Therefore, we have to exchange that. */ if(!OMPI_GROUP_IS_DENSE(group)) { proc_list = (ompi_proc_t **) calloc (group->grp_proc_count, sizeof (ompi_proc_t *)); for(i=0 ; igrp_proc_count ; i++) proc_list[i] = ompi_group_peer_lookup(group,i); } if ( OMPI_COMM_JOIN_TAG != (int)tag ) { if(OMPI_GROUP_IS_DENSE(group)){ rc = ompi_comm_get_rport(port,send_first, group->grp_proc_pointers[rank], tag, &tmp_port_name); } else { rc = ompi_comm_get_rport(port,send_first, proc_list[rank], tag, &tmp_port_name); } if (OMPI_SUCCESS != rc) { return rc; } rport = &tmp_port_name; } else { rport = port; } /* Generate the message buffer containing the number of processes and the list of participating processes */ nbuf = OBJ_NEW(orte_buffer_t); if (NULL == nbuf) { return OMPI_ERROR; } if (ORTE_SUCCESS != (rc = orte_dss.pack(nbuf, &size, 1, ORTE_INT))) { ORTE_ERROR_LOG(rc); goto exit; } if(OMPI_GROUP_IS_DENSE(group)) { ompi_proc_pack(group->grp_proc_pointers, size, nbuf); } else { ompi_proc_pack(proc_list, size, nbuf); } nrbuf = OBJ_NEW(orte_buffer_t); if (NULL == nrbuf ) { rc = OMPI_ERROR; goto exit; } /* Exchange the number and the list of processes in the groups */ if ( send_first ) { rc = orte_rml.send_buffer(rport, nbuf, tag, 0); rc = orte_rml.recv_buffer(rport, nrbuf, tag, 0); } else { rc = orte_rml.recv_buffer(rport, nrbuf, tag, 0); rc = orte_rml.send_buffer(rport, nbuf, tag, 0); } if (ORTE_SUCCESS != (rc = orte_dss.unload(nrbuf, &rnamebuf, &rnamebuflen))) { ORTE_ERROR_LOG(rc); goto exit; } } /* First convert the size_t to an int so we can cast in the bcast to a void * * if we don't then we will get badness when using big vs little endian * THIS IS NO LONGER REQUIRED AS THE LENGTH IS NOW A STD_CNTR_T, WHICH * CORRELATES TO AN INT32 */ rnamebuflen_int = (int)rnamebuflen; /* bcast the buffer-length to all processes in the local comm */ rc = comm->c_coll.coll_bcast (&rnamebuflen_int, 1, MPI_INT, root, comm, comm->c_coll.coll_bcast_module); if ( OMPI_SUCCESS != rc ) { goto exit; } rnamebuflen = rnamebuflen_int; if ( rank != root ) { /* non root processes need to allocate the buffer manually */ rnamebuf = (char *) malloc(rnamebuflen); if ( NULL == rnamebuf ) { rc = OMPI_ERR_OUT_OF_RESOURCE; goto exit; } } /* bcast list of processes to all procs in local group and reconstruct the data. Note that proc_get_proclist adds processes, which were not known yet to our process pool. */ rc = comm->c_coll.coll_bcast (rnamebuf, rnamebuflen_int, MPI_BYTE, root, comm, comm->c_coll.coll_bcast_module); if ( OMPI_SUCCESS != rc ) { goto exit; } nrbuf = OBJ_NEW(orte_buffer_t); if (NULL == nrbuf) { goto exit; } if ( ORTE_SUCCESS != ( rc = orte_dss.load(nrbuf, rnamebuf, rnamebuflen))) { ORTE_ERROR_LOG(rc); goto exit; } num_vals = 1; if (ORTE_SUCCESS != (rc = orte_dss.unpack(nrbuf, &rsize, &num_vals, ORTE_INT))) { ORTE_ERROR_LOG(rc); goto exit; } rc = ompi_proc_unpack(nrbuf, rsize, &rprocs, &new_proc_len, &new_proc_list); if ( OMPI_SUCCESS != rc ) { goto exit; } /* If we added new procs, we need to do the modex and then call PML add_procs */ if (new_proc_len > 0) { opal_list_t all_procs; orte_namelist_t *name; orte_buffer_t mdx_buf, rbuf; OBJ_CONSTRUCT(&all_procs, opal_list_t); if (send_first) { for (i = 0 ; i < group->grp_proc_count ; ++i) { name = OBJ_NEW(orte_namelist_t); name->name = &(ompi_group_peer_lookup(group, i)->proc_name); opal_list_append(&all_procs, &name->item); } for (i = 0 ; i < rsize ; ++i) { name = OBJ_NEW(orte_namelist_t); name->name = &(rprocs[i]->proc_name); opal_list_append(&all_procs, &name->item); } } else { for (i = 0 ; i < rsize ; ++i) { name = OBJ_NEW(orte_namelist_t); name->name = &(rprocs[i]->proc_name); opal_list_append(&all_procs, &name->item); } for (i = 0 ; i < group->grp_proc_count ; ++i) { name = OBJ_NEW(orte_namelist_t); name->name = &(ompi_group_peer_lookup(group, i)->proc_name); opal_list_append(&all_procs, &name->item); } } OBJ_CONSTRUCT(&mdx_buf, orte_buffer_t); if (OMPI_SUCCESS != (rc = ompi_modex_get_my_buffer(&mdx_buf))) { ORTE_ERROR_LOG(rc); goto exit; } OBJ_CONSTRUCT(&rbuf, orte_buffer_t); if (OMPI_SUCCESS != (rc = orte_grpcomm.allgather_list(&all_procs, &mdx_buf, &rbuf))) { ORTE_ERROR_LOG(rc); goto exit; } OBJ_DESTRUCT(&mdx_buf); if (OMPI_SUCCESS != (rc = ompi_modex_process_data(&rbuf))) { ORTE_ERROR_LOG(rc); goto exit; } OBJ_DESTRUCT(&rbuf); /* while (NULL != (item = opal_list_remove_first(&all_procs))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&all_procs); */ MCA_PML_CALL(add_procs(new_proc_list, new_proc_len)); } OBJ_RELEASE(nrbuf); if ( rank == root ) { OBJ_RELEASE(nbuf); } new_group_pointer=ompi_group_allocate(rsize); if( NULL == new_group_pointer ) { return MPI_ERR_GROUP; } /* put group elements in the list */ for (j = 0; j < rsize; j++) { new_group_pointer->grp_proc_pointers[j] = rprocs[j]; } /* end proc loop */ /* increment proc reference counters */ ompi_group_increment_proc_count(new_group_pointer); /* set up communicator structure */ rc = ompi_comm_set ( &newcomp, /* new comm */ comm, /* old comm */ group->grp_proc_count, /* local_size */ NULL, /* local_procs */ rsize, /* remote_size */ NULL , /* remote_procs */ NULL, /* attrs */ comm->error_handler, /* error handler */ NULL, /* topo component */ group, /* local group */ new_group_pointer /* remote group */ ); if ( NULL == newcomp ) { rc = OMPI_ERR_OUT_OF_RESOURCE; goto exit; } ompi_group_decrement_proc_count (new_group_pointer); OBJ_RELEASE(new_group_pointer); new_group_pointer = MPI_GROUP_NULL; /* allocate comm_cid */ rc = ompi_comm_nextcid ( newcomp, /* new communicator */ comm, /* old communicator */ NULL, /* bridge comm */ &root, /* local leader */ rport, /* remote leader */ OMPI_COMM_CID_INTRA_OOB, /* mode */ send_first ); /* send or recv first */ if ( OMPI_SUCCESS != rc ) { goto exit; } /* activate comm and init coll-component */ rc = ompi_comm_activate ( newcomp, /* new communicator */ comm, /* old communicator */ NULL, /* bridge comm */ &root, /* local leader */ rport, /* remote leader */ OMPI_COMM_CID_INTRA_OOB, /* mode */ send_first, /* send or recv first */ 0); /* sync_flag */ if ( OMPI_SUCCESS != rc ) { goto exit; } /* Question: do we have to re-start some low level stuff to enable the usage of fast communication devices between the two worlds ? */ exit: /* done with OOB and such - slow our tick rate again */ opal_progress(); opal_progress_event_users_decrement(); if ( NULL != rprocs ) { free ( rprocs ); } if ( NULL != proc_list ) { free ( proc_list ); } if ( OMPI_SUCCESS != rc ) { if ( MPI_COMM_NULL != newcomp && NULL != newcomp ) { OBJ_RETAIN(newcomp); newcomp = MPI_COMM_NULL; } } *newcomm = newcomp; return rc; } /**********************************************************************/ /**********************************************************************/ /**********************************************************************/ /* * This routine is necessary, since in the connect/accept case, the processes * executing the connect operation have the OOB contact information of the * leader of the remote group, however, the processes executing the * accept get their own port_name = OOB contact information passed in as * an argument. This is however useless. * * Therefore, the two root processes exchange this information at this * point. * */ int ompi_comm_get_rport(orte_process_name_t *port, int send_first, ompi_proc_t *proc, orte_rml_tag_t tag, orte_process_name_t *rport_name) { int rc; orte_std_cntr_t num_vals; if ( send_first ) { orte_buffer_t *sbuf; sbuf = OBJ_NEW(orte_buffer_t); if (NULL == sbuf) { return OMPI_ERROR; } if (ORTE_SUCCESS != (rc = orte_dss.pack(sbuf, &(proc->proc_name), 1, ORTE_NAME))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(sbuf); return rc; } rc = orte_rml.send_buffer(port, sbuf, tag, 0); OBJ_RELEASE(sbuf); if ( 0 > rc ) { ORTE_ERROR_LOG(rc); return rc; } *rport_name = *port; } else { orte_buffer_t *rbuf; rbuf = OBJ_NEW(orte_buffer_t); if (NULL == rbuf) { return ORTE_ERROR; } if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer(ORTE_NAME_WILDCARD, rbuf, tag, 0))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(rbuf); return rc; } num_vals = 1; if (ORTE_SUCCESS != (rc = orte_dss.unpack(rbuf, rport_name, &num_vals, ORTE_NAME))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(rbuf); return rc; } OBJ_RELEASE(rbuf); } return OMPI_SUCCESS; } /**********************************************************************/ /**********************************************************************/ /**********************************************************************/ int ompi_comm_start_processes(int count, char **array_of_commands, char ***array_of_argv, int *array_of_maxprocs, MPI_Info *array_of_info, char *port_name) { int rc, i, j, counter; int have_wdir=0; bool have_prefix; int valuelen=OMPI_PATH_MAX, flag=0; char cwd[OMPI_PATH_MAX]; char host[OMPI_PATH_MAX]; /*** should define OMPI_HOST_MAX ***/ char prefix[OMPI_PATH_MAX]; char *base_prefix; orte_std_cntr_t num_apps, ai; orte_jobid_t new_jobid=ORTE_JOBID_INVALID; orte_app_context_t **apps=NULL; opal_list_t attributes; opal_list_item_t *item; bool timing = false; struct timeval ompistart, ompistop; int param, value; /* parse the info object */ /* check potentially for: - "host": desired host where to spawn the processes - "prefix": the path to the root of the directory tree where ompi executables and libraries can be found - "arch": desired architecture - "wdir": directory, where executable can be found - "path": list of directories where to look for the executable - "file": filename, where additional information is provided. - "soft": see page 92 of MPI-2. */ /* make sure the progress engine properly trips the event library */ opal_progress_event_users_increment(); /* check to see if we want timing information */ param = mca_base_param_reg_int_name("ompi", "timing", "Request that critical timing loops be measured", false, false, 0, &value); if (value != 0) { timing = true; if (0 != gettimeofday(&ompistart, NULL)) { opal_output(0, "ompi_comm_start_procs: could not obtain start time"); ompistart.tv_sec = 0; ompistart.tv_usec = 0; } } /* setup to record the attributes */ OBJ_CONSTRUCT(&attributes, opal_list_t); /* we want to be able to default the prefix to the one used for this job * so that the ompi executables and libraries can be found. the user can * later override this value by providing an MPI_Info value. for now, though, * let's get the default value off the registry */ rc = orte_rmgr.get_app_context(orte_process_info.my_name->jobid, &apps, &num_apps); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); return rc; } /* we'll just use the prefix from the first member of the app_context array. * this shouldn't matter as they all should be the same. it could be NULL, of * course (user might not have specified it), so we need to protect against that. * * It's possible that no app_contexts are returned (e.g., during a comm_spawn * from a singleton), so check first */ if (NULL != apps && NULL != apps[0]->prefix_dir) { base_prefix = strdup(apps[0]->prefix_dir); } else { base_prefix = NULL; } /* cleanup the memory we used */ if(NULL != apps) { for (ai = 0; ai < num_apps; ai++) { OBJ_RELEASE(apps[ai]); } free(apps); } /* Convert the list of commands to an array of orte_app_context_t pointers */ apps = (orte_app_context_t**)malloc(count * sizeof(orte_app_context_t *)); if (NULL == apps) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } for (i = 0; i < count; ++i) { apps[i] = OBJ_NEW(orte_app_context_t); if (NULL == apps[i]) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); /* rollback what was already done */ for (j=0; j < i; j++) OBJ_RELEASE(apps[j]); opal_progress_event_users_decrement(); return ORTE_ERR_OUT_OF_RESOURCE; } /* copy over the name of the executable */ apps[i]->app = strdup(array_of_commands[i]); if (NULL == apps[i]->app) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); /* rollback what was already done */ for (j=0; j < i; j++) OBJ_RELEASE(apps[j]); opal_progress_event_users_decrement(); return ORTE_ERR_OUT_OF_RESOURCE; } /* record the number of procs to be generated */ apps[i]->num_procs = array_of_maxprocs[i]; /* copy over the argv array */ counter = 1; if (MPI_ARGVS_NULL != array_of_argv && MPI_ARGV_NULL != array_of_argv[i]) { /* first need to find out how many entries there are */ j=0; while (NULL != array_of_argv[i][j]) { j++; } counter += j; } /* now copy them over, ensuring to NULL terminate the array */ apps[i]->argv = (char**)malloc((1 + counter) * sizeof(char*)); if (NULL == apps[i]->argv) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); /* rollback what was already done */ for (j=0; j < i; j++) { OBJ_RELEASE(apps[j]); } opal_progress_event_users_decrement(); return ORTE_ERR_OUT_OF_RESOURCE; } apps[i]->argv[0] = strdup(array_of_commands[i]); for (j=1; j < counter; j++) { apps[i]->argv[j] = strdup(array_of_argv[i][j-1]); } apps[i]->argv[counter] = NULL; /* the environment gets set by the launcher * all we need to do is add the specific values * needed for comm_spawn */ /* Add environment variable with the contact information for the child processes. */ counter = 1; apps[i]->env = (char**)malloc((1+counter) * sizeof(char*)); if (NULL == apps[i]->env) { ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); /* rollback what was already done */ for (j=0; j < i; j++) OBJ_RELEASE(apps[j]); opal_progress_event_users_decrement(); return ORTE_ERR_OUT_OF_RESOURCE; } asprintf(&(apps[i]->env[0]), "OMPI_PARENT_PORT=%s", port_name); apps[i]->env[1] = NULL; for (j = 0; NULL != environ[j]; ++j) { if (0 == strncmp("OMPI_", environ[j], 5)) { opal_argv_append_nosize(&apps[i]->env, environ[j]); } } /* Check for well-known info keys */ have_wdir = 0; have_prefix = false; if ( array_of_info != NULL && array_of_info[i] != MPI_INFO_NULL ) { /* check for 'wdir' */ ompi_info_get (array_of_info[i], "wdir", valuelen, cwd, &flag); if ( flag ) { apps[i]->cwd = strdup(cwd); have_wdir = 1; } /* check for 'host' */ ompi_info_get (array_of_info[i], "host", sizeof(host), host, &flag); if ( flag ) { apps[i]->num_map = 1; apps[i]->map_data = (orte_app_context_map_t **) malloc(sizeof(orte_app_context_map_t *)); apps[i]->map_data[0] = OBJ_NEW(orte_app_context_map_t); apps[i]->map_data[0]->map_type = ORTE_APP_CONTEXT_MAP_HOSTNAME; apps[i]->map_data[0]->map_data = strdup(host); } /* 'path', 'arch', 'file', 'soft' -- to be implemented */ /* check for 'ompi_prefix' (OMPI-specific -- to effect the same * behavior as --prefix option to orterun) */ ompi_info_get (array_of_info[i], "ompi_prefix", sizeof(prefix), prefix, &flag); if ( flag ) { apps[i]->prefix_dir = strdup(prefix); have_prefix = true; } } /* default value: If the user did not tell us where to look for the executable, we assume the current working directory */ if ( !have_wdir ) { getcwd(cwd, OMPI_PATH_MAX); apps[i]->cwd = strdup(cwd); } /* if the user told us a new prefix, then we leave it alone. otherwise, if * a prefix had been provided before, copy that one into the new app_context * for use by the spawned children */ if ( !have_prefix && NULL != base_prefix) { apps[i]->prefix_dir = strdup(base_prefix); } /* leave the map info alone - the launcher will * decide where to put things */ } /* for (i = 0 ; i < count ; ++i) */ /* cleanup */ if (NULL != base_prefix) { free(base_prefix); } /* tell the RTE that we want to be the new job to be a child of this process' job */ if (ORTE_SUCCESS != (rc = orte_rmgr.add_attribute(&attributes, ORTE_NS_USE_PARENT, ORTE_JOBID, &(orte_process_info.my_name->jobid), ORTE_RMGR_ATTR_OVERRIDE))) { ORTE_ERROR_LOG(rc); OBJ_DESTRUCT(&attributes); opal_progress_event_users_decrement(); return MPI_ERR_SPAWN; } /* tell the RTE that we want to the children to run inside of our allocation - * don't go get one just for them */ if (ORTE_SUCCESS != (rc = orte_rmgr.add_attribute(&attributes, ORTE_RAS_USE_PARENT_ALLOCATION, ORTE_JOBID, &(orte_process_info.my_name->jobid), ORTE_RMGR_ATTR_OVERRIDE))) { ORTE_ERROR_LOG(rc); OBJ_DESTRUCT(&attributes); opal_progress_event_users_decrement(); return MPI_ERR_SPAWN; } /* tell the RTE that we want the children mapped the same way as their parent */ if (ORTE_SUCCESS != (rc = orte_rmgr.add_attribute(&attributes, ORTE_RMAPS_USE_PARENT_PLAN, ORTE_JOBID, &(orte_process_info.my_name->jobid), ORTE_RMGR_ATTR_OVERRIDE))) { ORTE_ERROR_LOG(rc); OBJ_DESTRUCT(&attributes); opal_progress_event_users_decrement(); return MPI_ERR_SPAWN; } #if 0 /* tell the RTE that we want to be cross-connected to the children so we receive * their ORTE-level information - e.g., OOB contact info - when they * reach the STG1 stage gate */ state = ORTE_PROC_STATE_AT_STG1; if (ORTE_SUCCESS != (rc = orte_rmgr.add_attribute(&attributes, ORTE_RMGR_XCONNECT_AT_SPAWN, ORTE_PROC_STATE, &state, ORTE_RMGR_ATTR_OVERRIDE))) { ORTE_ERROR_LOG(rc); OBJ_DESTRUCT(&attributes); opal_progress_event_users_decrement(); return MPI_ERR_SPAWN; } #endif /* check for timing request - get stop time and report elapsed time if so */ if (timing) { if (0 != gettimeofday(&ompistop, NULL)) { opal_output(0, "ompi_comm_start_procs: could not obtain stop time"); } else { opal_output(0, "ompi_comm_start_procs: time from start to prepare to spawn %ld usec", (long int)((ompistop.tv_sec - ompistart.tv_sec)*1000000 + (ompistop.tv_usec - ompistart.tv_usec))); if (0 != gettimeofday(&ompistart, NULL)) { opal_output(0, "ompi_comm_start_procs: could not obtain new start time"); ompistart.tv_sec = ompistop.tv_sec; ompistart.tv_usec = ompistop.tv_usec; } } } /* spawn procs */ rc = orte_rmgr.spawn_job(apps, count, &new_jobid, 0, NULL, NULL, ORTE_PROC_STATE_NONE, &attributes); if (ORTE_SUCCESS != rc) { ORTE_ERROR_LOG(rc); opal_progress_event_users_decrement(); return MPI_ERR_SPAWN; } /* check for timing request - get stop time and report elapsed time if so */ if (timing) { if (0 != gettimeofday(&ompistop, NULL)) { opal_output(0, "ompi_comm_start_procs: could not obtain stop time"); } else { opal_output(0, "ompi_comm_start_procs: time to spawn %ld usec", (long int)((ompistop.tv_sec - ompistart.tv_sec)*1000000 + (ompistop.tv_usec - ompistart.tv_usec))); } } /* clean up */ opal_progress_event_users_decrement(); while (NULL != (item = opal_list_remove_first(&attributes))) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&attributes); for ( i=0; ic_name, MPI_MAX_OBJECT_NAME, "MPI_COMM_PARENT"); } return OMPI_SUCCESS; } /**********************************************************************/ /**********************************************************************/ /**********************************************************************/ /* this routine runs through the list of communicators and and does the disconnect for all dynamic communicators */ int ompi_comm_dyn_finalize (void) { int i,j=0, max=0; ompi_comm_disconnect_obj **objs=NULL; ompi_communicator_t *comm=NULL; if ( 1 size = ompi_comm_remote_size (comm); } else { obj->size = ompi_comm_size (comm); } obj->comm = comm; obj->reqs = (ompi_request_t **) malloc(2*obj->size*sizeof(ompi_request_t *)); if ( NULL == obj->reqs ) { free (obj); return NULL; } /* initiate all isend_irecvs. We use a dummy buffer stored on the object, since we are sending zero size messages anyway. */ for ( i=0; i < obj->size; i++ ) { ret = MCA_PML_CALL(irecv (&(obj->buf), 0, MPI_INT, i, OMPI_COMM_BARRIER_TAG, comm, &(obj->reqs[2*i]))); if ( OMPI_SUCCESS != ret ) { free (obj->reqs); free (obj); return NULL; } ret = MCA_PML_CALL(isend (&(obj->buf), 0, MPI_INT, i, OMPI_COMM_BARRIER_TAG, MCA_PML_BASE_SEND_SYNCHRONOUS, comm, &(obj->reqs[2*i+1]))); if ( OMPI_SUCCESS != ret ) { free (obj->reqs); free (obj); return NULL; } } /* return handle */ return obj; } /**********************************************************************/ /**********************************************************************/ /**********************************************************************/ /* - count how many requests are active * - generate a request array large enough to hold all active requests * - call waitall on the overall request array * - free the objects */ void ompi_comm_disconnect_waitall (int count, ompi_comm_disconnect_obj **objs) { ompi_request_t **reqs=NULL; char *treq=NULL; int totalcount = 0; int i; int ret; for (i=0; isize; } reqs = (ompi_request_t **) malloc (2*totalcount*sizeof(ompi_request_t *)); if ( NULL == reqs ) { printf("ompi_comm_disconnect_waitall: error allocating memory\n"); return; } /* generate a single, large array of pending requests */ treq = (char *)reqs; for (i=0; ireqs, 2*objs[i]->size * sizeof(ompi_request_t *)); treq += 2*objs[i]->size * sizeof(ompi_request_t *); } /* force all non-blocking all-to-alls to finish */ ret = ompi_request_wait_all (2*totalcount, reqs, MPI_STATUSES_IGNORE); /* Finally, free everything */ for (i=0; i< count; i++ ) { if (NULL != objs[i]->reqs ) { free (objs[i]->reqs ); free (objs[i]); } } free (reqs); /* decrease the counter for dynamic communicators by 'count'. Attention, this approach now requires, that we are just using these routines for communicators which have been flagged dynamic */ ompi_comm_num_dyncomm -=count; return; } /**********************************************************************/ /**********************************************************************/ /**********************************************************************/ #define OMPI_COMM_MAXJOBIDS 64 void ompi_comm_mark_dyncomm (ompi_communicator_t *comm) { int i, j, numjobids=0; int size, rsize; int found; orte_jobid_t jobids[OMPI_COMM_MAXJOBIDS], thisjobid; ompi_group_t *grp=NULL; ompi_proc_t *proc = NULL; /* special case for MPI_COMM_NULL */ if ( comm == MPI_COMM_NULL ) { return; } size = ompi_comm_size (comm); rsize = ompi_comm_remote_size(comm); /* loop over all processes in local group and count number of different jobids. */ grp = comm->c_local_group; for (i=0; i< size; i++) { proc = ompi_group_peer_lookup(grp,i); thisjobid = proc->proc_name.jobid; found = 0; for ( j=0; jc_remote_group; for (i=0; i< rsize; i++) { proc = ompi_group_peer_lookup(grp,i); thisjobid = proc->proc_name.jobid; found = 0; for ( j=0; j 1 ) { ompi_comm_num_dyncomm++; OMPI_COMM_SET_DYNAMIC(comm); } return; }