OSHMEM: memheap mkey exchange fix
fix situations where cluster nodes can have different btls Fixed by Roman, reviewed by Igor, Mike cmr=v1.7.5:reviewer=ompi-rm1.7 This commit was SVN r30877.
This commit is contained in:
parent
d702307521
commit
d584869dda
@ -14,7 +14,7 @@
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
int proc, nproc;
|
||||
|
||||
|
||||
start_pes(0);
|
||||
nproc = _num_pes();
|
||||
proc = _my_pe();
|
||||
|
@ -520,7 +520,8 @@ void mca_memheap_modex_recv_all(void)
|
||||
void *send_buffer;
|
||||
char *rcv_buffer;
|
||||
void *dummy_buffer;
|
||||
int32_t size, dummy_size;
|
||||
int size, dummy_size;
|
||||
int *rcv_size, *rcv_n_transports, *rcv_offsets;
|
||||
int rc;
|
||||
|
||||
if (!mca_memheap_base_key_exchange) {
|
||||
@ -531,6 +532,30 @@ void mca_memheap_modex_recv_all(void)
|
||||
nprocs = oshmem_num_procs();
|
||||
my_pe = oshmem_my_proc_id();
|
||||
|
||||
/* buffer allocation for num_transports
|
||||
* message sizes and offsets */
|
||||
|
||||
rcv_size = (int *)malloc(nprocs * sizeof(int));
|
||||
if (NULL == rcv_size) {
|
||||
MEMHEAP_ERROR("failed to get rcv_size buffer");
|
||||
oshmem_shmem_abort(-1);
|
||||
return;
|
||||
}
|
||||
|
||||
rcv_offsets = (int *)malloc(nprocs * sizeof(int));
|
||||
if (NULL == rcv_offsets) {
|
||||
MEMHEAP_ERROR("failed to get rcv_offsets buffer");
|
||||
oshmem_shmem_abort(-1);
|
||||
return;
|
||||
}
|
||||
|
||||
rcv_n_transports = (int *)malloc(nprocs * sizeof(int));
|
||||
if (NULL == rcv_offsets) {
|
||||
MEMHEAP_ERROR("failed to get rcv_offsets buffer");
|
||||
oshmem_shmem_abort(-1);
|
||||
return;
|
||||
}
|
||||
|
||||
/* serialize our own mkeys */
|
||||
msg = OBJ_NEW(opal_buffer_t);
|
||||
if (NULL == msg) {
|
||||
@ -543,18 +568,46 @@ void mca_memheap_modex_recv_all(void)
|
||||
pack_local_mkeys(msg, 0, j, 1);
|
||||
}
|
||||
|
||||
/* we assume here that int32_t returned by opal_dss.unload
|
||||
* is equal to size of int we use for MPI_Allgather, MPI_Allgatherv */
|
||||
|
||||
assert(sizeof(int32_t) == sizeof(int));
|
||||
|
||||
/* Do allgather */
|
||||
opal_dss.unload(msg, &send_buffer, &size);
|
||||
MEMHEAP_VERBOSE(1, "local keys packed into %d bytes, %d segments", size, memheap_map->n_segments);
|
||||
rcv_buffer = malloc(size * nprocs);
|
||||
if (NULL == msg) {
|
||||
|
||||
/* we need to send num_transports and message sizes separately
|
||||
* since message sizes depend on types of btl used */
|
||||
|
||||
rc = oshmem_shmem_allgather(&memheap_map->num_transports, rcv_n_transports, sizeof(int));
|
||||
if (MPI_SUCCESS != rc) {
|
||||
MEMHEAP_ERROR("allgather failed");
|
||||
oshmem_shmem_abort(-1);
|
||||
}
|
||||
|
||||
rc = oshmem_shmem_allgather(&size, rcv_size, sizeof(int));
|
||||
if (MPI_SUCCESS != rc) {
|
||||
MEMHEAP_ERROR("allgather failed");
|
||||
oshmem_shmem_abort(-1);
|
||||
}
|
||||
|
||||
/* calculating offsets (displacements) for allgatherv */
|
||||
|
||||
rcv_offsets[0] = 0;
|
||||
for (i = 1; i < nprocs; i++) {
|
||||
rcv_offsets[i] = rcv_offsets[i - 1] + rcv_size[i - 1];
|
||||
}
|
||||
|
||||
rcv_buffer = malloc(rcv_offsets[nprocs - 1] + rcv_size[nprocs - 1]);
|
||||
if (NULL == rcv_buffer) {
|
||||
MEMHEAP_ERROR("failed to allocate recieve buffer");
|
||||
oshmem_shmem_abort(-1);
|
||||
}
|
||||
|
||||
rc = oshmem_shmem_allgather(send_buffer, rcv_buffer, size);
|
||||
rc = oshmem_shmem_allgatherv(send_buffer, rcv_buffer, size, rcv_size, rcv_offsets);
|
||||
if (MPI_SUCCESS != rc) {
|
||||
MEMHEAP_ERROR("allgather failed");
|
||||
MEMHEAP_ERROR("allgatherv failed");
|
||||
oshmem_shmem_abort(-1);
|
||||
}
|
||||
|
||||
@ -565,7 +618,7 @@ void mca_memheap_modex_recv_all(void)
|
||||
continue;
|
||||
}
|
||||
|
||||
opal_dss.load(msg, rcv_buffer + i*size, size);
|
||||
opal_dss.load(msg, (void*)((uint8_t *)rcv_buffer + rcv_offsets[i]), rcv_size[i]);
|
||||
for (j = 0; j < memheap_map->n_segments; j++) {
|
||||
map_segment_t *s;
|
||||
|
||||
@ -573,7 +626,7 @@ void mca_memheap_modex_recv_all(void)
|
||||
if (NULL != s->mkeys_cache[i]) {
|
||||
MEMHEAP_VERBOSE(10, "PE%d: segment%d already exists, mkey will be replaced", i, j);
|
||||
} else {
|
||||
s->mkeys_cache[i] = (sshmem_mkey_t *) calloc(memheap_map->num_transports,
|
||||
s->mkeys_cache[i] = (sshmem_mkey_t *) calloc(rcv_n_transports[i],
|
||||
sizeof(sshmem_mkey_t));
|
||||
if (NULL == s->mkeys_cache[i]) {
|
||||
MEMHEAP_ERROR("PE%d: segment%d: Failed to allocate mkeys cache entry", i, j);
|
||||
@ -587,6 +640,9 @@ void mca_memheap_modex_recv_all(void)
|
||||
}
|
||||
|
||||
OPAL_THREAD_UNLOCK(&memheap_oob.lck);
|
||||
free(rcv_size);
|
||||
free(rcv_offsets);
|
||||
free(rcv_n_transports);
|
||||
free(send_buffer);
|
||||
free(rcv_buffer);
|
||||
OBJ_RELEASE(msg);
|
||||
|
@ -78,7 +78,12 @@ static inline char *mca_spml_base_mkey2str(sshmem_mkey_t *mkey)
|
||||
{
|
||||
static char buf[64];
|
||||
|
||||
snprintf(buf, sizeof(buf), "mkey: base=%p len=%d key=%" PRIu64, mkey->va_base, mkey->len, mkey->u.key);
|
||||
if (mkey->len == 0) {
|
||||
snprintf(buf, sizeof(buf), "mkey: base=%p len=%d key=%" PRIu64, mkey->va_base, mkey->len, mkey->u.key);
|
||||
} else {
|
||||
snprintf(buf, sizeof(buf), "mkey: base=%p len=%d data=0x%p", mkey->va_base, mkey->len, mkey->u.data);
|
||||
}
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,17 @@ int oshmem_shmem_allgather(void *send_buf, void *rcv_buf, int elem_size)
|
||||
return rc;
|
||||
}
|
||||
|
||||
int oshmem_shmem_allgatherv(void *send_buf, void* rcv_buf, int send_count,
|
||||
int* rcv_size, int* displs)
|
||||
{
|
||||
int rc;
|
||||
|
||||
rc = MPI_Allgatherv(send_buf, send_count, MPI_BYTE,
|
||||
rcv_buf, rcv_size, displs, MPI_BYTE, oshmem_comm_world);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
void oshmem_shmem_barrier(void)
|
||||
{
|
||||
MPI_Barrier(oshmem_comm_world);
|
||||
|
@ -112,6 +112,12 @@ OSHMEM_DECLSPEC int oshmem_shmem_abort(int errcode);
|
||||
*/
|
||||
OSHMEM_DECLSPEC int oshmem_shmem_allgather(void *send_buf, void *rcv_buf, int elem_size);
|
||||
|
||||
/**
|
||||
* Allgatherv between all PEs
|
||||
*/
|
||||
OSHMEM_DECLSPEC int oshmem_shmem_allgatherv(void *send_buf, void* rcv_buf, int send_count,
|
||||
int *rcv_size, int* displs);
|
||||
|
||||
/**
|
||||
* Barrier between all PEs
|
||||
*/
|
||||
|
Loading…
x
Reference in New Issue
Block a user