1
1

This is a large check-in related to switching from

os/atomic.h  -->  include/sys/atomic.h

WARNING:  There are almost certainly some bugs introduced here, but I
believe that using this system will get us to a stable and portable
library faster in the long run.

Other changes:

threads/mutex

  Reorganized to use pthreads or asm atomic operations as available.
  Untested Windows implementation added using InterlockedExchange() funcion.

threads/thread

  Added an untested Windows implementation

other places

  Updates to include the "right" header files or to use
  ompi_atomic_add_int() rather that fetchNadd() etc.

This commit was SVN r2221.
Этот коммит содержится в:
David Daniel 2004-08-18 23:24:27 +00:00
родитель bbcd4b0330
Коммит 927ef83454
59 изменённых файлов: 1173 добавлений и 485 удалений

Просмотреть файл

@ -2,6 +2,10 @@
* $HEADER$ * $HEADER$
*/ */
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include "class/ompi_bitmap.h" #include "class/ompi_bitmap.h"
#define SIZE_OF_CHAR (sizeof(char) * 8) #define SIZE_OF_CHAR (sizeof(char) * 8)

Просмотреть файл

@ -7,7 +7,7 @@
#include "include/constants.h" #include "include/constants.h"
#include "include/sys/cache.h" #include "include/sys/cache.h"
#include "os/atomic.h" #include "include/sys/atomic.h"
#include "mca/mpool/mpool.h" #include "mca/mpool/mpool.h"
#include "util/pow2.h" #include "util/pow2.h"

Просмотреть файл

@ -1,7 +1,11 @@
/* /*
* $HEADER$ * $HEADER$
*/ */
#ifdef HAVE_CONFIG_H
#include "ompi_config.h" #include "ompi_config.h"
#endif
#include "class/ompi_free_list.h" #include "class/ompi_free_list.h"

Просмотреть файл

@ -2,10 +2,13 @@
* $HEADER$ * $HEADER$
*/ */
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include "ompi_config.h"
#include "include/constants.h" #include "include/constants.h"
#include "util/output.h" #include "util/output.h"
#include "class/ompi_hash_table.h" #include "class/ompi_hash_table.h"

Просмотреть файл

@ -2,6 +2,10 @@
* $HEADER$ * $HEADER$
*/ */
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include "class/ompi_list.h" #include "class/ompi_list.h"
/* /*

Просмотреть файл

@ -8,6 +8,10 @@
* Implementation of ompi_object_t, the base ompi foundation class * Implementation of ompi_object_t, the base ompi foundation class
*/ */
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include <stdio.h> #include <stdio.h>
#include "include/constants.h" #include "include/constants.h"

Просмотреть файл

@ -103,11 +103,11 @@
#include <assert.h> #include <assert.h>
#include <stdlib.h> #include <stdlib.h>
#ifdef HAVE_CONFIG_H #ifdef __WINDOWS__
#include "ompi_config.h" #include <windows.h>
#endif #else
#include "include/sys/atomic.h" #include "include/sys/atomic.h"
#endif
/* /*
* BEGIN_C_DECLS should be used at the beginning of your declarations, * BEGIN_C_DECLS should be used at the beginning of your declarations,
@ -151,8 +151,8 @@
ompi_class_t NAME ## _class = { \ ompi_class_t NAME ## _class = { \
# NAME, \ # NAME, \
OBJ_CLASS(PARENT), \ OBJ_CLASS(PARENT), \
(ompi_construct_t)CONSTRUCTOR, \ (ompi_construct_t) CONSTRUCTOR, \
(ompi_destruct_t)DESTRUCTOR, \ (ompi_destruct_t) DESTRUCTOR, \
0, 0, NULL, NULL \ 0, 0, NULL, NULL \
} }
@ -299,7 +299,6 @@ OBJ_CLASS_DECLARATION(ompi_object_t);
/* declarations *******************************************************/ /* declarations *******************************************************/
BEGIN_C_DECLS BEGIN_C_DECLS
/** /**
* Lazy initialization of class descriptor. * Lazy initialization of class descriptor.
* *
@ -311,7 +310,6 @@ BEGIN_C_DECLS
void ompi_class_initialize(ompi_class_t *); void ompi_class_initialize(ompi_class_t *);
END_C_DECLS END_C_DECLS
/** /**
* Run the hierarchy of class constructors for this object, in a * Run the hierarchy of class constructors for this object, in a
* parent-first order. * parent-first order.
@ -324,7 +322,7 @@ END_C_DECLS
* Hardwired for fairly shallow inheritance trees * Hardwired for fairly shallow inheritance trees
* @param size Pointer to the object. * @param size Pointer to the object.
*/ */
static inline void ompi_obj_run_constructors(ompi_object_t *object) static inline void ompi_obj_run_constructors(ompi_object_t * object)
{ {
ompi_class_t *cls; ompi_class_t *cls;
int i; int i;
@ -335,7 +333,7 @@ static inline void ompi_obj_run_constructors(ompi_object_t *object)
cls = object->obj_class; cls = object->obj_class;
for (i = cls->cls_depth - 1; i >= 0; i--) { for (i = cls->cls_depth - 1; i >= 0; i--) {
if (cls->cls_construct_array[i]) { if (cls->cls_construct_array[i]) {
(cls->cls_construct_array[i])(object); (cls->cls_construct_array[i]) (object);
} }
} }
} }
@ -349,7 +347,7 @@ static inline void ompi_obj_run_constructors(ompi_object_t *object)
* *
* @param size Pointer to the object. * @param size Pointer to the object.
*/ */
static inline void ompi_obj_run_destructors(ompi_object_t *object) static inline void ompi_obj_run_destructors(ompi_object_t * object)
{ {
ompi_class_t *cls; ompi_class_t *cls;
int i; int i;
@ -360,7 +358,7 @@ static inline void ompi_obj_run_destructors(ompi_object_t *object)
cls = object->obj_class; cls = object->obj_class;
for (i = 0; i < cls->cls_depth; i++) { for (i = 0; i < cls->cls_depth; i++) {
if (cls->cls_destruct_array[i]) { if (cls->cls_destruct_array[i]) {
(cls->cls_destruct_array[i])(object); (cls->cls_destruct_array[i]) (object);
} }
} }
} }
@ -376,7 +374,7 @@ static inline void ompi_obj_run_destructors(ompi_object_t *object)
* @param cls Pointer to the class descriptor of this object * @param cls Pointer to the class descriptor of this object
* @return Pointer to the object * @return Pointer to the object
*/ */
static inline ompi_object_t *ompi_obj_new(size_t size, ompi_class_t *cls) static inline ompi_object_t *ompi_obj_new(size_t size, ompi_class_t * cls)
{ {
ompi_object_t *object; ompi_object_t *object;
@ -407,8 +405,17 @@ static inline ompi_object_t *ompi_obj_new(size_t size, ompi_class_t *cls)
*/ */
static inline int ompi_obj_update(ompi_object_t *object, int inc) static inline int ompi_obj_update(ompi_object_t *object, int inc)
{ {
#ifdef __WINDOWS__
int newval;
LONG volatile *addr;
addr = (LONG volatile *) &(object->obj_reference_count);
newval = (int) InterlockedExchangeAdd(addr, (LONG) inc) + inc;
#elif OMPI_SYS_ARCH_ATOMIC_H
int newval; int newval;
#if 0
int oldval; int oldval;
volatile int *addr; volatile int *addr;
@ -417,10 +424,14 @@ static inline int ompi_obj_update(ompi_object_t *object, int inc)
oldval = *addr; oldval = *addr;
newval = oldval + inc; newval = oldval + inc;
} while (ompi_atomic_cmpset_int(addr, oldval, newval) == 0); } while (ompi_atomic_cmpset_int(addr, oldval, newval) == 0);
#else #else
object->obj_reference_count += inc; object->obj_reference_count += inc;
newval = object->obj_reference_count; newval = object->obj_reference_count;
#endif #endif
return newval; return newval;
} }

Просмотреть файл

@ -7,6 +7,10 @@
* Utility functions to manage fortran <-> c opaque object translation * Utility functions to manage fortran <-> c opaque object translation
*/ */
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <assert.h> #include <assert.h>

Просмотреть файл

@ -5,6 +5,10 @@
* @file * @file
*/ */
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include "class/ompi_rb_tree.h" #include "class/ompi_rb_tree.h"
/* declare the instance of the classes */ /* declare the instance of the classes */

Просмотреть файл

@ -2,6 +2,10 @@
* $HEADER$ * $HEADER$
*/ */
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include "class/ompi_value_array.h" #include "class/ompi_value_array.h"

Просмотреть файл

@ -26,9 +26,36 @@
#define STATIC_INLINE #define STATIC_INLINE
#endif #endif
/* /**
* prototypes * Volatile lock object (with optional padding).
*/ */
struct ompi_lock_t {
union {
volatile int lock; /**< The lock address (an integer) */
char padding[sizeof(int)]; /**< Array for optional padding */
} u;
};
typedef struct ompi_lock_t ompi_lock_t;
/**
* Memory barrier
*/
STATIC_INLINE void ompi_atomic_mb(void);
/**
* Read memory barrier
*/
STATIC_INLINE void ompi_atomic_rmb(void);
/**
* Write memory barrier.
*/
STATIC_INLINE void ompi_atomic_wmb(void);
/** /**
* Atomic compare and set of unsigned 32-bit integer. * Atomic compare and set of unsigned 32-bit integer.
@ -220,7 +247,7 @@ STATIC_INLINE int ompi_atomic_cmpset_rel_ptr(volatile void *addr,
* @param delta Value to add. * @param delta Value to add.
* @return New value of integer. * @return New value of integer.
*/ */
STATIC_INLINE uint32_t ompi_atomic_add_32(uint32_t *addr, int delta); static inline uint32_t ompi_atomic_add_32(volatile uint32_t *addr, int delta);
/** /**
@ -230,7 +257,7 @@ STATIC_INLINE uint32_t ompi_atomic_add_32(uint32_t *addr, int delta);
* @param delta Value to add. * @param delta Value to add.
* @return New value of integer. * @return New value of integer.
*/ */
STATIC_INLINE uint64_t ompi_atomic_add_64(uint64_t *addr, int delta); static inline uint64_t ompi_atomic_add_64(volatile uint64_t *addr, int delta);
/** /**
@ -240,7 +267,42 @@ STATIC_INLINE uint64_t ompi_atomic_add_64(uint64_t *addr, int delta);
* @param delta Value to add. * @param delta Value to add.
* @return New value of integer. * @return New value of integer.
*/ */
STATIC_INLINE int ompi_atomic_add_int(int *addr, int delta); static inline int ompi_atomic_add_int(volatile int *addr, int delta);
/**
* Atomically add to an integer.
*
* @param addr Address of integer.
* @param newval Value to set.
* @return Old value of integer.
*/
static inline int ompi_atomic_fetch_and_set_int(volatile int *addr, int newval);
/**
* Try to acquire a lock.
*
* @param lock Address of the lock.
* @return 0 if the lock was acquired, 1 otherwise.
*/
static inline int ompi_atomic_trylock(ompi_lock_t *lock);
/**
* Acquire a lock by spinning.
*
* @param lock Address of the lock.
*/
static inline void ompi_atomic_lock(ompi_lock_t *lock);
/**
* Release a lock.
*
* @param lock Address of the lock.
*/
static inline void ompi_atomic_unlock(ompi_lock_t *lock);
#ifdef __GNUC__ #ifdef __GNUC__
@ -398,7 +460,7 @@ static inline int ompi_atomic_cmpset_rel_ptr(volatile void *addr,
#endif #endif
static inline uint32_t ompi_atomic_add_32(uint32_t *addr, int delta) static inline uint32_t ompi_atomic_add_32(volatile uint32_t *addr, int delta)
{ {
uint32_t oldval; uint32_t oldval;
@ -409,7 +471,7 @@ static inline uint32_t ompi_atomic_add_32(uint32_t *addr, int delta)
} }
static inline uint64_t ompi_atomic_add_64(uint64_t *addr, int delta) static inline uint64_t ompi_atomic_add_64(volatile uint64_t *addr, int delta)
{ {
uint64_t oldval; uint64_t oldval;
@ -420,7 +482,7 @@ static inline uint64_t ompi_atomic_add_64(uint64_t *addr, int delta)
} }
static inline int ompi_atomic_add_int(int *addr, int delta) static inline int ompi_atomic_add_int(volatile int *addr, int delta)
{ {
int oldval; int oldval;
@ -430,4 +492,59 @@ static inline int ompi_atomic_add_int(int *addr, int delta)
return (oldval + delta); return (oldval + delta);
} }
static inline int ompi_atomic_fetch_and_set_int(volatile int *addr, int newval)
{
int oldval;
do {
oldval = *addr;
} while (0 == ompi_atomic_cmpset_int(addr, oldval, newval));
return (oldval);
}
/*
* Atomic locks
*/
/**
* Enumeration of lock states
*/
enum {
OMPI_ATOMIC_UNLOCKED = 0,
OMPI_ATOMIC_LOCKED = 1
};
static inline int ompi_atomic_trylock(ompi_lock_t *lock)
{
return ompi_atomic_cmpset_acq_int((volatile int *) lock,
OMPI_ATOMIC_UNLOCKED,
OMPI_ATOMIC_LOCKED);
}
static inline void ompi_atomic_lock(ompi_lock_t *lock)
{
while (!ompi_atomic_cmpset_acq_int((volatile int *) lock,
OMPI_ATOMIC_UNLOCKED,
OMPI_ATOMIC_LOCKED)) {
while (lock->u.lock == OMPI_ATOMIC_LOCKED) {
/* spin */ ;
}
}
}
static inline void ompi_atomic_unlock(ompi_lock_t *lock)
{
if (0) {
ompi_atomic_cmpset_rel_int((volatile int *) lock,
OMPI_ATOMIC_LOCKED,
OMPI_ATOMIC_UNLOCKED);
} else {
ompi_atomic_wmb();
lock->u.lock = OMPI_ATOMIC_UNLOCKED;
}
}
#endif /* OMPI_SYS_ATOMIC_H */ #endif /* OMPI_SYS_ATOMIC_H */

30
src/include/sys/numa.h Обычный файл
Просмотреть файл

@ -0,0 +1,30 @@
/*
* $HEADER$
*/
#include "include/constants.h"
#include "class/ompi_object.h"
typedef int affinity_t;
#ifndef ENABLE_NUMA
static inline int ompi_set_affinity(void *addr, size_t size, affinity_t affinity)
{
return 1;
}
static inline int ompi_get_cpu_set(void)
{
return OMPI_SUCCESS;
}
#else
/* OS / architecture specific implementation elsewhere */
int ompi_set_affinity(void *addr, size_t size, affinity_t affinity);
int ompi_get_cpu_set(void)
#endif

Просмотреть файл

@ -167,7 +167,7 @@ mca_common_sm_mmap_t* mca_common_sm_mmap_init(size_t size, char *file_name,
/* initialize the segment - only the first process to open the file */ /* initialize the segment - only the first process to open the file */
if( !file_previously_opened ) { if( !file_previously_opened ) {
spinunlock(&seg->seg_lock); ompi_atomic_unlock(&seg->seg_lock);
seg->seg_inited = false; seg->seg_inited = false;
seg->seg_offset = mem_offset; seg->seg_offset = mem_offset;
seg->seg_size = size; seg->seg_size = size;
@ -201,7 +201,7 @@ void* mca_common_sm_mmap_alloc(size_t* size)
mca_common_sm_file_header_t* seg = map->map_seg; mca_common_sm_file_header_t* seg = map->map_seg;
void* addr; void* addr;
spinlock(&seg->seg_lock); ompi_atomic_lock(&seg->seg_lock);
if(seg->seg_offset + *size > map->map_size) { if(seg->seg_offset + *size > map->map_size) {
addr = NULL; addr = NULL;
} else { } else {
@ -209,7 +209,7 @@ void* mca_common_sm_mmap_alloc(size_t* size)
addr = map->data_addr + seg->seg_offset; addr = map->data_addr + seg->seg_offset;
seg->seg_offset += *size; seg->seg_offset += *size;
} }
spinunlock(&seg->seg_lock); ompi_atomic_unlock(&seg->seg_lock);
return addr; return addr;
} }

Просмотреть файл

@ -2,12 +2,12 @@
#define _COMMON_SM_MMAP_H_ #define _COMMON_SM_MMAP_H_
#include "class/ompi_object.h" #include "class/ompi_object.h"
#include "os/atomic.h"
#include "class/ompi_list.h" #include "class/ompi_list.h"
#include "include/sys/atomic.h"
struct mca_common_sm_file_header_t { struct mca_common_sm_file_header_t {
/* lock to control atomic access */ /* lock to control atomic access */
ompi_lock_data_t seg_lock; ompi_lock_t seg_lock;
/* is the segment ready for use */ /* is the segment ready for use */
volatile bool seg_inited; volatile bool seg_inited;
/* Offset to next available memory location available for allocation */ /* Offset to next available memory location available for allocation */

Просмотреть файл

@ -4,7 +4,8 @@
/** @file: /** @file:
* *
*/ */
#define _GNU_SOURCE /* #define _GNU_SOURCE */
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <stddef.h> #include <stddef.h>

Просмотреть файл

@ -2,6 +2,8 @@
* $HEADER$ * $HEADER$
*/ */
#include "ompi_config.h"
#include <string.h> #include <string.h>
#include "pml_ptl_array.h" #include "pml_ptl_array.h"

Просмотреть файл

@ -2,6 +2,8 @@
* $HEADER$ * $HEADER$
*/ */
#include "ompi_config.h"
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include "mca/pml/pml.h" #include "mca/pml/pml.h"

Просмотреть файл

@ -1,3 +1,9 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "pml_teg.h" #include "pml_teg.h"

Просмотреть файл

@ -2,6 +2,8 @@
* $HEADER$ * $HEADER$
*/ */
#include "ompi_config.h"
#include "event/event.h" #include "event/event.h"
#include "mpi.h" #include "mpi.h"
#include "mca/pml/pml.h" #include "mca/pml/pml.h"

Просмотреть файл

@ -1,3 +1,9 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "pml_teg.h" #include "pml_teg.h"
#include "pml_teg_sendreq.h" #include "pml_teg_sendreq.h"
#include "mca/pml/base/pml_base_request.h" #include "mca/pml/base/pml_base_request.h"

Просмотреть файл

@ -1,84 +1,75 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "pml_teg_recvreq.h" #include "pml_teg_recvreq.h"
int mca_pml_teg_iprobe( int mca_pml_teg_iprobe(int src,
int src,
int tag, int tag,
struct ompi_communicator_t* comm, struct ompi_communicator_t *comm,
int* matched, int *matched, ompi_status_public_t * status)
ompi_status_public_t* status)
{ {
int rc; int rc;
mca_pml_base_recv_request_t recvreq; mca_pml_base_recv_request_t recvreq;
recvreq.req_base.req_ompi.req_type = OMPI_REQUEST_PML; recvreq.req_base.req_ompi.req_type = OMPI_REQUEST_PML;
recvreq.req_base.req_type = MCA_PML_REQUEST_IPROBE; recvreq.req_base.req_type = MCA_PML_REQUEST_IPROBE;
MCA_PML_BASE_RECV_REQUEST_INIT( MCA_PML_BASE_RECV_REQUEST_INIT(&recvreq,
&recvreq, NULL, 0, NULL, src, tag, comm, true);
NULL,
0,
NULL,
src,
tag,
comm,
true);
if((rc = mca_pml_teg_recv_request_start(&recvreq)) != OMPI_SUCCESS) { if ((rc = mca_pml_teg_recv_request_start(&recvreq)) != OMPI_SUCCESS) {
OBJ_DESTRUCT(&recvreq); OBJ_DESTRUCT(&recvreq);
return rc; return rc;
} }
if((*matched = recvreq.req_base.req_mpi_done) == true && (NULL != status)) { if ((*matched = recvreq.req_base.req_mpi_done) == true
&& (NULL != status)) {
*status = recvreq.req_base.req_status; *status = recvreq.req_base.req_status;
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
int mca_pml_teg_probe( int mca_pml_teg_probe(int src,
int src,
int tag, int tag,
struct ompi_communicator_t* comm, struct ompi_communicator_t *comm,
ompi_status_public_t* status) ompi_status_public_t * status)
{ {
int rc; int rc;
mca_pml_base_recv_request_t recvreq; mca_pml_base_recv_request_t recvreq;
recvreq.req_base.req_ompi.req_type = OMPI_REQUEST_PML; recvreq.req_base.req_ompi.req_type = OMPI_REQUEST_PML;
recvreq.req_base.req_type = MCA_PML_REQUEST_PROBE; recvreq.req_base.req_type = MCA_PML_REQUEST_PROBE;
MCA_PML_BASE_RECV_REQUEST_INIT( MCA_PML_BASE_RECV_REQUEST_INIT(&recvreq,
&recvreq, NULL, 0, NULL, src, tag, comm, true);
NULL,
0,
NULL,
src,
tag,
comm,
true);
if((rc = mca_pml_teg_recv_request_start(&recvreq)) != OMPI_SUCCESS) { if ((rc = mca_pml_teg_recv_request_start(&recvreq)) != OMPI_SUCCESS) {
OBJ_DESTRUCT(&recvreq); OBJ_DESTRUCT(&recvreq);
return rc; return rc;
} }
if(recvreq.req_base.req_mpi_done == false) { if (recvreq.req_base.req_mpi_done == false) {
/* give up and sleep until completion */ /* give up and sleep until completion */
if(ompi_using_threads()) { if (ompi_using_threads()) {
ompi_mutex_lock(&mca_pml_teg.teg_request_lock); ompi_mutex_lock(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++; mca_pml_teg.teg_request_waiting++;
while(recvreq.req_base.req_mpi_done == false) while (recvreq.req_base.req_mpi_done == false)
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--; mca_pml_teg.teg_request_waiting--;
ompi_mutex_unlock(&mca_pml_teg.teg_request_lock); ompi_mutex_unlock(&mca_pml_teg.teg_request_lock);
} else { } else {
mca_pml_teg.teg_request_waiting++; mca_pml_teg.teg_request_waiting++;
while(recvreq.req_base.req_mpi_done == false) while (recvreq.req_base.req_mpi_done == false)
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--; mca_pml_teg.teg_request_waiting--;
} }
} }
if(NULL != status) { if (NULL != status) {
*status = recvreq.req_base.req_status; *status = recvreq.req_base.req_status;
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -1,42 +1,40 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "pml_teg_recvreq.h" #include "pml_teg_recvreq.h"
int mca_pml_teg_irecv_init( int mca_pml_teg_irecv_init(void *addr,
void *addr,
size_t count, size_t count,
ompi_datatype_t *datatype, ompi_datatype_t * datatype,
int src, int src,
int tag, int tag,
struct ompi_communicator_t* comm, struct ompi_communicator_t *comm,
struct ompi_request_t **request) struct ompi_request_t **request)
{ {
int rc; int rc;
mca_pml_base_recv_request_t *recvreq; mca_pml_base_recv_request_t *recvreq;
MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc); MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc);
if(NULL == recvreq) if (NULL == recvreq)
return rc; return rc;
MCA_PML_BASE_RECV_REQUEST_INIT( MCA_PML_BASE_RECV_REQUEST_INIT(recvreq,
recvreq,
addr, addr,
count, count, datatype, src, tag, comm, true);
datatype,
src,
tag,
comm,
true);
*request = (ompi_request_t*)recvreq; *request = (ompi_request_t *) recvreq;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
int mca_pml_teg_irecv( int mca_pml_teg_irecv(void *addr,
void *addr,
size_t count, size_t count,
ompi_datatype_t *datatype, ompi_datatype_t * datatype,
int src, int src,
int tag, int tag,
struct ompi_communicator_t* comm, struct ompi_communicator_t *comm,
struct ompi_request_t **request) struct ompi_request_t **request)
{ {
int rc; int rc;
@ -46,74 +44,63 @@ int mca_pml_teg_irecv(
#if MCA_PML_TEG_STATISTICS #if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_irecvs++; mca_pml_teg.teg_irecvs++;
#endif #endif
if(NULL == recvreq) if (NULL == recvreq)
return rc; return rc;
MCA_PML_BASE_RECV_REQUEST_INIT( MCA_PML_BASE_RECV_REQUEST_INIT(recvreq,
recvreq,
addr, addr,
count, count, datatype, src, tag, comm, false);
datatype,
src,
tag,
comm,
false);
if((rc = mca_pml_teg_recv_request_start(recvreq)) != OMPI_SUCCESS) { if ((rc = mca_pml_teg_recv_request_start(recvreq)) != OMPI_SUCCESS) {
MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq); MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq);
return rc; return rc;
} }
*request = (ompi_request_t*)recvreq; *request = (ompi_request_t *) recvreq;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
int mca_pml_teg_recv( int mca_pml_teg_recv(void *addr,
void *addr,
size_t count, size_t count,
ompi_datatype_t *datatype, ompi_datatype_t * datatype,
int src, int src,
int tag, int tag,
struct ompi_communicator_t* comm, struct ompi_communicator_t *comm,
ompi_status_public_t* status) ompi_status_public_t * status)
{ {
int rc; int rc;
mca_pml_base_recv_request_t *recvreq; mca_pml_base_recv_request_t *recvreq;
MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc); MCA_PML_TEG_RECV_REQUEST_ALLOC(recvreq, rc);
#if MCA_PML_TEG_STATISTICS #if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_recvs++; mca_pml_teg.teg_recvs++;
#endif #endif
if(NULL == recvreq) if (NULL == recvreq)
return rc; return rc;
MCA_PML_BASE_RECV_REQUEST_INIT( MCA_PML_BASE_RECV_REQUEST_INIT(recvreq,
recvreq,
addr, addr,
count, count, datatype, src, tag, comm, false);
datatype,
src,
tag,
comm,
false);
if((rc = mca_pml_teg_recv_request_start(recvreq)) != OMPI_SUCCESS) { if ((rc = mca_pml_teg_recv_request_start(recvreq)) != OMPI_SUCCESS) {
MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq); MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq);
return rc; return rc;
} }
if(recvreq->req_base.req_mpi_done == false) { if (recvreq->req_base.req_mpi_done == false) {
/* give up and sleep until completion */ /* give up and sleep until completion */
if(ompi_using_threads()) { if (ompi_using_threads()) {
ompi_mutex_lock(&mca_pml_teg.teg_request_lock); ompi_mutex_lock(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++; mca_pml_teg.teg_request_waiting++;
while(recvreq->req_base.req_mpi_done == false) while (recvreq->req_base.req_mpi_done == false)
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--; mca_pml_teg.teg_request_waiting--;
ompi_mutex_unlock(&mca_pml_teg.teg_request_lock); ompi_mutex_unlock(&mca_pml_teg.teg_request_lock);
} else { } else {
mca_pml_teg.teg_request_waiting++; mca_pml_teg.teg_request_waiting++;
while(recvreq->req_base.req_mpi_done == false) while (recvreq->req_base.req_mpi_done == false)
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--; mca_pml_teg.teg_request_waiting--;
} }
} }
@ -126,4 +113,3 @@ int mca_pml_teg_recv(
MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq); MCA_PML_TEG_RECV_REQUEST_RETURN(recvreq);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -2,136 +2,118 @@
* $HEADER$ * $HEADER$
*/ */
#include "ompi_config.h"
#include "pml_teg.h" #include "pml_teg.h"
#include "pml_teg_proc.h" #include "pml_teg_proc.h"
#include "pml_teg_sendreq.h" #include "pml_teg_sendreq.h"
int mca_pml_teg_isend_init( int mca_pml_teg_isend_init(void *buf,
void *buf,
size_t count, size_t count,
ompi_datatype_t *datatype, ompi_datatype_t * datatype,
int dst, int dst,
int tag, int tag,
mca_pml_base_send_mode_t sendmode, mca_pml_base_send_mode_t sendmode,
ompi_communicator_t* comm, ompi_communicator_t * comm,
ompi_request_t **request) ompi_request_t ** request)
{ {
int rc; int rc;
mca_pml_base_send_request_t* sendreq; mca_pml_base_send_request_t *sendreq;
MCA_PML_TEG_SEND_REQUEST_ALLOC(comm,dst,sendreq,rc); MCA_PML_TEG_SEND_REQUEST_ALLOC(comm, dst, sendreq, rc);
if(rc != OMPI_SUCCESS) if (rc != OMPI_SUCCESS)
return rc; return rc;
MCA_PML_BASE_SEND_REQUEST_INIT( MCA_PML_BASE_SEND_REQUEST_INIT(sendreq,
sendreq,
buf, buf,
count, count,
datatype, datatype,
dst, dst, tag, comm, sendmode, true);
tag,
comm,
sendmode,
true
);
*request = (ompi_request_t*)sendreq; *request = (ompi_request_t *) sendreq;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
int mca_pml_teg_isend( int mca_pml_teg_isend(void *buf,
void *buf,
size_t count, size_t count,
ompi_datatype_t *datatype, ompi_datatype_t * datatype,
int dst, int dst,
int tag, int tag,
mca_pml_base_send_mode_t sendmode, mca_pml_base_send_mode_t sendmode,
ompi_communicator_t* comm, ompi_communicator_t * comm,
ompi_request_t **request) ompi_request_t ** request)
{ {
int rc; int rc;
mca_pml_base_send_request_t* sendreq; mca_pml_base_send_request_t *sendreq;
MCA_PML_TEG_SEND_REQUEST_ALLOC(comm,dst,sendreq,rc); MCA_PML_TEG_SEND_REQUEST_ALLOC(comm, dst, sendreq, rc);
#if MCA_PML_TEG_STATISTICS #if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_isends++; mca_pml_teg.teg_isends++;
#endif #endif
if(rc != OMPI_SUCCESS) if (rc != OMPI_SUCCESS)
return rc; return rc;
MCA_PML_BASE_SEND_REQUEST_INIT( MCA_PML_BASE_SEND_REQUEST_INIT(sendreq,
sendreq,
buf, buf,
count, count,
datatype, datatype,
dst, dst, tag, comm, sendmode, false);
tag,
comm,
sendmode,
false
);
if((rc = mca_pml_teg_send_request_start(sendreq)) != OMPI_SUCCESS) if ((rc = mca_pml_teg_send_request_start(sendreq)) != OMPI_SUCCESS)
return rc; return rc;
*request = (ompi_request_t*)sendreq; *request = (ompi_request_t *) sendreq;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
int mca_pml_teg_send( int mca_pml_teg_send(void *buf,
void *buf,
size_t count, size_t count,
ompi_datatype_t *datatype, ompi_datatype_t * datatype,
int dst, int dst,
int tag, int tag,
mca_pml_base_send_mode_t sendmode, mca_pml_base_send_mode_t sendmode,
ompi_communicator_t* comm) ompi_communicator_t * comm)
{ {
int rc; int rc;
mca_pml_base_send_request_t* sendreq; mca_pml_base_send_request_t *sendreq;
MCA_PML_TEG_SEND_REQUEST_ALLOC(comm,dst,sendreq,rc); MCA_PML_TEG_SEND_REQUEST_ALLOC(comm, dst, sendreq, rc);
#if MCA_PML_TEG_STATISTICS #if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_sends++; mca_pml_teg.teg_sends++;
#endif #endif
if(rc != OMPI_SUCCESS) if (rc != OMPI_SUCCESS)
return rc; return rc;
MCA_PML_BASE_SEND_REQUEST_INIT( MCA_PML_BASE_SEND_REQUEST_INIT(sendreq,
sendreq,
buf, buf,
count, count,
datatype, datatype,
dst, dst, tag, comm, sendmode, false);
tag,
comm,
sendmode,
false
);
if((rc = mca_pml_teg_send_request_start(sendreq)) != OMPI_SUCCESS) { if ((rc = mca_pml_teg_send_request_start(sendreq)) != OMPI_SUCCESS) {
MCA_PML_TEG_FREE((ompi_request_t**)&sendreq); MCA_PML_TEG_FREE((ompi_request_t **) & sendreq);
return rc; return rc;
} }
if(sendreq->req_base.req_mpi_done == false) { if (sendreq->req_base.req_mpi_done == false) {
/* give up and sleep until completion */ /* give up and sleep until completion */
if(ompi_using_threads()) { if (ompi_using_threads()) {
ompi_mutex_lock(&mca_pml_teg.teg_request_lock); ompi_mutex_lock(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++; mca_pml_teg.teg_request_waiting++;
while(sendreq->req_base.req_mpi_done == false) while (sendreq->req_base.req_mpi_done == false)
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--; mca_pml_teg.teg_request_waiting--;
ompi_mutex_unlock(&mca_pml_teg.teg_request_lock); ompi_mutex_unlock(&mca_pml_teg.teg_request_lock);
} else { } else {
mca_pml_teg.teg_request_waiting++; mca_pml_teg.teg_request_waiting++;
while(sendreq->req_base.req_mpi_done == false) while (sendreq->req_base.req_mpi_done == false)
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting--; mca_pml_teg.teg_request_waiting--;
} }
} }
/* return request to pool */ /* return request to pool */
MCA_PML_TEG_FREE((ompi_request_t**)&sendreq); MCA_PML_TEG_FREE((ompi_request_t **) & sendreq);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -1,3 +1,9 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "pml_teg.h" #include "pml_teg.h"
#include "pml_teg_sendreq.h" #include "pml_teg_sendreq.h"

Просмотреть файл

@ -1,3 +1,9 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "pml_teg_ptl.h" #include "pml_teg_ptl.h"

Просмотреть файл

@ -1,9 +1,13 @@
/* /*
* * $HEADER$
*/ */
/** /**
* @file * @file
*/ */
#include "ompi_config.h"
#include "pml_teg_recvfrag.h" #include "pml_teg_recvfrag.h"
#include "pml_teg_proc.h" #include "pml_teg_proc.h"

Просмотреть файл

@ -1,3 +1,9 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "mca/ptl/ptl.h" #include "mca/ptl/ptl.h"
#include "mca/ptl/base/ptl_base_comm.h" #include "mca/ptl/base/ptl_base_comm.h"
#include "pml_teg_recvreq.h" #include "pml_teg_recvreq.h"

Просмотреть файл

@ -1,6 +1,9 @@
/* /*
* $HEADER$ * $HEADER$
*/ */
#include "ompi_config.h"
/*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/ /*%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%*/
#include "include/constants.h" #include "include/constants.h"

Просмотреть файл

@ -24,7 +24,7 @@
mca_ptl_proc_t* ptl_proc; \ mca_ptl_proc_t* ptl_proc; \
mca_pml_base_ptl_t* ptl_base; \ mca_pml_base_ptl_t* ptl_base; \
\ \
THREAD_SCOPED_LOCK(&proc->proc_lock, \ OMPI_THREAD_SCOPED_LOCK(&proc->proc_lock, \
(ptl_proc = mca_ptl_array_get_next(&proc->proc_ptl_first))); \ (ptl_proc = mca_ptl_array_get_next(&proc->proc_ptl_first))); \
ptl_base = ptl_proc->ptl_base; \ ptl_base = ptl_proc->ptl_base; \
/* \ /* \

Просмотреть файл

@ -1,3 +1,9 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "pml_teg.h" #include "pml_teg.h"
#include "pml_teg_recvreq.h" #include "pml_teg_recvreq.h"
#include "pml_teg_sendreq.h" #include "pml_teg_sendreq.h"
@ -12,10 +18,11 @@ int mca_pml_teg_start(size_t count, ompi_request_t** requests)
if(NULL == pml_request) if(NULL == pml_request)
continue; continue;
/* If the persistent request is currently active - obtain the request lock /* If the persistent request is currently active - obtain the
* and verify the status is incomplete. if the pml layer has not completed * request lock and verify the status is incomplete. if the
* the request - mark the request as free called - so that it will be freed * pml layer has not completed the request - mark the request
* when the request completes - and create a new request. * as free called - so that it will be freed when the request
* completes - and create a new request.
*/ */
switch(pml_request->req_ompi.req_state) { switch(pml_request->req_ompi.req_state) {

Просмотреть файл

@ -1,80 +1,84 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "pml_teg.h" #include "pml_teg.h"
#include "pml_teg_sendreq.h" #include "pml_teg_sendreq.h"
int mca_pml_teg_test(size_t count,
int mca_pml_teg_test( ompi_request_t ** requests,
size_t count,
ompi_request_t** requests,
int *index, int *index,
int *completed, int *completed, ompi_status_public_t * status)
ompi_status_public_t* status)
{ {
size_t i; size_t i;
ompi_atomic_mb(); ompi_atomic_mb();
for(i=0; i<count; i++) { for (i = 0; i < count; i++) {
mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)requests[i]; mca_pml_base_request_t *pml_request =
if(pml_request == NULL) (mca_pml_base_request_t *) requests[i];
if (pml_request == NULL)
continue; continue;
if(pml_request->req_mpi_done) { if (pml_request->req_mpi_done) {
*index = i; *index = i;
*completed = true; *completed = true;
if (NULL != status) if (NULL != status)
*status = pml_request->req_status; *status = pml_request->req_status;
MCA_PML_TEG_FINI(requests+i); MCA_PML_TEG_FINI(requests + i);
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
} }
*index = MPI_UNDEFINED; *index = MPI_UNDEFINED;
*completed = false; *completed = false;
if(NULL != status) if (NULL != status)
*status = mca_pml_teg.teg_request_null.req_status; *status = mca_pml_teg.teg_request_null.req_status;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
int mca_pml_teg_test_all( int mca_pml_teg_test_all(size_t count,
size_t count, ompi_request_t ** requests,
ompi_request_t** requests, int *completed, ompi_status_public_t * statuses)
int *completed,
ompi_status_public_t* statuses)
{ {
size_t i; size_t i;
size_t num_completed; size_t num_completed;
ompi_atomic_mb(); ompi_atomic_mb();
for(i=0; i<count; i++) { for (i = 0; i < count; i++) {
mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)requests[i]; mca_pml_base_request_t *pml_request =
if(pml_request == NULL || pml_request->req_mpi_done) (mca_pml_base_request_t *) requests[i];
if (pml_request == NULL || pml_request->req_mpi_done)
num_completed++; num_completed++;
} }
if(num_completed != count) { if (num_completed != count) {
*completed = false; *completed = false;
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }
*completed = true; *completed = true;
if(NULL != statuses) { if (NULL != statuses) {
/* fill out completion status and free request if required */ /* fill out completion status and free request if required */
for(i=0; i<count; i++) { for (i = 0; i < count; i++) {
mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)requests[i]; mca_pml_base_request_t *pml_request =
if(NULL == pml_request) { (mca_pml_base_request_t *) requests[i];
if (NULL == pml_request) {
statuses[i] = mca_pml_teg.teg_request_null.req_status; statuses[i] = mca_pml_teg.teg_request_null.req_status;
} else { } else {
statuses[i] = pml_request->req_status; statuses[i] = pml_request->req_status;
MCA_PML_TEG_FINI(requests+i); MCA_PML_TEG_FINI(requests + i);
} }
} }
} else { } else {
/* free request if required */ /* free request if required */
for(i=0; i<count; i++) { for (i = 0; i < count; i++) {
mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)requests[i]; mca_pml_base_request_t *pml_request =
if(NULL != pml_request) (mca_pml_base_request_t *) requests[i];
MCA_PML_TEG_FINI(requests+i); if (NULL != pml_request)
MCA_PML_TEG_FINI(requests + i);
} }
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -1,22 +1,25 @@
/*
* $HEADER$
*/
#include "ompi_config.h"
#include "pml_teg.h" #include "pml_teg.h"
#include "pml_teg_sendreq.h" #include "pml_teg_sendreq.h"
#include "mca/ptl/base/ptl_base_comm.h" #include "mca/ptl/base/ptl_base_comm.h"
#include "mca/pml/base/pml_base_request.h" #include "mca/pml/base/pml_base_request.h"
int mca_pml_teg_wait(size_t count,
int mca_pml_teg_wait( ompi_request_t ** request,
size_t count, int *index, ompi_status_public_t * status)
ompi_request_t** request,
int *index,
ompi_status_public_t* status)
{ {
#if OMPI_HAVE_THREADS #if OMPI_HAVE_THREADS
int c; int c;
#endif #endif
int i; int i;
int completed = -1; int completed = -1;
mca_pml_base_request_t* pml_request; mca_pml_base_request_t *pml_request;
#if MCA_PML_TEG_STATISTICS #if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_waits++; mca_pml_teg.teg_waits++;
@ -26,12 +29,12 @@ int mca_pml_teg_wait(
/* poll for completion */ /* poll for completion */
ompi_atomic_mb(); ompi_atomic_mb();
for(c=0; completed < 0 && c < mca_pml_teg.teg_poll_iterations; c++) { for (c = 0; completed < 0 && c < mca_pml_teg.teg_poll_iterations; c++) {
for(i=0; i<count; i++) { for (i = 0; i < count; i++) {
pml_request = (mca_pml_base_request_t*)request[i]; pml_request = (mca_pml_base_request_t *) request[i];
if(pml_request == NULL) if (pml_request == NULL)
continue; continue;
if(pml_request->req_mpi_done == true) { if (pml_request->req_mpi_done == true) {
completed = i; completed = i;
break; break;
} }
@ -39,25 +42,26 @@ int mca_pml_teg_wait(
} }
#endif #endif
if(completed < 0) { if (completed < 0) {
/* give up and sleep until completion */ /* give up and sleep until completion */
OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock); OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++; mca_pml_teg.teg_request_waiting++;
do { do {
for(i=0; i<count; i++) { for (i = 0; i < count; i++) {
pml_request = (mca_pml_base_request_t*)request[i]; pml_request = (mca_pml_base_request_t *) request[i];
if(pml_request->req_mpi_done == true) { if (pml_request->req_mpi_done == true) {
completed = i; completed = i;
break; break;
} }
} }
if(completed < 0) { if (completed < 0) {
#if MCA_PML_TEG_STATISTICS #if MCA_PML_TEG_STATISTICS
mca_pml_teg.teg_condition_waits++; mca_pml_teg.teg_condition_waits++;
#endif #endif
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
} }
} while(completed < 0); } while (completed < 0);
mca_pml_teg.teg_request_waiting--; mca_pml_teg.teg_request_waiting--;
OMPI_THREAD_UNLOCK(&mca_pml_teg.teg_request_lock); OMPI_THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
} }
@ -74,63 +78,69 @@ int mca_pml_teg_wait(
} }
int mca_pml_teg_wait_all( int mca_pml_teg_wait_all(size_t count,
size_t count, ompi_request_t ** requests,
ompi_request_t** requests, ompi_status_public_t * statuses)
ompi_status_public_t* statuses)
{ {
int completed = 0, i; int completed = 0, i;
for(i=0; i<count; i++) { for (i = 0; i < count; i++) {
mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)requests[i]; mca_pml_base_request_t *pml_request =
if(pml_request == NULL || pml_request->req_mpi_done == true) { (mca_pml_base_request_t *) requests[i];
if (pml_request == NULL || pml_request->req_mpi_done == true) {
completed++; completed++;
} }
} }
/* if all requests have not completed -- defer requiring lock unless required */ /* if all requests have not completed -- defer requiring lock
if(completed != count) { * unless required */
if (completed != count) {
/* /*
* acquire lock and test for completion - if all requests are not completed * acquire lock and test for completion - if all requests are
* pend on condition variable until a request completes * not completed pend on condition variable until a request
* completes
*/ */
OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock); OMPI_THREAD_LOCK(&mca_pml_teg.teg_request_lock);
mca_pml_teg.teg_request_waiting++; mca_pml_teg.teg_request_waiting++;
do { do {
completed = 0; completed = 0;
for(i=0; i<count; i++) { for (i = 0; i < count; i++) {
mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)requests[i]; mca_pml_base_request_t *pml_request =
if(pml_request == NULL || pml_request->req_mpi_done == true) { (mca_pml_base_request_t *) requests[i];
if (pml_request == NULL
|| pml_request->req_mpi_done == true) {
completed++; completed++;
continue; continue;
} }
} }
if(completed != count) if (completed != count)
ompi_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock); ompi_condition_wait(&mca_pml_teg.teg_request_cond,
&mca_pml_teg.teg_request_lock);
} while (completed != count); } while (completed != count);
mca_pml_teg.teg_request_waiting--; mca_pml_teg.teg_request_waiting--;
OMPI_THREAD_UNLOCK(&mca_pml_teg.teg_request_lock); OMPI_THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
} }
if(NULL != statuses) { if (NULL != statuses) {
/* fill out status and free request if required */ /* fill out status and free request if required */
for(i=0; i<count; i++) { for (i = 0; i < count; i++) {
mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)requests[i]; mca_pml_base_request_t *pml_request =
(mca_pml_base_request_t *) requests[i];
if (NULL == pml_request) { if (NULL == pml_request) {
statuses[i] = mca_pml_teg.teg_request_null.req_status; statuses[i] = mca_pml_teg.teg_request_null.req_status;
} else { } else {
statuses[i] = pml_request->req_status; statuses[i] = pml_request->req_status;
MCA_PML_TEG_FINI(requests+i); MCA_PML_TEG_FINI(requests + i);
} }
} }
} else { } else {
/* free request if required */ /* free request if required */
for(i=0; i<count; i++) { for (i = 0; i < count; i++) {
mca_pml_base_request_t* pml_request = (mca_pml_base_request_t*)requests[i]; mca_pml_base_request_t *pml_request =
(mca_pml_base_request_t *) requests[i];
if (NULL != pml_request) { if (NULL != pml_request) {
MCA_PML_TEG_FINI(requests+i); MCA_PML_TEG_FINI(requests + i);
} }
} }
} }
return OMPI_SUCCESS; return OMPI_SUCCESS;
} }

Просмотреть файл

@ -53,7 +53,8 @@ extern int mca_pml_ptl_comm_init_size(mca_pml_ptl_comm_t* comm, size_t size);
static inline mca_ptl_sequence_t mca_pml_ptl_comm_send_sequence(mca_pml_ptl_comm_t* comm, int dst) static inline mca_ptl_sequence_t mca_pml_ptl_comm_send_sequence(mca_pml_ptl_comm_t* comm, int dst)
{ {
mca_ptl_sequence_t sequence; mca_ptl_sequence_t sequence;
THREAD_SCOPED_LOCK(&comm->c_matching_lock, sequence = comm->c_msg_seq[dst]++); OMPI_THREAD_SCOPED_LOCK(&comm->c_matching_lock,
sequence = comm->c_msg_seq[dst]++);
return sequence; return sequence;
} }

Просмотреть файл

@ -1,9 +1,15 @@
/* /*
* $HEADER$ * $HEADER$
*/ */
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include <unistd.h> #include <unistd.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/errno.h> #include <sys/errno.h>
#include "types.h" #include "types.h"
#include "datatype/datatype.h" #include "datatype/datatype.h"
#include "mca/pml/base/pml_base_sendreq.h" #include "mca/pml/base/pml_base_sendreq.h"
@ -166,7 +172,7 @@ mca_ptl_elan_send_desc_done (
& MCA_PTL_FLAGS_ACK_MATCHED) & MCA_PTL_FLAGS_ACK_MATCHED)
|| mca_pml_base_send_request_matched(req)) { || mca_pml_base_send_request_matched(req)) {
if(fetchNset (&desc->frag_progressed, 1) == 0) { if(ompi_atomic_fetch_and_set_int (&desc->frag_progressed, 1) == 0) {
ptl->super.ptl_send_progress(ptl, req, ptl->super.ptl_send_progress(ptl, req,
header->hdr_frag.hdr_frag_length); header->hdr_frag.hdr_frag_length);
} }
@ -208,7 +214,7 @@ mca_ptl_elan_send_desc_done (
* the start of following fragments. As the logic is not there. * the start of following fragments. As the logic is not there.
*/ */
if(fetchNset (&desc->frag_progressed, 1) == 0) { if(ompi_atomic_fetch_and_set_int (&desc->frag_progressed, 1) == 0) {
ptl->super.ptl_send_progress(ptl, req, ptl->super.ptl_send_progress(ptl, req,
header->hdr_frag.hdr_frag_length); header->hdr_frag.hdr_frag_length);
} }

Просмотреть файл

@ -6,7 +6,7 @@
#include <string.h> #include <string.h>
#include "sys/atomic.h" #include "include/sys/atomic.h"
#include "class/ompi_hash_table.h" #include "class/ompi_hash_table.h"
#include "mca/base/mca_base_module_exchange.h" #include "mca/base/mca_base_module_exchange.h"
#include "ptl_elan.h" #include "ptl_elan.h"

Просмотреть файл

@ -6,7 +6,7 @@
#include <string.h> #include <string.h>
#include "sys/atomic.h" #include "include/sys/atomic.h"
#include "class/ompi_hash_table.h" #include "class/ompi_hash_table.h"
#include "mca/base/mca_base_module_exchange.h" #include "mca/base/mca_base_module_exchange.h"
#include "ptl_gm.h" #include "ptl_gm.h"

Просмотреть файл

@ -10,7 +10,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/in.h> #include <netinet/in.h>
#include "os/atomic.h" #include "include/sys/atomic.h"
#include "ompi_config.h" #include "ompi_config.h"
#include "mca/pml/base/pml_base_sendreq.h" #include "mca/pml/base/pml_base_sendreq.h"
#include "mca/pml/base/pml_base_recvreq.h" #include "mca/pml/base/pml_base_recvreq.h"

Просмотреть файл

@ -1,7 +1,7 @@
#ifndef MCA_PTL_IB_RECV_FRAG_H #ifndef MCA_PTL_IB_RECV_FRAG_H
#define MCA_PTL_IB_RECV_FRAG_H #define MCA_PTL_IB_RECV_FRAG_H
#include "os/atomic.h" #include "include/sys/atomic.h"
#include "mca/ptl/ptl.h" #include "mca/ptl/ptl.h"
#include "mca/ptl/base/ptl_base_recvfrag.h" #include "mca/ptl/base/ptl_base_recvfrag.h"

Просмотреть файл

@ -2,7 +2,7 @@
#ifndef MCA_PTL_IB_SEND_FRAG_H #ifndef MCA_PTL_IB_SEND_FRAG_H
#define MCA_PTL_IB_SEND_FRAG_H #define MCA_PTL_IB_SEND_FRAG_H
#include "os/atomic.h" #include "include/sys/atomic.h"
#include "ompi_config.h" #include "ompi_config.h"
#include "mca/pml/base/pml_base_sendreq.h" #include "mca/pml/base/pml_base_sendreq.h"
#include "mca/ptl/base/ptl_base_sendfrag.h" #include "mca/ptl/base/ptl_base_sendfrag.h"

Просмотреть файл

@ -10,7 +10,7 @@
#include <string.h> #include <string.h>
#include <sys/types.h> #include <sys/types.h>
#include "os/atomic.h" #include "include/sys/atomic.h"
#include "mca/ptl/ptl.h" #include "mca/ptl/ptl.h"
#include "mca/ptl/base/ptl_base_recvfrag.h" #include "mca/ptl/base/ptl_base_recvfrag.h"
#include "ptl_sm.h" #include "ptl_sm.h"

Просмотреть файл

@ -12,7 +12,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/in.h> #include <netinet/in.h>
#include "os/atomic.h" #include "include/sys/atomic.h"
#include "mca/ptl/ptl.h" #include "mca/ptl/ptl.h"
#include "mca/ptl/base/ptl_base_recvfrag.h" #include "mca/ptl/base/ptl_base_recvfrag.h"
#include "ptl_tcp.h" #include "ptl_tcp.h"
@ -95,7 +95,7 @@ static inline void mca_ptl_tcp_recv_frag_progress(mca_ptl_tcp_recv_frag_t* frag)
{ {
if((frag)->frag_msg_cnt >= frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length) { if((frag)->frag_msg_cnt >= frag->frag_recv.frag_base.frag_header.hdr_frag.hdr_frag_length) {
/* make sure this only happens once for threaded case */ /* make sure this only happens once for threaded case */
if(fetchNset(&frag->frag_progressed, 1) == 0) { if(ompi_atomic_fetch_and_set_int(&frag->frag_progressed, 1) == 0) {
mca_pml_base_recv_request_t* request = frag->frag_recv.frag_request; mca_pml_base_recv_request_t* request = frag->frag_recv.frag_request;
if(frag->frag_recv.frag_is_buffered) { if(frag->frag_recv.frag_is_buffered) {
mca_ptl_base_match_header_t* header = &(frag)->frag_recv.frag_base.frag_header.hdr_match; mca_ptl_base_match_header_t* header = &(frag)->frag_recv.frag_base.frag_header.hdr_match;

Просмотреть файл

@ -10,7 +10,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/in.h> #include <netinet/in.h>
#include "os/atomic.h" #include "include/sys/atomic.h"
#include "ompi_config.h" #include "ompi_config.h"
#include "mca/pml/base/pml_base_sendreq.h" #include "mca/pml/base/pml_base_sendreq.h"
#include "mca/ptl/base/ptl_base_sendfrag.h" #include "mca/ptl/base/ptl_base_sendfrag.h"
@ -84,7 +84,7 @@ static inline void mca_ptl_tcp_send_frag_progress(mca_ptl_tcp_send_frag_t* frag)
mca_pml_base_send_request_matched(request))) { mca_pml_base_send_request_matched(request))) {
/* make sure this only happens once in threaded case */ /* make sure this only happens once in threaded case */
if(fetchNset(&frag->frag_progressed,1) == 0) { if(ompi_atomic_fetch_and_set_int(&frag->frag_progressed,1) == 0) {
/* update request status */ /* update request status */
frag->frag_send.frag_base.frag_owner->ptl_send_progress( frag->frag_send.frag_base.frag_owner->ptl_send_progress(

Просмотреть файл

@ -2,6 +2,10 @@
* $HEADER * $HEADER
*/ */
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include <string.h> #include <string.h>
#include "threads/mutex.h" #include "threads/mutex.h"
@ -31,7 +35,7 @@ ompi_class_t ompi_proc_t_class = {
void ompi_proc_construct(ompi_proc_t* proc) void ompi_proc_construct(ompi_proc_t* proc)
{ {
static int init = 0; static int init = 0;
if(fetchNset(&init,1) == 0) { if(ompi_atomic_fetch_and_set_int(&init,1) == 0) {
OBJ_CONSTRUCT(&ompi_proc_list, ompi_list_t); OBJ_CONSTRUCT(&ompi_proc_list, ompi_list_t);
OBJ_CONSTRUCT(&ompi_proc_lock, ompi_mutex_t); OBJ_CONSTRUCT(&ompi_proc_lock, ompi_mutex_t);
} }

Просмотреть файл

@ -3,7 +3,9 @@
*/ */
/** @file **/ /** @file **/
#define _GNU_SOURCE
/* #define _GNU_SOURCE */
#include "ompi_config.h" #include "ompi_config.h"
#include "include/constants.h" #include "include/constants.h"

Просмотреть файл

@ -2,26 +2,28 @@
* $HEADER$ * $HEADER$
*/ */
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include "mutex.h" #include "mutex.h"
#include "condition.h" #include "condition.h"
#if OMPI_HAVE_POSIX_THREADS #if OMPI_HAVE_POSIX_THREADS
static void ompi_condition_construct(ompi_condition_t* c) static void ompi_condition_construct(ompi_condition_t * c)
{ {
pthread_cond_init(&c->c_cond, NULL); pthread_cond_init(&c->c_cond, NULL);
} }
static void ompi_condition_destruct(ompi_condition_t* c) static void ompi_condition_destruct(ompi_condition_t * c)
{ {
pthread_cond_destroy(&c->c_cond); pthread_cond_destroy(&c->c_cond);
} }
OBJ_CLASS_INSTANCE( OBJ_CLASS_INSTANCE(ompi_condition_t,
ompi_condition_t,
ompi_object_t, ompi_object_t,
ompi_condition_construct, ompi_condition_construct,
ompi_condition_destruct ompi_condition_destruct);
);
#endif #endif

Просмотреть файл

@ -5,7 +5,7 @@
#define OMPI_CONDITION_PTHREAD_H #define OMPI_CONDITION_PTHREAD_H
#include <pthread.h> #include <pthread.h>
#include "threads/mutex_pthread.h" #include "threads/mutex.h"
struct ompi_condition_t { struct ompi_condition_t {
@ -19,12 +19,12 @@ OBJ_CLASS_DECLARATION(ompi_condition_t);
static inline int ompi_condition_wait(ompi_condition_t* c, ompi_mutex_t* m) static inline int ompi_condition_wait(ompi_condition_t* c, ompi_mutex_t* m)
{ {
return pthread_cond_wait(&c->c_cond, &m->m_lock); return pthread_cond_wait(&c->c_cond, &m->m_lock.thread);
} }
static inline int ompi_condition_timedwait(ompi_condition_t* c, ompi_mutex_t* m, const struct timespec *abstime) static inline int ompi_condition_timedwait(ompi_condition_t* c, ompi_mutex_t* m, const struct timespec *abstime)
{ {
return pthread_cond_timedwait(&c->c_cond, &m->m_lock, abstime); return pthread_cond_timedwait(&c->c_cond, &m->m_lock.thread, abstime);
} }
static inline int ompi_condition_signal(ompi_condition_t* c) static inline int ompi_condition_signal(ompi_condition_t* c)

Просмотреть файл

@ -2,31 +2,31 @@
* $HEADER$ * $HEADER$
*/ */
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include "mutex.h" #include "mutex.h"
#include "condition.h" #include "condition.h"
#if (OMPI_HAVE_THREADS == 0) #if (OMPI_HAVE_THREADS == 0)
static void ompi_condition_construct(ompi_condition_t* c) static void ompi_condition_construct(ompi_condition_t * c)
{ {
c->c_waiting = 0; c->c_waiting = 0;
c->c_signaled = 0; c->c_signaled = 0;
} }
static void ompi_condition_destruct(ompi_condition_t* c) static void ompi_condition_destruct(ompi_condition_t * c)
{ {
} }
OBJ_CLASS_INSTANCE( OBJ_CLASS_INSTANCE(ompi_condition_t,
ompi_condition_t,
ompi_object_t, ompi_object_t,
ompi_condition_construct, ompi_condition_construct,
ompi_condition_destruct ompi_condition_destruct);
);
#endif #endif

Просмотреть файл

@ -2,28 +2,28 @@
* $HEADER$ * $HEADER$
*/ */
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include "mutex.h" #include "mutex.h"
#include "condition.h" #include "condition.h"
#if defined(OMPI_USE_SPINWAIT) #if defined(OMPI_USE_SPINWAIT)
static void ompi_condition_construct(ompi_condition_t * c)
static void ompi_condition_construct(ompi_condition_t* c)
{ {
pthread_cond_init(&c->c_cond, NULL); pthread_cond_init(&c->c_cond, NULL);
} }
static void ompi_condition_destruct(ompi_condition_t* c) static void ompi_condition_destruct(ompi_condition_t * c)
{ {
pthread_cond_destroy(&c->c_cond); pthread_cond_destroy(&c->c_cond);
} }
OBJ_CLASS_INSTANCE( OBJ_CLASS_INSTANCE(ompi_condition_t,
ompi_condition_t,
ompi_object_t, ompi_object_t,
ompi_condition_construct, ompi_condition_construct,
ompi_condition_destruct ompi_condition_destruct);
);
#endif #endif

Просмотреть файл

@ -23,11 +23,11 @@ static inline int ompi_condition_wait(ompi_condition_t* c, ompi_mutex_t* m)
int rc; int rc;
pthread_mutex_lock(&m->m_lock); pthread_mutex_lock(&m->m_lock);
/* release the spinlock */ /* release the spinlock */
fetchNset(&m->m_spinlock, 0); ompi_atomic_fetch_and_set_int(&m->m_spinlock, 0);
if(m->m_waiting) if(m->m_waiting)
pthread_cond_signal(&m->m_cond); pthread_cond_signal(&m->m_cond);
rc = pthread_cond_wait(&c->c_cond, &m->m_lock); rc = pthread_cond_wait(&c->c_cond, &m->m_lock);
fetchNset(&m->m_spinlock, 1); ompi_atomic_fetch_and_set_int(&m->m_spinlock, 1);
return rc; return rc;
} }
@ -36,11 +36,11 @@ static inline int ompi_condition_timedwait(ompi_condition_t* c, ompi_mutex_t* m,
int rc; int rc;
pthread_mutex_lock(&m->m_lock); pthread_mutex_lock(&m->m_lock);
/* release the spinlock */ /* release the spinlock */
fetchNset(&m->m_spinlock, 0); ompi_atomic_fetch_and_set_int(&m->m_spinlock, 0);
if(m->m_waiting) if(m->m_waiting)
pthread_cond_signal(&m->m_cond); pthread_cond_signal(&m->m_cond);
rc = pthread_cond_timedwait(&c->c_cond, &m->m_lock, abstime); rc = pthread_cond_timedwait(&c->c_cond, &m->m_lock, abstime);
fetchNset(&m->m_spinlock, 1); ompi_atomic_fetch_and_set_int(&m->m_spinlock, 1);
return rc; return rc;
} }

Просмотреть файл

@ -2,18 +2,77 @@
* $HEADER$ * $HEADER$
*/ */
/** @file **/ #ifndef OMPI_MUTEX_H
#define OMPI_MUTEX_H 1
#ifndef OMPI_MUTEX_H_
#define OMPI_MUTEX_H_
#ifdef HAVE_CONFIG_H
#include "ompi_config.h" #include "ompi_config.h"
#if OMPI_HAVE_POSIX_THREADS
#include "threads/mutex_pthread.h"
#else
#include "threads/mutex_spinlock.h"
#endif #endif
/**
* @file:
*
* Mutual exclusion functions.
*
* Functions for locking of critical sections.
*/
/**
* Opaque mutex object
*/
typedef struct ompi_mutex_t ompi_mutex_t;
/**
* Try to acquire a mutex.
*
* @param mutex Address of the mutex.
* @return 0 if the mutex was acquired, 1 otherwise.
*/
static inline int ompi_mutex_trylock(ompi_mutex_t *mutex);
/**
* Acquire a mutex.
*
* @param mutex Address of the mutex.
*/
static inline void ompi_mutex_lock(ompi_mutex_t *mutex);
/**
* Release a mutex.
*
* @param mutex Address of the mutex.
*/
static inline void ompi_mutex_unlock(ompi_mutex_t *mutex);
/**
* Try to acquire a mutex using atomic operations.
*
* @param mutex Address of the mutex.
* @return 0 if the mutex was acquired, 1 otherwise.
*/
static inline int ompi_mutex_atomic_trylock(ompi_mutex_t *mutex);
/**
* Acquire a mutex using atomic operations.
*
* @param mutex Address of the mutex.
*/
static inline void ompi_mutex_atomic_lock(ompi_mutex_t *mutex);
/**
* Release a mutex using atomic operations.
*
* @param mutex Address of the mutex.
*/
static inline void ompi_mutex_atomic_unlock(ompi_mutex_t *mutex);
/** /**
* Check and see if the process is using multiple threads. * Check and see if the process is using multiple threads.
* *
@ -57,30 +116,6 @@ static inline bool ompi_using_threads(void)
} }
/**
* Lock a mutex if ompi_using_threads() says that multiple threads may
* be active in the process for the duration of the specified action.
*
* @param mutex Pointer to a ompi_mutex_t to lock.
* @param action A scope over which the lock is held.
*
* If there is a possibility that multiple threads are running in the
* process (as determined by ompi_using_threads()), this function will
* acquire the lock before invoking the specified action and release
* it on return.
*
* If there is no possibility that multiple threads are running in the
* process, invoke the action without acquiring the lock.
*/
#define THREAD_SCOPED_LOCK(mutex,action) \
if(ompi_using_threads()) { \
ompi_mutex_lock(mutex); \
(action); \
ompi_mutex_unlock(mutex); \
} else { \
(action); \
}
/** /**
* Set whether the process is using multiple threads or not. * Set whether the process is using multiple threads or not.
* *
@ -108,6 +143,7 @@ static inline bool ompi_set_using_threads(bool have)
return ompi_uses_threads; return ompi_uses_threads;
} }
/** /**
* Lock a mutex if ompi_using_threads() says that multiple threads may * Lock a mutex if ompi_using_threads() says that multiple threads may
* be active in the process. * be active in the process.
@ -121,8 +157,12 @@ static inline bool ompi_set_using_threads(bool have)
* If there is no possibility that multiple threads are running in the * If there is no possibility that multiple threads are running in the
* process, return immediately. * process, return immediately.
*/ */
#define OMPI_THREAD_LOCK(a) if (ompi_using_threads()) \ #define OMPI_THREAD_LOCK(mutex) \
ompi_mutex_lock((a)); do { \
if (ompi_using_threads()) { \
ompi_mutex_lock(mutex); \
} \
} while (0)
/* /*
* Unlock a mutex if ompi_using_threads() says that multiple threads * Unlock a mutex if ompi_using_threads() says that multiple threads
@ -137,8 +177,40 @@ static inline bool ompi_set_using_threads(bool have)
* If there is no possibility that multiple threads are running in the * If there is no possibility that multiple threads are running in the
* process, return immediately without modifying the mutex. * process, return immediately without modifying the mutex.
*/ */
#define OMPI_THREAD_UNLOCK(a) if (ompi_using_threads()) \ #define OMPI_THREAD_UNLOCK(mutex) \
ompi_mutex_unlock((a)); do { \
if (ompi_using_threads()) { \
ompi_mutex_unlock(mutex); \
} \
} while (0)
/**
* Lock a mutex if ompi_using_threads() says that multiple threads may
* be active in the process for the duration of the specified action.
*
* @param mutex Pointer to a ompi_mutex_t to lock.
* @param action A scope over which the lock is held.
*
* If there is a possibility that multiple threads are running in the
* process (as determined by ompi_using_threads()), this function will
* acquire the lock before invoking the specified action and release
* it on return.
*
* If there is no possibility that multiple threads are running in the
* process, invoke the action without acquiring the lock.
*/
#define OMPI_THREAD_SCOPED_LOCK(mutex, action) \
do { \
if(ompi_using_threads()) { \
ompi_mutex_lock(mutex); \
(action); \
ompi_mutex_unlock(mutex); \
} else { \
(action); \
} \
} while (0)
/** /**
* Always locks a mutex (never compile- or run-time removed) * Always locks a mutex (never compile- or run-time removed)
@ -150,7 +222,7 @@ static inline bool ompi_set_using_threads(bool have)
* multiple threads or not. This is useful, for example, with shared * multiple threads or not. This is useful, for example, with shared
* memory. * memory.
*/ */
#define OMPI_LOCK(a) ompi_mutex_lock((a)) #define OMPI_LOCK(mutex) ompi_mutex_atomic_lock(mutex)
/** /**
* Always unlocks a mutex (never compile- or run-time removed) * Always unlocks a mutex (never compile- or run-time removed)
@ -162,7 +234,32 @@ static inline bool ompi_set_using_threads(bool have)
* process has multiple threads or not. This is useful, for example, * process has multiple threads or not. This is useful, for example,
* with shared memory. * with shared memory.
*/ */
#define OMPI_UNLOCK(a) ompi_mutex_unlock((a)); #define OMPI_UNLOCK(mutex) ompi_mutex_atomic_unlock(mutex)
/**
* Lock a mutex for the duration of the specified action.
*
* @param mutex Pointer to a ompi_mutex_t to lock.
* @param action A scope over which the lock is held.
*
* This is the macro that you should use for mutexes that should
* always be locked, regardless of whether the process has multiple
* threads or not. This is useful, for example, with shared memory.
*/
#define OMPI_SCOPED_LOCK(mutex, action) \
do { \
ompi_mutex_lock(mutex); \
(action); \
ompi_mutex_unlock(mutex); \
} while (0)
#ifdef __WINDOWS__
#error Windows code is untested
#include "mutex_windows.h"
#else
#include "mutex_unix.h"
#endif #endif
#endif /* OMPI_MUTEX_H */

Просмотреть файл

@ -7,7 +7,7 @@
#include <pthread.h> #include <pthread.h>
#include "class/ompi_object.h" #include "class/ompi_object.h"
#include "os/atomic.h" #include "include/sys/atomic.h"
struct ompi_mutex_t { struct ompi_mutex_t {

Просмотреть файл

@ -6,7 +6,7 @@
#define OMPI_MUTEX_SPINLOCK_ #define OMPI_MUTEX_SPINLOCK_
#include "class/ompi_object.h" #include "class/ompi_object.h"
#include "os/atomic.h" #include "include/sys/atomic.h"
struct ompi_mutex_t { struct ompi_mutex_t {

Просмотреть файл

@ -6,8 +6,9 @@
#define OMPI_MUTEX_SPINWAIT_ #define OMPI_MUTEX_SPINWAIT_
#include <pthread.h> #include <pthread.h>
#include "class/ompi_object.h" #include "class/ompi_object.h"
#include "os/atomic.h" #include "include/sys/atomic.h"
#ifndef MUTEX_SPINWAIT #ifndef MUTEX_SPINWAIT
#define MUTEX_SPINWAIT 10000 #define MUTEX_SPINWAIT 10000
@ -29,16 +30,16 @@ OBJ_CLASS_DECLARATION(ompi_mutex_t);
static inline void ompi_mutex_lock(ompi_mutex_t* m) static inline void ompi_mutex_lock(ompi_mutex_t* m)
{ {
if(fetchNset(&m->m_spinlock, 1) == 1) { if(ompi_atomic_fetch_and_set_int(&m->m_spinlock, 1) == 1) {
unsigned long cnt = 0; unsigned long cnt = 0;
int locked; int locked;
fetchNadd(&m->m_waiting, 1); fetchNadd(&m->m_waiting, 1);
while( ((locked = fetchNset(&m->m_spinlock, 1)) == 1) while( ((locked = ompi_atomic_fetch_and_set_int(&m->m_spinlock, 1)) == 1)
&& (cnt++ < MUTEX_SPINWAIT) ) && (cnt++ < MUTEX_SPINWAIT) )
; ;
if(locked) { if(locked) {
pthread_mutex_lock(&m->m_lock); pthread_mutex_lock(&m->m_lock);
while(fetchNset(&m->m_spinlock, 1) == 1) while(ompi_atomic_fetch_and_set_int(&m->m_spinlock, 1) == 1)
pthread_cond_wait(&m->m_cond, &m->m_lock); pthread_cond_wait(&m->m_cond, &m->m_lock);
pthread_mutex_unlock(&m->m_lock); pthread_mutex_unlock(&m->m_lock);
} }
@ -49,13 +50,13 @@ static inline void ompi_mutex_lock(ompi_mutex_t* m)
static inline int ompi_mutex_trylock(ompi_mutex_t* m) static inline int ompi_mutex_trylock(ompi_mutex_t* m)
{ {
return (fetchNset(&m->m_spinlock, 1) == 0); return (ompi_atomic_fetch_and_set_int(&m->m_spinlock, 1) == 0);
} }
static inline void ompi_mutex_unlock(ompi_mutex_t* m) static inline void ompi_mutex_unlock(ompi_mutex_t* m)
{ {
fetchNset(&m->m_spinlock, 0); ompi_atomic_fetch_and_set_int(&m->m_spinlock, 0);
if(m->m_waiting) { if(m->m_waiting) {
pthread_cond_signal(&m->m_cond); pthread_cond_signal(&m->m_cond);
} }

31
src/threads/mutex_unix.c Обычный файл
Просмотреть файл

@ -0,0 +1,31 @@
/*
* $HEADER$
*/
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include "threads/mutex.h"
static void ompi_mutex_construct(ompi_mutex_t *m)
{
#if OMPI_HAVE_POSIX_THREADS
pthread_mutex_init(&m->m_lock.thread, 0);
#endif
#if OMPI_SYS_ARCH_ATOMIC_H
ompi_atomic_unlock(&m->m_lock.atomic, 0);
#endif
}
static void ompi_mutex_destruct(ompi_mutex_t *m)
{
#if OMPI_HAVE_POSIX_THREADS
pthread_mutex_destroy(&m->m_lock);
#endif
}
OBJ_CLASS_INSTANCE(ompi_mutex_t,
ompi_object_t,
ompi_mutex_construct,
ompi_mutex_destruct);

164
src/threads/mutex_unix.h Обычный файл
Просмотреть файл

@ -0,0 +1,164 @@
/*
* $HEADER$
*/
#ifndef OMPI_MUTEX_UNIX_H
#define OMPI_MUTEX_UNIX_H 1
/**
* @file:
*
* Mutual exclusion functions: Windows implementation.
*
* Functions for locking of critical sections.
*
* On unix, use pthreads or our own atomic operations as
* available.
*/
#if OMPI_HAVE_POSIX_THREADS
#include <pthread.h>
#endif
#include "class/ompi_object.h"
#include "include/sys/atomic.h"
struct ompi_mutex_t {
ompi_object_t super;
union {
#if OMPI_HAVE_POSIX_THREADS
pthread_mutex_t thread;
#endif
#if OMPI_SYS_ARCH_ATOMIC_H
ompi_lock_t atomic;
#endif
} m_lock;
};
OBJ_CLASS_DECLARATION(ompi_mutex_t);
#if defined(OMPI_SYS_ARCH_ATOMIC_H) && defined(OMPI_HAVE_POSIX_THREADS)
/*
* ompi_mutex_* implemented using pthreads
* ompi_mutex_atomic_* implemented using atomic operations
*/
static inline int ompi_mutex_trylock(ompi_mutex_t * m)
{
return pthread_mutex_trylock(&m->m_lock.thread);
}
static inline void ompi_mutex_lock(ompi_mutex_t * m)
{
pthread_mutex_lock(&m->m_lock.thread);
}
static inline void ompi_mutex_unlock(ompi_mutex_t * m)
{
pthread_mutex_unlock(&m->m_lock.thread);
}
static inline void ompi_mutex_atomic_lock(ompi_mutex_t * m)
{
ompi_atomic_lock(&m->m_lock.atomic);
}
static inline int ompi_mutex_atomic_trylock(ompi_mutex_t * m)
{
return ompi_atomic_trylock(&m->m_lock.atomic);
}
static inline void ompi_mutex_atomic_unlock(ompi_mutex_t * m)
{
ompi_atomic_unlock(&m->m_lock.atomic);
}
#elif defined(OMPI_HAVE_POSIX_THREADS)
/*
* ompi_mutex_* and ompi_mutex_atomic_* implemented using pthreads
*/
static inline int ompi_mutex_trylock(ompi_mutex_t * m)
{
return pthread_mutex_trylock(&m->m_lock.thread);
}
static inline void ompi_mutex_lock(ompi_mutex_t * m)
{
pthread_mutex_lock(&m->m_lock.thread);
}
static inline void ompi_mutex_unlock(ompi_mutex_t * m)
{
pthread_mutex_unlock(&m->m_lock.thread);
}
static inline int ompi_mutex_atomic_trylock(ompi_mutex_t * m)
{
return ompi_mutex_trylock(m);
}
static inline void ompi_mutex_atomic_lock(ompi_mutex_t * m)
{
ompi_mutex_lock(m);
}
static inline void ompi_mutex_atomic_unlock(ompi_mutex_t * m)
{
ompi_mutex_unlock(m);
}
#elif defined(OMPI_SYS_ARCH_ATOMIC_H)
/*
* ompi_mutex_* and ompi_mutex_atomic_* implemented using atomic
* operations
*/
static inline int ompi_mutex_trylock(ompi_mutex_t * m)
{
return ompi_atomic_trylock(&m->m_lock.atomic);
}
static inline void ompi_mutex_lock(ompi_mutex_t * m)
{
ompi_atomic_lock(&m->m_lock.atomic);
}
static inline void ompi_mutex_unlock(ompi_mutex_t * m)
{
ompi_atomic_unlock(&m->m_lock.atomic);
}
static inline int ompi_mutex_atomic_trylock(ompi_mutex_t * m)
{
return ompi_mutex_trylock(m);
}
static inline void ompi_mutex_atomic_lock(ompi_mutex_t * m)
{
ompi_mutex_lock(m);
}
static inline void ompi_mutex_atomic_unlock(ompi_mutex_t * m)
{
ompi_mutex_unlock(m);
}
#else
#error No mutex definition
#endif
#endif /* OMPI_MUTEX_UNIX_H */

19
src/threads/mutex_windows.c Обычный файл
Просмотреть файл

@ -0,0 +1,19 @@
/*
* $HEADER$
*/
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include "threads/mutex.h"
static void ompi_mutex_construct(ompi_mutex_t *m)
{
InterlockedExchange(&m->m_lock, 0);
}
OBJ_CLASS_INSTANCE(ompi_mutex_t,
ompi_object_t,
ompi_mutex_construct,
NULL);

72
src/threads/mutex_windows.h Обычный файл
Просмотреть файл

@ -0,0 +1,72 @@
/*
* $HEADER$
*/
#ifndef OMPI_MUTEX_WINDOWS_H
#define OMPI_MUTEX_WINDOWS_H 1
/**
* @file:
*
* Mutual exclusion functions: Windows implementation.
*
* Functions for locking of critical sections.
*
* On Windows, base everything on InterlockedExchange().
*/
#error Windows code is untested
#include <windows.h>
#include "class/ompi_object.h"
struct ompi_mutex_t {
ompi_object_t super;
volatile LONG m_lock;
};
OBJ_CLASS_DECLARATION(ompi_mutex_t);
static inline int ompi_mutex_trylock(ompi_mutex_t *m)
{
return (int) InterlockedExchange(&m->m_lock, 1);
}
static inline void ompi_mutex_lock(ompi_mutex_t *m)
{
while (InterlockedExchange(&m->m_lock, 1)) {
while (m->m_lock == 1) {
/* spin */;
}
}
}
static inline void ompi_mutex_unlock(ompi_mutex_t *m)
{
InterlockedExchange(&m->m_lock, 0);
}
static inline int ompi_mutex_atomic_trylock(ompi_mutex_t *m)
{
return ompi_mutex_trylock(m);
}
static inline void ompi_mutex_atomic_lock(ompi_mutex_t *m)
{
ompi_mutex_lock(m);
}
static inline void ompi_mutex_atomic_unlock(ompi_mutex_t *m)
{
ompi_mutex_unlock(m);
}
#endif /* OMPI_MUTEX_WINDOWS_H */

Просмотреть файл

@ -2,54 +2,121 @@
* $HEADER$ * $HEADER$
*/ */
#ifdef HAVE_CONFIG_H
#include "ompi_config.h"
#endif
#include "include/constants.h" #include "include/constants.h"
#include "threads/thread.h" #include "threads/thread.h"
static void ompi_thread_construct(ompi_thread_t* t) static void ompi_thread_construct(ompi_thread_t *t)
{ {
t->t_run = 0; t->t_run = 0;
#ifdef __WINDOWS__
t->t_handle = (HANDLE) -1;
#elif OMPI_HAVE_POSIX_THREADS
t->t_handle = (pthread_t) -1; t->t_handle = (pthread_t) -1;
#endif
} }
static void ompi_thread_destruct(ompi_thread_t* t) static void ompi_thread_destruct(ompi_thread_t *t)
{ {
} }
OBJ_CLASS_INSTANCE( OBJ_CLASS_INSTANCE(ompi_thread_t,
ompi_thread_t,
ompi_object_t, ompi_object_t,
ompi_thread_construct, ompi_thread_construct,
ompi_thread_destruct ompi_thread_destruct);
);
typedef void* (*pthread_start_fn_t)(void*);
int ompi_thread_start(ompi_thread_t* t)
#ifdef __WINDOWS__
#error Windows code is untested
int ompi_thread_start(ompi_thread_t *t)
{ {
#if OMPI_HAVE_POSIX_THREADS DWORD tid;
int rc;
#if OMPI_ENABLE_DEBUG if (OMPI_ENABLE_DEBUG) {
if(NULL == t->t_run || t->t_handle != (pthread_t) -1) if (NULL == t->t_run || t->t_handle != (HANDLE) -1L) {
return OMPI_ERR_BAD_PARAM; return OMPI_ERR_BAD_PARAM;
#endif }
rc = pthread_create(&t->t_handle, NULL, (pthread_start_fn_t)t->t_run, t); }
return (rc == 0) ? OMPI_SUCCESS: OMPI_ERROR;
#else t->t_handle = CreateThread(NULL, /* default security attributes */
0, /* default stack size */
(LPVOID) t->t_run,
t, /* argument */
0, /* default creation flags */
&tid);
if (t->t_handle == NULL) {
return OMPI_ERROR; return OMPI_ERROR;
#endif }
return OMPI_SUCCESS;
} }
int ompi_thread_join(ompi_thread_t* t, void** thr_return)
int ompi_thread_join(ompi_thread_t *t, void **thr_return)
{ {
#if OMPI_HAVE_POSIX_THREADS DWORD rc;
int rc = pthread_join(t->t_handle, thr_return);
return (rc == 0) ? OMPI_SUCCESS: OMPI_ERROR; if (WaitForSingleObject(t->t_handle, INFINITE) != WAIT_OBJECT_0) {
#else
return OMPI_ERROR; return OMPI_ERROR;
#endif }
if (!GetExitCodeThread(t->t_handle, &rc)) {
return OMPI_ERROR;
}
*thr_return = (void *) rc;
return OMPI_SUCCESS;
} }
#elif OMPI_HAVE_POSIX_THREADS
int ompi_thread_start(ompi_thread_t *t)
{
int rc;
if (OMPI_ENABLE_DEBUG) {
if (NULL == t->t_run || t->t_handle != (pthread_t) -1) {
return OMPI_ERR_BAD_PARAM;
}
}
rc = pthread_create(&t->t_handle, NULL, t->t_run, t);
return (rc == 0) ? OMPI_SUCCESS : OMPI_ERROR;
}
int ompi_thread_join(ompi_thread_t *t, void **thr_return)
{
int rc = pthread_join(t->t_handle, thr_return);
return (rc == 0) ? OMPI_SUCCESS : OMPI_ERROR;
}
#else
int ompi_thread_start(ompi_thread_t *t)
{
return OMPI_ERROR;
}
int ompi_thread_join(ompi_thread_t *t, void **thr_return)
{
return OMPI_ERROR;
}
#endif

Просмотреть файл

@ -3,27 +3,36 @@
*/ */
#ifndef OMPI_THREAD_H #ifndef OMPI_THREAD_H
#define OMPI_THREAD_H #define OMPI_THREAD_H 1
#ifdef __WINDOWS__
#include <windows.h>
#elif OMPI_HAVE_POSIX_THREADS
#include <pthread.h> #include <pthread.h>
#endif
#include "class/ompi_object.h" #include "class/ompi_object.h"
typedef void* (*ompi_thread_fn_t)(ompi_object_t*); typedef void *(*ompi_thread_fn_t) (ompi_object_t *);
struct ompi_thread_t struct ompi_thread_t {
{
ompi_object_t super; ompi_object_t super;
ompi_thread_fn_t t_run; ompi_thread_fn_t t_run;
#ifdef __WINDOWS__
HANDLE t_handle;
#elif OMPI_HAVE_POSIX_THREADS
pthread_t t_handle; pthread_t t_handle;
#endif
}; };
typedef struct ompi_thread_t ompi_thread_t; typedef struct ompi_thread_t ompi_thread_t;
OBJ_CLASS_DECLARATION(ompi_thread_t); OBJ_CLASS_DECLARATION(ompi_thread_t);
int ompi_thread_start(ompi_thread_t*); int ompi_thread_start(ompi_thread_t *);
int ompi_thread_join(ompi_thread_t*, void** thread_return); int ompi_thread_join(ompi_thread_t *, void **thread_return);
#endif /* OMPI_THREAD_H */ #endif /* OMPI_THREAD_H */