1
1

OSHMEM: Fix deadlock for collect operation using various data sizes.

Deadlock when using the shmem_collect32()/shmem_collect64() routines and any of the non-root PEs pass 0 as the number of elements.
Algorithm in _algorithm_central_collector() does use 0 as a special value, and thus does not break out of the loop.

fixed by IgorI, reviewed by MikeD

cmr=v1.8.2:reviewer=ompi-rm1.8

This commit was SVN r31814.
Этот коммит содержится в:
Mike Dubman 2014-05-19 06:17:53 +00:00
родитель 772bbc2e3d
Коммит d531a2ccad

Просмотреть файл

@ -541,13 +541,13 @@ static int _algorithm_central_collector(struct oshmem_group_t *group,
group->my_pe);
/* Set own data size */
pSync[0] = nlong;
pSync[0] = (nlong ? nlong : SHMEM_SYNC_READY);
if (PE_root == group->my_pe) {
long value = 0;
int pe_cur = 0;
long wait_pe_count = 0;
size_t* wait_pe_array = NULL;
long* wait_pe_array = NULL;
wait_pe_count = group->proc_count;
wait_pe_array = malloc(sizeof(*wait_pe_array) * wait_pe_count);
@ -569,9 +569,8 @@ static int _algorithm_central_collector(struct oshmem_group_t *group,
value = 0;
rc = MCA_SPML_CALL(get((void*)pSync, sizeof(value), (void*)&value, pe_cur));
if ((rc == OSHMEM_SUCCESS)
&& (value != _SHMEM_SYNC_VALUE)
&& (value > 0)) {
wait_pe_array[i] = (size_t) value;
&& (value != _SHMEM_SYNC_VALUE)) {
wait_pe_array[i] = value;
wait_pe_count--;
SCOLL_VERBOSE(14,
"Got source data size as %d from #%d (wait list counter: %d)",
@ -588,17 +587,23 @@ static int _algorithm_central_collector(struct oshmem_group_t *group,
for (i = 1; (i < group->proc_count) && (rc == OSHMEM_SUCCESS);
i++) {
/* Skip zero size data */
if (wait_pe_array[i] == SHMEM_SYNC_READY) {
continue;
}
/* Get PE ID of a peer from the group */
pe_cur = oshmem_proc_pe(group->proc_array[i]);
/* Get data from the current peer */
rc = MCA_SPML_CALL(get((void *)source, wait_pe_array[i], (void*)((unsigned char*)target + offset), pe_cur));
rc = MCA_SPML_CALL(get((void *)source, (size_t)wait_pe_array[i], (void*)((unsigned char*)target + offset), pe_cur));
SCOLL_VERBOSE(14,
"Got %d bytes of data from #%d (offset: %d)",
(int)wait_pe_array[i], pe_cur, (int)offset);
offset += wait_pe_array[i];
offset += (size_t)wait_pe_array[i];
}
free(wait_pe_array);