Fix the automatic handling of communicator associated requests.
If the array doesn't exist, or if it's size is not adequate then we reallocate it. Otherwise just keep using the same array of requests.
Этот коммит содержится в:
родитель
67d01bd8cd
Коммит
0445670bb9
@ -72,7 +72,7 @@ mca_coll_base_alltoall_intra_basic_inplace(void *rbuf, int rcount,
|
||||
for (i = 0 ; i < size ; ++i) {
|
||||
for (j = i+1 ; j < size ; ++j) {
|
||||
/* Initiate all send/recv to/from others. */
|
||||
preq = base_module->base_data->mcct_reqs;
|
||||
preq = coll_base_comm_get_reqs(base_module->base_data, size * 2);
|
||||
|
||||
if (i == rank) {
|
||||
/* Copy the data into the temporary buffer */
|
||||
@ -188,15 +188,11 @@ int ompi_coll_base_alltoall_intra_bruck(void *sbuf, int scount,
|
||||
struct ompi_communicator_t *comm,
|
||||
mca_coll_base_module_t *module)
|
||||
{
|
||||
int i, k, line = -1, rank, size, err = 0, weallocated = 0;
|
||||
int i, k, line = -1, rank, size, err = 0;
|
||||
int sendto, recvfrom, distance, *displs = NULL, *blen = NULL;
|
||||
char *tmpbuf = NULL, *tmpbuf_free = NULL;
|
||||
ptrdiff_t rlb, slb, tlb, sext, rext, tsext;
|
||||
struct ompi_datatype_t *new_ddt;
|
||||
#ifdef blahblah
|
||||
mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
|
||||
mca_coll_base_comm_t *data = base_module->base_data;
|
||||
#endif
|
||||
|
||||
if (MPI_IN_PLACE == sbuf) {
|
||||
return mca_coll_base_alltoall_intra_basic_inplace (rbuf, rcount, rdtype,
|
||||
@ -219,25 +215,10 @@ int ompi_coll_base_alltoall_intra_bruck(void *sbuf, int scount,
|
||||
if (err != MPI_SUCCESS) { line = __LINE__; goto err_hndl; }
|
||||
|
||||
|
||||
#ifdef blahblah
|
||||
/* try and SAVE memory by using the data segment hung off
|
||||
the communicator if possible */
|
||||
if (data->mcct_num_reqs >= size) {
|
||||
/* we have enought preallocated for displments and lengths */
|
||||
displs = (int*) data->mcct_reqs;
|
||||
blen = (int *) (displs + size);
|
||||
weallocated = 0;
|
||||
}
|
||||
else { /* allocate the buffers ourself */
|
||||
#endif
|
||||
displs = (int *) malloc(size * sizeof(int));
|
||||
if (displs == NULL) { line = __LINE__; err = -1; goto err_hndl; }
|
||||
blen = (int *) malloc(size * sizeof(int));
|
||||
if (blen == NULL) { line = __LINE__; err = -1; goto err_hndl; }
|
||||
weallocated = 1;
|
||||
#ifdef blahblah
|
||||
}
|
||||
#endif
|
||||
displs = (int *) malloc(size * sizeof(int));
|
||||
if (displs == NULL) { line = __LINE__; err = -1; goto err_hndl; }
|
||||
blen = (int *) malloc(size * sizeof(int));
|
||||
if (blen == NULL) { line = __LINE__; err = -1; goto err_hndl; }
|
||||
|
||||
/* tmp buffer allocation for message data */
|
||||
tmpbuf_free = (char *) malloc(tsext + ((ptrdiff_t)scount * (ptrdiff_t)size - 1) * sext);
|
||||
@ -312,10 +293,8 @@ int ompi_coll_base_alltoall_intra_bruck(void *sbuf, int scount,
|
||||
|
||||
/* Step 4 - clean up */
|
||||
if (tmpbuf != NULL) free(tmpbuf_free);
|
||||
if (weallocated) {
|
||||
if (displs != NULL) free(displs);
|
||||
if (blen != NULL) free(blen);
|
||||
}
|
||||
if (displs != NULL) free(displs);
|
||||
if (blen != NULL) free(blen);
|
||||
return OMPI_SUCCESS;
|
||||
|
||||
err_hndl:
|
||||
@ -623,7 +602,7 @@ int ompi_coll_base_alltoall_intra_basic_linear(void *sbuf, int scount,
|
||||
|
||||
/* Initiate all send/recv to/from others. */
|
||||
|
||||
req = rreq = data->mcct_reqs;
|
||||
req = rreq = coll_base_comm_get_reqs(data, (size - 1) * 2);
|
||||
sreq = rreq + size - 1;
|
||||
|
||||
prcv = (char *) rbuf;
|
||||
@ -638,7 +617,7 @@ int ompi_coll_base_alltoall_intra_basic_linear(void *sbuf, int scount,
|
||||
(prcv + (ptrdiff_t)i * rcvinc, rcount, rdtype, i,
|
||||
MCA_COLL_BASE_TAG_ALLTOALL, comm, rreq));
|
||||
if (MPI_SUCCESS != err) {
|
||||
ompi_coll_base_free_reqs(req, rreq - req);
|
||||
ompi_coll_base_free_reqs(req, nreqs);
|
||||
return err;
|
||||
}
|
||||
}
|
||||
@ -655,12 +634,11 @@ int ompi_coll_base_alltoall_intra_basic_linear(void *sbuf, int scount,
|
||||
MCA_COLL_BASE_TAG_ALLTOALL,
|
||||
MCA_PML_BASE_SEND_STANDARD, comm, sreq));
|
||||
if (MPI_SUCCESS != err) {
|
||||
ompi_coll_base_free_reqs(req, sreq - req);
|
||||
ompi_coll_base_free_reqs(req, nreqs);
|
||||
return err;
|
||||
}
|
||||
}
|
||||
|
||||
nreqs = (size - 1) * 2;
|
||||
/* Start your engines. This will never return an error. */
|
||||
|
||||
MCA_PML_CALL(start(nreqs, req));
|
||||
|
@ -78,7 +78,7 @@ mca_coll_base_alltoallv_intra_basic_inplace(void *rbuf, const int *rcounts, cons
|
||||
for (i = 0 ; i < size ; ++i) {
|
||||
for (j = i+1 ; j < size ; ++j) {
|
||||
/* Initiate all send/recv to/from others. */
|
||||
preq = base_module->base_data->mcct_reqs;
|
||||
preq = coll_base_comm_get_reqs(base_module->base_data, 2);
|
||||
|
||||
if (i == rank && rcounts[j]) {
|
||||
/* Copy the data into the temporary buffer */
|
||||
@ -239,7 +239,7 @@ ompi_coll_base_alltoallv_intra_basic_linear(void *sbuf, int *scounts, int *sdisp
|
||||
|
||||
/* Now, initiate all send/recv to/from others. */
|
||||
nreqs = 0;
|
||||
preq = data->mcct_reqs;
|
||||
preq = coll_base_comm_get_reqs(data, 2 * size);
|
||||
|
||||
/* Post all receives first */
|
||||
for (i = 0; i < size; ++i) {
|
||||
|
@ -638,7 +638,7 @@ ompi_coll_base_bcast_intra_basic_linear(void *buff, int count,
|
||||
{
|
||||
int i, size, rank, err;
|
||||
mca_coll_base_comm_t *data = module->base_data;
|
||||
ompi_request_t **preq, **reqs = data->mcct_reqs;
|
||||
ompi_request_t **preq, **reqs;
|
||||
|
||||
|
||||
size = ompi_comm_size(comm);
|
||||
@ -655,8 +655,8 @@ ompi_coll_base_bcast_intra_basic_linear(void *buff, int count,
|
||||
}
|
||||
|
||||
/* Root sends data to all others. */
|
||||
|
||||
for (i = 0, preq = reqs; i < size; ++i) {
|
||||
preq = reqs = coll_base_comm_get_reqs(data, size-1);
|
||||
for (i = 0; i < size; ++i) {
|
||||
if (i == rank) {
|
||||
continue;
|
||||
}
|
||||
@ -666,6 +666,7 @@ ompi_coll_base_bcast_intra_basic_linear(void *buff, int count,
|
||||
MCA_PML_BASE_SEND_STANDARD,
|
||||
comm, preq++));
|
||||
if (MPI_SUCCESS != err) {
|
||||
ompi_coll_base_free_reqs(data->mcct_reqs, i);
|
||||
return err;
|
||||
}
|
||||
}
|
||||
|
@ -56,49 +56,74 @@ static void coll_base_module_construct(mca_coll_base_module_t *m)
|
||||
static void
|
||||
coll_base_module_destruct(mca_coll_base_module_t *module)
|
||||
{
|
||||
mca_coll_base_comm_t* data = module->base_data;
|
||||
|
||||
if (NULL != data) {
|
||||
if( NULL != data->mcct_reqs ) {
|
||||
for( int i = 0; i < data->mcct_num_reqs; ++i ) {
|
||||
if( MPI_REQUEST_NULL != data->mcct_reqs[i] )
|
||||
ompi_request_free(&data->mcct_reqs[i]);
|
||||
}
|
||||
free(data->mcct_reqs);
|
||||
data->mcct_reqs = NULL;
|
||||
data->mcct_num_reqs = 0;
|
||||
}
|
||||
assert(0 == data->mcct_num_reqs);
|
||||
|
||||
/* free any cached information that has been allocated */
|
||||
if (data->cached_ntree) { /* destroy general tree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_ntree);
|
||||
}
|
||||
if (data->cached_bintree) { /* destroy bintree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_bintree);
|
||||
}
|
||||
if (data->cached_bmtree) { /* destroy bmtree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_bmtree);
|
||||
}
|
||||
if (data->cached_in_order_bmtree) { /* destroy bmtree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_in_order_bmtree);
|
||||
}
|
||||
if (data->cached_chain) { /* destroy general chain if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_chain);
|
||||
}
|
||||
if (data->cached_pipeline) { /* destroy pipeline if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_pipeline);
|
||||
}
|
||||
if (data->cached_in_order_bintree) { /* destroy in order bintree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_in_order_bintree);
|
||||
}
|
||||
|
||||
free(data);
|
||||
if (NULL != module->base_data) {
|
||||
OBJ_RELEASE(module->base_data);
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_coll_base_module_t, opal_object_t,
|
||||
coll_base_module_construct, coll_base_module_destruct);
|
||||
|
||||
|
||||
static void
|
||||
coll_base_comm_destruct(mca_coll_base_comm_t *data)
|
||||
{
|
||||
if( NULL != data->mcct_reqs ) {
|
||||
for( int i = 0; i < data->mcct_num_reqs; ++i ) {
|
||||
if( MPI_REQUEST_NULL != data->mcct_reqs[i] )
|
||||
ompi_request_free(&data->mcct_reqs[i]);
|
||||
}
|
||||
free(data->mcct_reqs);
|
||||
data->mcct_reqs = NULL;
|
||||
data->mcct_num_reqs = 0;
|
||||
}
|
||||
assert(0 == data->mcct_num_reqs);
|
||||
|
||||
/* free any cached information that has been allocated */
|
||||
if (data->cached_ntree) { /* destroy general tree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_ntree);
|
||||
}
|
||||
if (data->cached_bintree) { /* destroy bintree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_bintree);
|
||||
}
|
||||
if (data->cached_bmtree) { /* destroy bmtree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_bmtree);
|
||||
}
|
||||
if (data->cached_in_order_bmtree) { /* destroy bmtree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_in_order_bmtree);
|
||||
}
|
||||
if (data->cached_chain) { /* destroy general chain if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_chain);
|
||||
}
|
||||
if (data->cached_pipeline) { /* destroy pipeline if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_pipeline);
|
||||
}
|
||||
if (data->cached_in_order_bintree) { /* destroy in order bintree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_in_order_bintree);
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_CLASS_INSTANCE(mca_coll_base_comm_t, opal_object_t,
|
||||
NULL, coll_base_comm_destruct);
|
||||
|
||||
ompi_request_t** coll_base_comm_get_reqs(mca_coll_base_comm_t* data, int nreqs)
|
||||
{
|
||||
int startfrom = data->mcct_num_reqs;
|
||||
|
||||
if( NULL == data->mcct_reqs ) {
|
||||
assert(0 == data->mcct_num_reqs);
|
||||
data->mcct_reqs = (ompi_request_t**)malloc(sizeof(ompi_request_t*) * nreqs);
|
||||
} else if( data->mcct_num_reqs <= nreqs ) {
|
||||
data->mcct_reqs = (ompi_request_t**)realloc(data->mcct_reqs, sizeof(ompi_request_t*) * nreqs);
|
||||
}
|
||||
if( NULL != data->mcct_reqs ) {
|
||||
data->mcct_num_reqs = nreqs;
|
||||
for( int i = startfrom; i < data->mcct_num_reqs; i++ )
|
||||
data->mcct_reqs[i] = MPI_REQUEST_NULL;
|
||||
} else
|
||||
data->mcct_num_reqs = 0; /* nothing to return */
|
||||
return data->mcct_reqs;
|
||||
}
|
||||
|
||||
MCA_BASE_FRAMEWORK_DECLARE(ompi, coll, "Collectives", NULL, NULL, NULL,
|
||||
mca_coll_base_static_components, 0);
|
||||
|
@ -346,4 +346,10 @@ static inline void ompi_coll_base_free_reqs(ompi_request_t **reqs, int count)
|
||||
ompi_request_free(&reqs[i]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the array of requests on the data. If the array was not initialized
|
||||
* or if it's size was too small, allocate it to fit the requested size.
|
||||
*/
|
||||
ompi_request_t** coll_base_comm_get_reqs(mca_coll_base_comm_t* data, int nreqs);
|
||||
|
||||
#endif /* MCA_COLL_BASE_EXPORT_H */
|
||||
|
@ -618,47 +618,3 @@ int ompi_coll_base_topo_dump_tree (ompi_coll_tree_t* tree, int rank)
|
||||
return (0);
|
||||
}
|
||||
|
||||
mca_coll_base_comm_t* ompi_coll_base_topo_construct( mca_coll_base_comm_t* data )
|
||||
{
|
||||
if( NULL == data ) {
|
||||
data = (mca_coll_base_comm_t*)calloc(1, sizeof(mca_coll_base_comm_t));
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
void ompi_coll_base_topo_destruct( mca_coll_base_comm_t* data )
|
||||
{
|
||||
if(NULL == data) return;
|
||||
|
||||
#if OPAL_ENABLE_DEBUG
|
||||
/* Reset the reqs to NULL/0 -- they'll be freed as part of freeing
|
||||
the generel c_coll_selected_data */
|
||||
data->mcct_reqs = NULL;
|
||||
data->mcct_num_reqs = 0;
|
||||
#endif
|
||||
|
||||
/* free any cached information that has been allocated */
|
||||
if (data->cached_ntree) { /* destroy general tree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_ntree);
|
||||
}
|
||||
if (data->cached_bintree) { /* destroy bintree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_bintree);
|
||||
}
|
||||
if (data->cached_bmtree) { /* destroy bmtree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_bmtree);
|
||||
}
|
||||
if (data->cached_in_order_bmtree) { /* destroy bmtree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_in_order_bmtree);
|
||||
}
|
||||
if (data->cached_chain) { /* destroy general chain if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_chain);
|
||||
}
|
||||
if (data->cached_pipeline) { /* destroy pipeline if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_pipeline);
|
||||
}
|
||||
if (data->cached_in_order_bintree) { /* destroy in order bintree if defined */
|
||||
ompi_coll_base_topo_destroy_tree (&data->cached_in_order_bintree);
|
||||
}
|
||||
|
||||
free(data);
|
||||
}
|
||||
|
@ -206,9 +206,7 @@ tuned_module_enable( mca_coll_base_module_t *module,
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
if( size <= ompi_coll_tuned_preallocate_memory_comm_size_limit ) {
|
||||
data->mcct_num_reqs = size * 2;
|
||||
data->mcct_reqs = (ompi_request_t**)malloc(sizeof(ompi_request_t*) * data->mcct_num_reqs);
|
||||
if (NULL == data->mcct_reqs) {
|
||||
if (NULL == coll_base_comm_get_reqs(data, size * 2)) {
|
||||
OBJ_RELEASE(data);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user