/* -*- Mode: C; c-basic-offset:4 ; -*- */ /* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2007 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006-2007 University of Houston. All rights reserved. * Copyright (c) 2006-2007 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2007 Cisco, Inc. All rights reserved. * * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ #include "ompi_config.h" #include #include #include "ompi/request/request.h" #include "ompi/mca/dpm/dpm.h" #include "ompi/mca/pml/pml.h" #include "ompi/mca/dpm/base/base.h" char* ompi_dpm_base_dyn_init (void) { char *envvarname=NULL, *port_name=NULL; /* check for appropriate env variable */ asprintf(&envvarname, "OMPI_PARENT_PORT"); port_name = getenv(envvarname); free (envvarname); return port_name; } /**********************************************************************/ /**********************************************************************/ /**********************************************************************/ /* this routine runs through the list of communicators and does the disconnect for all dynamic communicators */ int ompi_dpm_base_dyn_finalize (void) { int i,j=0, max=0; ompi_dpm_base_disconnect_obj **objs=NULL; ompi_communicator_t *comm=NULL; if ( 1 size = ompi_comm_remote_size (comm); } else { obj->size = ompi_comm_size (comm); } obj->comm = comm; obj->reqs = (ompi_request_t **) malloc(2*obj->size*sizeof(ompi_request_t *)); if ( NULL == obj->reqs ) { free (obj); return NULL; } /* initiate all isend_irecvs. We use a dummy buffer stored on the object, since we are sending zero size messages anyway. */ for ( i=0; i < obj->size; i++ ) { ret = MCA_PML_CALL(irecv (&(obj->buf), 0, MPI_INT, i, OMPI_COMM_BARRIER_TAG, comm, &(obj->reqs[2*i]))); if ( OMPI_SUCCESS != ret ) { free (obj->reqs); free (obj); return NULL; } ret = MCA_PML_CALL(isend (&(obj->buf), 0, MPI_INT, i, OMPI_COMM_BARRIER_TAG, MCA_PML_BASE_SEND_SYNCHRONOUS, comm, &(obj->reqs[2*i+1]))); if ( OMPI_SUCCESS != ret ) { free (obj->reqs); free (obj); return NULL; } } /* return handle */ return obj; } /**********************************************************************/ /**********************************************************************/ /**********************************************************************/ /* - count how many requests are active * - generate a request array large enough to hold all active requests * - call waitall on the overall request array * - free the objects */ void ompi_dpm_base_disconnect_waitall (int count, ompi_dpm_base_disconnect_obj **objs) { ompi_request_t **reqs=NULL; char *treq=NULL; int totalcount = 0; int i; int ret; for (i=0; isize; } reqs = (ompi_request_t **) malloc (2*totalcount*sizeof(ompi_request_t *)); if ( NULL == reqs ) { printf("ompi_comm_disconnect_waitall: error allocating memory\n"); return; } /* generate a single, large array of pending requests */ treq = (char *)reqs; for (i=0; ireqs, 2*objs[i]->size * sizeof(ompi_request_t *)); treq += 2*objs[i]->size * sizeof(ompi_request_t *); } /* force all non-blocking all-to-alls to finish */ ret = ompi_request_wait_all (2*totalcount, reqs, MPI_STATUSES_IGNORE); /* Finally, free everything */ for (i=0; i< count; i++ ) { if (NULL != objs[i]->reqs ) { free (objs[i]->reqs ); free (objs[i]); } } free (reqs); /* decrease the counter for dynamic communicators by 'count'. Attention, this approach now requires, that we are just using these routines for communicators which have been flagged dynamic */ ompi_comm_num_dyncomm -=count; return; } /**********************************************************************/ /**********************************************************************/ /**********************************************************************/ #define OMPI_DPM_BASE_MAXJOBIDS 64 void ompi_dpm_base_mark_dyncomm (ompi_communicator_t *comm) { int i, j, numjobids=0; int size, rsize; int found; orte_jobid_t jobids[OMPI_DPM_BASE_MAXJOBIDS], thisjobid; ompi_group_t *grp=NULL; ompi_proc_t *proc = NULL; /* special case for MPI_COMM_NULL */ if ( comm == MPI_COMM_NULL ) { return; } size = ompi_comm_size (comm); rsize = ompi_comm_remote_size(comm); /* loop over all processes in local group and count number of different jobids. */ grp = comm->c_local_group; for (i=0; i< size; i++) { proc = ompi_group_peer_lookup(grp,i); thisjobid = proc->proc_name.jobid; found = 0; for ( j=0; jc_remote_group; for (i=0; i< rsize; i++) { proc = ompi_group_peer_lookup(grp,i); thisjobid = proc->proc_name.jobid; found = 0; for ( j=0; j 1 ) { ompi_comm_num_dyncomm++; OMPI_COMM_SET_DYNAMIC(comm); } return; }