/* * Copyright (c) 2013 Mellanox Technologies, Inc. * All rights reserved. * Copyright (c) 2014 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ /** * @file */ #include "oshmem_config.h" #include "oshmem/constants.h" #include "oshmem/include/shmem.h" #include "oshmem/runtime/params.h" #include "oshmem/runtime/runtime.h" #include #include #include "oshmem/shmem/shmem_api_logger.h" #include "oshmem/shmem/shmem_lock.h" #include "oshmem/mca/memheap/memheap.h" #include "oshmem/mca/memheap/base/base.h" #include "oshmem/mca/atomic/atomic.h" #define OPAL_BITWISE_SIZEOF_LONG (SIZEOF_LONG * 8) struct oshmem_lock_counter { void *lock; int counter; struct oshmem_lock_counter *next; struct oshmem_lock_counter *prev; }; typedef struct oshmem_lock_counter oshmem_lock_counter_t; struct oshmem_lock_prev_pe_container { void *lock; int prev_pe; struct oshmem_lock_prev_pe_container *next; struct oshmem_lock_prev_pe_container *prev; }; typedef struct oshmem_lock_prev_pe_container oshmem_lock_prev_pe_container_t; oshmem_lock_counter_t *lock_counter_head = NULL; oshmem_lock_prev_pe_container_t *lock_prev_pe_container_head = NULL; static int *lock_turn; static int *lock_inform; static int *lock_last_ticket; static int lock_save_prev_pe(void *lock, int prev_pe); static int lock_restore_prev_pe(void *lock, int *prev_pe); static int shmem_lock_try_inform_server(void *lock, int lock_size); static void shmem_get_wrapper(uint64_t *target, const void *source, int source_size, size_t nelems, int pe); static void shmem_wait_wrapper(void *target, int target_size, uint64_t value); static int shmem_lock_extract_pe_next(void *lock, int lock_size, int *pe_next); static int shmem_lock_pack_pe_next_pe_last(void *lock, int lock_size, const int *pe_next, const int *pe_last); static int shmem_lock_pack_pe_next(void *lock, int lock_size, const int *pe_next); static void shmem_lock_increment_counter(void *lock, int lock_size); static int shmem_lock_decrement_counter(void *lock, int lock_size); static int shmem_lock_get_server(void *lock); static int shmem_lock_is_mine(void *lock, int lock_size); static int shmem_lock_get_ticket(void *lock); static int shmem_lock_wait_for_ticket(void *lock, int lock_size, int ticket, int *pe_last); static int shmem_lock_subscribe_for_informing(void *lock, int lock_size, int pe_last); static int shmem_lock_wait_for_informing(void *lock, int lock_size); static int shmem_lock_inform_next(void *lock, int lock_size, int pe_next); /***************************************************************************/ /**************************Init/Finalize************************************/ /***************************************************************************/ int shmem_lock_init() { void* ptr = 0; #if (OPAL_BITWISE_SIZEOF_LONG == 32) int number_of_pes = shmem_n_pes(); if (number_of_pes >= 65534) { SHMEM_API_ERROR("SHMEM distributed locking implementation does not support total number of PEs greater than 65534 if sizeof(long) = 4"); return OSHMEM_ERROR; } #endif #if ((OPAL_BITWISE_SIZEOF_LONG != 64) && (OPAL_BITWISE_SIZEOF_LONG != 32)) SHMEM_API_ERROR("SHMEM distributed locking implementation does not support sizeof(long) = %i", sizeof(long)); return OSHMEM_ERROR; #endif ptr = (void *) lock_turn; MCA_MEMHEAP_CALL(private_alloc(sizeof(int), &ptr)); lock_turn = (int *) ptr; *lock_turn = 0; ptr = (void *) lock_last_ticket; MCA_MEMHEAP_CALL(private_alloc(sizeof(int), &ptr)); lock_last_ticket = (int *) ptr; *lock_last_ticket = 0; ptr = (void *) lock_inform; MCA_MEMHEAP_CALL(private_alloc(sizeof(int), &ptr)); lock_inform = (int *) ptr; *lock_inform = 0; lock_counter_head = 0; lock_prev_pe_container_head = 0; return OSHMEM_SUCCESS; } int shmem_lock_finalize() { oshmem_lock_counter_t *current_counter = lock_counter_head; oshmem_lock_prev_pe_container_t *current_pe_container = lock_prev_pe_container_head; if (0 != lock_turn) { MCA_MEMHEAP_CALL(private_free(lock_turn)); } if (0 != lock_last_ticket) { MCA_MEMHEAP_CALL(private_free(lock_last_ticket)); } if (0 != lock_inform) { MCA_MEMHEAP_CALL(private_free(lock_inform)); } lock_turn = 0; lock_last_ticket = 0; lock_inform = 0; while (0 != current_counter) { oshmem_lock_counter_t *counter_to_free = current_counter; current_counter = current_counter->next; free(counter_to_free); } lock_counter_head = 0; while (0 != current_pe_container) { oshmem_lock_prev_pe_container_t *container_to_free = current_pe_container; current_pe_container = current_pe_container->next; free(container_to_free); } lock_prev_pe_container_head = 0; return OSHMEM_SUCCESS; } static int shmem_lock_get_server(void *lock) { map_segment_t *s; s = memheap_find_va(lock); if (NULL == s) { SHMEM_API_ERROR("PE#%i lock %p is not a shared variable", shmem_my_pe(), lock); oshmem_shmem_abort(-1); return 0; } return ((int)((uintptr_t)lock - (uintptr_t)s->super.va_base)/8) % shmem_n_pes(); } static uint64_t get_lock_value(const void *lock, int lock_size) { uint64_t lock_value = 0; if (lock_size == 4) { lock_value = *(uint32_t *) lock; } else if (lock_size == 8) { lock_value = *(uint64_t *) lock; } return lock_value; } static void set_lock_value(void *lock, int lock_size, uint64_t lock_value) { if (lock_size == 8) { memcpy(lock, &lock_value, 8); } else if (lock_size == 4) { uint32_t lock_value_32 = (uint32_t) lock_value; memcpy(lock, &lock_value_32, 4); } } /***************************************************************************/ /**************************Pack/Extract*************************************/ /***************************************************************************/ static int extract_2_words(const void *lock, int lock_size, int *one, int *two) { int lock_bitwise_size = lock_size * 8; uint64_t lock_value = get_lock_value(lock, lock_size); if (lock == 0 || one == 0 || two == 0) { return OSHMEM_ERROR; } *one = (int) (lock_value >> (lock_bitwise_size / 2)); if (lock_size == 8) { *two = (int) ((lock_value << 32) >> 32); } else if (lock_size == 4) { *two = (int) ((lock_value << 48) >> 48); } return OSHMEM_SUCCESS; } static int pack_2_words(void *lock, int lock_size, const int *one, const int *two) { uint64_t lock_value = 0; int lock_bitwise_size = lock_size * 8; if (lock == 0 || one == 0 || two == 0) { return OSHMEM_ERROR; } lock_value = (uint64_t) *two | (((uint64_t) *one) << (lock_bitwise_size / 2)); set_lock_value(lock, lock_size, lock_value); return OSHMEM_SUCCESS; } static int extract_first_word(void *lock, int lock_size, int *one) { int two = 0; return extract_2_words(lock, lock_size, one, &two); } static int extract_second_word(void *lock, int lock_size, int *two) { int one = 0; return extract_2_words(lock, lock_size, &one, two); } static uint64_t shmem_lock_cswap(void *target, int target_size, uint64_t cond, uint64_t value, int pe) { uint64_t prev_value = 0; if (target_size == 8) { MCA_ATOMIC_CALL(cswap( target, (void*)&prev_value, (const void*)&cond, (const void*)&value, target_size, pe)); } else if (target_size == 4) { uint32_t prev_value_32 = 0; uint32_t cond32 = (uint32_t) cond; uint32_t value32 = (uint32_t) value; MCA_ATOMIC_CALL(cswap( target, (void*)&prev_value_32, (const void*)&cond32, (const void*)&value32, target_size, pe)); prev_value = prev_value_32; } return prev_value; } /* function is used to busy wait for the value. * Call opal_progress() so that ompi will no deadlock * (for example may need to respond to rkey requests) */ static uint64_t shmem_lock_cswap_poll(void *target, int target_size, uint64_t cond, uint64_t value, int pe) { uint64_t prev_value; prev_value = shmem_lock_cswap(target, target_size, cond, value, pe); opal_progress(); return prev_value; } static uint64_t shmem_lock_fadd(void *target, int target_size, uint64_t value, int pe) { uint64_t prev_value = 0; if (target_size == sizeof(int)) { prev_value = (uint64_t) shmem_int_fadd((int *) target, (int) value, pe); } else if (target_size == sizeof(long)) { prev_value = (uint64_t) shmem_long_fadd((long *) target, (long) value, pe); } else if (target_size == sizeof(long long)) { prev_value = (uint64_t) shmem_longlong_fadd((long long *) target, (long long) value, pe); } return prev_value; } static int pack_first_word(void *lock, int lock_size, const int *one, int use_atomic) { int my_pe = shmem_my_pe(); uint64_t lock_value = 0; uint64_t new_long_value = 0; uint64_t temp = 0; int two = 0; if (lock == 0 || one == 0) { return OSHMEM_ERROR; } if (use_atomic) { lock_value = get_lock_value(lock, lock_size); extract_second_word(&lock_value, lock_size, &two); pack_2_words(&new_long_value, lock_size, one, &two); while (lock_value != (temp = shmem_lock_cswap_poll(lock, lock_size, lock_value, new_long_value, my_pe))) { lock_value = temp; extract_second_word(&lock_value, lock_size, &two); pack_2_words(&new_long_value, lock_size, one, &two); } } else { uint64_t zero_mask = 0xFFFFFFFF; if (lock_size == 4) { zero_mask = 0xFFFF; } int zero = 0; int written_one = 0; pack_2_words(&new_long_value, lock_size, one, &zero); do { lock_value = get_lock_value(lock, lock_size); lock_value &= zero_mask; lock_value |= new_long_value; set_lock_value(lock, lock_size, lock_value); extract_first_word(lock, lock_size, &written_one); } while (written_one != *one); } return OSHMEM_SUCCESS; } static int pack_second_word(void *lock, int lock_size, const int *two, int use_atomic) { int my_pe = shmem_my_pe(); uint64_t lock_value = 0; uint64_t new_long_value = 0; uint64_t temp = 0; int one = 0; if (lock == 0 || two == 0) { return OSHMEM_ERROR; } if (use_atomic) { lock_value = get_lock_value(lock, lock_size); extract_first_word(&lock_value, lock_size, &one); pack_2_words(&new_long_value, lock_size, &one, two); while (lock_value != (temp = shmem_lock_cswap_poll(lock, lock_size, lock_value, new_long_value, my_pe))) { lock_value = temp; extract_first_word(&lock_value, lock_size, &one); pack_2_words(&new_long_value, lock_size, &one, two); } } else { uint64_t zero_mask = 0xFFFFFFFF00000000; if (lock_size == 4) { zero_mask = 0xFFFF0000; } int zero = 0; int written_two = 0; pack_2_words(&new_long_value, lock_size, &zero, two); do { lock_value = get_lock_value(lock, lock_size); lock_value &= zero_mask; lock_value |= new_long_value; set_lock_value(lock, lock_size, lock_value); extract_second_word(lock, lock_size, &written_two); } while (written_two != *two); } return OSHMEM_SUCCESS; } static int lock_extract_pe_next_counter(void *lock, int lock_size, int *pe_next, int *counter) { int status = extract_2_words(lock, lock_size, counter, pe_next); /* make sure counter does not have the last bit - infoming bit */ *counter &= ~(((unsigned int) 1) << (SIZEOF_INT * 8 - 1)); if (*pe_next >= 0) { *pe_next -= 1; } return status; } static int shmem_lock_extract_pe_next(void *lock, int lock_size, int *pe_next) { int status = extract_second_word(lock, lock_size, pe_next); if (*pe_next >= 0) { *pe_next -= 1; } return status; } static int lock_extract_pe_last(void *lock, int lock_size, int *pe_last) { int status = extract_first_word(lock, lock_size, pe_last); if (*pe_last >= 0) { *pe_last -= 1; } return status; } static int lock_extract_counter(void *lock, int lock_size, int *count) { int status = extract_first_word(lock, lock_size, count); /* make sure counter does not have the last bit - infoming bit */ *count &= ~(((unsigned int) 1) << (SIZEOF_INT * 8 - 1)); return status; } static int shmem_lock_pack_pe_next_pe_last(void *lock, int lock_size, const int *pe_next, const int *pe_last) { int pe_next_plus_one = *pe_next + 1; int pe_last_plus_one = *pe_last + 1; return pack_2_words(lock, lock_size, &pe_last_plus_one, &pe_next_plus_one); } static int lock_pack_pe_next_counter(void *lock, int lock_size, const int *pe_next, const int *counter) { int pe_next_plus_one = *pe_next + 1; return pack_2_words(lock, lock_size, counter, &pe_next_plus_one); } static int shmem_lock_pack_pe_next(void *lock, int lock_size, const int *pe_next) { int pe_next_plus_one = *pe_next + 1; return pack_second_word(lock, lock_size, &pe_next_plus_one, 1); } static int lock_pack_counter(void *lock, int lock_size, const int *counter, int use_atomic) { return pack_first_word(lock, lock_size, counter, use_atomic); } static int lock_pack_pe_last(void *lock, int lock_size, const int *pe_last, int use_atomic) { int pe_last_plus_one = *pe_last + 1; return pack_first_word(lock, lock_size, &pe_last_plus_one, use_atomic); } /***************************************************************************/ /**************************Lock counters************************************/ /***************************************************************************/ static oshmem_lock_counter_t *lock_find_counter(void *lock) { oshmem_lock_counter_t *current_counter = lock_counter_head; if (0 == lock_counter_head) { return 0; } while (0 != current_counter) { if (current_counter->lock == lock) { return current_counter; } current_counter = current_counter->next; } return 0; } static int shmem_lock_insert_counter(void *lock) { oshmem_lock_counter_t *counter = lock_find_counter(lock); if (counter) { counter->counter += 1; } else if (lock_counter_head) { counter = malloc(sizeof(oshmem_lock_counter_t)); counter->lock = lock; counter->counter = 1; counter->next = lock_counter_head; counter->prev = lock_counter_head->prev; lock_counter_head->prev = counter; lock_counter_head = counter; } else { lock_counter_head = malloc(sizeof(oshmem_lock_counter_t)); lock_counter_head->lock = lock; lock_counter_head->counter = 1; lock_counter_head->next = 0; lock_counter_head->prev = 0; } return OSHMEM_SUCCESS; } static int shmem_lock_remove_counter(void *lock) { oshmem_lock_counter_t *counter = lock_find_counter(lock); if (counter) { oshmem_lock_counter_t *prev = counter->prev; oshmem_lock_counter_t *next = counter->next; if (next) { next->prev = prev; } if (prev) { prev->next = next; } if (lock_counter_head == counter) { lock_counter_head = next; } free(counter); } return OSHMEM_SUCCESS; } static void shmem_lock_increment_counter(void *lock, int lock_size) { int my_pe = shmem_my_pe(); int server_pe = shmem_lock_get_server(lock); if (my_pe == server_pe) { shmem_lock_insert_counter(lock); } else { int counter = 0; lock_extract_counter(lock, lock_size, &counter); counter++; lock_pack_counter(lock, lock_size, &counter, 1); } } static int shmem_lock_decrement_counter(void *lock, int lock_size) { int my_pe = shmem_my_pe(); int server_pe = shmem_lock_get_server(lock); int current_lock_counter = -1; if (my_pe == server_pe) { oshmem_lock_counter_t *counter = lock_find_counter(lock); if (counter) { if (oshmem_shmem_lock_recursive) { counter->counter -= 1; } else { counter->counter = 0; } if ((current_lock_counter = counter->counter) <= 0) { shmem_lock_remove_counter(lock); current_lock_counter = 0; } } } else { int pe_next = 0, counter = 0; lock_extract_pe_next_counter(lock, lock_size, &pe_next, &counter); if (counter > 0) { if (oshmem_shmem_lock_recursive) { current_lock_counter = counter - 1; } else { current_lock_counter = 0; } lock_pack_counter(lock, lock_size, ¤t_lock_counter, 1); } } return current_lock_counter; } static int lock_get_count(void *lock, int lock_size) { int my_pe = shmem_my_pe(); int server_pe = shmem_lock_get_server(lock); int count = 0; if (my_pe == server_pe) { oshmem_lock_counter_t *counter = lock_find_counter(lock); if (counter) { count = counter->counter; } } else { lock_extract_counter(lock, lock_size, &count); } return count; } static int shmem_lock_is_mine(void *lock, int lock_size) { return (lock_get_count(lock, lock_size) > 0); } /***************************************************************************/ /**************************Ticket utilities*********************************/ /***************************************************************************/ static int shmem_lock_get_ticket(void *lock) { int server_pe = shmem_lock_get_server(lock); int my_ticket = shmem_int_finc(lock_last_ticket, server_pe); return my_ticket; } static void shmem_get_wrapper(uint64_t *target, const void *source, int source_size, size_t nelems, int pe) { if (source_size == 8) { shmem_get64(target, source, nelems, pe); } else if (source_size == 4) { uint32_t temp32 = 0; shmem_get32(&temp32, source, nelems, pe); *target = temp32; } } static int shmem_lock_wait_for_ticket(void *lock, int lock_size, int ticket, int *pe_last) { int my_pe = shmem_my_pe(); int server_pe = shmem_lock_get_server(lock); int remote_new_pe_last = 0; int remote_turn = 0; uint64_t server_lock = 0; uint64_t new_server_lock = 0; uint64_t temp = 0; do { shmem_int_get(&remote_turn, lock_turn, 1, server_pe); } while (remote_turn != ticket); shmem_get_wrapper(&temp, lock, lock_size, 1, server_pe); do { /* Another process has ignored the queue, possibly due to shmem_test_lock */ new_server_lock = server_lock = temp; lock_pack_pe_last(&new_server_lock, lock_size, &my_pe, 0); } while (server_lock != (temp = shmem_lock_cswap_poll(lock, lock_size, server_lock, new_server_lock, server_pe))); lock_extract_pe_last(&server_lock, lock_size, pe_last); if (*pe_last == -1) { /* we are first in queue for the lock */ *pe_last = my_pe; } /* Since quiet is too slow in ikrit then check directly * shmem_quiet(); */ do { shmem_get_wrapper(&new_server_lock, lock, lock_size, 1, server_pe); lock_extract_pe_last(&new_server_lock, lock_size, &remote_new_pe_last); } while (remote_new_pe_last != my_pe); shmem_int_finc(lock_turn, server_pe); return OSHMEM_SUCCESS; } static int shmem_lock_subscribe_for_informing(void *lock, int lock_size, int pe_last) { int my_pe = shmem_my_pe(); int server_pe = shmem_lock_get_server(lock); int remote_prev_pe_next = 0; uint64_t prev_remote_value = 1; prev_remote_value = shmem_lock_fadd(lock, lock_size, my_pe + 1, pe_last); if (my_pe == server_pe) { lock_save_prev_pe(lock, pe_last); } /* Check the previous value of pe_next is -1 * if not 0 report a bug in distributed locking implementation */ shmem_lock_extract_pe_next(&prev_remote_value, lock_size, &remote_prev_pe_next); if (remote_prev_pe_next != -1) { int remote_counter = 0; uint64_t new_remote_value = 0; uint64_t temp_value = 0; SHMEM_API_ERROR("PE #%i noticed incorrect pe_next value=%i on PE#%i", my_pe, remote_prev_pe_next, pe_last); /* Trying to restore */ lock_extract_counter(&prev_remote_value, lock_size, &remote_counter); lock_pack_pe_next_counter(&new_remote_value, lock_size, &my_pe, &remote_counter); prev_remote_value += my_pe + 1; while (prev_remote_value != (temp_value = shmem_lock_cswap_poll(lock, lock_size, prev_remote_value, new_remote_value, pe_last))) { prev_remote_value = temp_value; lock_extract_counter(&prev_remote_value, lock_size, &remote_counter); lock_pack_pe_next_counter(&new_remote_value, lock_size, &my_pe, &remote_counter); } } return OSHMEM_SUCCESS; } static void shmem_wait_wrapper(void *target, int target_size, uint64_t value) { if (target_size == sizeof(int)) { shmem_int_wait((int *) target, (int) value); } else if (target_size == sizeof(long)) { shmem_long_wait((long *) target, (long) value); } else if (target_size == sizeof(long long)) { shmem_longlong_wait((long long *) target, (long long) value); } } /***************************************************************************/ /********************Release lock informing functions***********************/ /***************************************************************************/ static int shmem_lock_wait_for_informing(void *lock, int lock_size) { int lock_bitwise_size = lock_size * 8; int my_pe = shmem_my_pe(); int server_pe = shmem_lock_get_server(lock); if (my_pe != server_pe) { int original_counter = 1; uint64_t prev_value = get_lock_value(lock, lock_size); int informed = (prev_value >> (lock_bitwise_size - 1)); lock_extract_counter(&prev_value, lock_size, &original_counter); while (!informed) { shmem_wait_wrapper(lock, lock_size, prev_value); prev_value = get_lock_value(lock, lock_size); informed = (prev_value >> (lock_bitwise_size - 1)); } lock_pack_counter(lock, lock_size, &original_counter, 1); } else { int prev_value = *lock_inform; int prev_pe = -1; int remote_pe_next = 0; int remote_counter = 1; if (OSHMEM_SUCCESS != lock_restore_prev_pe(lock, &prev_pe)) { SHMEM_API_ERROR("Unable to restore prev_pe on server PE#%i", my_pe); oshmem_shmem_abort(-1); } if (prev_pe == server_pe) { SHMEM_API_ERROR("prev_pe (%i) is me", prev_pe); } do { uint64_t remote_lock = 0; shmem_get_wrapper(&remote_lock, lock, lock_size, 1, prev_pe); lock_extract_pe_next_counter(&remote_lock, lock_size, &remote_pe_next, &remote_counter); if ((remote_counter > 0) && (remote_pe_next == my_pe)) { shmem_int_wait(lock_inform, prev_value); prev_value = *lock_inform; } } while ((remote_counter > 0) && (remote_pe_next == my_pe)); } return OSHMEM_SUCCESS; } static int shmem_lock_inform_next(void *lock, int lock_size, int pe_next) { int lock_bitwise_size = lock_size * 8; int server_pe = shmem_lock_get_server(lock); if (server_pe != pe_next) { uint64_t temp_value = 0, remote_value = 0; shmem_get_wrapper(&remote_value, lock, lock_size, 1, pe_next); uint64_t new_remote_value = remote_value | (((uint64_t) 1) << (lock_bitwise_size - 1)); while (remote_value != (temp_value = shmem_lock_cswap_poll(lock, lock_size, remote_value, new_remote_value, pe_next))) { remote_value = temp_value; new_remote_value = remote_value | (((uint64_t) 1) << (lock_bitwise_size - 1)); } } else { shmem_int_inc(lock_inform, pe_next); } return OSHMEM_SUCCESS; } static int lock_save_prev_pe(void *lock, int prev_pe) { oshmem_lock_prev_pe_container_t *container = lock_prev_pe_container_head; while (container != 0) { if (container->lock == lock) { break; } container = container->next; } if (container) { container->prev_pe = prev_pe; } else { container = malloc(sizeof(oshmem_lock_prev_pe_container_t)); container->lock = lock; container->prev_pe = prev_pe; container->next = lock_prev_pe_container_head; container->prev = 0; if (lock_prev_pe_container_head) { lock_prev_pe_container_head->prev = container; } lock_prev_pe_container_head = container; } return OSHMEM_SUCCESS; } static int lock_restore_prev_pe(void *lock, int *prev_pe) { oshmem_lock_prev_pe_container_t *container = lock_prev_pe_container_head; while (container != 0) { if (container->lock == lock) { break; } container = container->next; } if (container) { oshmem_lock_prev_pe_container_t *next = container->next; oshmem_lock_prev_pe_container_t *prev = container->prev; *prev_pe = container->prev_pe; if (prev) { prev->next = next; } if (next) { next->prev = prev; } if (lock_prev_pe_container_head == container) { lock_prev_pe_container_head = next; } free(container); return OSHMEM_SUCCESS; } else { *prev_pe = -1; return OSHMEM_ERROR; } } static int shmem_lock_try_inform_server(void *lock, int lock_size) { int my_pe = shmem_my_pe(); int server_pe = shmem_lock_get_server(lock); int zero = 0; int incorrect_pe = -1; uint64_t remote_value = 0; shmem_lock_pack_pe_next_pe_last(&remote_value, lock_size, &incorrect_pe, &my_pe); return !(remote_value == shmem_lock_cswap_poll(lock, lock_size, remote_value, zero, server_pe)); } /***************************************************************************/ /**************************API wrappers*************************************/ /***************************************************************************/ void _shmem_set_lock(void *lock, int lock_size) { int my_pe = shmem_my_pe(); if (!shmem_lock_is_mine(lock, lock_size)) { int has_lock = !_shmem_test_lock(lock, lock_size); if (!has_lock) { int pe_last = -1; int ticket = shmem_lock_get_ticket(lock); shmem_lock_increment_counter(lock, lock_size); shmem_lock_wait_for_ticket(lock, lock_size, ticket, &pe_last); if (pe_last != my_pe) { shmem_lock_subscribe_for_informing(lock, lock_size, pe_last); shmem_lock_wait_for_informing(lock, lock_size); } } } else { shmem_lock_increment_counter(lock, lock_size); } } int _shmem_test_lock(void *lock, int lock_size) { int status = 1; int server_pe = shmem_lock_get_server(lock); int incorrect_value = -1; int my_pe = shmem_my_pe(); uint64_t new_lock_value = 0; uint64_t prev_lock_value = 1; int my_lock = shmem_lock_is_mine(lock, lock_size); shmem_lock_increment_counter(lock, lock_size); if (!my_lock) { if (shmem_lock_pack_pe_next_pe_last(&new_lock_value, lock_size, &incorrect_value, &my_pe)) { goto FreeMemory; } prev_lock_value = shmem_lock_cswap(lock, lock_size, 0, new_lock_value, server_pe); } if (0 == prev_lock_value || my_lock) { status = 0; } else { shmem_lock_decrement_counter(lock, lock_size); } FreeMemory: return status; } void _shmem_clear_lock(void *lock, int lock_size) { int current_lock_counter = shmem_lock_decrement_counter(lock, lock_size); if (0 == current_lock_counter) { int next_informed = 0; int pe_next = 0; while (!next_informed) { shmem_lock_extract_pe_next(lock, lock_size, &pe_next); if (pe_next >= 0) { shmem_lock_inform_next(lock, lock_size, pe_next); next_informed = 1; } else { /* It seems I'm the last in queue */ if (!shmem_lock_try_inform_server(lock, lock_size)) { next_informed = 1; } } } pe_next = -1; shmem_lock_pack_pe_next(lock, lock_size, &pe_next); } }