1
1

support for building w/out threads

This commit was SVN r1014.
Этот коммит содержится в:
Tim Woodall 2004-04-06 16:13:17 +00:00
родитель d742429fee
Коммит a4422f55f8
15 изменённых файлов: 60 добавлений и 80 удалений

Просмотреть файл

@ -161,7 +161,10 @@ int
lam_event_init(void)
{
static int inited = false;
int i, rc;
int i;
#if LAM_HAVE_THREADS
int rc;
#endif
if(inited)
return LAM_SUCCESS;
@ -185,11 +188,13 @@ lam_event_init(void)
if (lam_evbase == NULL)
errx(1, "%s: no event mechanism available", __func__);
#if LAM_HAVE_THREADS
/* spin up a thread to dispatch events */
OBJ_CONSTRUCT(&lam_event_thread, lam_thread_t);
lam_event_thread.t_run = lam_event_run;
if((rc = lam_thread_start(&lam_event_thread)) != LAM_SUCCESS)
return rc;
#endif
#if defined(USE_LOG) && defined(USE_DEBUG)
log_to(stderr);
@ -302,7 +307,6 @@ lam_event_loop(int flags)
}
}
lam_mutex_unlock(&lam_event_lock);
lam_output(0, "lam_event_loop: done");
return (0);
}

Просмотреть файл

@ -14,6 +14,7 @@ int mca_pml_teg_wait(
int completed = -1;
mca_pml_base_request_t* pml_request;
#if LAM_HAVE_THREADS
/* poll for completion */
for(c=0; completed < 0 && c < mca_pml_teg.teg_poll_iterations; c++) {
for(i=0; i<count; i++) {
@ -26,6 +27,7 @@ int mca_pml_teg_wait(
}
}
}
#endif
if(completed < 0) {
/* give up and sleep until completion */
@ -39,7 +41,7 @@ int mca_pml_teg_wait(
break;
}
}
if(completed < 0)
if(completed < 0)
lam_condition_wait(&mca_pml_teg.teg_request_cond, &mca_pml_teg.teg_request_lock);
} while(completed < 0);
mca_pml_teg.teg_request_waiting--;

Просмотреть файл

@ -19,7 +19,6 @@
static void mca_ptl_tcp_peer_construct(mca_ptl_base_peer_t* ptl_peer);
static void mca_ptl_tcp_peer_destruct(mca_ptl_base_peer_t* ptl_peer);
static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t*);
static void mca_ptl_tcp_peer_close_i(mca_ptl_base_peer_t*);
static void mca_ptl_tcp_peer_connected(mca_ptl_base_peer_t*);
static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user);
static void mca_ptl_tcp_peer_send_handler(int sd, short flags, void* user);
@ -81,7 +80,7 @@ static inline void mca_ptl_tcp_peer_event_init(mca_ptl_base_peer_t* ptl_peer, in
static void mca_ptl_tcp_peer_destruct(mca_ptl_base_peer_t* ptl_peer)
{
mca_ptl_tcp_proc_remove(ptl_peer->peer_proc, ptl_peer);
mca_ptl_tcp_peer_close_i(ptl_peer);
mca_ptl_tcp_peer_close(ptl_peer);
}
@ -137,7 +136,7 @@ static int mca_ptl_tcp_peer_send_blocking(mca_ptl_base_peer_t* ptl_peer, void* d
if(retval < 0) {
if(errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
lam_output(0, "mca_ptl_tcp_peer_send_blocking: send() failed with errno=%d\n",errno);
mca_ptl_tcp_peer_close_i(ptl_peer);
mca_ptl_tcp_peer_close(ptl_peer);
return -1;
}
continue;
@ -187,10 +186,10 @@ bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t* ptl_peer, struct sockaddr_in*
if((ptl_peer->peer_sd < 0) ||
(ptl_peer->peer_state != MCA_PTL_TCP_CONNECTED &&
peer_proc->proc_lam->proc_vpid < this_proc->proc_lam->proc_vpid)) {
mca_ptl_tcp_peer_close_i(ptl_peer);
mca_ptl_tcp_peer_close(ptl_peer);
ptl_peer->peer_sd = sd;
if(mca_ptl_tcp_peer_send_connect_ack(ptl_peer) != LAM_SUCCESS) {
mca_ptl_tcp_peer_close_i(ptl_peer);
mca_ptl_tcp_peer_close(ptl_peer);
THREAD_UNLOCK(&ptl_peer->peer_send_lock);
PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
return false;
@ -208,20 +207,6 @@ bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t* ptl_peer, struct sockaddr_in*
return false;
}
/*
* An external I/F to close a peer. Called in the event of failure
* on read or write. Note that this must acquire the peer lock
* prior to delegating to the internal routine.
*/
void mca_ptl_tcp_peer_close(mca_ptl_base_peer_t* ptl_peer)
{
THREAD_LOCK(&ptl_peer->peer_recv_lock);
THREAD_LOCK(&ptl_peer->peer_send_lock);
mca_ptl_tcp_peer_close_i(ptl_peer);
THREAD_UNLOCK(&ptl_peer->peer_send_lock);
THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
}
/*
* Remove any event registrations associated with the socket
@ -229,7 +214,7 @@ void mca_ptl_tcp_peer_close(mca_ptl_base_peer_t* ptl_peer)
* been closed.
*/
static void mca_ptl_tcp_peer_close_i(mca_ptl_base_peer_t* ptl_peer)
void mca_ptl_tcp_peer_close(mca_ptl_base_peer_t* ptl_peer)
{
if(ptl_peer->peer_sd >= 0) {
lam_event_del(&ptl_peer->peer_recv_event);
@ -271,7 +256,7 @@ static int mca_ptl_tcp_peer_recv_blocking(mca_ptl_base_peer_t* ptl_peer, void* d
/* remote closed connection */
if(retval == 0) {
mca_ptl_tcp_peer_close_i(ptl_peer);
mca_ptl_tcp_peer_close(ptl_peer);
return -1;
}
@ -279,7 +264,7 @@ static int mca_ptl_tcp_peer_recv_blocking(mca_ptl_base_peer_t* ptl_peer, void* d
if(retval < 0) {
if(errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
lam_output(0, "mca_ptl_tcp_peer_recv_blocking: recv() failed with errno=%d\n",errno);
mca_ptl_tcp_peer_close_i(ptl_peer);
mca_ptl_tcp_peer_close(ptl_peer);
return -1;
}
continue;
@ -323,7 +308,7 @@ static int mca_ptl_tcp_peer_recv_connect_ack(mca_ptl_base_peer_t* ptl_peer)
/* compare this to the expected values */
if(size_h != ptl_proc->proc_guid_size || memcmp(ptl_proc->proc_guid, guid, size_h) != 0) {
lam_output(0, "mca_ptl_tcp_peer_connect: received unexpected process identifier");
mca_ptl_tcp_peer_close_i(ptl_peer);
mca_ptl_tcp_peer_close(ptl_peer);
return LAM_ERR_UNREACH;
}
@ -376,7 +361,7 @@ static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t* ptl_peer)
lam_event_add(&ptl_peer->peer_send_event, 0);
return LAM_SUCCESS;
}
mca_ptl_tcp_peer_close_i(ptl_peer);
mca_ptl_tcp_peer_close(ptl_peer);
ptl_peer->peer_retries++;
return LAM_ERR_UNREACH;
}
@ -386,7 +371,7 @@ static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t* ptl_peer)
ptl_peer->peer_state = MCA_PTL_TCP_CONNECT_ACK;
lam_event_add(&ptl_peer->peer_recv_event, 0);
} else {
mca_ptl_tcp_peer_close_i(ptl_peer);
mca_ptl_tcp_peer_close(ptl_peer);
}
return rc;
}
@ -409,7 +394,7 @@ static void mca_ptl_tcp_peer_complete_connect(mca_ptl_base_peer_t* ptl_peer)
/* check connect completion status */
if(getsockopt(ptl_peer->peer_sd, SOL_SOCKET, SO_ERROR, &so_error, &so_length) < 0) {
lam_output(0, "mca_ptl_tcp_peer_complete_connect: getsockopt() failed with errno=%d\n", errno);
mca_ptl_tcp_peer_close_i(ptl_peer);
mca_ptl_tcp_peer_close(ptl_peer);
return;
}
if(so_error == EINPROGRESS) {
@ -418,7 +403,7 @@ static void mca_ptl_tcp_peer_complete_connect(mca_ptl_base_peer_t* ptl_peer)
}
if(so_error != 0) {
lam_output(0, "mca_ptl_tcp_peer_complete_connect: connect() failed with errno=%d\n", so_error);
mca_ptl_tcp_peer_close_i(ptl_peer);
mca_ptl_tcp_peer_close(ptl_peer);
return;
}
@ -426,7 +411,7 @@ static void mca_ptl_tcp_peer_complete_connect(mca_ptl_base_peer_t* ptl_peer)
ptl_peer->peer_state = MCA_PTL_TCP_CONNECT_ACK;
lam_event_add(&ptl_peer->peer_recv_event, 0);
} else {
mca_ptl_tcp_peer_close_i(ptl_peer);
mca_ptl_tcp_peer_close(ptl_peer);
}
}
@ -469,7 +454,7 @@ static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user)
default:
{
lam_output(0, "mca_ptl_tcp_peer_recv_handler: invalid socket state(%d)", ptl_peer->peer_state);
mca_ptl_tcp_peer_close_i(ptl_peer);
mca_ptl_tcp_peer_close(ptl_peer);
break;
}
}

Просмотреть файл

@ -62,20 +62,17 @@ void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_base_peer
bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
/* read common header */
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_common_header_t))
if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_common_header_t)) == false)
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_header_t))
if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_header_t)) == false)
return false;
switch(frag->frag_header.hdr_common.hdr_type) {
case MCA_PTL_HDR_TYPE_MATCH:
assert(frag->frag_header.hdr_common.hdr_size == sizeof(mca_ptl_base_match_header_t));
return mca_ptl_tcp_recv_frag_match(frag, sd);
case MCA_PTL_HDR_TYPE_FRAG:
assert(frag->frag_header.hdr_common.hdr_size == sizeof(mca_ptl_base_frag_header_t));
return mca_ptl_tcp_recv_frag_frag(frag, sd);
case MCA_PTL_HDR_TYPE_ACK:
case MCA_PTL_HDR_TYPE_NACK:
assert(frag->frag_header.hdr_common.hdr_size == sizeof(mca_ptl_base_ack_header_t));
return mca_ptl_tcp_recv_frag_ack(frag, sd);
default:
lam_output(0, "mca_ptl_tcp_recv_frag_handler: invalid message type: %08X",
@ -122,10 +119,6 @@ static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
mca_ptl_tcp_send_frag_t* sendfrag;
mca_ptl_base_send_request_t* sendreq;
if (frag->frag_hdr_cnt < sizeof(mca_ptl_base_ack_header_t))
if (mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_ack_header_t)) == false)
return false;
sendfrag = (mca_ptl_tcp_send_frag_t*)frag->frag_header.hdr_ack.hdr_src_ptr.pval;
sendreq = sendfrag->super.frag_request;
sendreq->req_peer_request = frag->frag_header.hdr_ack.hdr_dst_ptr;
@ -137,10 +130,6 @@ static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd)
static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_match_header_t))
if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_match_header_t)) == false)
return false;
/* first pass through - attempt a match */
if(NULL == frag->super.frag_request && 0 == frag->frag_msg_cnt) {
/* attempt to match a posted recv */
@ -178,10 +167,6 @@ static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd)
static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
if(frag->frag_hdr_cnt < sizeof(mca_ptl_base_frag_header_t))
if(mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_frag_header_t)) == false)
return false;
/* get request from header */
if(frag->frag_msg_cnt == 0) {
frag->super.frag_request = frag->frag_header.hdr_frag.hdr_dst_ptr.pval;

Просмотреть файл

@ -94,17 +94,14 @@ int mca_ptl_tcp_send_frag_init(
} else {
convertor = &sendfrag->frag_convertor;
if((rc = lam_convertor_copy(&sendreq->req_convertor, convertor)) != LAM_SUCCESS)
return rc;
if((rc = lam_convertor_init_for_send(
lam_convertor_copy(&sendreq->req_convertor, convertor);
lam_convertor_init_for_send(
convertor,
0,
sendreq->super.req_datatype,
sendreq->super.req_count,
sendreq->super.req_addr,
sendreq->req_offset)) != LAM_SUCCESS)
return rc;
sendreq->req_offset);
}
/* if data is contigous convertor will return an offset
@ -113,8 +110,8 @@ int mca_ptl_tcp_send_frag_init(
*/
sendfrag->frag_vec[1].iov_base = NULL;
sendfrag->frag_vec[1].iov_len = size;
if((rc = lam_convertor_pack(convertor, &sendfrag->frag_vec[1], 1)) != LAM_SUCCESS)
return rc;
if((rc = lam_convertor_pack(convertor, &sendfrag->frag_vec[1], 1)) < 0)
return LAM_ERROR;
/* adjust size and request offset to reflect actual number of bytes packed by convertor */
size = sendfrag->frag_vec[1].iov_len;
@ -133,7 +130,7 @@ int mca_ptl_tcp_send_frag_init(
sendfrag->frag_vec_ptr = sendfrag->frag_vec;
sendfrag->frag_vec_cnt = (size == 0) ? 1 : 2;
sendfrag->frag_vec[0].iov_base = (lam_iov_base_ptr_t)hdr;
sendfrag->frag_vec[0].iov_len = hdr->hdr_common.hdr_size;
sendfrag->frag_vec[0].iov_len = sizeof(mca_ptl_base_header_t);
return LAM_SUCCESS;
}

Просмотреть файл

@ -21,6 +21,7 @@ libruntime_la_SOURCES = \
lam_init.c \
lam_mpi_init.c \
lam_mpi_finalize.c \
lam_progress.c \
lam_rte_finalize.c \
lam_rte_init.c

Просмотреть файл

@ -122,7 +122,8 @@ int lam_mpi_init(int argc, char **argv, int requested, int *provided)
default */
mca_base_param_lookup_int(param, &value);
lam_mpi_param_check = (bool) value;
/* lam_mpi_param_check = (bool) value; */
lam_mpi_param_check = false;
/* do module exchange */
if (LAM_SUCCESS != (ret = mca_base_modex_exchange())) {

Просмотреть файл

@ -5,16 +5,11 @@
#define LAM_CONDITION_H
#include "lam_config.h"
#include "threads/mutex.h"
#if defined(LAM_USE_SPINLOCK)
#include "condition_spinlock.h"
#elif defined(LAM_USE_SPINWAIT)
#include "condition_spinwait.h"
#elif defined(LAM_USE_PTHREADS)
#if LAM_HAVE_POSIX_THREADS
#include "condition_pthread.h"
#else
#error "concurrency model not configured"
#include "condition_spinlock.h"
#endif
#endif

Просмотреть файл

@ -4,7 +4,7 @@
#include "mutex.h"
#include "condition.h"
#if defined(LAM_USE_PTHREADS)
#if LAM_HAVE_POSIX_THREADS
static void lam_condition_construct(lam_condition_t* c)
{

Просмотреть файл

@ -5,7 +5,7 @@
#define LAM_CONDITION_PTHREAD_H
#include <pthread.h>
#include "threads/mutex.h"
#include "threads/mutex_pthread.h"
struct lam_condition_t {

Просмотреть файл

@ -5,11 +5,13 @@
#include "mutex.h"
#include "condition.h"
#if defined(LAM_USE_SPINLOCK)
#if (LAM_HAVE_THREADS == 0)
static void lam_condition_construct(lam_condition_t* c)
{
c->c_waiting = 0;
c->c_signaled = 0;
}

Просмотреть файл

@ -5,10 +5,13 @@
#define LAM_CONDITION_SPINLOCK_H
#include "threads/condition.h"
#include "threads/mutex_spinlock.h"
#include "runtime/lam_progress.h"
struct lam_condition_t {
volatile int c_waiting;
volatile int c_signaled;
};
typedef struct lam_condition_t lam_condition_t;
@ -17,6 +20,14 @@ OBJ_CLASS_DECLARATION(lam_condition_t);
static inline int lam_condition_wait(lam_condition_t* c, lam_mutex_t* m)
{
c->c_waiting++;
while(c->c_signaled == 0) {
lam_mutex_unlock(m);
lam_progress();
lam_mutex_lock(m);
}
c->c_signaled--;
c->c_waiting--;
return 0;
}
@ -27,11 +38,15 @@ static inline int lam_condition_timedwait(lam_condition_t* c, lam_mutex_t* m, co
static inline int lam_condition_signal(lam_condition_t* c)
{
if(c->c_waiting) {
c->c_signaled++;
}
return 0;
}
static inline int lam_condition_broadcast(lam_condition_t* c)
{
c->c_signaled += c->c_waiting;
return 0;
}

Просмотреть файл

@ -8,17 +8,10 @@
#define LAM_MUTEX_H_
#include "lam_config.h"
#define LAM_USE_PTHREADS
#if defined(LAM_USE_SPINLOCK)
#include "threads/mutex_spinlock.h"
#elif defined(LAM_USE_SPINWAIT)
#include "threads/mutex_spinwait.h"
#elif defined(LAM_USE_PTHREADS)
#if LAM_HAVE_POSIX_THREADS
#include "threads/mutex_pthread.h"
#else
#error "concurrency model not configured"
#include "threads/mutex_spinlock.h"
#endif
/**

Просмотреть файл

@ -2,7 +2,7 @@
* $HEADER$
*/
#include "threads/mutex.h"
#if defined(LAM_USE_PTHREADS)
#if LAM_HAVE_POSIX_THREADS
static void lam_mutex_construct(lam_mutex_t* m)
{

Просмотреть файл

@ -3,7 +3,7 @@
*/
#include "mutex.h"
#if defined(LAM_USE_SPINLOCK)
#if (LAM_HAVE_THREADS == 0)
static void lam_mutex_construct(lam_mutex_t* m)
{