1
1
- added pml callbacks to ptl interface

This commit was SVN r746.
Этот коммит содержится в:
Tim Woodall 2004-02-12 20:55:10 +00:00
родитель 844f3f9ad3
Коммит 44ffcda358
21 изменённых файлов: 151 добавлений и 163 удалений

Просмотреть файл

@ -10,3 +10,4 @@ stamp-h1
.libs .libs
event.lo event.lo
libevent.la libevent.la
libtool

Просмотреть файл

@ -72,7 +72,7 @@ static int poll_del (void *, struct lam_event *);
static int poll_recalc (void *, int); static int poll_recalc (void *, int);
static int poll_dispatch (void *, struct timeval *); static int poll_dispatch (void *, struct timeval *);
struct lam_eventop lam_pollops = { const struct lam_eventop lam_pollops = {
"poll", "poll",
poll_init, poll_init,
poll_add, poll_add,

Просмотреть файл

@ -14,9 +14,8 @@
#include "lam/util/output.h" #include "lam/util/output.h"
#endif #endif
/* /*
* Array of elements maintained by value. * @file Array of elements maintained by value.
*/ */
extern lam_class_info_t lam_value_array_t_class_info; extern lam_class_info_t lam_value_array_t_class_info;
@ -53,11 +52,37 @@ extern "C" {
static inline int lam_value_array_init(lam_value_array_t *array, size_t item_sizeof) static inline int lam_value_array_init(lam_value_array_t *array, size_t item_sizeof)
{ {
array->array_item_sizeof = item_sizeof; array->array_item_sizeof = item_sizeof;
array->array_alloc_size = 1; array->array_alloc_size = 1;
array->array_items = (unsigned char*) malloc(item_sizeof * array->array_alloc_size); array->array_size = 0;
array->array_items = (unsigned char*)realloc(array->array_items, item_sizeof * array->array_alloc_size);
return (NULL != array->array_items) ? LAM_SUCCESS : LAM_ERR_OUT_OF_RESOURCE; return (NULL != array->array_items) ? LAM_SUCCESS : LAM_ERR_OUT_OF_RESOURCE;
} }
/**
* Reserve space in the array for new elements, but do not change the size.
*
* @param array The input array (IN).
* @param size The anticipated size of the array (IN).
* @return LAM error code.
*/
static inline int lam_value_array_reserve(lam_value_array_t* array, size_t size)
{
if(size > array->array_alloc_size) {
array->array_items = (unsigned char*)realloc(array->array_items, array->array_item_sizeof * size);
if(NULL == array->array_items) {
array->array_size = 0;
array->array_alloc_size = 0;
return LAM_ERR_OUT_OF_RESOURCE;
}
array->array_alloc_size = 1;
}
return LAM_SUCCESS;
}
/** /**
* Retreives the number of elements in the array. * Retreives the number of elements in the array.
* *

Просмотреть файл

@ -11,6 +11,7 @@
#include "lam/mem/malloc.h" #include "lam/mem/malloc.h"
#include "lam/util/output.h" #include "lam/util/output.h"
#include "lam/threads/mutex.h" #include "lam/threads/mutex.h"
#include "lam/event/event.h"
/** /**
* First function that must be called in a LAM process. * First function that must be called in a LAM process.
@ -51,6 +52,10 @@ int lam_init(int argc, char *argv[])
lam_malloc_init(); lam_malloc_init();
/* Initialize event handling */
lam_event_init();
/* Other things that we'll probably need: /* Other things that we'll probably need:
- session directory setup - session directory setup

Просмотреть файл

@ -7,10 +7,13 @@
#include "mca/mpi/ptl/ptl.h" #include "mca/mpi/ptl/ptl.h"
#include "mca/mpi/ptl/base/base.h" #include "mca/mpi/ptl/base/base.h"
#include "mca/mpi/ptl/base/ptl_base_comm.h" #include "mca/mpi/ptl/base/ptl_base_comm.h"
#include "mca/mpi/ptl/base/ptl_base_sendreq.h" #include "mca/mpi/ptl/base/ptl_base_header.h"
#include "mca/mpi/ptl/base/ptl_base_recvreq.h" #include "mca/mpi/ptl/base/ptl_base_recvfrag.h"
#include "mca/mpi/ptl/base/ptl_base_sendfrag.h"
#include "pml_teg.h" #include "pml_teg.h"
#include "pml_teg_proc.h" #include "pml_teg_proc.h"
#include "pml_teg_recvreq.h"
#include "pml_teg_sendreq.h"
mca_pml_teg_t mca_pml_teg = { mca_pml_teg_t mca_pml_teg = {
@ -88,6 +91,11 @@ int mca_pml_teg_add_ptls(lam_list_t *ptls)
break; break;
if(i == mca_pml_teg.teg_num_ptl_modules) if(i == mca_pml_teg.teg_num_ptl_modules)
mca_pml_teg.teg_ptl_modules[mca_pml_teg.teg_num_ptl_modules++] = ptl->ptl_module; mca_pml_teg.teg_ptl_modules[mca_pml_teg.teg_num_ptl_modules++] = ptl->ptl_module;
/* setup ptl */
ptl->ptl_match = mca_ptl_base_recv_frag_match;
ptl->ptl_send_progress = mca_pml_teg_send_request_progress;
ptl->ptl_recv_progress = mca_pml_teg_recv_request_progress;
} }
/* sort ptl list by exclusivity */ /* sort ptl list by exclusivity */

Просмотреть файл

@ -1,2 +1,10 @@
#include "pml_teg_recvreq.h" #include "pml_teg_recvreq.h"
void mca_pml_teg_recv_request_progress(
mca_ptl_base_recv_request_t* request,
mca_ptl_base_recv_frag_t* frag)
{
}

Просмотреть файл

@ -42,6 +42,11 @@ static inline int mca_pml_teg_recv_request_start(mca_ptl_base_recv_request_t* re
return LAM_SUCCESS; return LAM_SUCCESS;
} }
void mca_pml_teg_recv_request_progress(
mca_ptl_base_recv_request_t* recv_request,
mca_ptl_base_recv_frag_t* recv_frag
);
#endif #endif

Просмотреть файл

@ -76,3 +76,10 @@ int mca_pml_teg_send_request_schedule(mca_ptl_base_send_request_t* req, bool* co
return LAM_SUCCESS; return LAM_SUCCESS;
} }
void mca_pml_teg_send_request_progress(
mca_ptl_base_send_request_t* req,
mca_ptl_base_send_frag_t* frag)
{
}

Просмотреть файл

@ -56,6 +56,10 @@ static inline int mca_pml_teg_send_request_start(
return LAM_SUCCESS; return LAM_SUCCESS;
} }
void mca_pml_teg_send_request_progress(
mca_ptl_base_send_request_t* send_request,
mca_ptl_base_send_frag_t* send_frag
);
#endif #endif

Просмотреть файл

@ -27,26 +27,26 @@ static inline void mca_ptl_base_recv_frag_process(mca_ptl_base_recv_frag_t* frag
mca_ptl_base_recv_request_t* request = frag->frag_request; mca_ptl_base_recv_request_t* request = frag->frag_request;
mca_ptl_base_frag_header_t* header = &frag->super.frag_header.hdr_frag; mca_ptl_base_frag_header_t* header = &frag->super.frag_header.hdr_frag;
mca_ptl_t* ptl = frag->super.frag_owner; mca_ptl_t* ptl = frag->super.frag_owner;
/* determine the offset and size of posted buffer */ /* determine the offset and size of posted buffer */
if (request->super.req_length < header->hdr_frag_offset) { if (request->super.req_length < header->hdr_frag_offset) {
/* user buffer is to small - discard entire fragment */ /* user buffer is to small - discard entire fragment */
frag->super.frag_addr = 0; frag->super.frag_addr = 0;
frag->super.frag_size = 0; frag->super.frag_size = 0;
} else if (request->super.req_length < header->hdr_frag_offset + header->hdr_frag_length) { } else if (request->super.req_length < header->hdr_frag_offset + header->hdr_frag_length) {
/* user buffer is to small - discard part of fragment */ /* user buffer is to small - discard part of fragment */
frag->super.frag_addr = ((unsigned char*)request->super.req_addr + header->hdr_frag_offset); frag->super.frag_addr = ((unsigned char*)request->super.req_addr + header->hdr_frag_offset);
frag->super.frag_size = request->super.req_length - header->hdr_frag_offset; frag->super.frag_size = request->super.req_length - header->hdr_frag_offset;
} else { } else {
/* user buffer is large enough for this fragment */ /* user buffer is large enough for this fragment */
frag->super.frag_addr = ((unsigned char*)request->super.req_addr + header->hdr_frag_offset); frag->super.frag_addr = ((unsigned char*)request->super.req_addr + header->hdr_frag_offset);
frag->super.frag_size = header->hdr_frag_length; frag->super.frag_size = header->hdr_frag_length;
} }
/* indicate to the ptl that the fragment can be delivered */ /* indicate to the ptl that the fragment can be delivered */
@ -54,19 +54,21 @@ static inline void mca_ptl_base_recv_frag_process(mca_ptl_base_recv_frag_t* frag
} }
static inline int mca_ptl_base_recv_frag_match(mca_ptl_base_recv_frag_t* frag, mca_ptl_base_match_header_t* header) static inline int mca_ptl_base_recv_frag_match(
mca_ptl_base_recv_frag_t* frag,
mca_ptl_base_match_header_t* header)
{ {
bool matched; bool matched;
lam_list_t matched_frags; lam_list_t matched_frags;
int rc = mca_ptl_base_match(header, frag, &matched, &matched_frags); int rc = mca_ptl_base_match(header, frag, &matched, &matched_frags);
if(rc != LAM_SUCCESS) if(rc != LAM_SUCCESS)
return rc; return rc;
if(matched) { if(matched) {
do { do {
/* process current fragment */ /* process current fragment */
mca_ptl_base_recv_frag_process(frag); mca_ptl_base_recv_frag_process(frag);
/* process any additional fragments that arrived out of order */ /* process any additional fragments that arrived out of order */
frag = (mca_ptl_base_recv_frag_t*)lam_list_remove_first(&matched_frags); frag = (mca_ptl_base_recv_frag_t*)lam_list_remove_first(&matched_frags);
} while(NULL != frag); } while(NULL != frag);

Просмотреть файл

@ -144,11 +144,3 @@ static bool mca_ptl_base_recv_request_match_specific_proc(mca_ptl_base_recv_requ
return false; return false;
} }
void mca_ptl_base_recv_request_progress(
mca_ptl_base_recv_request_t* request,
mca_ptl_base_recv_frag_t* frag)
{
}

Просмотреть файл

@ -12,15 +12,15 @@ extern lam_class_info_t mca_ptl_base_recv_request_t_class_info;;
struct mca_ptl_base_recv_frag_t; struct mca_ptl_base_recv_frag_t;
typedef struct { struct mca_ptl_base_recv_request_t {
mca_pml_base_request_t super; mca_pml_base_request_t super;
mca_ptl_base_sequence_t req_sequence; mca_ptl_base_sequence_t req_sequence;
} mca_ptl_base_recv_request_t; };
typedef struct mca_ptl_base_recv_request_t mca_ptl_base_recv_request_t;
void mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t*); void mca_ptl_base_recv_request_match_wild(mca_ptl_base_recv_request_t*);
void mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t*); void mca_ptl_base_recv_request_match_specific(mca_ptl_base_recv_request_t*);
void mca_ptl_base_recv_request_progress(mca_ptl_base_recv_request_t*, struct mca_ptl_base_recv_frag_t*);
static inline void mca_ptl_base_recv_request_reinit( static inline void mca_ptl_base_recv_request_reinit(

Просмотреть файл

@ -28,10 +28,3 @@ static void mca_ptl_base_send_request_destruct(mca_ptl_base_send_request_t* req)
OBJ_DESTRUCT_SUPER(&req->req_unacked_frags, mca_pml_base_request_t); OBJ_DESTRUCT_SUPER(&req->req_unacked_frags, mca_pml_base_request_t);
} }
void mca_ptl_base_send_request_progress(
mca_ptl_base_send_request_t* req,
mca_ptl_base_send_frag_t* frag)
{
}

Просмотреть файл

@ -48,12 +48,6 @@ struct mca_ptl_base_send_request_t {
typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t; typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t;
void mca_ptl_base_send_request_progress(
mca_ptl_base_send_request_t* request,
struct mca_ptl_base_send_frag_t* frag
);
static inline void mca_ptl_base_send_request_reinit( static inline void mca_ptl_base_send_request_reinit(
mca_ptl_base_send_request_t *request, mca_ptl_base_send_request_t *request,
void *addr, void *addr,

Просмотреть файл

@ -25,6 +25,7 @@ struct mca_ptl_base_recv_request_t;
struct mca_ptl_base_send_request_t; struct mca_ptl_base_send_request_t;
struct mca_ptl_base_recv_frag_t; struct mca_ptl_base_recv_frag_t;
struct mca_ptl_base_send_frag_t; struct mca_ptl_base_send_frag_t;
struct mca_ptl_base_match_header_t;
typedef uint64_t mca_ptl_base_sequence_t; typedef uint64_t mca_ptl_base_sequence_t;
typedef uint64_t mca_ptl_base_tstamp_t; typedef uint64_t mca_ptl_base_tstamp_t;
@ -145,6 +146,11 @@ typedef void (*mca_ptl_base_recv_fn_t)(
struct mca_ptl_base_recv_frag_t* recv_frag struct mca_ptl_base_recv_frag_t* recv_frag
); );
typedef int (*mca_ptl_base_match_fn_t)(
struct mca_ptl_base_recv_frag_t* frag,
struct mca_ptl_base_match_header_t* header
);
typedef void (*mca_ptl_base_recv_progress_fn_t)( typedef void (*mca_ptl_base_recv_progress_fn_t)(
struct mca_ptl_base_recv_request_t* recv_request, struct mca_ptl_base_recv_request_t* recv_request,
struct mca_ptl_base_recv_frag_t* recv_frag struct mca_ptl_base_recv_frag_t* recv_frag
@ -181,6 +187,7 @@ struct mca_ptl_t {
mca_ptl_base_frag_return_fn_t ptl_frag_return; mca_ptl_base_frag_return_fn_t ptl_frag_return;
/* PTL->PML function table - filled in by PML at init */ /* PTL->PML function table - filled in by PML at init */
mca_ptl_base_match_fn_t ptl_match;
mca_ptl_base_send_progress_fn_t ptl_send_progress; mca_ptl_base_send_progress_fn_t ptl_send_progress;
mca_ptl_base_recv_progress_fn_t ptl_recv_progress; mca_ptl_base_recv_progress_fn_t ptl_recv_progress;
}; };

Просмотреть файл

@ -9,8 +9,8 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/in.h> #include <netinet/in.h>
#include "lam/util/reactor.h"
#include "lam/mem/free_list.h" #include "lam/mem/free_list.h"
#include "lam/event/event.h"
#include "mca/mpi/pml/pml.h" #include "mca/mpi/pml/pml.h"
#include "mca/mpi/ptl/ptl.h" #include "mca/mpi/ptl/ptl.h"
@ -31,13 +31,14 @@ struct mca_ptl_tcp_module_1_0_0_t {
int tcp_free_list_num; /**< initial size of free lists */ int tcp_free_list_num; /**< initial size of free lists */
int tcp_free_list_max; /**< maximum size of free lists */ int tcp_free_list_max; /**< maximum size of free lists */
int tcp_free_list_inc; /**< number of elements to alloc when growing free lists */ int tcp_free_list_inc; /**< number of elements to alloc when growing free lists */
lam_reactor_t tcp_reactor;
lam_free_list_t tcp_send_requests; lam_free_list_t tcp_send_requests;
lam_free_list_t tcp_send_frags; lam_free_list_t tcp_send_frags;
lam_free_list_t tcp_recv_frags; lam_free_list_t tcp_recv_frags;
lam_list_t tcp_procs; lam_list_t tcp_procs;
lam_list_t tcp_acks; lam_list_t tcp_acks;
struct mca_ptl_tcp_proc_t* tcp_local; struct mca_ptl_tcp_proc_t* tcp_local;
lam_event_t tcp_send_event;
lam_event_t tcp_recv_event;
lam_mutex_t tcp_lock; lam_mutex_t tcp_lock;
}; };
typedef struct mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module_1_0_0_t; typedef struct mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module_1_0_0_t;

Просмотреть файл

@ -55,19 +55,11 @@ mca_ptl_tcp_module_1_0_0_t mca_ptl_tcp_module = {
/* /*
* data structure for receiving reactor callbacks * functions for receiving event callbacks
*/ */
static void mca_ptl_tcp_module_recv_handler(void*, int sd); static void mca_ptl_tcp_module_recv_handler(int, short, void*);
static void mca_ptl_tcp_module_send_handler(void*, int sd); static void mca_ptl_tcp_module_send_handler(int, short, void*);
static void mca_ptl_tcp_module_except_handler(void*, int sd);
static lam_reactor_listener_t mca_ptl_tcp_module_listener = {
mca_ptl_tcp_module_recv_handler,
mca_ptl_tcp_module_send_handler,
mca_ptl_tcp_module_except_handler,
};
/* /*
@ -102,7 +94,6 @@ static inline int mca_ptl_tcp_param_register_int(
int mca_ptl_tcp_module_open(void) int mca_ptl_tcp_module_open(void)
{ {
lam_mutex_init(&mca_ptl_tcp_module.tcp_lock); lam_mutex_init(&mca_ptl_tcp_module.tcp_lock);
OBJ_CONSTRUCT(&mca_ptl_tcp_module.tcp_reactor, lam_reactor_t);
OBJ_CONSTRUCT(&mca_ptl_tcp_module.tcp_procs, lam_list_t); OBJ_CONSTRUCT(&mca_ptl_tcp_module.tcp_procs, lam_list_t);
OBJ_CONSTRUCT(&mca_ptl_tcp_module.tcp_acks, lam_list_t); OBJ_CONSTRUCT(&mca_ptl_tcp_module.tcp_acks, lam_list_t);
OBJ_CONSTRUCT(&mca_ptl_tcp_module.tcp_send_requests, lam_free_list_t); OBJ_CONSTRUCT(&mca_ptl_tcp_module.tcp_send_requests, lam_free_list_t);
@ -138,7 +129,6 @@ int mca_ptl_tcp_module_close(void)
if (NULL != mca_ptl_tcp_module.tcp_ptls) if (NULL != mca_ptl_tcp_module.tcp_ptls)
free(mca_ptl_tcp_module.tcp_ptls); free(mca_ptl_tcp_module.tcp_ptls);
OBJ_DESTRUCT(&mca_ptl_tcp_module.tcp_reactor);
OBJ_DESTRUCT(&mca_ptl_tcp_module.tcp_procs); OBJ_DESTRUCT(&mca_ptl_tcp_module.tcp_procs);
OBJ_DESTRUCT(&mca_ptl_tcp_module.tcp_acks); OBJ_DESTRUCT(&mca_ptl_tcp_module.tcp_acks);
OBJ_DESTRUCT(&mca_ptl_tcp_module.tcp_send_requests); OBJ_DESTRUCT(&mca_ptl_tcp_module.tcp_send_requests);
@ -243,13 +233,7 @@ static int mca_ptl_tcp_module_create_listen(void)
mca_ptl_tcp_module.tcp_port = inaddr.sin_port; mca_ptl_tcp_module.tcp_port = inaddr.sin_port;
/* register listen port */ /* register listen port */
lam_reactor_insert( lam_event_add(&mca_ptl_tcp_module.tcp_recv_event, 0);
&mca_ptl_tcp_module.tcp_reactor,
mca_ptl_tcp_module.tcp_listen,
&mca_ptl_tcp_module_listener,
0,
LAM_REACTOR_NOTIFY_RECV|LAM_REACTOR_NOTIFY_EXCEPT);
return LAM_SUCCESS; return LAM_SUCCESS;
} }
@ -341,7 +325,7 @@ mca_ptl_t** mca_ptl_tcp_module_init(int *num_ptls,
void mca_ptl_tcp_module_progress(mca_ptl_base_tstamp_t tstamp) void mca_ptl_tcp_module_progress(mca_ptl_base_tstamp_t tstamp)
{ {
lam_reactor_poll(&mca_ptl_tcp_module.tcp_reactor); lam_event_loop(LAM_EVLOOP_NONBLOCK);
} }
@ -366,21 +350,18 @@ static void mca_ptl_tcp_module_accept(void)
} }
/* wait for receipt of peers process identifier to complete this connection */ /* wait for receipt of peers process identifier to complete this connection */
lam_reactor_insert( lam_event_t* event = malloc(sizeof(lam_event_t));
&mca_ptl_tcp_module.tcp_reactor, lam_event_set(event, sd, LAM_EV_READ|LAM_EV_PERSIST, mca_ptl_tcp_module_recv_handler, event);
sd, lam_event_add(event, 0);
&mca_ptl_tcp_module_listener,
0,
LAM_REACTOR_NOTIFY_RECV|LAM_REACTOR_NOTIFY_EXCEPT);
} }
} }
/* /*
* Called by reactor when there is data available on the registered * Event callback when there is data available on the registered
* socket to recv. * socket to recv.
*/ */
static void mca_ptl_tcp_module_recv_handler(void* user, int sd) static void mca_ptl_tcp_module_recv_handler(int sd, short flags, void* user)
{ {
void* guid; void* guid;
uint32_t size; uint32_t size;
@ -392,7 +373,8 @@ static void mca_ptl_tcp_module_recv_handler(void* user, int sd)
mca_ptl_tcp_module_accept(); mca_ptl_tcp_module_accept();
return; return;
} }
lam_reactor_remove(&mca_ptl_tcp_module.tcp_reactor, sd, LAM_REACTOR_NOTIFY_ALL); lam_event_del((lam_event_t*)user);
free(user);
/* recv the size of the process identifier */ /* recv the size of the process identifier */
int retval = recv(sd, &size, sizeof(size), 0); int retval = recv(sd, &size, sizeof(size), 0);
@ -444,16 +426,3 @@ static void mca_ptl_tcp_module_recv_handler(void* user, int sd)
} }
} }
static void mca_ptl_tcp_module_send_handler(void* user, int sd)
{
lam_output(0, "mca_ptl_tcp_module_send: received invalid event for descriptor(%d)", sd);
lam_reactor_remove(&mca_ptl_tcp_module.tcp_reactor, sd, LAM_REACTOR_NOTIFY_ALL);
}
static void mca_ptl_tcp_module_except_handler(void* user, int sd)
{
lam_output(0, "mca_ptl_tcp_module_except: received invalid event for descriptor(%d)", sd);
lam_reactor_remove(&mca_ptl_tcp_module.tcp_reactor, sd, LAM_REACTOR_NOTIFY_ALL);
}

Просмотреть файл

@ -18,9 +18,8 @@ static void mca_ptl_tcp_peer_destruct(mca_ptl_base_peer_t* ptl_peer);
static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t*); static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t*);
static void mca_ptl_tcp_peer_close_i(mca_ptl_base_peer_t*); static void mca_ptl_tcp_peer_close_i(mca_ptl_base_peer_t*);
static void mca_ptl_tcp_peer_connected(mca_ptl_base_peer_t*); static void mca_ptl_tcp_peer_connected(mca_ptl_base_peer_t*);
static void mca_ptl_tcp_peer_recv_handler(mca_ptl_base_peer_t*, int sd); static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user);
static void mca_ptl_tcp_peer_send_handler(mca_ptl_base_peer_t*, int sd); static void mca_ptl_tcp_peer_send_handler(int sd, short flags, void* user);
static void mca_ptl_tcp_peer_except_handler(mca_ptl_base_peer_t*, int sd);
lam_class_info_t mca_ptl_tcp_peer_t_class_info = { lam_class_info_t mca_ptl_tcp_peer_t_class_info = {
@ -30,14 +29,6 @@ lam_class_info_t mca_ptl_tcp_peer_t_class_info = {
(lam_destruct_t)mca_ptl_tcp_peer_destruct (lam_destruct_t)mca_ptl_tcp_peer_destruct
}; };
static lam_reactor_listener_t mca_ptl_tcp_peer_listener = {
(lam_rl_recv_handler_fn_t)mca_ptl_tcp_peer_recv_handler,
(lam_rl_send_handler_fn_t)mca_ptl_tcp_peer_send_handler,
(lam_rl_except_handler_fn_t)mca_ptl_tcp_peer_except_handler
};
/* /*
* Initialize state of the peer instance. * Initialize state of the peer instance.
*/ */
@ -51,6 +42,8 @@ static void mca_ptl_tcp_peer_construct(mca_ptl_base_peer_t* ptl_peer)
ptl_peer->peer_sd = -1; ptl_peer->peer_sd = -1;
ptl_peer->peer_send_frag = 0; ptl_peer->peer_send_frag = 0;
ptl_peer->peer_recv_frag = 0; ptl_peer->peer_recv_frag = 0;
ptl_peer->peer_send_event.ev_flags = 0;
ptl_peer->peer_recv_event.ev_flags = 0;
ptl_peer->peer_state = MCA_PTL_TCP_CLOSED; ptl_peer->peer_state = MCA_PTL_TCP_CLOSED;
ptl_peer->peer_retries = 0; ptl_peer->peer_retries = 0;
OBJ_CONSTRUCT(&ptl_peer->peer_frags, lam_list_t); OBJ_CONSTRUCT(&ptl_peer->peer_frags, lam_list_t);
@ -96,12 +89,7 @@ int mca_ptl_tcp_peer_send(mca_ptl_base_peer_t* ptl_peer, mca_ptl_tcp_send_frag_t
else { else {
if(mca_ptl_tcp_send_frag_handler(frag, ptl_peer->peer_sd) == false) { if(mca_ptl_tcp_send_frag_handler(frag, ptl_peer->peer_sd) == false) {
ptl_peer->peer_send_frag = frag; ptl_peer->peer_send_frag = frag;
rc = lam_reactor_insert( lam_event_add(&mca_ptl_tcp_module.tcp_send_event, 0);
&mca_ptl_tcp_module.tcp_reactor,
ptl_peer->peer_sd,
&mca_ptl_tcp_peer_listener,
ptl_peer,
LAM_REACTOR_NOTIFY_SEND);
} }
} }
break; break;
@ -155,7 +143,7 @@ void mca_ptl_tcp_peer_close(mca_ptl_base_peer_t* ptl_peer)
} }
/* /*
* Remove the socket descriptor from the reactor, close it, * Remove any event registrations associated with the socket
* and update the peer state to reflect the connection has * and update the peer state to reflect the connection has
* been closed. * been closed.
*/ */
@ -163,10 +151,8 @@ void mca_ptl_tcp_peer_close(mca_ptl_base_peer_t* ptl_peer)
static void mca_ptl_tcp_peer_close_i(mca_ptl_base_peer_t* ptl_peer) static void mca_ptl_tcp_peer_close_i(mca_ptl_base_peer_t* ptl_peer)
{ {
if(ptl_peer->peer_sd >= 0) { if(ptl_peer->peer_sd >= 0) {
lam_reactor_remove( lam_event_del(&ptl_peer->peer_recv_event);
&mca_ptl_tcp_module.tcp_reactor, lam_event_del(&ptl_peer->peer_send_event);
ptl_peer->peer_sd,
LAM_REACTOR_NOTIFY_ALL);
close(ptl_peer->peer_sd); close(ptl_peer->peer_sd);
ptl_peer->peer_sd = -1; ptl_peer->peer_sd = -1;
} }
@ -180,20 +166,27 @@ static void mca_ptl_tcp_peer_close_i(mca_ptl_base_peer_t* ptl_peer)
static void mca_ptl_tcp_peer_connected(mca_ptl_base_peer_t* ptl_peer) static void mca_ptl_tcp_peer_connected(mca_ptl_base_peer_t* ptl_peer)
{ {
int flags = LAM_REACTOR_NOTIFY_RECV|LAM_REACTOR_NOTIFY_EXCEPT;
ptl_peer->peer_state = MCA_PTL_TCP_CONNECTED; ptl_peer->peer_state = MCA_PTL_TCP_CONNECTED;
ptl_peer->peer_retries = 0; ptl_peer->peer_retries = 0;
if(lam_list_get_size(&ptl_peer->peer_frags) > 0) { if(lam_list_get_size(&ptl_peer->peer_frags) > 0) {
if(NULL == ptl_peer->peer_send_frag) if(NULL == ptl_peer->peer_send_frag)
ptl_peer->peer_send_frag = (mca_ptl_tcp_send_frag_t*)lam_list_remove_first(&ptl_peer->peer_frags); ptl_peer->peer_send_frag = (mca_ptl_tcp_send_frag_t*)
flags |= LAM_REACTOR_NOTIFY_SEND; lam_list_remove_first(&ptl_peer->peer_frags);
lam_event_add(&ptl_peer->peer_send_event, 0);
} }
lam_reactor_insert( lam_event_set(
&mca_ptl_tcp_module.tcp_reactor, &ptl_peer->peer_recv_event,
ptl_peer->peer_sd, ptl_peer->peer_sd,
&mca_ptl_tcp_peer_listener, LAM_EV_READ|LAM_EV_PERSIST,
ptl_peer, mca_ptl_tcp_peer_recv_handler,
flags); ptl_peer);
lam_event_set(
&ptl_peer->peer_send_event,
ptl_peer->peer_sd,
LAM_EV_WRITE|LAM_EV_PERSIST,
mca_ptl_tcp_peer_send_handler,
ptl_peer);
lam_event_add(&ptl_peer->peer_recv_event, 0);
} }
@ -313,8 +306,8 @@ static int mca_ptl_tcp_peer_send_connect_ack(mca_ptl_base_peer_t* ptl_peer)
/* /*
* Start a connection to the peer. This will likely not complete, * Start a connection to the peer. This will likely not complete,
* as the socket is set to non-blocking, so register with the reactor * as the socket is set to non-blocking, so register for event
* for notification of connect completion. On connection we send * notification of connect completion. On connection we send
* our globally unique process identifier to the peer and wait for * our globally unique process identifier to the peer and wait for
* the peers response. * the peers response.
*/ */
@ -344,12 +337,7 @@ static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t* ptl_peer)
/* non-blocking so wait for completion */ /* non-blocking so wait for completion */
if(errno == EINPROGRESS) { if(errno == EINPROGRESS) {
ptl_peer->peer_state = MCA_PTL_TCP_CONNECTING; ptl_peer->peer_state = MCA_PTL_TCP_CONNECTING;
rc = lam_reactor_insert( lam_event_add(&ptl_peer->peer_send_event, 0);
&mca_ptl_tcp_module.tcp_reactor,
ptl_peer->peer_sd,
&mca_ptl_tcp_peer_listener,
ptl_peer,
LAM_REACTOR_NOTIFY_SEND|LAM_REACTOR_NOTIFY_EXCEPT);
return rc; return rc;
} }
mca_ptl_tcp_peer_close_i(ptl_peer); mca_ptl_tcp_peer_close_i(ptl_peer);
@ -360,12 +348,7 @@ static int mca_ptl_tcp_peer_start_connect(mca_ptl_base_peer_t* ptl_peer)
/* send our globally unique process identifier to the peer */ /* send our globally unique process identifier to the peer */
if((rc = mca_ptl_tcp_peer_send_connect_ack(ptl_peer)) == LAM_SUCCESS) { if((rc = mca_ptl_tcp_peer_send_connect_ack(ptl_peer)) == LAM_SUCCESS) {
ptl_peer->peer_state = MCA_PTL_TCP_CONNECT_ACK; ptl_peer->peer_state = MCA_PTL_TCP_CONNECT_ACK;
rc = lam_reactor_insert( lam_event_add(&ptl_peer->peer_recv_event, 0);
&mca_ptl_tcp_module.tcp_reactor,
ptl_peer->peer_sd,
&mca_ptl_tcp_peer_listener,
ptl_peer,
LAM_REACTOR_NOTIFY_RECV|LAM_REACTOR_NOTIFY_EXCEPT);
} else { } else {
mca_ptl_tcp_peer_close_i(ptl_peer); mca_ptl_tcp_peer_close_i(ptl_peer);
} }
@ -385,7 +368,7 @@ static void mca_ptl_tcp_peer_complete_connect(mca_ptl_base_peer_t* ptl_peer)
lam_socklen_t so_length = sizeof(so_error); lam_socklen_t so_length = sizeof(so_error);
/* unregister from receiving event notifications */ /* unregister from receiving event notifications */
lam_reactor_remove(&mca_ptl_tcp_module.tcp_reactor, ptl_peer->peer_sd, LAM_REACTOR_NOTIFY_ALL); lam_event_del(&ptl_peer->peer_send_event);
/* check connect completion status */ /* check connect completion status */
if(getsockopt(ptl_peer->peer_sd, SOL_SOCKET, SO_ERROR, &so_error, &so_length) < 0) { if(getsockopt(ptl_peer->peer_sd, SOL_SOCKET, SO_ERROR, &so_error, &so_length) < 0) {
@ -394,12 +377,7 @@ static void mca_ptl_tcp_peer_complete_connect(mca_ptl_base_peer_t* ptl_peer)
return; return;
} }
if(so_error == EINPROGRESS) { if(so_error == EINPROGRESS) {
lam_reactor_insert( lam_event_add(&ptl_peer->peer_send_event, 0);
&mca_ptl_tcp_module.tcp_reactor,
ptl_peer->peer_sd,
&mca_ptl_tcp_peer_listener,
ptl_peer,
LAM_REACTOR_NOTIFY_SEND|LAM_REACTOR_NOTIFY_EXCEPT);
return; return;
} }
if(so_error != 0) { if(so_error != 0) {
@ -410,12 +388,7 @@ static void mca_ptl_tcp_peer_complete_connect(mca_ptl_base_peer_t* ptl_peer)
if(mca_ptl_tcp_peer_send_connect_ack(ptl_peer) == LAM_SUCCESS) { if(mca_ptl_tcp_peer_send_connect_ack(ptl_peer) == LAM_SUCCESS) {
ptl_peer->peer_state = MCA_PTL_TCP_CONNECT_ACK; ptl_peer->peer_state = MCA_PTL_TCP_CONNECT_ACK;
lam_reactor_insert( lam_event_add(&ptl_peer->peer_recv_event, 0);
&mca_ptl_tcp_module.tcp_reactor,
ptl_peer->peer_sd,
&mca_ptl_tcp_peer_listener,
ptl_peer,
LAM_REACTOR_NOTIFY_RECV|LAM_REACTOR_NOTIFY_EXCEPT);
} else { } else {
mca_ptl_tcp_peer_close_i(ptl_peer); mca_ptl_tcp_peer_close_i(ptl_peer);
} }
@ -427,8 +400,9 @@ static void mca_ptl_tcp_peer_complete_connect(mca_ptl_base_peer_t* ptl_peer)
* of the socket and take the appropriate action. * of the socket and take the appropriate action.
*/ */
static void mca_ptl_tcp_peer_recv_handler(mca_ptl_base_peer_t* ptl_peer, int sd) static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user)
{ {
mca_ptl_base_peer_t* ptl_peer = user;
THREAD_LOCK(&ptl_peer->peer_lock); THREAD_LOCK(&ptl_peer->peer_lock);
switch(ptl_peer->peer_state) { switch(ptl_peer->peer_state) {
case MCA_PTL_TCP_CONNECT_ACK: case MCA_PTL_TCP_CONNECT_ACK:
@ -472,8 +446,9 @@ static void mca_ptl_tcp_peer_recv_handler(mca_ptl_base_peer_t* ptl_peer, int sd)
* of the socket and take the appropriate action. * of the socket and take the appropriate action.
*/ */
static void mca_ptl_tcp_peer_send_handler(mca_ptl_base_peer_t* ptl_peer, int sd) static void mca_ptl_tcp_peer_send_handler(int sd, short flags, void* user)
{ {
mca_ptl_tcp_peer_t* ptl_peer = user;
THREAD_LOCK(&ptl_peer->peer_lock); THREAD_LOCK(&ptl_peer->peer_lock);
switch(ptl_peer->peer_state) { switch(ptl_peer->peer_state) {
case MCA_PTL_TCP_CONNECTING: case MCA_PTL_TCP_CONNECTING:
@ -491,27 +466,17 @@ static void mca_ptl_tcp_peer_send_handler(mca_ptl_base_peer_t* ptl_peer, int sd)
/* if nothing else to do unregister for send event notifications */ /* if nothing else to do unregister for send event notifications */
if(NULL == ptl_peer->peer_send_frag) { if(NULL == ptl_peer->peer_send_frag) {
lam_reactor_remove(&mca_ptl_tcp_module.tcp_reactor, sd, LAM_REACTOR_NOTIFY_SEND); lam_event_del(&ptl_peer->peer_send_event);
} }
break; break;
default: default:
lam_output(0, "mca_ptl_tcp_peer_send_handler: invalid connection state (%d)", lam_output(0, "mca_ptl_tcp_peer_send_handler: invalid connection state (%d)",
ptl_peer->peer_state); ptl_peer->peer_state);
lam_reactor_remove(&mca_ptl_tcp_module.tcp_reactor, sd, LAM_REACTOR_NOTIFY_SEND); lam_event_del(&ptl_peer->peer_send_event);
break; break;
} }
THREAD_UNLOCK(&ptl_peer->peer_lock); THREAD_UNLOCK(&ptl_peer->peer_lock);
} }
/*
* A file descriptor is in an erroneous state. Close the connection
* and update the peers state.
*/
static void mca_ptl_tcp_peer_except_handler(mca_ptl_base_peer_t* ptl_peer, int sd)
{
lam_output(0, "mca_ptl_tcp_peer_except_handler: closing connection");
mca_ptl_tcp_peer_close_i(ptl_peer);
}

Просмотреть файл

@ -10,7 +10,7 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/in.h> #include <netinet/in.h>
#include "lam/lfc/lam_list.h" #include "lam/lfc/lam_list.h"
#include "lam/util/reactor.h" #include "lam/event/event.h"
#include "mca/mpi/pml/pml.h" #include "mca/mpi/pml/pml.h"
#include "mca/mpi/ptl/ptl.h" #include "mca/mpi/ptl/ptl.h"
#include "ptl_tcp_recvfrag.h" #include "ptl_tcp_recvfrag.h"
@ -46,6 +46,8 @@ struct mca_ptl_base_peer_t {
size_t peer_retries; size_t peer_retries;
lam_list_t peer_frags; lam_list_t peer_frags;
lam_mutex_t peer_lock; lam_mutex_t peer_lock;
lam_event_t peer_send_event;
lam_event_t peer_recv_event;
}; };
typedef struct mca_ptl_base_peer_t mca_ptl_base_peer_t; typedef struct mca_ptl_base_peer_t mca_ptl_base_peer_t;

Просмотреть файл

@ -113,7 +113,7 @@ static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd)
if (mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_ack_header_t)) == false) if (mca_ptl_tcp_recv_frag_header(frag, sd, sizeof(mca_ptl_base_ack_header_t)) == false)
return false; return false;
sendfrag = (mca_ptl_tcp_send_frag_t*)frag->frag_header.hdr_ack.hdr_src_ptr.pval; sendfrag = (mca_ptl_tcp_send_frag_t*)frag->frag_header.hdr_ack.hdr_src_ptr.pval;
mca_ptl_base_send_request_progress(sendfrag->super.frag_request, &sendfrag->super); sendfrag->frag_owner->ptl_send_progress(sendfrag->super.frag_request, &sendfrag->super);
/* don't return first fragment - it is returned along with the request */ /* don't return first fragment - it is returned along with the request */
return true; return true;
} }
@ -174,7 +174,7 @@ static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd)
return false; return false;
/* indicate completion status */ /* indicate completion status */
mca_ptl_base_recv_request_progress(frag->super.frag_request, &frag->super); frag->frag_owner->ptl_recv_progress(frag->super.frag_request, &frag->super);
return true; return true;
} }
@ -296,7 +296,7 @@ void mca_ptl_tcp_recv_frag_process(mca_ptl_tcp_recv_frag_t* frag)
if(frag->frag_addr != frag->super.super.frag_addr) { if(frag->frag_addr != frag->super.super.frag_addr) {
memcpy(frag->super.super.frag_addr, frag->frag_addr, frag->super.super.frag_size); memcpy(frag->super.super.frag_addr, frag->frag_addr, frag->super.super.frag_size);
} }
mca_ptl_base_recv_request_progress(frag->super.frag_request, &frag->super); frag->frag_owner->ptl_recv_progress(frag->super.frag_request, &frag->super);
if(frag->frag_acked == true) { if(frag->frag_acked == true) {
mca_ptl_tcp_recv_frag_return(frag); mca_ptl_tcp_recv_frag_return(frag);
} }

Просмотреть файл

@ -100,7 +100,7 @@ void mca_ptl_tcp_send_frag_reinit(
/* /*
* The socket is setup as non-blocking, writes are handled asynchronously, * The socket is setup as non-blocking, writes are handled asynchronously,
* with callbacks from the reactor when the socket is ready for writes. * with event callbacks when the socket is ready for writes.
*/ */
bool mca_ptl_tcp_send_frag_handler(mca_ptl_tcp_send_frag_t* frag, int sd) bool mca_ptl_tcp_send_frag_handler(mca_ptl_tcp_send_frag_t* frag, int sd)