cleanup - fixed compile errors - sorry!!
This commit was SVN r420.
Этот коммит содержится в:
родитель
b6b4bcd69c
Коммит
67be30928a
@ -6,6 +6,9 @@
|
||||
#ifndef _LAM_IF_UTIL_
|
||||
#define _LAM_IF_UTIL_
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
|
||||
/**
|
||||
* Lookup an interface by name and return its primary address.
|
||||
*
|
||||
|
@ -59,16 +59,16 @@ extern lam_array_t mca_base_params;
|
||||
#if defined(c_plusplus) || defined(__cplusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
int mca_base_param_register_int(char *type_name, char *module_name,
|
||||
char *param_name, char *mca_param_name,
|
||||
int mca_base_param_register_int(const char *type_name, const char *module_name,
|
||||
const char *param_name, const char *mca_param_name,
|
||||
int default_value);
|
||||
int mca_base_param_register_string(char *type_name, char *module_name,
|
||||
char *param_name,
|
||||
char *mca_param_name,
|
||||
char *default_value);
|
||||
int mca_base_param_register_string(const char *type_name, const char *module_name,
|
||||
const char *param_name,
|
||||
const char *mca_param_name,
|
||||
const char *default_value);
|
||||
int mca_base_param_lookup_int(int index, int *value);
|
||||
int mca_base_param_lookup_string(int index, char **value);
|
||||
int mca_base_param_find(char *type, char *module, char *param);
|
||||
int mca_base_param_find(const char *type, const char *module, const char *param);
|
||||
int mca_base_param_finalize(void);
|
||||
|
||||
#if 0
|
||||
|
@ -20,6 +20,8 @@
|
||||
*/
|
||||
|
||||
struct mca_ptl_t;
|
||||
struct mca_ptl_addr_t;
|
||||
|
||||
|
||||
typedef enum {
|
||||
MCA_PML_BASE_SEND_STANDARD,
|
||||
@ -126,6 +128,16 @@ typedef int (*mca_pml_base_wait_fn_t)(
|
||||
lam_status_public_t* status
|
||||
);
|
||||
|
||||
/**
|
||||
* PTL->PML Upcall from PTL to PML to add themself to proc array
|
||||
*/
|
||||
|
||||
typedef int (*mca_ptl_pml_add_proc_fn_t)(
|
||||
struct lam_proc_t* proc,
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_addr_t*
|
||||
);
|
||||
|
||||
|
||||
/**
|
||||
* PML instance interface functions.
|
||||
@ -133,22 +145,25 @@ typedef int (*mca_pml_base_wait_fn_t)(
|
||||
|
||||
struct mca_pml_t {
|
||||
|
||||
/* downcalls from MCA to PML */
|
||||
mca_pml_base_add_comm_fn_t pml_add_comm;
|
||||
mca_pml_base_del_comm_fn_t pml_del_comm;
|
||||
mca_pml_base_add_procs_fn_t pml_add_procs;
|
||||
mca_pml_base_del_procs_fn_t pml_del_procs;
|
||||
mca_pml_base_add_ptls_fn_t pml_add_ptls;
|
||||
mca_pml_base_fini_fn_t pml_fini;
|
||||
mca_pml_base_progress_fn_t pml_progress;
|
||||
|
||||
/* downcalls from MPI to PML */
|
||||
mca_pml_base_irecv_init_fn_t pml_irecv_init;
|
||||
mca_pml_base_irecv_fn_t pml_irecv;
|
||||
mca_pml_base_isend_init_fn_t pml_isend_init;
|
||||
mca_pml_base_isend_fn_t pml_isend;
|
||||
mca_pml_base_progress_fn_t pml_progress;
|
||||
mca_pml_base_start_fn_t pml_start;
|
||||
mca_pml_base_test_fn_t pml_test;
|
||||
mca_pml_base_wait_fn_t pml_wait;
|
||||
|
||||
/* upcalls from PTL to PML */
|
||||
mca_ptl_pml_add_proc_fn_t ptl_pml_add_proc;
|
||||
};
|
||||
typedef struct mca_pml_t mca_pml_t;
|
||||
|
||||
|
@ -15,39 +15,40 @@ lam_class_info_t mca_pml_teg_proc_cls = {
|
||||
};
|
||||
|
||||
|
||||
void mca_ptl_array_init(mca_ptl_array_t* ptl_array)
|
||||
void mca_ptl_array_init(mca_ptl_array_t* array)
|
||||
{
|
||||
SUPER_INIT(ptl_array, &lam_object_cls);
|
||||
ptl_array->ptl_array = 0;
|
||||
ptl_array->ptl_size = 0;
|
||||
ptl_array->ptl_index = 0;
|
||||
ptl_array->ptl_reserve = 0;
|
||||
SUPER_INIT(array, &lam_object_cls);
|
||||
array->ptl_procs = 0;
|
||||
array->ptl_size = 0;
|
||||
array->ptl_index = 0;
|
||||
array->ptl_reserve = 0;
|
||||
}
|
||||
|
||||
|
||||
void mca_ptl_array_destroy(mca_ptl_array_t* ptl_array)
|
||||
void mca_ptl_array_destroy(mca_ptl_array_t* array)
|
||||
{
|
||||
if(ptl_array->ptl_array != 0)
|
||||
LAM_FREE(ptl_array->ptl_array);
|
||||
SUPER_DESTROY(ptl_array, &lam_object_cls);
|
||||
if(array->ptl_procs != 0)
|
||||
LAM_FREE(array->ptl_procs);
|
||||
SUPER_DESTROY(array, &lam_object_cls);
|
||||
}
|
||||
|
||||
|
||||
int mca_ptl_array_reserve(mca_ptl_array_t* ptl_array, size_t size)
|
||||
int mca_ptl_array_reserve(mca_ptl_array_t* array, size_t size)
|
||||
{
|
||||
mca_ptl_info_t *array;
|
||||
if(ptl_array->ptl_reserve >= size)
|
||||
mca_ptl_proc_t *procs;
|
||||
if(array->ptl_reserve >= size)
|
||||
return LAM_SUCCESS;
|
||||
|
||||
array = (mca_ptl_info_t*)LAM_MALLOC(sizeof(mca_ptl_array_t)*size);
|
||||
procs = (mca_ptl_proc_t*)LAM_MALLOC(sizeof(mca_ptl_array_t)*size);
|
||||
if(array == 0)
|
||||
return LAM_ERR_OUT_OF_RESOURCE;
|
||||
if(ptl_array->ptl_size) {
|
||||
memcpy(ptl_array->ptl_array, array, ptl_array->ptl_size * sizeof(mca_ptl_info_t));
|
||||
LAM_FREE(ptl_array->ptl_array);
|
||||
if(array->ptl_size) {
|
||||
memcpy(procs, array->ptl_procs, array->ptl_size * sizeof(mca_ptl_proc_t));
|
||||
LAM_FREE(array->ptl_procs);
|
||||
}
|
||||
memset(ptl_array->ptl_array+(size-ptl_array->ptl_size), 0, (size-ptl_array->ptl_size)*sizeof(mca_ptl_info_t));
|
||||
ptl_array->ptl_reserve = size;
|
||||
array->ptl_procs = procs;
|
||||
array->ptl_reserve = size;
|
||||
memset(array->ptl_procs+(size-array->ptl_size), 0, (size-array->ptl_size)*sizeof(mca_ptl_proc_t));
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -5,19 +5,21 @@
|
||||
#ifndef LAM_PTL_ARRAY_H
|
||||
#define LAM_PTL_ARRAY_H
|
||||
|
||||
#include "lam/util/output.h"
|
||||
#include "mca/mpi/ptl/ptl.h"
|
||||
|
||||
extern lam_class_info_t mca_ptl_array_cls;
|
||||
|
||||
struct mca_ptl_info_t {
|
||||
double ptl_weight; /* PTL weight for scheduling */
|
||||
mca_ptl_t *ptl; /* PTL implementation */
|
||||
struct mca_ptl_proc_t {
|
||||
double ptl_weight; /* PTL weight for scheduling */
|
||||
struct mca_ptl_addr_t* ptl_addr; /* PTL addressing info */
|
||||
mca_ptl_t *ptl; /* PTL implementation */
|
||||
};
|
||||
typedef struct mca_ptl_info_t mca_ptl_info_t;
|
||||
typedef struct mca_ptl_proc_t mca_ptl_proc_t;
|
||||
|
||||
struct mca_ptl_array_t {
|
||||
lam_object_t super;
|
||||
mca_ptl_info_t* ptl_array; /* array of PTL info */
|
||||
mca_ptl_proc_t* ptl_procs; /* array of ptl procs */
|
||||
size_t ptl_size; /* number available */
|
||||
size_t ptl_reserve;
|
||||
size_t ptl_index; /* last used index*/
|
||||
@ -29,6 +31,11 @@ void mca_ptl_array_init(mca_ptl_array_t*);
|
||||
void mca_ptl_array_destroy(mca_ptl_array_t*);
|
||||
int mca_ptl_array_reserve(mca_ptl_array_t*, size_t);
|
||||
|
||||
static inline size_t mca_ptl_array_get_size(mca_ptl_array_t* array)
|
||||
{
|
||||
return array->ptl_size;
|
||||
}
|
||||
|
||||
static inline void mca_ptl_array_set_size(mca_ptl_array_t* array, size_t size)
|
||||
{
|
||||
if(array->ptl_size > array->ptl_reserve)
|
||||
@ -36,20 +43,30 @@ static inline void mca_ptl_array_set_size(mca_ptl_array_t* array, size_t size)
|
||||
array->ptl_size = size;
|
||||
}
|
||||
|
||||
static inline mca_ptl_info_t* mca_ptl_array_get_next(mca_ptl_array_t* ptl_array)
|
||||
static inline mca_ptl_proc_t* mca_ptl_array_insert(mca_ptl_array_t* array)
|
||||
{
|
||||
mca_ptl_info_t* ptl_info = &ptl_array->ptl_array[ptl_array->ptl_index++];
|
||||
if(ptl_array->ptl_index == ptl_array->ptl_size)
|
||||
ptl_array->ptl_index = 0;
|
||||
return ptl_info;
|
||||
#if LAM_ENABLE_DEBUG
|
||||
if(array->ptl_size >= array->ptl_reserve) {
|
||||
lam_output(0, "mca_ptl_array_insert: invalid array index %d >= %d",
|
||||
array->ptl_size, array->ptl_reserve);
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
return &array->ptl_procs[array->ptl_size++];
|
||||
}
|
||||
|
||||
static inline mca_ptl_t* mca_ptl_array_get_next_ptl(mca_ptl_array_t* ptl_array)
|
||||
static inline mca_ptl_proc_t* mca_ptl_array_get_next(mca_ptl_array_t* array)
|
||||
{
|
||||
mca_ptl_info_t* ptl_info = &ptl_array->ptl_array[ptl_array->ptl_index++];
|
||||
if(ptl_array->ptl_index == ptl_array->ptl_size)
|
||||
ptl_array->ptl_index = 0;
|
||||
return ptl_info->ptl;
|
||||
#if LAM_ENABLE_DEBUG
|
||||
if(array->ptl_size == 0) {
|
||||
lam_output(0, "mca_ptl_array_get_next: invalid array size");
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
mca_ptl_proc_t* ptl_proc = &array->ptl_procs[array->ptl_index++];
|
||||
if(array->ptl_index == array->ptl_size)
|
||||
array->ptl_index = 0;
|
||||
return ptl_proc;
|
||||
}
|
||||
|
||||
|
||||
|
@ -18,13 +18,12 @@ mca_pml_teg_t mca_pml_teg = {
|
||||
mca_pml_teg_del_comm,
|
||||
mca_pml_teg_add_procs,
|
||||
mca_pml_teg_del_procs,
|
||||
mca_pml_teg_add_ptls,
|
||||
mca_pml_teg_fini,
|
||||
mca_pml_teg_progress,
|
||||
mca_pml_teg_irecv_init,
|
||||
mca_pml_teg_irecv,
|
||||
mca_pml_teg_isend_init,
|
||||
mca_pml_teg_isend,
|
||||
mca_pml_teg_progress,
|
||||
mca_pml_teg_start,
|
||||
mca_pml_teg_test,
|
||||
mca_pml_teg_wait,
|
||||
@ -50,6 +49,7 @@ int mca_pml_teg_del_comm(lam_communicator_t* comm)
|
||||
|
||||
int mca_pml_teg_add_ptls(struct mca_ptl_t** ptls, size_t nptls)
|
||||
{
|
||||
/* sort the ptls by exclusivity */
|
||||
mca_pml_teg.teg_ptls = ptls;
|
||||
mca_pml_teg.teg_num_ptls = nptls;
|
||||
return LAM_SUCCESS;
|
||||
@ -58,13 +58,21 @@ int mca_pml_teg_add_ptls(struct mca_ptl_t** ptls, size_t nptls)
|
||||
int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs)
|
||||
{
|
||||
size_t i;
|
||||
/* initialize each proc */
|
||||
for(i=0; i<nprocs; i++) {
|
||||
lam_proc_t *proc = procs[i];
|
||||
if(proc->proc_pml == 0) {
|
||||
size_t p;
|
||||
|
||||
for(p=0; p<nprocs; p++) {
|
||||
lam_proc_t *proc = procs[p];
|
||||
|
||||
/* initialize each proc */
|
||||
mca_pml_proc_t* proc_pml = proc->proc_pml;
|
||||
if(proc_pml == 0) {
|
||||
|
||||
/* allocate pml specific proc data */
|
||||
mca_pml_proc_t* proc_pml = (mca_pml_proc_t*)LAM_MALLOC(sizeof(mca_pml_proc_t));
|
||||
proc_pml = (mca_pml_proc_t*)LAM_MALLOC(sizeof(mca_pml_proc_t));
|
||||
if(NULL == proc_pml) {
|
||||
lam_output(0, "mca_pml_teg_add_procs: unable to allocate resources");
|
||||
return LAM_ERR_OUT_OF_RESOURCE;
|
||||
}
|
||||
mca_pml_teg_proc_init(proc_pml);
|
||||
|
||||
/* preallocate space in array for max number of ptls */
|
||||
@ -73,18 +81,36 @@ int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs)
|
||||
proc_pml->proc_lam = proc;
|
||||
proc->proc_pml = proc_pml;
|
||||
}
|
||||
}
|
||||
|
||||
/* allow each ptl to add itself to proc array */
|
||||
for(i=0; i<mca_pml_teg.teg_num_ptls; i++) {
|
||||
mca_ptl_t* ptl = mca_pml_teg.teg_ptls[i];
|
||||
ptl->ptl_add_procs(ptl, procs, nprocs);
|
||||
}
|
||||
/* allow each ptl to register with the proc */
|
||||
for(i=0; i<mca_pml_teg.teg_num_ptls; i++) {
|
||||
mca_ptl_t* ptl = mca_pml_teg.teg_ptls[i];
|
||||
|
||||
/* if the ptl can reach the destination proc it will return
|
||||
* addressing information that will be cached on the proc
|
||||
*/
|
||||
struct mca_ptl_addr_t* ptl_addr;
|
||||
int rc = ptl->ptl_add_proc(ptl, proc, &ptl_addr);
|
||||
if(rc == LAM_SUCCESS) {
|
||||
|
||||
/* compute a weighting factor for each ptl */
|
||||
for(i=0; i<nprocs; i++) {
|
||||
lam_proc_t *proc = procs[i];
|
||||
mca_pml_proc_t* proc_pml = proc->proc_pml;
|
||||
/* cache the ptl on the proc */
|
||||
mca_ptl_proc_t* ptl_proc = mca_ptl_array_insert(&proc_pml->proc_ptl_next);
|
||||
ptl_proc->ptl = ptl;
|
||||
ptl_proc->ptl_addr = ptl_addr;
|
||||
ptl_proc->ptl_weight = 0;
|
||||
|
||||
/* if this ptl supports exclusive access then don't allow
|
||||
* subsequent ptls to register
|
||||
*/
|
||||
if(ptl->ptl_exclusive)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* compute a weighting factor for each ptl */
|
||||
for(i=0; i<mca_ptl_array_get_size(&proc_pml->proc_ptl_next); i++) {
|
||||
|
||||
}
|
||||
}
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
@ -105,9 +131,10 @@ int mca_pml_teg_del_procs(lam_proc_t** procs, size_t nprocs)
|
||||
}
|
||||
}
|
||||
#endif
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
int mca_pml_teg_module_fini(void)
|
||||
int mca_pml_teg_fini(void)
|
||||
{
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
@ -19,15 +19,17 @@ static inline int mca_pml_teg_send_request_alloc(
|
||||
mca_ptl_base_send_request_t** sendreq)
|
||||
{
|
||||
mca_pml_proc_t *proc = mca_pml_teg_proc_lookup_remote(comm,dst);
|
||||
mca_ptl_proc_t* ptl_proc;
|
||||
mca_ptl_t* ptl;
|
||||
|
||||
THREAD_SCOPED_LOCK(&proc->proc_lock,
|
||||
(ptl = mca_ptl_array_get_next_ptl(&proc->proc_ptl_first)));
|
||||
(ptl_proc = mca_ptl_array_get_next(&proc->proc_ptl_first)));
|
||||
ptl = ptl_proc->ptl;
|
||||
|
||||
int rc = ptl->ptl_request_alloc(ptl,sendreq);
|
||||
if(rc != LAM_SUCCESS)
|
||||
return rc;
|
||||
(*sendreq)->req_owner = ptl;
|
||||
(*sendreq)->req_owner = ptl_proc;
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
@ -35,7 +37,8 @@ static inline int mca_pml_teg_send_request_alloc(
|
||||
static inline int mca_pml_teg_send_request_start(
|
||||
mca_ptl_base_send_request_t* req)
|
||||
{
|
||||
mca_ptl_t* ptl = req->req_owner;
|
||||
mca_ptl_proc_t* ptl_proc = req->req_owner;
|
||||
mca_ptl_t* ptl = ptl_proc->ptl;
|
||||
size_t first_fragment_size = ptl->ptl_first_frag_size;
|
||||
int rc;
|
||||
bool complete;
|
||||
@ -43,7 +46,7 @@ static inline int mca_pml_teg_send_request_start(
|
||||
// start the first fragment
|
||||
if(req->req_length < first_fragment_size)
|
||||
first_fragment_size = req->req_length;
|
||||
rc = ptl->ptl_send(ptl, req, first_fragment_size, &complete);
|
||||
rc = ptl->ptl_send(ptl, ptl_proc->ptl_addr, req, first_fragment_size, &complete);
|
||||
if(rc != LAM_SUCCESS)
|
||||
return rc;
|
||||
|
||||
|
@ -36,7 +36,7 @@ struct mca_ptl_base_send_request_t {
|
||||
/* queue of fragments that are waiting to be acknowledged */
|
||||
mca_ptl_base_queue_t req_unacked_frags;
|
||||
/* PTL that allocated this descriptor */
|
||||
struct mca_ptl_t* req_owner;
|
||||
struct mca_ptl_proc_t* req_owner;
|
||||
};
|
||||
typedef struct mca_ptl_base_send_request_t mca_ptl_base_send_request_t;
|
||||
|
||||
|
@ -21,6 +21,7 @@
|
||||
*/
|
||||
|
||||
struct mca_ptl_t;
|
||||
struct mca_ptl_addr_t;
|
||||
struct mca_ptl_base_fragment_t;
|
||||
struct mca_ptl_base_send_request_t;
|
||||
|
||||
@ -82,10 +83,10 @@ typedef struct mca_ptl_base_module_1_0_0_t mca_ptl_base_module_t;
|
||||
* @param nprocs (IN)
|
||||
* @return
|
||||
*/
|
||||
typedef int (*mca_ptl_base_add_procs_fn_t)(
|
||||
typedef int (*mca_ptl_base_add_proc_fn_t)(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct lam_proc_t** procs,
|
||||
size_t nprocs
|
||||
struct lam_proc_t* proc,
|
||||
struct mca_ptl_addr_t**
|
||||
);
|
||||
|
||||
/**
|
||||
@ -112,6 +113,7 @@ typedef int (*mca_ptl_base_request_alloc_fn_t)(
|
||||
|
||||
typedef int (*mca_ptl_base_send_fn_t)(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_addr_t* ptl_addr,
|
||||
struct mca_ptl_base_send_request_t* send_request,
|
||||
size_t size,
|
||||
bool* complete
|
||||
@ -125,7 +127,7 @@ struct mca_ptl_t {
|
||||
|
||||
/* PTL common attributes */
|
||||
mca_ptl_base_module_t* ptl_module;
|
||||
bool ptl_exclusive; /**< indicates this PTL should be used exclusively */
|
||||
int ptl_exclusive; /**< indicates this PTL should be used exclusively */
|
||||
size_t ptl_first_frag_size; /**< maximum size of first fragment */
|
||||
size_t ptl_min_frag_size; /**< threshold below which the PTL will not fragment */
|
||||
size_t ptl_max_frag_size; /**< maximum fragment size supported by the PTL */
|
||||
@ -133,7 +135,7 @@ struct mca_ptl_t {
|
||||
uint64_t ptl_bandwidth; /**< bandwidth (bytes/sec) supported by each endpoint */
|
||||
|
||||
/* PTL function table */
|
||||
mca_ptl_base_add_procs_fn_t ptl_add_procs;
|
||||
mca_ptl_base_add_proc_fn_t ptl_add_proc;
|
||||
mca_ptl_base_fini_fn_t ptl_fini;
|
||||
mca_ptl_base_send_fn_t ptl_send;
|
||||
mca_ptl_base_request_alloc_fn_t ptl_request_alloc;
|
||||
|
@ -14,5 +14,6 @@ noinst_LTLIBRARIES = libmca_ptl_tcp.la
|
||||
libmca_ptl_tcp_la_SOURCES = \
|
||||
ptl_tcp.c \
|
||||
ptl_tcp.h \
|
||||
ptl_tcp_init.c \
|
||||
ptl_tcp_module.c \
|
||||
ptl_tcp_send.c
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
#include "lam/mem/malloc.h"
|
||||
#include "lam/util/output.h"
|
||||
#include "lam/util/if.h"
|
||||
#include "mca/mpi/pml/pml.h"
|
||||
#include "mca/mpi/ptl/ptl.h"
|
||||
#include "ptl_tcp.h"
|
||||
@ -18,7 +19,7 @@ mca_ptl_tcp_t mca_ptl_tcp = {
|
||||
0, /* ptl_frag_max_size */
|
||||
0, /* ptl_latency */
|
||||
0, /* ptl_andwidth */
|
||||
mca_ptl_tcp_add_procs,
|
||||
mca_ptl_tcp_add_proc,
|
||||
mca_ptl_tcp_fini,
|
||||
mca_ptl_tcp_send,
|
||||
mca_ptl_tcp_request_alloc
|
||||
@ -38,8 +39,13 @@ int mca_ptl_tcp_create(int if_index)
|
||||
|
||||
/* initialize the ptl */
|
||||
ptl->tcp_ifindex = if_index;
|
||||
lam_ifindextoaddr(if_index, &ptl->tcp_addr, sizeof(ptl->tcp_addr));
|
||||
|
||||
lam_ifindextoaddr(if_index, (struct sockaddr*)&ptl->tcp_addr, sizeof(ptl->tcp_addr));
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int mca_ptl_tcp_add_proc(struct mca_ptl_t* ptl, struct lam_proc_t *procs, struct mca_ptl_addr_t** ptl_addr)
|
||||
{
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -71,10 +71,10 @@ extern int mca_ptl_tcp_fini(
|
||||
struct mca_ptl_t* ptl
|
||||
);
|
||||
|
||||
extern int mca_ptl_tcp_add_procs(
|
||||
extern int mca_ptl_tcp_add_proc(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct lam_proc_t **procs,
|
||||
size_t nprocs
|
||||
struct lam_proc_t *procs,
|
||||
struct mca_ptl_addr_t** addr
|
||||
);
|
||||
|
||||
extern int mca_ptl_tcp_request_alloc(
|
||||
@ -84,6 +84,7 @@ extern int mca_ptl_tcp_request_alloc(
|
||||
|
||||
extern int mca_ptl_tcp_send(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_addr_t* ptl_addr,
|
||||
struct mca_ptl_base_send_request_t*,
|
||||
size_t size,
|
||||
bool* complete
|
||||
|
@ -3,10 +3,13 @@
|
||||
*/
|
||||
#include <errno.h>
|
||||
#include "lam/constants.h"
|
||||
#include "lam/util/if.h"
|
||||
#include "lam/util/argv.h"
|
||||
#include "lam/util/output.h"
|
||||
#include "lam/mem/malloc.h"
|
||||
#include "mca/mpi/pml/pml.h"
|
||||
#include "mca/mpi/ptl/ptl.h"
|
||||
#include "mca/lam/base/param.h"
|
||||
#include "mca/lam/base/module_exchange.h"
|
||||
#include "ptl_tcp.h"
|
||||
|
||||
@ -76,8 +79,10 @@ static inline int mca_ptl_tcp_param_register_int(
|
||||
const char* param_name,
|
||||
int default_value)
|
||||
{
|
||||
int id = mca_base_param_register_string("ptl","tcp",param_name,NULL,default_value);
|
||||
return mca_base_param_lookup_int(id);
|
||||
int id = mca_base_param_register_int("ptl","tcp",param_name,NULL,default_value);
|
||||
int param_value = default_value;
|
||||
mca_base_param_lookup_int(id,¶m_value);
|
||||
return param_value;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -95,11 +100,11 @@ int mca_ptl_tcp_module_open(void)
|
||||
mca_ptl_tcp.super.ptl_exclusive =
|
||||
mca_ptl_tcp_param_register_int("exclusive", 0);
|
||||
mca_ptl_tcp.super.ptl_first_frag_size =
|
||||
mca_ptl_tcp_param_register_uint32("first-frag-size", 16*1024);
|
||||
mca_ptl_tcp_param_register_int("first-frag-size", 16*1024);
|
||||
mca_ptl_tcp.super.ptl_min_frag_size =
|
||||
mca_ptl_tcp_param_register_uint32("min-frag-size", 64*1024);
|
||||
mca_ptl_tcp_param_register_int("min-frag-size", 64*1024);
|
||||
mca_ptl_tcp.super.ptl_max_frag_size =
|
||||
mca_ptl_tcp_param_register_uint32("max-frag-size", -1);
|
||||
mca_ptl_tcp_param_register_int("max-frag-size", -1);
|
||||
return LAM_SUCCESS;
|
||||
}
|
||||
|
||||
@ -153,8 +158,7 @@ static int mca_ptl_tcp_module_create_instances(void)
|
||||
* a PTL for each interface that was not excluded.
|
||||
*/
|
||||
exclude = lam_argv_split(mca_ptl_tcp_module.tcp_if_exclude,'\'');
|
||||
for(if_index = lam_ifbegin(); if_index >= 0; if_index = lam_ifnext()) {
|
||||
int argc;
|
||||
for(if_index = lam_ifbegin(); if_index >= 0; if_index = lam_ifnext(if_index)) {
|
||||
char if_name[32];
|
||||
lam_ifindextoname(if_index, if_name, sizeof(if_name));
|
||||
|
||||
@ -271,20 +275,68 @@ mca_ptl_t** mca_ptl_tcp_module_init(int* num_ptls, int* thread_min, int* thread_
|
||||
return (mca_ptl_t**)mca_ptl_tcp_module.tcp_ptls;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* All TCP progress is handled via an event loop based on select. Events
|
||||
* are dispatched to the appropriate callbacks as file descriptors become
|
||||
* available for read/write.
|
||||
*/
|
||||
|
||||
void mca_ptl_tcp_module_progress(mca_ptl_base_tstamp_t tstamp)
|
||||
{
|
||||
lam_reactor_poll(&mca_ptl_tcp_module.tcp_reactor);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Called by mca_ptl_tcp_module_recv() when the TCP listen
|
||||
* socket has pending connection requests. Accept incoming
|
||||
* requests and queue for completion of the connection handshake.
|
||||
* We wait for the peer to send a 4 byte global process ID(rank)
|
||||
* to complete the connection.
|
||||
*/
|
||||
|
||||
static void mca_ptl_tcp_module_accept(void)
|
||||
{
|
||||
#if defined(__linux__)
|
||||
socklen_t addrlen = sizeof(struct sockaddr_in);
|
||||
#else
|
||||
int addrlen = sizeof(struct sockaddr_in);
|
||||
#endif
|
||||
while(true) {
|
||||
struct sockaddr_in addr;
|
||||
int sd = accept(mca_ptl_tcp_module.tcp_listen, (struct sockaddr*)&addr, &addrlen);
|
||||
if(sd < 0) {
|
||||
if(errno == EINTR)
|
||||
continue;
|
||||
if(errno != EAGAIN || errno != EWOULDBLOCK)
|
||||
lam_output(0, "mca_ptl_tcp_module_accept: accept() failed with errno %d.", errno);
|
||||
return;
|
||||
}
|
||||
|
||||
/* wait for receipt of data to complete the connect */
|
||||
lam_reactor_insert(
|
||||
&mca_ptl_tcp_module.tcp_reactor,
|
||||
sd,
|
||||
&mca_ptl_tcp_module_listener,
|
||||
0,
|
||||
LAM_NOTIFY_RECV|LAM_NOTIFY_EXCEPT);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Called by reactor when registered socket is ready to read.
|
||||
*/
|
||||
static void mca_ptl_tcp_module_recv(int sd, void* user)
|
||||
{
|
||||
mca_ptl_tcp_module_accept();
|
||||
/* accept new connections on the listen socket */
|
||||
if(mca_ptl_tcp_module.tcp_listen == sd) {
|
||||
mca_ptl_tcp_module_accept();
|
||||
return;
|
||||
}
|
||||
lam_reactor_remove(&mca_ptl_tcp_module.tcp_reactor, sd, LAM_NOTIFY_ALL);
|
||||
|
||||
}
|
||||
|
||||
static void mca_ptl_tcp_module_send(int sd, void* user)
|
||||
|
@ -5,6 +5,7 @@
|
||||
|
||||
int mca_ptl_tcp_send(
|
||||
struct mca_ptl_t* ptl,
|
||||
struct mca_ptl_addr_t* ptl_addr,
|
||||
struct mca_ptl_base_send_request_t* sendreq,
|
||||
size_t size,
|
||||
bool* complete)
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user