From 903762e19456d797bec6d05ff4a56e5f936d14bc Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Tue, 22 Sep 2015 16:00:27 -0600 Subject: [PATCH] osc/sm: fix pscw synchronization The osc/sm component was using a simple counter to determine if all expected posts had arrived to start a PSCW access epoch. This is incorrect as a post may arrive from a peer that isn't part of the current start group. There are many ways this could have been fixed. This commit adds an n^2 bitmap. When a process posts it sets a bit in the bitmap associated with the access rank to indicate the post is complete. The access rank checks for and clears the bits associated with all the processes in the start group. The bitmap requires comm_size ^ 2 bits of space. This should be managable as most nodes have relatively small numbers of processes. If this changes another algorigthm can be implemented. Signed-off-by: Nathan Hjelm --- ompi/mca/osc/sm/osc_sm.h | 4 +- ompi/mca/osc/sm/osc_sm_active_target.c | 241 +++++++++++++++++++------ ompi/mca/osc/sm/osc_sm_component.c | 108 ++++++----- 3 files changed, 248 insertions(+), 105 deletions(-) diff --git a/ompi/mca/osc/sm/osc_sm.h b/ompi/mca/osc/sm/osc_sm.h index 344c05661f..7c058465b0 100644 --- a/ompi/mca/osc/sm/osc_sm.h +++ b/ompi/mca/osc/sm/osc_sm.h @@ -38,7 +38,6 @@ struct ompi_osc_sm_lock_t { typedef struct ompi_osc_sm_lock_t ompi_osc_sm_lock_t; struct ompi_osc_sm_node_state_t { - int32_t post_count; int32_t complete_count; ompi_osc_sm_lock_t lock; opal_atomic_lock_t accumulate_lock; @@ -81,6 +80,9 @@ struct ompi_osc_sm_module_t { ompi_osc_sm_global_state_t *global_state; ompi_osc_sm_node_state_t *my_node_state; ompi_osc_sm_node_state_t *node_states; + uint64_t **posts; + + opal_mutex_t lock; }; typedef struct ompi_osc_sm_module_t ompi_osc_sm_module_t; diff --git a/ompi/mca/osc/sm/osc_sm_active_target.c b/ompi/mca/osc/sm/osc_sm_active_target.c index 5cbe4b0855..e78364a9b8 100644 --- a/ompi/mca/osc/sm/osc_sm_active_target.c +++ b/ompi/mca/osc/sm/osc_sm_active_target.c @@ -1,7 +1,7 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2012 Sandia National Laboratories. All rights reserved. - * Copyright (c) 2014 Los Alamos National Security, LLC. All rights + * Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2014 The University of Tennessee and The University * of Tennessee Research Foundation. All rights @@ -22,6 +22,74 @@ #include "osc_sm.h" +/** + * compare_ranks: + * + * @param[in] ptra Pointer to integer item + * @param[in] ptrb Pointer to integer item + * + * @returns 0 if *ptra == *ptrb + * @returns -1 if *ptra < *ptrb + * @returns 1 otherwise + * + * This function is used to sort the rank list. It can be removed if + * groups are always in order. + */ +static int compare_ranks (const void *ptra, const void *ptrb) +{ + int a = *((int *) ptra); + int b = *((int *) ptrb); + + if (a < b) { + return -1; + } else if (a > b) { + return 1; + } + + return 0; +} + +/** + * ompi_osc_pt2pt_get_comm_ranks: + * + * @param[in] module - OSC PT2PT module + * @param[in] sub_group - Group with ranks to translate + * + * @returns an array of translated ranks on success or NULL on failure + * + * Translate the ranks given in {sub_group} into ranks in the + * communicator used to create {module}. + */ +static int *ompi_osc_sm_group_ranks (ompi_group_t *group, ompi_group_t *sub_group) +{ + int size = ompi_group_size(sub_group); + int *ranks1, *ranks2; + int ret; + + ranks1 = calloc (size, sizeof(int)); + ranks2 = calloc (size, sizeof(int)); + if (NULL == ranks1 || NULL == ranks2) { + free (ranks1); + free (ranks2); + return NULL; + } + + for (int i = 0 ; i < size ; ++i) { + ranks1[i] = i; + } + + ret = ompi_group_translate_ranks (sub_group, size, ranks1, group, ranks2); + free (ranks1); + if (OMPI_SUCCESS != ret) { + free (ranks2); + return NULL; + } + + qsort (ranks2, size, sizeof (int), compare_ranks); + + return ranks2; +} + int ompi_osc_sm_fence(int assert, struct ompi_win_t *win) @@ -54,7 +122,6 @@ ompi_osc_sm_fence(int assert, struct ompi_win_t *win) } } - int ompi_osc_sm_start(struct ompi_group_t *group, int assert, @@ -62,20 +129,43 @@ ompi_osc_sm_start(struct ompi_group_t *group, { ompi_osc_sm_module_t *module = (ompi_osc_sm_module_t*) win->w_osc_module; + int my_rank = ompi_comm_rank (module->comm); + + OBJ_RETAIN(group); + + if (!OPAL_ATOMIC_CMPSET(&module->start_group, NULL, group)) { + OBJ_RELEASE(group); + return OMPI_ERR_RMA_SYNC; + } if (0 == (assert & MPI_MODE_NOCHECK)) { int size; - OBJ_RETAIN(group); - module->start_group = group; + int *ranks = ompi_osc_sm_group_ranks (module->comm->c_local_group, group); + if (NULL == ranks) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + size = ompi_group_size(module->start_group); - while (module->my_node_state->post_count != size) { - opal_progress(); - opal_atomic_mb(); - } - } else { - module->start_group = NULL; + for (int i = 0 ; i < size ; ++i) { + int rank_byte = ranks[i] >> 6; + uint64_t old, rank_bit = 1 << (ranks[i] & 0x3f); + + /* wait for rank to post */ + while (!(module->posts[my_rank][rank_byte] & rank_bit)) { + opal_progress(); + opal_atomic_mb(); + } + + opal_atomic_rmb (); + + do { + old = module->posts[my_rank][rank_byte]; + } while (!opal_atomic_cmpset_64 ((int64_t *) module->posts[my_rank] + rank_byte, old, old ^ rank_bit)); + } + + free (ranks); } opal_atomic_mb(); @@ -88,30 +178,33 @@ ompi_osc_sm_complete(struct ompi_win_t *win) { ompi_osc_sm_module_t *module = (ompi_osc_sm_module_t*) win->w_osc_module; - int gsize, csize; + ompi_group_t *group; + int gsize; /* ensure all memory operations have completed */ opal_atomic_mb(); - if (NULL != module->start_group) { - module->my_node_state->post_count = 0; - opal_atomic_mb(); - - gsize = ompi_group_size(module->start_group); - csize = ompi_comm_size(module->comm); - for (int i = 0 ; i < gsize ; ++i) { - for (int j = 0 ; j < csize ; ++j) { - if (ompi_group_peer_lookup(module->start_group, i) == - ompi_comm_peer_lookup(module->comm, j)) { - (void)opal_atomic_add_32(&module->node_states[j].complete_count, 1); - } - } - } - - OBJ_RELEASE(module->start_group); - module->start_group = NULL; + group = module->start_group; + if (NULL == group || !OPAL_ATOMIC_CMPSET(&module->start_group, group, NULL)) { + return OMPI_ERR_RMA_SYNC; } + opal_atomic_mb(); + + int *ranks = ompi_osc_sm_group_ranks (module->comm->c_local_group, group); + if (NULL == ranks) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + + gsize = ompi_group_size(group); + for (int i = 0 ; i < gsize ; ++i) { + (void) opal_atomic_add_32(&module->node_states[ranks[i]].complete_count, 1); + } + + free (ranks); + + OBJ_RELEASE(group); + opal_atomic_mb(); return OMPI_SUCCESS; } @@ -124,29 +217,45 @@ ompi_osc_sm_post(struct ompi_group_t *group, { ompi_osc_sm_module_t *module = (ompi_osc_sm_module_t*) win->w_osc_module; - int gsize, csize; + int my_rank = ompi_comm_rank (module->comm); + int my_byte = my_rank >> 6; + uint64_t my_bit = 1 << (my_rank & 0x3f); + int gsize; + + OPAL_THREAD_LOCK(&module->lock); + + if (NULL != module->post_group) { + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_ERR_RMA_SYNC; + } + + module->post_group = group; + + OBJ_RETAIN(group); if (0 == (assert & MPI_MODE_NOCHECK)) { - OBJ_RETAIN(group); - module->post_group = group; + int *ranks = ompi_osc_sm_group_ranks (module->comm->c_local_group, group); + if (NULL == ranks) { + return OMPI_ERR_OUT_OF_RESOURCE; + } module->my_node_state->complete_count = 0; opal_atomic_mb(); gsize = ompi_group_size(module->post_group); - csize = ompi_comm_size(module->comm); for (int i = 0 ; i < gsize ; ++i) { - for (int j = 0 ; j < csize ; ++j) { - if (ompi_group_peer_lookup(module->post_group, i) == - ompi_comm_peer_lookup(module->comm, j)) { - (void)opal_atomic_add_32(&module->node_states[j].post_count, 1); - } - } + (void) opal_atomic_add_64 ((int64_t *) module->posts[ranks[i]] + my_byte, my_bit); } - } else { - module->post_group = NULL; + + opal_atomic_wmb (); + + free (ranks); + + opal_progress (); } + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_SUCCESS; } @@ -156,19 +265,29 @@ ompi_osc_sm_wait(struct ompi_win_t *win) { ompi_osc_sm_module_t *module = (ompi_osc_sm_module_t*) win->w_osc_module; + ompi_group_t *group; - if (NULL != module->post_group) { - int size = ompi_group_size(module->post_group); + OPAL_THREAD_LOCK(&module->lock); - while (module->my_node_state->complete_count != size) { - opal_progress(); - opal_atomic_mb(); - } - - OBJ_RELEASE(module->post_group); - module->post_group = NULL; + if (NULL == module->post_group) { + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_ERR_RMA_SYNC; } + group = module->post_group; + + int size = ompi_group_size (group); + + while (module->my_node_state->complete_count != size) { + opal_progress(); + opal_atomic_mb(); + } + + OBJ_RELEASE(group); + module->post_group = NULL; + + OPAL_THREAD_UNLOCK(&module->lock); + /* ensure all memory operations have completed */ opal_atomic_mb(); @@ -183,19 +302,23 @@ ompi_osc_sm_test(struct ompi_win_t *win, ompi_osc_sm_module_t *module = (ompi_osc_sm_module_t*) win->w_osc_module; - if (NULL != module->post_group) { - int size = ompi_group_size(module->post_group); + OPAL_THREAD_LOCK(&module->lock); - if (module->my_node_state->complete_count == size) { - OBJ_RELEASE(module->post_group); - module->post_group = NULL; - *flag = 1; - } - } else { - opal_atomic_mb(); - *flag = 0; + if (NULL == module->post_group) { + OPAL_THREAD_UNLOCK(&module->lock); + return OMPI_ERR_RMA_SYNC; } + int size = ompi_group_size(module->post_group); + + if (module->my_node_state->complete_count == size) { + OBJ_RELEASE(module->post_group); + module->post_group = NULL; + *flag = 1; + } + + OPAL_THREAD_UNLOCK(&module->lock); + /* ensure all memory operations have completed */ opal_atomic_mb(); diff --git a/ompi/mca/osc/sm/osc_sm_component.c b/ompi/mca/osc/sm/osc_sm_component.c index 6b18d2bc3e..28d76217b7 100644 --- a/ompi/mca/osc/sm/osc_sm_component.c +++ b/ompi/mca/osc/sm/osc_sm_component.c @@ -58,45 +58,45 @@ ompi_osc_sm_component_t mca_osc_sm_component = { ompi_osc_sm_module_t ompi_osc_sm_module_template = { { - ompi_osc_sm_shared_query, + .osc_win_shared_query = ompi_osc_sm_shared_query, - ompi_osc_sm_attach, - ompi_osc_sm_detach, - ompi_osc_sm_free, + .osc_win_attach = ompi_osc_sm_attach, + .osc_win_detach = ompi_osc_sm_detach, + .osc_free = ompi_osc_sm_free, - ompi_osc_sm_put, - ompi_osc_sm_get, - ompi_osc_sm_accumulate, - ompi_osc_sm_compare_and_swap, - ompi_osc_sm_fetch_and_op, - ompi_osc_sm_get_accumulate, + .osc_put = ompi_osc_sm_put, + .osc_get = ompi_osc_sm_get, + .osc_accumulate = ompi_osc_sm_accumulate, + .osc_compare_and_swap = ompi_osc_sm_compare_and_swap, + .osc_fetch_and_op = ompi_osc_sm_fetch_and_op, + .osc_get_accumulate = ompi_osc_sm_get_accumulate, - ompi_osc_sm_rput, - ompi_osc_sm_rget, - ompi_osc_sm_raccumulate, - ompi_osc_sm_rget_accumulate, + .osc_rput = ompi_osc_sm_rput, + .osc_rget = ompi_osc_sm_rget, + .osc_raccumulate = ompi_osc_sm_raccumulate, + .osc_rget_accumulate = ompi_osc_sm_rget_accumulate, - ompi_osc_sm_fence, + .osc_fence = ompi_osc_sm_fence, - ompi_osc_sm_start, - ompi_osc_sm_complete, - ompi_osc_sm_post, - ompi_osc_sm_wait, - ompi_osc_sm_test, + .osc_start = ompi_osc_sm_start, + .osc_complete = ompi_osc_sm_complete, + .osc_post = ompi_osc_sm_post, + .osc_wait = ompi_osc_sm_wait, + .osc_test = ompi_osc_sm_test, - ompi_osc_sm_lock, - ompi_osc_sm_unlock, - ompi_osc_sm_lock_all, - ompi_osc_sm_unlock_all, + .osc_lock = ompi_osc_sm_lock, + .osc_unlock = ompi_osc_sm_unlock, + .osc_lock_all = ompi_osc_sm_lock_all, + .osc_unlock_all = ompi_osc_sm_unlock_all, - ompi_osc_sm_sync, - ompi_osc_sm_flush, - ompi_osc_sm_flush_all, - ompi_osc_sm_flush_local, - ompi_osc_sm_flush_local_all, + .osc_sync = ompi_osc_sm_sync, + .osc_flush = ompi_osc_sm_flush, + .osc_flush_all = ompi_osc_sm_flush_all, + .osc_flush_local = ompi_osc_sm_flush_local, + .osc_flush_local_all = ompi_osc_sm_flush_local_all, - ompi_osc_sm_set_info, - ompi_osc_sm_get_info + .osc_set_info = ompi_osc_sm_set_info, + .osc_get_info = ompi_osc_sm_get_info } }; @@ -163,6 +163,7 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit int flavor, int *model) { ompi_osc_sm_module_t *module = NULL; + int comm_size = ompi_comm_size (comm); int ret = OMPI_ERROR; if (OMPI_SUCCESS != (ret = check_win_ok(comm, flavor))) { @@ -174,6 +175,8 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit calloc(1, sizeof(ompi_osc_sm_module_t)); if (NULL == module) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + OBJ_CONSTRUCT(&module->lock, opal_mutex_t); + /* fill in the function pointer part */ memcpy(module, &ompi_osc_sm_module_template, sizeof(ompi_osc_base_module_t)); @@ -185,7 +188,7 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit module->flavor = flavor; /* create the segment */ - if (1 == ompi_comm_size(comm)) { + if (1 == comm_size) { module->segment_base = NULL; module->sizes = malloc(sizeof(size_t)); if (NULL == module->sizes) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; @@ -200,13 +203,16 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit if (NULL == module->global_state) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; module->node_states = malloc(sizeof(ompi_osc_sm_node_state_t)); if (NULL == module->node_states) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; - + module->posts = calloc (1, sizeof(module->posts[0]) + sizeof (uint64_t)); + if (NULL == module->posts) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + module->posts[0] = (uint64_t *) (module->posts + 1); } else { unsigned long total, *rbuf; char *data_file; int i, flag; size_t pagesize; size_t state_size; + int posts_size, post_size = (comm_size + 63) / 64; OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output, "allocating shared memory region of size %ld\n", (long) size)); @@ -214,7 +220,7 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit /* get the pagesize */ pagesize = opal_getpagesize(); - rbuf = malloc(sizeof(unsigned long) * ompi_comm_size(module->comm)); + rbuf = malloc(sizeof(unsigned long) * comm_size); if (NULL == rbuf) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; module->noncontig = false; @@ -235,7 +241,7 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit if (OMPI_SUCCESS != ret) return ret; total = 0; - for (i = 0 ; i < ompi_comm_size(module->comm) ; ++i) { + for (i = 0 ; i < comm_size ; ++i) { total += rbuf[i]; } @@ -247,9 +253,10 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit } /* user opal/shmem directly to create a shared memory segment */ - state_size = sizeof(ompi_osc_sm_global_state_t) + sizeof(ompi_osc_sm_node_state_t) * ompi_comm_size(module->comm); + state_size = sizeof(ompi_osc_sm_global_state_t) + sizeof(ompi_osc_sm_node_state_t) * comm_size; + posts_size = comm_size * post_size * sizeof (uint64_t); if (0 == ompi_comm_rank (module->comm)) { - ret = opal_shmem_segment_create (&module->seg_ds, data_file, total + pagesize + state_size); + ret = opal_shmem_segment_create (&module->seg_ds, data_file, total + pagesize + state_size + posts_size); if (OPAL_SUCCESS != ret) { goto error; } @@ -266,15 +273,22 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit goto error; } - module->sizes = malloc(sizeof(size_t) * ompi_comm_size(module->comm)); + module->sizes = malloc(sizeof(size_t) * comm_size); if (NULL == module->sizes) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; - module->bases = malloc(sizeof(void*) * ompi_comm_size(module->comm)); + module->bases = malloc(sizeof(void*) * comm_size); if (NULL == module->bases) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; + module->posts = calloc (comm_size, sizeof (module->posts[0])); + if (NULL == module->posts) return OMPI_ERR_TEMP_OUT_OF_RESOURCE; module->global_state = (ompi_osc_sm_global_state_t *) (module->segment_base); module->node_states = (ompi_osc_sm_node_state_t *) (module->global_state + 1); + module->posts[0] = (uint64_t *) (module->node_states + comm_size); + + for (i = 0, total = state_size + posts_size ; i < comm_size ; ++i) { + if (i > 0) { + module->posts[i] = module->posts[i - 1] + post_size; + } - for (i = 0, total = state_size ; i < ompi_comm_size(module->comm) ; ++i) { module->sizes[i] = rbuf[i]; if (module->sizes[i]) { module->bases[i] = ((char *) module->segment_base) + total; @@ -296,7 +310,7 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit opal_atomic_init(&module->my_node_state->accumulate_lock, OPAL_ATOMIC_UNLOCKED); /* share everyone's displacement units. */ - module->disp_units = malloc(sizeof(int) * ompi_comm_size(module->comm)); + module->disp_units = malloc(sizeof(int) * comm_size); ret = module->comm->c_coll.coll_allgather(&disp_unit, 1, MPI_INT, module->disp_units, 1, MPI_INT, module->comm, @@ -309,7 +323,7 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit /* initialize synchronization code */ module->my_sense = 1; - module->outstanding_locks = calloc(ompi_comm_size(module->comm), sizeof(enum ompi_osc_sm_locktype_t)); + module->outstanding_locks = calloc(comm_size, sizeof(enum ompi_osc_sm_locktype_t)); if (NULL == module->outstanding_locks) { ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE; goto error; @@ -346,7 +360,7 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit } module->global_state->use_barrier_for_fence = 0; module->global_state->sense = module->my_sense; - module->global_state->count = ompi_comm_size(module->comm); + module->global_state->count = comm_size; pthread_mutexattr_destroy(&mattr); } else { module->global_state->use_barrier_for_fence = 1; @@ -367,8 +381,8 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit return OMPI_SUCCESS; error: - if (NULL != module->comm) ompi_comm_free(&module->comm); - if (NULL != module) free(module); + win->w_osc_module = &module->super; + ompi_osc_sm_free (win); return ret; } @@ -459,10 +473,14 @@ ompi_osc_sm_free(struct ompi_win_t *win) free(module->sizes); } + free (module->posts); + /* cleanup */ ompi_comm_free(&module->comm); free(module); + OBJ_DESTRUCT(&module->lock); + return OMPI_SUCCESS; }