Correctly handle the homogeneous pack. We support multiple iovec, and we respect the ioc's size as well as the total amount that should be packed as requested by the user. Several minor bugs have been removed.
This commit was SVN r4116.
Этот коммит содержится в:
родитель
5ca7b580c4
Коммит
37721f96a3
@ -298,7 +298,7 @@ int ompi_convertor_pack_no_conversion( ompi_convertor_t* pConv,
|
||||
dt_stack_t* pStack; /* pointer to the position on the stack */
|
||||
int pos_desc; /* actual position in the description of the derived datatype */
|
||||
int i; /* index for basic elements with extent */
|
||||
uint32_t iov_pos = 0; /* index in the iovec that we put data inside */
|
||||
uint32_t iov_pos = 0; /* index in the iovec where we put data inside */
|
||||
int bConverted = 0; /* number of bytes converted/moved this time */
|
||||
uint32_t space_on_iovec; /* amount of free space on the current iovec */
|
||||
long lastDisp = 0, last_count = 0;
|
||||
@ -319,240 +319,225 @@ int ompi_convertor_pack_no_conversion( ompi_convertor_t* pConv,
|
||||
/* retrieve the context of the last call */
|
||||
pos_desc = pStack->index;
|
||||
last_count = pStack->count;
|
||||
last_blength = last_count * ompi_ddt_basicDatatypes[pElems[pos_desc].type]->size;
|
||||
lastDisp = pStack->disp;
|
||||
savePos = (char*)pConv->pBaseBuf + pStack->disp;
|
||||
/*saveLength = last_count * ompi_ddt_basicDatatypes[pElems[pos_desc].type]->size;*/
|
||||
saveLength = 0;
|
||||
pStack--;
|
||||
pConv->stack_pos--;
|
||||
|
||||
*freeAfter = 0;
|
||||
space_on_iovec = iov[0].iov_len;
|
||||
if( iov[0].iov_base == NULL )
|
||||
space_on_iovec = 0;
|
||||
|
||||
while( pos_desc >= 0 ) {
|
||||
if( pElems[pos_desc].type == DT_END_LOOP ) { /* end of the current loop */
|
||||
if( --(pStack->count) == 0 ) { /* end of loop */
|
||||
if( pConv->stack_pos == 0 ) { /* finish everything */
|
||||
if( saveLength != 0 ) {
|
||||
/* there is still a chunk of memory to be handled, but here we dont allocate more
|
||||
* memory. We just copy what we can in the right place and update the values to be
|
||||
* saved on the next round.
|
||||
*/
|
||||
if( iov_pos <= (*out_size) ) { /* still some place in the iovec */
|
||||
if( iov[iov_pos].iov_base == NULL ) {
|
||||
/* prepare a new iovec */
|
||||
iov[iov_pos].iov_base = savePos;
|
||||
iov[iov_pos].iov_len = saveLength;
|
||||
bConverted += saveLength;
|
||||
saveLength = 0;
|
||||
iov_pos++;
|
||||
space_on_iovec = 0;
|
||||
/* let's go out of here */
|
||||
} else {
|
||||
if( space_on_iovec > saveLength ) {
|
||||
OMPI_DDT_SAFEGUARD_POINTER( savePos, saveLength,
|
||||
pConv->pBaseBuf, pData, pConv->count );
|
||||
MEMCPY( pDestBuf, savePos, saveLength );
|
||||
savePos += saveLength;
|
||||
pDestBuf += saveLength;
|
||||
bConverted += saveLength;
|
||||
space_on_iovec -= saveLength;
|
||||
saveLength = 0;
|
||||
} else {
|
||||
OMPI_DDT_SAFEGUARD_POINTER( savePos, space_on_iovec,
|
||||
pConv->pBaseBuf, pData, pConv->count );
|
||||
MEMCPY( pDestBuf, savePos, space_on_iovec );
|
||||
savePos += space_on_iovec;
|
||||
pDestBuf += space_on_iovec;
|
||||
saveLength -= space_on_iovec;
|
||||
bConverted += space_on_iovec;
|
||||
space_on_iovec = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
iov[iov_pos].iov_len -= space_on_iovec;
|
||||
last_count = 0;
|
||||
pos_desc = -1;
|
||||
goto end_loop;
|
||||
}
|
||||
if( --(pStack->count) == 0 ) { /* end of loop */
|
||||
if( pConv->stack_pos == 0 ) { /* finish everything */
|
||||
if( saveLength != 0 ) {
|
||||
/* there is still a chunk of memory to be handled, but here we dont allocate more
|
||||
* memory. We just copy what we can in the right place and update the values to be
|
||||
* saved on the next round.
|
||||
*/
|
||||
if( iov_pos < (*out_size) ) { /* still some place in the iovec */
|
||||
if( iov[iov_pos].iov_base == NULL ) {
|
||||
/* prepare a new iovec */
|
||||
iov[iov_pos].iov_base = savePos;
|
||||
iov[iov_pos].iov_len = saveLength;
|
||||
bConverted += saveLength;
|
||||
saveLength = 0;
|
||||
iov_pos++;
|
||||
space_on_iovec = 0;
|
||||
/* let's go out of here */
|
||||
} else {
|
||||
uint32_t copy_length = saveLength;
|
||||
if( space_on_iovec < saveLength ) {
|
||||
copy_length = space_on_iovec;
|
||||
}
|
||||
OMPI_DDT_SAFEGUARD_POINTER( savePos, copy_length,
|
||||
pConv->pBaseBuf, pData, pConv->count );
|
||||
MEMCPY( pDestBuf, savePos, copy_length );
|
||||
savePos += copy_length;
|
||||
pDestBuf += copy_length;
|
||||
bConverted += copy_length;
|
||||
space_on_iovec -= copy_length;
|
||||
saveLength -= copy_length;
|
||||
}
|
||||
}
|
||||
}
|
||||
iov[iov_pos].iov_len -= space_on_iovec;
|
||||
last_count = 0;
|
||||
pos_desc = -1;
|
||||
goto end_loop;
|
||||
}
|
||||
pConv->stack_pos--;
|
||||
pStack--;
|
||||
} else {
|
||||
pos_desc = pStack->index; /* DT_LOOP index */
|
||||
if( pos_desc == -1 )
|
||||
pStack->disp += (pData->ub - pData->lb);
|
||||
else
|
||||
pStack->disp += pElems[pos_desc].extent;
|
||||
}
|
||||
pos_desc++; /* go to the next element */
|
||||
lastDisp = pStack->disp + pElems[pos_desc].disp;
|
||||
last_count = pElems[pos_desc].count;
|
||||
continue; /* next loop */
|
||||
}
|
||||
while( pElems[pos_desc].type == DT_LOOP ) {
|
||||
int stop_in_loop = 0;
|
||||
} else {
|
||||
pos_desc = pStack->index; /* DT_LOOP index */
|
||||
if( pos_desc == -1 )
|
||||
pStack->disp += (pData->ub - pData->lb);
|
||||
else
|
||||
pStack->disp += pElems[pos_desc].extent;
|
||||
}
|
||||
pos_desc++; /* go to the next element */
|
||||
lastDisp = pStack->disp + pElems[pos_desc].disp;
|
||||
last_count = pElems[pos_desc].count;
|
||||
last_blength = last_count * ompi_ddt_basicDatatypes[pElems[pos_desc].type]->size;
|
||||
continue; /* next loop */
|
||||
}
|
||||
while( pElems[pos_desc].type == DT_LOOP ) {
|
||||
int stop_in_loop = 0;
|
||||
|
||||
/* If the loop container is contiguous then we can do some
|
||||
* optimizations.
|
||||
*/
|
||||
if( pElems[pos_desc].flags & DT_FLAG_CONTIGUOUS ) {
|
||||
/* point to the end of loop element */
|
||||
dt_elem_desc_t* pLast = &(pElems[pos_desc + pElems[pos_desc].disp]);
|
||||
if( iov[iov_pos].iov_base == NULL ) {
|
||||
iov[iov_pos].iov_base = pConv->memAlloc_fn( &(iov[iov_pos].iov_len) );
|
||||
space_on_iovec = iov[iov_pos].iov_len;
|
||||
pDestBuf = iov[iov_pos].iov_base;
|
||||
(*freeAfter) |= (1 << iov_pos);
|
||||
}
|
||||
/* compute the maximum amount of data to be packed */
|
||||
if( (pLast->extent * last_count) > space_on_iovec ) {
|
||||
stop_in_loop = last_count;
|
||||
last_count = space_on_iovec / pLast->extent;
|
||||
}
|
||||
/* Now let's do it */
|
||||
for( i = 0; i < last_count; i++ ) {
|
||||
/* If the loop container is contiguous then we can do some
|
||||
* optimizations.
|
||||
*/
|
||||
if( pElems[pos_desc].flags & DT_FLAG_CONTIGUOUS ) {
|
||||
/* point to the end of loop element */
|
||||
dt_elem_desc_t* pLast = &(pElems[pos_desc + pElems[pos_desc].disp]);
|
||||
if( iov[iov_pos].iov_base == NULL ) {
|
||||
iov[iov_pos].iov_base = pConv->memAlloc_fn( &(iov[iov_pos].iov_len) );
|
||||
space_on_iovec = iov[iov_pos].iov_len;
|
||||
pDestBuf = iov[iov_pos].iov_base;
|
||||
(*freeAfter) |= (1 << iov_pos);
|
||||
}
|
||||
/* compute the maximum amount of data to be packed */
|
||||
if( (pLast->extent * last_count) > space_on_iovec ) {
|
||||
stop_in_loop = last_count;
|
||||
last_count = space_on_iovec / pLast->extent;
|
||||
}
|
||||
/* Now let's do it */
|
||||
for( i = 0; i < last_count; i++ ) {
|
||||
OMPI_DDT_SAFEGUARD_POINTER( pConv->pBaseBuf + lastDisp, pLast->extent,
|
||||
pConv->pBaseBuf, pData, pConv->count );
|
||||
MEMCPY( pDestBuf, pConv->pBaseBuf + lastDisp, pLast->extent );
|
||||
lastDisp += pElems[pos_desc].extent;
|
||||
pDestBuf += pLast->extent;
|
||||
}
|
||||
i = pLast->extent * last_count; /* temporary value */
|
||||
space_on_iovec -= i;
|
||||
space -= i;
|
||||
bConverted += i;
|
||||
if( stop_in_loop == 0 ) { /* did I stop before the end */
|
||||
/* the pElems point to the LOOP struct */
|
||||
pos_desc += pElems[pos_desc].disp + 1;
|
||||
last_count = pElems[pos_desc].count;
|
||||
lastDisp = pStack->disp + pElems[pos_desc].disp;
|
||||
continue;
|
||||
}
|
||||
/* mark some of the iterations as completed */
|
||||
last_count = stop_in_loop - last_count;
|
||||
last_blength = 0;
|
||||
/* Save the stack with the correct last_count value. */
|
||||
}
|
||||
PUSH_STACK( pStack, pConv->stack_pos, pos_desc, last_count,
|
||||
pStack->disp, pos_desc + pElems[pos_desc].disp );
|
||||
pos_desc++;
|
||||
lastDisp = pStack->disp + pElems[pos_desc].disp;
|
||||
last_count = pElems[pos_desc].count;
|
||||
MEMCPY( pDestBuf, pConv->pBaseBuf + lastDisp, pLast->extent );
|
||||
lastDisp += pElems[pos_desc].extent;
|
||||
pDestBuf += pLast->extent;
|
||||
}
|
||||
i = pLast->extent * last_count; /* temporary value */
|
||||
space_on_iovec -= i;
|
||||
space -= i;
|
||||
bConverted += i;
|
||||
if( stop_in_loop == 0 ) { /* did I stop before the end */
|
||||
/* the pElems point to the LOOP struct */
|
||||
pos_desc += pElems[pos_desc].disp + 1;
|
||||
last_count = pElems[pos_desc].count;
|
||||
last_blength = last_count * ompi_ddt_basicDatatypes[pElems[pos_desc].type]->size;
|
||||
lastDisp = pStack->disp + pElems[pos_desc].disp;
|
||||
continue;
|
||||
}
|
||||
/* mark some of the iterations as completed */
|
||||
last_count = stop_in_loop - last_count;
|
||||
last_blength = 0;
|
||||
/* Save the stack with the correct last_count value. */
|
||||
}
|
||||
PUSH_STACK( pStack, pConv->stack_pos, pos_desc, last_count,
|
||||
pStack->disp, pos_desc + pElems[pos_desc].disp );
|
||||
pos_desc++;
|
||||
lastDisp = pStack->disp + pElems[pos_desc].disp;
|
||||
last_count = pElems[pos_desc].count;
|
||||
last_blength = last_count * ompi_ddt_basicDatatypes[pElems[pos_desc].type]->size;
|
||||
}
|
||||
/* now here we have a basic datatype */
|
||||
while( pElems[pos_desc].flags & DT_FLAG_DATA ) {
|
||||
/* do we have enough space in the buffer ? */
|
||||
last_blength = last_count * ompi_ddt_basicDatatypes[pElems[pos_desc].type]->size;
|
||||
|
||||
/* first let's see if it's contiguous with the previous chunk of memory */
|
||||
if( (savePos + saveLength) == (pConv->pBaseBuf + lastDisp) ) {
|
||||
/* ok still contiguous */
|
||||
saveLength += last_blength;
|
||||
/* nothing else to do, we act the next time */
|
||||
} else {
|
||||
/* Now we have 2 piece of non contiguous memory. One start at savePos
|
||||
* with a length of saveLength, the other start at
|
||||
* pConv->pBaseBuf + lastDisp with a length of last_blength bytes.
|
||||
* First we have to pack the old buffer and then we should decide
|
||||
* what we do with the new one.
|
||||
*/
|
||||
do {
|
||||
if( saveLength > IOVEC_MEM_LIMIT ) {
|
||||
/* If the user didn't provide any memory, then we are free
|
||||
* to handle this case how we want.
|
||||
*/
|
||||
if( iov[iov_pos].iov_base == NULL ) {
|
||||
iov[iov_pos].iov_base = savePos;
|
||||
iov[iov_pos].iov_len = saveLength;
|
||||
savePos = pConv->pBaseBuf + lastDisp;
|
||||
/* update the pack counters values */
|
||||
bConverted += saveLength;
|
||||
space -= saveLength;
|
||||
saveLength = last_blength;
|
||||
iov_pos++;
|
||||
space_on_iovec = 0;
|
||||
break;
|
||||
}
|
||||
/* Now the user provided some memory or we allocate some on the
|
||||
* previous round. We are force to copy some data into the user buffer.
|
||||
*/
|
||||
} else {
|
||||
if( iov[iov_pos].iov_base == NULL ) { /* we have to allocate some memory */
|
||||
iov[iov_pos].iov_base = pConv->memAlloc_fn( &(iov[iov_pos].iov_len) );
|
||||
(*freeAfter) |= (1 << iov_pos);
|
||||
pDestBuf = iov[iov_pos].iov_base;
|
||||
space_on_iovec = iov[iov_pos].iov_len;
|
||||
}
|
||||
}
|
||||
/* In all the others cases we simply copy as much data as possible */
|
||||
if( space_on_iovec > saveLength ) {
|
||||
/* now here we have a basic datatype */
|
||||
while( pElems[pos_desc].flags & DT_FLAG_DATA ) {
|
||||
/* first let's see if it's contiguous with the previous chunk of memory and
|
||||
* we still have enough room in the buffer...
|
||||
*/
|
||||
if( ((savePos + saveLength) == (pConv->pBaseBuf + lastDisp))
|
||||
&& ((saveLength + last_blength) < space_on_iovec) ) {
|
||||
/* ok still contiguous */
|
||||
saveLength += last_blength;
|
||||
/* nothing else to do, we act the next time */
|
||||
} else {
|
||||
/* Now we have 2 piece of non contiguous memory. One start at savePos
|
||||
* with a length of saveLength, the other start at
|
||||
* pConv->pBaseBuf + lastDisp with a length of last_blength bytes.
|
||||
* First we have to pack the old buffer and then we should decide
|
||||
* what we do with the new one.
|
||||
*/
|
||||
do {
|
||||
if( iov[iov_pos].iov_base == NULL ) {
|
||||
if( saveLength > IOVEC_MEM_LIMIT ) {
|
||||
/* If the user didn't provide any memory, then we are free
|
||||
* to handle this case as we want.
|
||||
*/
|
||||
iov[iov_pos].iov_base = savePos;
|
||||
iov[iov_pos].iov_len = saveLength;
|
||||
savePos = pConv->pBaseBuf + lastDisp;
|
||||
/* update the pack counters values */
|
||||
bConverted += saveLength;
|
||||
space -= saveLength;
|
||||
saveLength = last_blength;
|
||||
if( ++iov_pos == (*out_size) ) goto end_loop;
|
||||
last_blength = 0;
|
||||
pDestBuf = iov[iov_pos].iov_base;
|
||||
space_on_iovec = iov[iov_pos].iov_len;
|
||||
break;
|
||||
}
|
||||
/* Let's allocate some. */
|
||||
iov[iov_pos].iov_base = pConv->memAlloc_fn( &(iov[iov_pos].iov_len) );
|
||||
(*freeAfter) |= (1 << iov_pos);
|
||||
pDestBuf = iov[iov_pos].iov_base;
|
||||
space_on_iovec = iov[iov_pos].iov_len;
|
||||
}
|
||||
/* In all the others cases we simply copy as much data as possible */
|
||||
if( space_on_iovec > saveLength ) {
|
||||
OMPI_DDT_SAFEGUARD_POINTER( savePos, saveLength,
|
||||
pConv->pBaseBuf, pData, pConv->count );
|
||||
MEMCPY( pDestBuf, savePos, saveLength );
|
||||
pDestBuf += saveLength;
|
||||
/* update the pack counters values */
|
||||
bConverted += saveLength;
|
||||
space -= saveLength;
|
||||
space_on_iovec -= saveLength;
|
||||
savePos = pConv->pBaseBuf + lastDisp;
|
||||
saveLength = last_blength;
|
||||
break;
|
||||
} else {
|
||||
OMPI_DDT_SAFEGUARD_POINTER( savePos, space_on_iovec,
|
||||
pConv->pBaseBuf, pData, pConv->count );
|
||||
MEMCPY( pDestBuf, savePos, space_on_iovec );
|
||||
/* let's prepare for the next round. As I keep trace of the amount that I still
|
||||
* have to pack, the next time when I came here, I'll try to append something.
|
||||
* If I already fill-up the amount of data required by the upper level, I will
|
||||
* simply save all informations in the stack, if not I'll take care of allocating
|
||||
* new memory and packing the data inside.
|
||||
*/
|
||||
savePos += space_on_iovec;
|
||||
saveLength -= space_on_iovec;
|
||||
/* update the pack counters values */
|
||||
bConverted += space_on_iovec;
|
||||
space -= space_on_iovec;
|
||||
/* check for the next step */
|
||||
if( iov_pos < (*out_size) ) {
|
||||
iov_pos++;
|
||||
pDestBuf = iov[iov_pos].iov_base;
|
||||
space_on_iovec = iov[iov_pos].iov_len;
|
||||
} else {
|
||||
pDestBuf = NULL;
|
||||
space_on_iovec = 0;
|
||||
last_count = 0;
|
||||
last_blength = 0;
|
||||
}
|
||||
}
|
||||
if( iov_pos == (*out_size) ) goto end_loop;
|
||||
} while(1); /* continue forever */
|
||||
}
|
||||
if( iov_pos == (*out_size) ) goto end_loop;
|
||||
|
||||
pos_desc++; /* advance to the next data */
|
||||
lastDisp = pStack->disp + pElems[pos_desc].disp;
|
||||
last_count = pElems[pos_desc].count;
|
||||
MEMCPY( pDestBuf, savePos, saveLength );
|
||||
pDestBuf += saveLength;
|
||||
/* update the pack counters values */
|
||||
bConverted += saveLength;
|
||||
space -= saveLength;
|
||||
space_on_iovec -= saveLength;
|
||||
savePos = pConv->pBaseBuf + lastDisp;
|
||||
saveLength = last_blength;
|
||||
last_blength = 0;
|
||||
break;
|
||||
}
|
||||
OMPI_DDT_SAFEGUARD_POINTER( savePos, space_on_iovec,
|
||||
pConv->pBaseBuf, pData, pConv->count );
|
||||
MEMCPY( pDestBuf, savePos, space_on_iovec );
|
||||
/* let's prepare for the next round. As I keep trace of the amount that I still
|
||||
* have to pack, the next time when I came here, I'll try to append something.
|
||||
* If I already fill-up the amount of data required by the upper level, I will
|
||||
* simply save all informations in the stack, if not I'll take care of allocating
|
||||
* new memory and packing the data inside.
|
||||
*/
|
||||
savePos += space_on_iovec;
|
||||
saveLength -= space_on_iovec;
|
||||
/* update the pack counters values */
|
||||
bConverted += space_on_iovec;
|
||||
space -= space_on_iovec;
|
||||
lastDisp += space_on_iovec;
|
||||
/* check for the next step */
|
||||
if( ++iov_pos == (*out_size) ) { /* are there more iovecs to fill ? */
|
||||
pDestBuf = NULL;
|
||||
space_on_iovec = 0;
|
||||
last_count = 0;
|
||||
last_blength = 0;
|
||||
goto end_loop;
|
||||
}
|
||||
pDestBuf = iov[iov_pos].iov_base;
|
||||
space_on_iovec = iov[iov_pos].iov_len;
|
||||
} while(1); /* continue forever */
|
||||
}
|
||||
if( saveLength > space ) /* this will be the last element copied this time */
|
||||
continue;
|
||||
pos_desc++; /* advance to the next data */
|
||||
lastDisp = pStack->disp + pElems[pos_desc].disp;
|
||||
last_count = pElems[pos_desc].count;
|
||||
last_blength = last_count * ompi_ddt_basicDatatypes[pElems[pos_desc].type]->size;
|
||||
}
|
||||
}
|
||||
last_count = 0; /* complete the data */
|
||||
end_loop:
|
||||
|
||||
if( pos_desc >= 0 ) { /* if the pack is not finish add a new entry in the stack */
|
||||
PUSH_STACK( pStack, pConv->stack_pos, pos_desc, last_blength,
|
||||
lastDisp, pos_desc );
|
||||
if( (pConv->pBaseBuf + lastDisp) == savePos )
|
||||
PUSH_STACK( pStack, pConv->stack_pos, 0, 0, 0, 0 );
|
||||
else
|
||||
PUSH_STACK( pStack, pConv->stack_pos, 0, 0, (long)savePos, saveLength );
|
||||
PUSH_STACK( pStack, pConv->stack_pos, pos_desc, saveLength, lastDisp, pos_desc );
|
||||
}
|
||||
|
||||
assert( last_blength == 0 );
|
||||
pConv->bConverted += bConverted; /* update the byte converted field in the convertor */
|
||||
*max_data = bConverted; /* update the length in the iovec */
|
||||
if( ((*out_size) == iov_pos) || (iov[iov_pos].iov_base == NULL) ) *out_size = iov_pos;
|
||||
else *out_size = iov_pos + 1;
|
||||
assert( pConv->bConverted <= (pData->size * pConv->count) );
|
||||
return (pConv->bConverted == (pData->size * pConv->count));
|
||||
}
|
||||
|
||||
@ -755,6 +740,7 @@ int ompi_convertor_init_for_send( ompi_convertor_t* pConv,
|
||||
/* this datatype is improper for conversion. Commit it first */
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
assert( datatype != NULL );
|
||||
convertor_init_generic( pConv, datatype, count, pUserBuf );
|
||||
|
||||
pConv->flags = CONVERTOR_SEND | CONVERTOR_HOMOGENEOUS; /* by default set to homogeneous */
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user