1
1

- changed PTL ptl_add_proc/ptl_del_proc interface to accept an array of procs for more scalable startup

- resolved several threaded race conditions
- stubs for shared memory ptl

This commit was SVN r1186.
Этот коммит содержится в:
Tim Woodall 2004-06-03 22:13:01 +00:00
родитель bc9ac9b7b1
Коммит 9361dbf767
37 изменённых файлов: 1278 добавлений и 147 удалений

Просмотреть файл

@ -303,7 +303,9 @@ lam_event_loop(int flags)
struct timeval tv;
int res, done;
THREAD_LOCK(&lam_event_lock);
if(lam_using_threads()) {
THREAD_LOCK(&lam_event_lock);
}
/* Calculate the initial events that we are waiting for */
if (lam_evsel->recalc && lam_evsel->recalc(lam_evbase, 0) == -1) {

Просмотреть файл

@ -135,6 +135,14 @@ int mca_pml_teg_control(int param, void* value, size_t size)
int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs)
{
size_t p;
lam_bitmap_t reachable;
int rc;
OBJ_CONSTRUCT(&reachable, lam_bitmap_t);
rc = lam_bitmap_init(&reachable, 1);
if(LAM_SUCCESS != rc)
return rc;
for(p=0; p<nprocs; p++) {
lam_proc_t *proc = procs[p];
double total_bandwidth = 0;
@ -163,14 +171,19 @@ int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs)
/* attempt to add the proc to each ptl */
for(p_index = 0; p_index < mca_pml_teg.teg_num_ptls; p_index++) {
mca_ptl_t* ptl = mca_pml_teg.teg_ptls[p_index];
struct mca_ptl_base_peer_t* ptl_peer;
lam_bitmap_clear_all_bits(&reachable);
/* if the ptl can reach the destination proc it will return
* addressing information that will be cached on the proc, if it
* cannot reach the proc - but another peer
*/
struct mca_ptl_base_peer_t* ptl_peer;
int rc = ptl->ptl_add_proc(ptl, proc, &ptl_peer);
if(rc == LAM_SUCCESS) {
rc = ptl->ptl_add_procs(ptl, 1, &proc, &ptl_peer, &reachable);
if(LAM_SUCCESS != rc)
return rc;
if(lam_bitmap_is_set_bit(&reachable, 0)) {
/* cache the ptl on the proc */
mca_ptl_proc_t* ptl_proc = mca_ptl_array_insert(&proc_pml->proc_ptl_next);
@ -248,6 +261,7 @@ int mca_pml_teg_add_procs(lam_proc_t** procs, size_t nprocs)
int mca_pml_teg_del_procs(lam_proc_t** procs, size_t nprocs)
{
size_t p;
int rc;
for(p = 0; p < nprocs; p++) {
lam_proc_t *proc = procs[p];
mca_pml_proc_t* proc_pml = proc->proc_pml;
@ -260,7 +274,9 @@ int mca_pml_teg_del_procs(lam_proc_t** procs, size_t nprocs)
mca_ptl_proc_t* ptl_proc = mca_ptl_array_get_index(&proc_pml->proc_ptl_first, f_index);
mca_ptl_t* ptl = ptl_proc->ptl;
ptl->ptl_del_proc(ptl,proc,ptl_proc->ptl_peer);
rc = ptl->ptl_del_procs(ptl,1,&proc,&ptl_proc->ptl_peer);
if(LAM_SUCCESS != rc)
return rc;
/* remove this from next array so that we dont call it twice w/
* the same address pointer
@ -280,8 +296,11 @@ int mca_pml_teg_del_procs(lam_proc_t** procs, size_t nprocs)
for(n_index = 0; n_index < n_size; n_index++) {
mca_ptl_proc_t* ptl_proc = mca_ptl_array_get_index(&proc_pml->proc_ptl_first, n_index);
mca_ptl_t* ptl = ptl_proc->ptl;
if (ptl != 0)
ptl->ptl_del_proc(ptl,proc,ptl_proc->ptl_peer);
if (ptl != 0) {
rc = ptl->ptl_del_procs(ptl,1,&proc,&ptl_proc->ptl_peer);
if(LAM_SUCCESS != rc)
return rc;
}
}
/* do any required cleanup */

Просмотреть файл

@ -87,7 +87,7 @@ int mca_pml_teg_send(
mca_pml_base_send_mode_t sendmode,
lam_communicator_t* comm)
{
int rc, index;
int rc;
mca_ptl_base_send_request_t* sendreq;
MCA_PML_TEG_SEND_REQUEST_ALLOC(comm,dst,sendreq,rc);
#if MCA_PML_TEG_STATISTICS

Просмотреть файл

@ -75,7 +75,7 @@ void mca_pml_teg_send_request_progress(
{
bool first_frag;
THREAD_LOCK(&mca_pml_teg.teg_request_lock);
first_frag = (req->req_bytes_sent == 0);
first_frag = (req->req_bytes_sent == 0 && req->req_bytes_packed > 0);
req->req_bytes_sent += frag->super.frag_size;
if (req->req_bytes_sent >= req->req_bytes_packed) {
req->super.req_pml_done = true;
@ -96,7 +96,7 @@ void mca_pml_teg_send_request_progress(
THREAD_UNLOCK(&mca_pml_teg.teg_request_lock);
/* if first fragment - shedule remaining fragments */
if(first_frag == 1) {
if(first_frag == true) {
mca_pml_teg_send_request_schedule(req);
}
}

Просмотреть файл

@ -42,6 +42,7 @@ static inline int mca_pml_teg_send_request_start(
{
mca_ptl_t* ptl = req->req_owner;
size_t first_fragment_size = ptl->ptl_first_frag_size;
size_t offset = req->req_offset;
int flags, rc;
/* start the first fragment */
@ -54,10 +55,9 @@ static inline int mca_pml_teg_send_request_start(
flags = MCA_PTL_FLAGS_ACK_MATCHED;
}
rc = ptl->ptl_put(ptl, req->req_peer, req, 0, &first_fragment_size, flags);
rc = ptl->ptl_put(ptl, req->req_peer, req, offset, first_fragment_size, flags);
if(rc != LAM_SUCCESS)
return rc;
req->req_offset += first_fragment_size;
return LAM_SUCCESS;
}

Просмотреть файл

@ -57,7 +57,6 @@ struct mca_ptl_base_match_header_t {
int32_t hdr_tag; /**< user tag */
uint32_t hdr_msg_length; /**< message length */
mca_ptl_sequence_t hdr_msg_seq; /**< message sequence number */
lam_ptr_t hdr_src_ptr;
};
typedef struct mca_ptl_base_match_header_t mca_ptl_base_match_header_t;

Просмотреть файл

@ -255,12 +255,14 @@ typedef int (*mca_ptl_base_finalize_fn_t)(
/**
* PML->PTL notification of change in the process list.
*
* @param ptl (IN) PTL instance
* @param proc (IN) Peer process
* @param peer (OUT) Peer addressing information.
* @return Status indicates wether PTL is reachable.
* @param ptl (IN) PTL instance
* @param nprocs (IN) Number of processes
* @param procs (IN) Set of processes
* @param peer (OUT) Set of (optional) mca_ptl_base_peer_t instances returned by PTL.
* @param reachable (IN/OUT) Bitmask indicating set of peer processes that are reachable by this PTL.
* @return LAM_SUCCESS or error status on failure.
*
* The mca_ptl_base_add_proc_fn_t() is called by the PML to determine
* The mca_ptl_base_add_procs_fn_t() is called by the PML to determine
* the set of PTLs that should be used to reach the specified process.
* A return value of LAM_SUCCESS indicates the PTL should be added to the
* set used to reach the proc. The peers addressing information may be
@ -268,28 +270,32 @@ typedef int (*mca_ptl_base_finalize_fn_t)(
* The PTL may optionally return a pointer to a mca_ptl_base_peer_t data
* structure, to cache peer addressing or connection information.
*/
typedef int (*mca_ptl_base_add_proc_fn_t)(
typedef int (*mca_ptl_base_add_procs_fn_t)(
struct mca_ptl_t* ptl,
struct lam_proc_t* proc,
struct mca_ptl_base_peer_t** peer
size_t nprocs,
struct lam_proc_t** procs,
struct mca_ptl_base_peer_t** peer,
lam_bitmap_t* reachable
);
/**
* PML->PTL notification of change in the process list.
*
* @param ptl (IN) PTL instance
* @param proc (IN) Peer process
* @param peer (IN) Peer addressing information.
* @param nprocs (IN) Number of processes
* @param proc (IN) Set of processes
* @param peer (IN) Set of peer addressing information.
* @return Status indicating if cleanup was successful
*
* If the process list shrinks, the PML will notify the PTL of the
* change. Peer addressing information cached by the PML is provided
* for cleanup by the PTL.
*/
typedef int (*mca_ptl_base_del_proc_fn_t)(
typedef int (*mca_ptl_base_del_procs_fn_t)(
struct mca_ptl_t* ptl,
struct lam_proc_t* proc,
struct mca_ptl_base_peer_t*
size_t nprocs,
struct lam_proc_t** procs,
struct mca_ptl_base_peer_t**
);
/**
@ -350,7 +356,7 @@ typedef int (*mca_ptl_base_put_fn_t)(
struct mca_ptl_base_peer_t* ptl_base_peer,
struct mca_ptl_base_send_request_t* request,
size_t offset,
size_t *size,
size_t size,
int flags
);
@ -376,7 +382,7 @@ typedef int (*mca_ptl_base_get_fn_t)(
struct mca_ptl_base_peer_t* ptl_base_peer,
struct mca_ptl_base_recv_request_t* request,
size_t offset,
size_t *size,
size_t size,
int flags
);
@ -458,8 +464,8 @@ struct mca_ptl_t {
uint32_t ptl_flags; /**< flags (put/get...) */
/* PML->PTL function table */
mca_ptl_base_add_proc_fn_t ptl_add_proc;
mca_ptl_base_del_proc_fn_t ptl_del_proc;
mca_ptl_base_add_procs_fn_t ptl_add_procs;
mca_ptl_base_del_procs_fn_t ptl_del_procs;
mca_ptl_base_finalize_fn_t ptl_finalize;
mca_ptl_base_put_fn_t ptl_put;
mca_ptl_base_get_fn_t ptl_get;

0
src/mca/ptl/sm/.lam_ignore Обычный файл
Просмотреть файл

36
src/mca/ptl/sm/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,36 @@
#
# $HEADER$
#
# Use the top-level LAM Makefile.options
include $(top_lam_srcdir)/config/Makefile.options
SUBDIRS = src
EXTRA_DIST = VERSION
# According to the MCA spec, we have to make the output library here
# in the top-level directory, and it has to be named
# liblam_ssi_coll_lam_basic.la
if LAM_BUILD_ptl_sm_LOADABLE_MODULE
module_noinst =
module_install = mca_ptl_sm.la
else
module_noinst = libmca_ptl_sm.la
module_install =
endif
mcamoduledir = $(libdir)/lam
mcamodule_LTLIBRARIES = $(module_install)
mca_ptl_sm_la_SOURCES =
mca_ptl_sm_la_LIBADD = \
src/libmca_ptl_sm.la \
$(LIBLAM_LA)
mca_ptl_sm_la_LDFLAGS = -module -avoid-version
noinst_LTLIBRARIES = $(module_noinst)
libmca_ptl_sm_la_SOURCES =
libmca_ptl_sm_la_LIBADD = src/libmca_ptl_sm.la
libmca_ptl_sm_la_LDFLAGS = -module -avoid-version

10
src/mca/ptl/sm/configure.params Обычный файл
Просмотреть файл

@ -0,0 +1,10 @@
# -*- shell-script -*-
#
# $HEADER$
#
# Specific to this module
PARAM_INIT_FILE=src/ptl_sm.c
PARAM_CONFIG_HEADER_FILE="src/sm_config.h"
PARAM_CONFIG_FILES="Makefile src/Makefile"

22
src/mca/ptl/sm/src/Makefile.am Обычный файл
Просмотреть файл

@ -0,0 +1,22 @@
# -*- makefile -*-
#
# $HEADER$
#
include $(top_lam_srcdir)/config/Makefile.options
AM_CPPFLAGS = \
-I$(top_lam_builddir)/src/include \
-I$(top_lam_srcdir)/src \
-I$(top_lam_srcdir)/src/include
noinst_LTLIBRARIES = libmca_ptl_sm.la
libmca_ptl_sm_la_SOURCES = \
ptl_sm.c \
ptl_sm.h \
ptl_sm_mmap.c \
ptl_sm_mmap.h \
ptl_sm_module.c \
ptl_sm_sendreq.c \
ptl_sm_sendfrag.c \
ptl_sm_recvfrag.c

109
src/mca/ptl/sm/src/ptl_sm.c Обычный файл
Просмотреть файл

@ -0,0 +1,109 @@
/*
* $HEADER$
*/
#include <string.h>
#include "util/output.h"
#include "util/if.h"
#include "mca/pml/pml.h"
#include "mca/ptl/ptl.h"
#include "mca/ptl/base/ptl_base_header.h"
#include "mca/ptl/base/ptl_base_sendreq.h"
#include "mca/ptl/base/ptl_base_sendfrag.h"
#include "mca/ptl/base/ptl_base_recvreq.h"
#include "mca/ptl/base/ptl_base_recvfrag.h"
#include "mca/base/mca_base_module_exchange.h"
#include "ptl_sm.h"
mca_ptl_sm_t mca_ptl_sm = {
{
&mca_ptl_sm_module.super,
0, /* ptl_exclusivity */
0, /* ptl_latency */
0, /* ptl_andwidth */
0, /* ptl_frag_first_size */
0, /* ptl_frag_min_size */
0, /* ptl_frag_max_size */
MCA_PTL_PUT, /* ptl flags */
mca_ptl_sm_add_procs,
mca_ptl_sm_del_procs,
mca_ptl_sm_finalize,
mca_ptl_sm_send,
NULL,
mca_ptl_sm_matched,
mca_ptl_sm_request_alloc,
mca_ptl_sm_request_return
}
};
int mca_ptl_sm_add_procs(
struct mca_ptl_t* ptl,
size_t nprocs,
struct lam_proc_t **procs,
struct mca_ptl_base_peer_t **peers,
lam_bitmap_t* reachability)
{
return LAM_SUCCESS;
}
int mca_ptl_sm_del_procs(
struct mca_ptl_t* ptl,
size_t nprocs,
struct lam_proc_t **procs,
struct mca_ptl_base_peer_t **peers)
{
return LAM_SUCCESS;
}
int mca_ptl_sm_finalize(struct mca_ptl_t* ptl)
{
return LAM_SUCCESS;
}
int mca_ptl_sm_request_alloc(struct mca_ptl_t* ptl, struct mca_ptl_base_send_request_t** request)
{
return LAM_SUCCESS;
}
void mca_ptl_sm_request_return(struct mca_ptl_t* ptl, struct mca_ptl_base_send_request_t* request)
{
}
/*
* Initiate a send. If this is the first fragment, use the fragment
* descriptor allocated with the send requests, otherwise obtain
* one from the free list. Initialize the fragment and foward
* on to the peer.
*/
int mca_ptl_sm_send(
struct mca_ptl_t* ptl,
struct mca_ptl_base_peer_t* ptl_peer,
struct mca_ptl_base_send_request_t* sendreq,
size_t offset,
size_t *size,
int flags)
{
return LAM_SUCCESS;
}
/*
* A posted receive has been matched - if required send an
* ack back to the peer and process the fragment.
*/
void mca_ptl_sm_matched(
mca_ptl_t* ptl,
mca_ptl_base_recv_frag_t* frag)
{
}

198
src/mca/ptl/sm/src/ptl_sm.h Обычный файл
Просмотреть файл

@ -0,0 +1,198 @@
/*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_PTL_SM_H
#define MCA_PTL_SM_H
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "mem/free_list.h"
#include "mca/pml/pml.h"
#include "mca/ptl/ptl.h"
#include "ptl_sm_mmap.h"
/**
* Shared Memory (SM) PTL module.
*/
struct mca_ptl_sm_module_1_0_0_t {
mca_ptl_base_module_1_0_0_t super; /**< base PTL module */
int sm_min_alloc; /**< min size of shared memory allocation */
int sm_max_alloc; /**< max size of shared memory allocation */
int sm_free_list_num; /**< initial size of free lists */
int sm_free_list_max; /**< maximum size of free lists */
int sm_free_list_inc; /**< number of elements to alloc when growing free lists */
void* sm_base_addr; /**< base address of mmaped region */
lam_free_list_t sm_send_requests; /**< free list of sm send requests -- sendreq + sendfrag */
lam_free_list_t sm_send_frags; /**< free list of sm send fragments */
lam_free_list_t sm_recv_frags; /**< free list of sm recv fragments */
lam_allocator_t sm_allocator; /**< shared memory allocator */
char sm_mmap_file[PATH_MAX]; /**< full path to backing file */
mca_ptl_sm_mmap_t *sm_mmap;
lam_mutex_t sm_lock;
};
typedef struct mca_ptl_sm_module_1_0_0_t mca_ptl_sm_module_1_0_0_t;
typedef struct mca_ptl_sm_module_1_0_0_t mca_ptl_sm_module_t;
extern mca_ptl_sm_module_1_0_0_t mca_ptl_sm_module;
/**
* Register shared memory module parameters with the MCA framework
*/
extern int mca_ptl_sm_module_open(void);
/**
* Any final cleanup before being unloaded.
*/
extern int mca_ptl_sm_module_close(void);
/**
* SM module initialization.
*
* @param num_ptls (OUT) Number of PTLs returned in PTL array.
* @param allow_multi_user_threads (OUT) Flag indicating wether PTL supports user threads (TRUE)
* @param have_hidden_threads (OUT) Flag indicating wether PTL uses threads (TRUE)
*
*/
extern mca_ptl_t** mca_ptl_sm_module_init(
int *num_ptls,
bool *allow_multi_user_threads,
bool *have_hidden_threads
);
/**
* shared memory module control.
*/
extern int mca_ptl_sm_module_control(
int param,
void* value,
size_t size
);
/**
* shared memory module progress.
*/
extern int mca_ptl_sm_module_progress(
mca_ptl_tstamp_t tstamp
);
/**
* SM PTL Interface
*/
struct mca_ptl_sm_t {
mca_ptl_t super; /**< base PTL interface */
};
typedef struct mca_ptl_sm_t mca_ptl_sm_t;
extern mca_ptl_sm_t mca_ptl_sm;
/**
* Cleanup any resources held by the PTL.
*
* @param ptl PTL instance.
* @return LAM_SUCCESS or error status on failure.
*/
extern int mca_ptl_sm_finalize(
struct mca_ptl_t* ptl
);
/**
* PML->PTL notification of change in the process list.
*
* @param ptl (IN)
* @param proc (IN)
* @param peer (OUT)
* @return LAM_SUCCESS or error status on failure.
*
*/
extern int mca_ptl_sm_add_procs(
struct mca_ptl_t* ptl,
size_t nprocs,
struct lam_proc_t **procs,
struct mca_ptl_base_peer_t** peers,
lam_bitmap_t* reachability
);
/**
* PML->PTL notification of change in the process list.
*
* @param ptl (IN) PTL instance
* @param proc (IN) Peer process
* @param peer (IN) Peer addressing information.
* @return Status indicating if cleanup was successful
*
*/
extern int mca_ptl_sm_del_procs(
struct mca_ptl_t* ptl,
size_t nprocs,
struct lam_proc_t **procs,
struct mca_ptl_base_peer_t **peers
);
/**
* PML->PTL Allocate a send request from the PTL modules free list.
*
* @param ptl (IN) PTL instance
* @param request (OUT) Pointer to allocated request.
* @return Status indicating if allocation was successful.
*
*/
extern int mca_ptl_sm_request_alloc(
struct mca_ptl_t* ptl,
struct mca_ptl_base_send_request_t**
);
/**
* PML->PTL Return a send request to the PTL modules free list.
*
* @param ptl (IN) PTL instance
* @param request (IN) Pointer to allocated request.
*
*/
extern void mca_ptl_sm_request_return(
struct mca_ptl_t* ptl,
struct mca_ptl_base_send_request_t*
);
/**
* PML->PTL Notification that a receive fragment has been matched.
*
* @param ptl (IN) PTL instance
* @param recv_frag (IN) Receive fragment
*
*/
extern void mca_ptl_sm_matched(
struct mca_ptl_t* ptl,
struct mca_ptl_base_recv_frag_t* frag
);
/**
* PML->PTL Initiate a send of the specified size.
*
* @param ptl (IN) PTL instance
* @param ptl_base_peer (IN) PTL peer addressing
* @param send_request (IN/OUT) Send request (allocated by PML via mca_ptl_base_request_alloc_fn_t)
* @param size (IN) Number of bytes PML is requesting PTL to deliver
* @param flags (IN) Flags that should be passed to the peer via the message header.
* @param request (OUT) LAM_SUCCESS if the PTL was able to queue one or more fragments
*/
extern int mca_ptl_sm_send(
struct mca_ptl_t* ptl,
struct mca_ptl_base_peer_t* ptl_peer,
struct mca_ptl_base_send_request_t*,
size_t offset,
size_t *size,
int flags
);
#endif

11
src/mca/ptl/sm/src/ptl_sm_ctl.h Обычный файл
Просмотреть файл

@ -0,0 +1,11 @@
#ifndef _MCA_PTL_SM_CTL_H_
#define _MCA_PTL_SM_CTL_H_
struct mca_ptl_sm_ctrl {
size_t fifo_offset;
};
#endif

135
src/mca/ptl/sm/src/ptl_sm_mmap.c Обычный файл
Просмотреть файл

@ -0,0 +1,135 @@
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/errno.h>
#include <sys/mman.h>
#include "constants.h"
#include "mca/pcm/pcm.h"
#include "ptl_sm.h"
#include "ptl_sm_mmap.h"
OBJ_CLASS_INSTANCE(
mca_ptl_sm_mmap_t,
lam_object_t,
NULL,
NULL
);
static mca_ptl_sm_mmap_t* mca_ptl_sm_mmap_open(size_t size)
{
mca_ptl_sm_segment_t* seg;
mca_ptl_sm_mmap_t* map;
int fd = -1;
while(fd < 0) {
struct timespec ts;
fd = shm_open(mca_ptl_sm_module.sm_mmap_file, O_CREAT|O_RDWR, 0000);
if(fd < 0 && errno != EACCES) {
lam_output(0, "mca_ptl_sm_mmap_open: open failed with errno=%d\n", errno);
return NULL;
}
ts.tv_sec = 0;
ts.tv_nsec = 500000;
nanosleep(&ts,NULL);
}
/* map the file and initialize segment state */
seg = mmap(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
if(NULL == seg) {
lam_output(0, "mca_ptl_sm_module_mmap: mmap failed with errno=%d\n", errno);
return NULL;
}
close(fd);
fprintf(stderr, "mapped at %08x", (unsigned int)seg);
map = OBJ_NEW(mca_ptl_sm_mmap_t);
map->sm_segment = seg;
map->sm_addr = (unsigned char*)(seg + 1);
map->sm_size = seg->seg_size;
return map;
}
mca_ptl_sm_mmap_t* mca_ptl_sm_mmap_init(size_t size)
{
static int segnum = 0;
lam_job_handle_t job_handle = mca_pcm.pcm_handle_get();
char hostname[64];
int fd;
mca_ptl_sm_segment_t* seg;
mca_ptl_sm_mmap_t* map;
gethostname(hostname, sizeof(hostname));
sprintf(mca_ptl_sm_module.sm_mmap_file, "/%s.%s.%d", hostname, job_handle, segnum++);
fd = shm_open(mca_ptl_sm_module.sm_mmap_file, O_CREAT|O_RDWR, 0000);
if(fd < 0) {
if(errno == EACCES)
return mca_ptl_sm_mmap_open(size);
lam_output(0, "mca_ptl_sm_module_mmap: open failed with errno=%d\n", errno);
return NULL;
}
/* truncate the file to the requested size */
if(ftruncate(fd, size) != 0) {
lam_output(0, "mca_ptl_sm_module_mmap: ftruncate failed with errno=%d\n", errno);
return NULL;
}
/* map the file and initialize segment state */
seg = mmap(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
if(NULL == seg) {
lam_output(0, "mca_ptl_sm_module_mmap: mmap failed with errno=%d\n", errno);
return NULL;
}
fprintf(stderr, "mapped at %08x", (unsigned int)seg);
spinunlock(&seg->seg_lock);
seg->seg_offset = 0;
seg->seg_size = size;
map = OBJ_NEW(mca_ptl_sm_mmap_t);
map->sm_segment = seg;
map->sm_addr = (unsigned char*)(seg + 1);
map->sm_size = size;
/* enable access by other processes on this host */
if(fchmod(fd, 0600) != 0) {
lam_output(0, "mca_ptl_sm_module_mmap: fchmod failed with errno=%d\n", errno);
OBJ_RELEASE(map);
close(fd);
return NULL;
}
close(fd);
return map;
}
void* mca_ptl_sm_mmap_alloc(lam_allocator_t* allocator, size_t size)
{
mca_ptl_sm_mmap_t* map = mca_ptl_sm_module.sm_mmap;
mca_ptl_sm_segment_t* seg = map->sm_segment;
void* addr;
spinlock(&seg->seg_lock);
addr = map->sm_addr + seg->seg_offset;
seg->seg_offset += size;
spinunlock(&seg->seg_lock);
return addr;
}
void mca_ptl_sm_mmap_free(lam_allocator_t* allocator, void* ptr)
{
/* empty for now */
}

35
src/mca/ptl/sm/src/ptl_sm_mmap.h Обычный файл
Просмотреть файл

@ -0,0 +1,35 @@
#ifndef _PTL_SM_MMAP_H_
#define _PTL_SM_MMAP_H_
#include "lfc/lam_object.h"
#include "mem/allocator.h"
#include "os/atomic.h"
struct mca_ptl_sm_segment {
lam_lock_data_t seg_lock;
size_t seg_offset;
size_t seg_size;
};
typedef struct mca_ptl_sm_segment mca_ptl_sm_segment_t;
struct mca_ptl_sm_mmap {
lam_object_t sm_base;
mca_ptl_sm_segment_t* sm_segment;
unsigned char* sm_addr;
size_t sm_size;
};
typedef struct mca_ptl_sm_mmap mca_ptl_sm_mmap_t;
OBJ_CLASS_DECLARATION(mca_ptl_sm_mmap_t);
mca_ptl_sm_mmap_t* mca_ptl_sm_mmap_init(size_t size);
void* mca_ptl_sm_mmap_alloc(lam_allocator_t*, size_t size);
void mca_ptl_sm_mmap_free(lam_allocator_t*, void* alloc);
#endif

232
src/mca/ptl/sm/src/ptl_sm_module.c Обычный файл
Просмотреть файл

@ -0,0 +1,232 @@
/*
* $HEADER$
*/
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/mman.h>
#include "constants.h"
#include "event/event.h"
#include "util/if.h"
#include "util/argv.h"
#include "util/output.h"
#include "mca/pml/pml.h"
#include "mca/ptl/ptl.h"
#include "mca/ptl/base/ptl_base_sendreq.h"
#include "mca/base/mca_base_param.h"
#include "mca/base/mca_base_module_exchange.h"
#include "ptl_sm.h"
#include "ptl_sm_sendreq.h"
#include "ptl_sm_sendfrag.h"
#include "ptl_sm_recvfrag.h"
/*
* Local utility functions.
*/
static int mca_ptl_sm_module_exchange(void);
/*
* Shared Memory (SM) module instance.
*/
mca_ptl_sm_module_1_0_0_t mca_ptl_sm_module = {
{
/* First, the mca_base_module_t struct containing meta information
about the module itself */
{
/* Indicate that we are a pml v1.0.0 module (which also implies a
specific MCA version) */
MCA_PTL_BASE_VERSION_1_0_0,
"sm", /* MCA module name */
1, /* MCA module major version */
0, /* MCA module minor version */
0, /* MCA module release version */
mca_ptl_sm_module_open, /* module open */
mca_ptl_sm_module_close /* module close */
},
/* Next the MCA v1.0.0 module meta data */
{
/* Whether the module is checkpointable or not */
false
},
mca_ptl_sm_module_init,
mca_ptl_sm_module_control,
mca_ptl_sm_module_progress,
}
};
/*
* utility routines for parameter registration
*/
static inline char* mca_ptl_sm_param_register_string(
const char* param_name,
const char* default_value)
{
char *param_value;
int id = mca_base_param_register_string("ptl","sm",param_name,NULL,default_value);
mca_base_param_lookup_string(id, &param_value);
return param_value;
}
static inline int mca_ptl_sm_param_register_int(
const char* param_name,
int default_value)
{
int id = mca_base_param_register_int("ptl","sm",param_name,NULL,default_value);
int param_value = default_value;
mca_base_param_lookup_int(id,&param_value);
return param_value;
}
/*
* Called by MCA framework to open the module, registers
* module parameters.
*/
int mca_ptl_sm_module_open(void)
{
/* register SM module parameters */
mca_ptl_sm_module.sm_min_alloc =
mca_ptl_sm_param_register_int("min_alloc", 64*1024*1024);
mca_ptl_sm_module.sm_max_alloc =
mca_ptl_sm_param_register_int("max_alloc", 512*1024*1024);
mca_ptl_sm_module.sm_base_addr = (void*)
mca_ptl_sm_param_register_int("base_addr", 0x7fffffff);
mca_ptl_sm_module.sm_free_list_num =
mca_ptl_sm_param_register_int("free_list_num", 256);
mca_ptl_sm_module.sm_free_list_max =
mca_ptl_sm_param_register_int("free_list_max", -1);
mca_ptl_sm_module.sm_free_list_inc =
mca_ptl_sm_param_register_int("free_list_inc", 256);
/* initialize objects */
OBJ_CONSTRUCT(&mca_ptl_sm_module.sm_lock, lam_mutex_t);
OBJ_CONSTRUCT(&mca_ptl_sm_module.sm_allocator, lam_allocator_t);
OBJ_CONSTRUCT(&mca_ptl_sm_module.sm_send_requests, lam_free_list_t);
OBJ_CONSTRUCT(&mca_ptl_sm_module.sm_send_frags, lam_free_list_t);
OBJ_CONSTRUCT(&mca_ptl_sm_module.sm_recv_frags, lam_free_list_t);
mca_ptl_sm_module.sm_allocator.alc_alloc_fn = mca_ptl_sm_mmap_alloc;
mca_ptl_sm_module.sm_allocator.alc_free_fn = mca_ptl_sm_mmap_free;
/* initialize state */
mca_ptl_sm_module.sm_mmap = NULL;
return LAM_SUCCESS;
}
/*
* module cleanup - sanity checking of queue lengths
*/
int mca_ptl_sm_module_close(void)
{
OBJ_DESTRUCT(&mca_ptl_sm_module.sm_lock);
OBJ_DESTRUCT(&mca_ptl_sm_module.sm_send_requests);
OBJ_DESTRUCT(&mca_ptl_sm_module.sm_send_frags);
OBJ_DESTRUCT(&mca_ptl_sm_module.sm_recv_frags);
return LAM_SUCCESS;
}
/*
* SM module initialization
*/
mca_ptl_t** mca_ptl_sm_module_init(
int *num_ptls,
bool *allow_multi_user_threads,
bool *have_hidden_threads)
{
mca_ptl_t **ptls = NULL;
*num_ptls = 0;
*allow_multi_user_threads = true;
*have_hidden_threads = LAM_HAVE_THREADS;
/* allocate a block of shared memory */
mca_ptl_sm_module.sm_mmap = mca_ptl_sm_mmap_init(mca_ptl_sm_module.sm_min_alloc);
if(NULL == mca_ptl_sm_module.sm_mmap)
return NULL;
/* initialize free lists */
lam_free_list_init(&mca_ptl_sm_module.sm_send_requests,
sizeof(mca_ptl_sm_send_request_t),
OBJ_CLASS(mca_ptl_sm_send_request_t),
mca_ptl_sm_module.sm_free_list_num,
mca_ptl_sm_module.sm_free_list_max,
mca_ptl_sm_module.sm_free_list_inc,
&mca_ptl_sm_module.sm_allocator); /* use shared-memory allocator */
lam_free_list_init(&mca_ptl_sm_module.sm_recv_frags,
sizeof(mca_ptl_sm_recv_frag_t),
OBJ_CLASS(mca_ptl_sm_recv_frag_t),
mca_ptl_sm_module.sm_free_list_num,
mca_ptl_sm_module.sm_free_list_max,
mca_ptl_sm_module.sm_free_list_inc,
&mca_ptl_sm_module.sm_allocator); /* use default allocator */
/* publish shared memory parameters with the MCA framework */
if(mca_ptl_sm_module_exchange() != LAM_SUCCESS)
return 0;
ptls = malloc(sizeof(mca_ptl_t*));
if(NULL == ptls)
return NULL;
*ptls = &mca_ptl_sm.super;
*num_ptls = 1;
return ptls;
}
/*
* SM module control
*/
int mca_ptl_sm_module_control(int param, void* value, size_t size)
{
switch(param) {
case MCA_PTL_ENABLE:
break;
default:
break;
}
return LAM_SUCCESS;
}
/*
* SM module progress.
*/
int mca_ptl_sm_module_progress(mca_ptl_tstamp_t tstamp)
{
return LAM_SUCCESS;
}
/*
*
*/
static int mca_ptl_sm_module_exchange()
{
return LAM_SUCCESS;
}

39
src/mca/ptl/sm/src/ptl_sm_recvfrag.c Обычный файл
Просмотреть файл

@ -0,0 +1,39 @@
/*
* $HEADER$
*/
#include <unistd.h>
#include <sys/types.h>
#include <sys/errno.h>
#include "mca/ptl/base/ptl_base_sendreq.h"
#include "ptl_sm.h"
#include "ptl_sm_recvfrag.h"
static void mca_ptl_sm_recv_frag_construct(mca_ptl_sm_recv_frag_t* frag);
static void mca_ptl_sm_recv_frag_destruct(mca_ptl_sm_recv_frag_t* frag);
OBJ_CLASS_INSTANCE(
mca_ptl_sm_recv_frag_t,
mca_ptl_base_recv_frag_t,
mca_ptl_sm_recv_frag_construct,
mca_ptl_sm_recv_frag_destruct
);
/*
* shared memory recv fragment constructor
*/
static void mca_ptl_sm_recv_frag_construct(mca_ptl_sm_recv_frag_t* frag)
{
}
/*
* shared memory recv fragment destructor
*/
static void mca_ptl_sm_recv_frag_destruct(mca_ptl_sm_recv_frag_t* frag)
{
}

31
src/mca/ptl/sm/src/ptl_sm_recvfrag.h Обычный файл
Просмотреть файл

@ -0,0 +1,31 @@
/*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_PTL_SM_RECV_FRAG_H
#define MCA_PTL_SM_RECV_FRAG_H
#include <string.h>
#include <sys/types.h>
#include "os/atomic.h"
#include "mca/ptl/ptl.h"
#include "mca/ptl/base/ptl_base_recvfrag.h"
#include "ptl_sm.h"
OBJ_CLASS_DECLARATION(mca_ptl_sm_recv_frag_t);
/**
* shared memory received fragment derived type.
*/
struct mca_ptl_sm_recv_frag_t {
mca_ptl_base_recv_frag_t super; /**< base receive fragment descriptor */
};
typedef struct mca_ptl_sm_recv_frag_t mca_ptl_sm_recv_frag_t;
#endif

37
src/mca/ptl/sm/src/ptl_sm_sendfrag.c Обычный файл
Просмотреть файл

@ -0,0 +1,37 @@
/*
* $HEADER$
*/
#include <unistd.h>
#include <sys/types.h>
#include <sys/errno.h>
#include "types.h"
#include "datatype/datatype.h"
#include "mca/ptl/base/ptl_base_sendreq.h"
#include "ptl_sm.h"
#include "ptl_sm_sendfrag.h"
static void mca_ptl_sm_send_frag_construct(mca_ptl_sm_send_frag_t* frag);
static void mca_ptl_sm_send_frag_destruct(mca_ptl_sm_send_frag_t* frag);
OBJ_CLASS_INSTANCE(
mca_ptl_sm_send_frag_t,
mca_ptl_base_send_frag_t,
mca_ptl_sm_send_frag_construct,
mca_ptl_sm_send_frag_destruct);
/*
* Placeholders for send fragment constructor/destructors.
*/
static void mca_ptl_sm_send_frag_construct(mca_ptl_sm_send_frag_t* frag)
{
}
static void mca_ptl_sm_send_frag_destruct(mca_ptl_sm_send_frag_t* frag)
{
}

54
src/mca/ptl/sm/src/ptl_sm_sendfrag.h Обычный файл
Просмотреть файл

@ -0,0 +1,54 @@
/*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_PTL_SM_SEND_FRAG_H
#define MCA_PTL_SM_SEND_FRAG_H
#include <sys/types.h>
#include "lam_config.h"
#include "mca/ptl/base/ptl_base_sendreq.h"
#include "mca/ptl/base/ptl_base_sendfrag.h"
#include "ptl_sm.h"
OBJ_CLASS_DECLARATION(mca_ptl_sm_send_frag_t);
/**
* shared memory send fragment derived type.
*/
struct mca_ptl_sm_send_frag_t {
mca_ptl_base_send_frag_t super; /**< base send fragment descriptor */
};
typedef struct mca_ptl_sm_send_frag_t mca_ptl_sm_send_frag_t;
#define MCA_PTL_SM_SEND_FRAG_ALLOC(item, rc) \
LAM_FREE_LIST_GET(&mca_ptl_sm_module.sm_send_frags, item, rc);
/**
* Initialize a fragment descriptor.
*
* frag (IN) Fragment
* peer (IN) PTL peer addressing information
* request (IN) Send request
* offset (IN) Current offset into packed buffer
* size (IN/OUT) Requested size / actual size returned
* flags (IN)
*/
int mca_ptl_sm_send_frag_init(
mca_ptl_sm_send_frag_t*,
struct mca_ptl_base_peer_t*,
struct mca_ptl_base_send_request_t*,
size_t offset,
size_t* size,
int flags);
#endif

35
src/mca/ptl/sm/src/ptl_sm_sendreq.c Обычный файл
Просмотреть файл

@ -0,0 +1,35 @@
/*
* $HEADER$
*/
#include <unistd.h>
#include <sys/types.h>
#include <sys/errno.h>
#include "types.h"
#include "mca/ptl/base/ptl_base_sendreq.h"
#include "ptl_sm.h"
#include "ptl_sm_sendreq.h"
static void mca_ptl_sm_send_request_construct(mca_ptl_sm_send_request_t*);
static void mca_ptl_sm_send_request_destruct(mca_ptl_sm_send_request_t*);
OBJ_CLASS_INSTANCE(
mca_ptl_sm_send_request_t,
mca_ptl_base_send_request_t,
mca_ptl_sm_send_request_construct,
mca_ptl_sm_send_request_destruct
);
void mca_ptl_sm_send_request_construct(mca_ptl_sm_send_request_t* request)
{
OBJ_CONSTRUCT(&request->req_frag, mca_ptl_sm_send_frag_t);
}
void mca_ptl_sm_send_request_destruct(mca_ptl_sm_send_request_t* request)
{
OBJ_DESTRUCT(&request->req_frag);
}

33
src/mca/ptl/sm/src/ptl_sm_sendreq.h Обычный файл
Просмотреть файл

@ -0,0 +1,33 @@
/*
* $HEADER$
*/
/**
* @file
*/
#ifndef MCA_PTL_SM_SEND_REQUEST_H
#define MCA_PTL_SM_SEND_REQUEST_H
#include <sys/types.h>
#include "lam_config.h"
#include "mca/ptl/base/ptl_base_sendreq.h"
#include "ptl_sm_sendfrag.h"
OBJ_CLASS_DECLARATION(mca_ptl_sm_send_request_t);
/**
* Shared Memory (SM) send request derived type. The send request contains both the
* base send request, and space for the first send fragment descriptor.
* This avoids the overhead of a second allocation for the initial send
* fragment on every send request.
*/
struct mca_ptl_sm_send_request_t {
mca_ptl_base_send_request_t super;
mca_ptl_sm_send_frag_t req_frag; /* first fragment */
};
typedef struct mca_ptl_sm_send_request_t mca_ptl_sm_send_request_t;
#endif

Просмотреть файл

@ -7,7 +7,6 @@ include $(top_lam_srcdir)/config/Makefile.options
AM_CPPFLAGS = \
-I$(top_lam_builddir)/src/include \
-I$(top_lam_builddir)/src/lam/event \
-I$(top_lam_srcdir)/src \
-I$(top_lam_srcdir)/src/include

Просмотреть файл

@ -31,8 +31,8 @@ mca_ptl_tcp_t mca_ptl_tcp = {
0, /* ptl_frag_min_size */
0, /* ptl_frag_max_size */
MCA_PTL_PUT, /* ptl flags */
mca_ptl_tcp_add_proc,
mca_ptl_tcp_del_proc,
mca_ptl_tcp_add_procs,
mca_ptl_tcp_del_procs,
mca_ptl_tcp_finalize,
mca_ptl_tcp_send,
NULL,
@ -43,50 +43,62 @@ mca_ptl_tcp_t mca_ptl_tcp = {
};
int mca_ptl_tcp_add_proc(struct mca_ptl_t* ptl, struct lam_proc_t *lam_proc, struct mca_ptl_base_peer_t** peer_ret)
int mca_ptl_tcp_add_procs(
struct mca_ptl_t* ptl,
size_t nprocs,
struct lam_proc_t **lam_procs,
struct mca_ptl_base_peer_t** peers,
lam_bitmap_t* reachable)
{
mca_ptl_tcp_proc_t* ptl_proc = mca_ptl_tcp_proc_create(lam_proc);
mca_ptl_base_peer_t* ptl_peer;
int rc;
size_t i;
for(i=0; i<nprocs; i++) {
struct lam_proc_t *lam_proc = lam_procs[i];
mca_ptl_tcp_proc_t* ptl_proc = mca_ptl_tcp_proc_create(lam_proc);
mca_ptl_base_peer_t* ptl_peer;
int rc;
if(NULL == ptl_proc)
return LAM_ERR_OUT_OF_RESOURCE;
if(NULL == ptl_proc)
return LAM_ERR_OUT_OF_RESOURCE;
/*
* Check to make sure that the peer has at least as many interface addresses
* exported as we are trying to use. If not, then don't bind this PTL instance
* to the proc.
*/
THREAD_LOCK(&ptl_proc->proc_lock);
if(ptl_proc->proc_addr_count == ptl_proc->proc_peer_count) {
THREAD_UNLOCK(&ptl_proc->proc_lock);
return LAM_ERR_UNREACH;
}
/*
* Check to make sure that the peer has at least as many interface addresses
* exported as we are trying to use. If not, then don't bind this PTL instance
* to the proc.
*/
THREAD_LOCK(&ptl_proc->proc_lock);
if(ptl_proc->proc_addr_count == ptl_proc->proc_peer_count) {
THREAD_UNLOCK(&ptl_proc->proc_lock);
return LAM_ERR_UNREACH;
}
/* The ptl_proc datastructure is shared by all TCP PTL instances that are trying
* to reach this destination. Cache the peer instance on the ptl_proc.
*/
ptl_peer = OBJ_NEW(mca_ptl_tcp_peer_t);
if(NULL == ptl_peer) {
/* The ptl_proc datastructure is shared by all TCP PTL instances that are trying
* to reach this destination. Cache the peer instance on the ptl_proc.
*/
ptl_peer = OBJ_NEW(mca_ptl_tcp_peer_t);
if(NULL == ptl_peer) {
THREAD_UNLOCK(&ptl_proc->proc_lock);
return LAM_ERR_OUT_OF_RESOURCE;
}
ptl_peer->peer_ptl = (mca_ptl_tcp_t*)ptl;
rc = mca_ptl_tcp_proc_insert(ptl_proc, ptl_peer);
if(rc != LAM_SUCCESS) {
OBJ_RELEASE(ptl_peer);
THREAD_UNLOCK(&ptl_proc->proc_lock);
return rc;
}
lam_bitmap_set_bit(reachable, i);
THREAD_UNLOCK(&ptl_proc->proc_lock);
return LAM_ERR_OUT_OF_RESOURCE;
peers[i] = ptl_peer;
}
ptl_peer->peer_ptl = (mca_ptl_tcp_t*)ptl;
rc = mca_ptl_tcp_proc_insert(ptl_proc, ptl_peer);
if(rc != LAM_SUCCESS) {
OBJ_RELEASE(ptl_peer);
THREAD_UNLOCK(&ptl_proc->proc_lock);
return rc;
}
THREAD_UNLOCK(&ptl_proc->proc_lock);
*peer_ret = ptl_peer;
return LAM_SUCCESS;
}
int mca_ptl_tcp_del_proc(struct mca_ptl_t* ptl, struct lam_proc_t *proc, struct mca_ptl_base_peer_t* ptl_peer)
int mca_ptl_tcp_del_procs(struct mca_ptl_t* ptl, size_t nprocs, struct lam_proc_t **procs, struct mca_ptl_base_peer_t ** peers)
{
OBJ_RELEASE(ptl_peer);
size_t i;
for(i=0; i<nprocs; i++) {
OBJ_RELEASE(peers[i]);
}
return LAM_SUCCESS;
}
@ -157,7 +169,7 @@ int mca_ptl_tcp_send(
struct mca_ptl_base_peer_t* ptl_peer,
struct mca_ptl_base_send_request_t* sendreq,
size_t offset,
size_t *size,
size_t size,
int flags)
{
mca_ptl_tcp_send_frag_t* sendfrag;
@ -170,9 +182,13 @@ int mca_ptl_tcp_send(
if(NULL == (sendfrag = (mca_ptl_tcp_send_frag_t*)item))
return rc;
}
rc = mca_ptl_tcp_send_frag_init(sendfrag, ptl_peer, sendreq, offset, size, flags);
rc = mca_ptl_tcp_send_frag_init(sendfrag, ptl_peer, sendreq, offset, &size, flags);
if(rc != LAM_SUCCESS)
return rc;
/* must update the offset after actual fragment size is determined -- and very important --
* before attempting to send the fragment
*/
sendreq->req_offset += size;
return mca_ptl_tcp_peer_send(ptl_peer, sendfrag);
}

Просмотреть файл

@ -131,16 +131,20 @@ extern int mca_ptl_tcp_finalize(
* PML->PTL notification of change in the process list.
*
* @param ptl (IN)
* @param proc (IN)
* @param peer (OUT)
* @param nprocs (IN) Number of processes
* @param procs (IN) Set of processes
* @param peers (OUT) Set of (optional) peer addressing info.
* @param peers (IN/OUT) Set of processes that are reachable via this PTL.
* @return LAM_SUCCESS or error status on failure.
*
*/
extern int mca_ptl_tcp_add_proc(
extern int mca_ptl_tcp_add_procs(
struct mca_ptl_t* ptl,
struct lam_proc_t *proc,
struct mca_ptl_base_peer_t** peer
size_t nprocs,
struct lam_proc_t **procs,
struct mca_ptl_base_peer_t** peers,
lam_bitmap_t* reachable
);
@ -148,15 +152,17 @@ extern int mca_ptl_tcp_add_proc(
* PML->PTL notification of change in the process list.
*
* @param ptl (IN) PTL instance
* @param proc (IN) Peer process
* @param peer (IN) Peer addressing information.
* @param nproc (IN) Number of processes.
* @param procs (IN) Set of processes.
* @param peers (IN) Set of peer data structures.
* @return Status indicating if cleanup was successful
*
*/
extern int mca_ptl_tcp_del_proc(
extern int mca_ptl_tcp_del_procs(
struct mca_ptl_t* ptl,
struct lam_proc_t *procs,
struct mca_ptl_base_peer_t* addr
size_t nprocs,
struct lam_proc_t **procs,
struct mca_ptl_base_peer_t** peers
);
/**
@ -211,7 +217,7 @@ extern int mca_ptl_tcp_send(
struct mca_ptl_base_peer_t* ptl_peer,
struct mca_ptl_base_send_request_t*,
size_t offset,
size_t *size,
size_t size,
int flags
);

Просмотреть файл

@ -139,6 +139,10 @@ int mca_ptl_tcp_module_open(void)
return LAM_SUCCESS;
}
/*
* module cleanup - sanity checking of queue lengths
*/
int mca_ptl_tcp_module_close(void)
{
if (mca_ptl_tcp_module.tcp_send_requests.fl_num_allocated !=
@ -211,8 +215,6 @@ static int mca_ptl_tcp_create(int if_index, const char* if_name)
#endif
return LAM_SUCCESS;
}
/*
* Create a TCP PTL instance for either:
@ -383,6 +385,10 @@ mca_ptl_t** mca_ptl_tcp_module_init(int *num_ptls,
*allow_multi_user_threads = true;
*have_hidden_threads = LAM_HAVE_THREADS;
/* need to set lam_using_threads() as lam_event_init() will spawn a thread if supported */
if(LAM_HAVE_THREADS)
lam_set_using_threads(true);
if((rc = lam_event_init()) != LAM_SUCCESS) {
lam_output(0, "mca_ptl_tcp_module_init: unable to initialize event dispatch thread: %d\n", rc);
return NULL;

Просмотреть файл

@ -27,9 +27,6 @@ static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user);
static void mca_ptl_tcp_peer_send_handler(int sd, short flags, void* user);
#define PROGRESS_THREAD_LOCK THREAD_LOCK
#define PROGRESS_THREAD_UNLOCK THREAD_UNLOCK
lam_class_t mca_ptl_tcp_peer_t_class = {
"mca_tcp_ptl_peer_t",
@ -113,6 +110,10 @@ static void mca_ptl_tcp_peer_dump(mca_ptl_base_peer_t* ptl_peer, const char* msg
lam_output(0, buff);
}
/*
* Initialize events to be used by the peer instance for TCP select/poll callbacks.
*/
static inline void mca_ptl_tcp_peer_event_init(mca_ptl_base_peer_t* ptl_peer, int sd)
{
lam_event_set(
@ -235,7 +236,7 @@ bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t* ptl_peer, struct sockaddr_in*
{
mca_ptl_tcp_addr_t* ptl_addr;
mca_ptl_tcp_proc_t* this_proc = mca_ptl_tcp_proc_local();
PROGRESS_THREAD_LOCK(&ptl_peer->peer_recv_lock);
THREAD_LOCK(&ptl_peer->peer_recv_lock);
THREAD_LOCK(&ptl_peer->peer_send_lock);
if((ptl_addr = ptl_peer->peer_addr) != NULL &&
ptl_addr->addr_inet.s_addr == addr->sin_addr.s_addr) {
@ -248,7 +249,7 @@ bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t* ptl_peer, struct sockaddr_in*
if(mca_ptl_tcp_peer_send_connect_ack(ptl_peer) != LAM_SUCCESS) {
mca_ptl_tcp_peer_close(ptl_peer);
THREAD_UNLOCK(&ptl_peer->peer_send_lock);
PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
return false;
}
mca_ptl_tcp_peer_event_init(ptl_peer, sd);
@ -258,12 +259,12 @@ bool mca_ptl_tcp_peer_accept(mca_ptl_base_peer_t* ptl_peer, struct sockaddr_in*
mca_ptl_tcp_peer_dump(ptl_peer, "accepted");
#endif
THREAD_UNLOCK(&ptl_peer->peer_send_lock);
PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
return true;
}
}
THREAD_UNLOCK(&ptl_peer->peer_send_lock);
PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
return false;
}
@ -287,7 +288,8 @@ void mca_ptl_tcp_peer_close(mca_ptl_base_peer_t* ptl_peer)
}
/*
*
* Setup peer state to reflect that connection has been established,
* and start any pending sends.
*/
static void mca_ptl_tcp_peer_connected(mca_ptl_base_peer_t* ptl_peer)
@ -521,7 +523,7 @@ static void mca_ptl_tcp_peer_complete_connect(mca_ptl_base_peer_t* ptl_peer)
static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user)
{
mca_ptl_base_peer_t* ptl_peer = user;
PROGRESS_THREAD_LOCK(&ptl_peer->peer_recv_lock);
THREAD_LOCK(&ptl_peer->peer_recv_lock);
switch(ptl_peer->peer_state) {
case MCA_PTL_TCP_CONNECT_ACK:
{
@ -535,7 +537,7 @@ static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user)
int rc;
MCA_PTL_TCP_RECV_FRAG_ALLOC(recv_frag, rc);
if(NULL == recv_frag) {
PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
return;
}
mca_ptl_tcp_recv_frag_init(recv_frag, ptl_peer);
@ -555,7 +557,7 @@ static void mca_ptl_tcp_peer_recv_handler(int sd, short flags, void* user)
break;
}
}
PROGRESS_THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
THREAD_UNLOCK(&ptl_peer->peer_recv_lock);
}

Просмотреть файл

@ -17,6 +17,9 @@
#include "ptl_tcp_recvfrag.h"
#include "ptl_tcp_sendfrag.h"
/**
* State of TCP peer connection.
*/
typedef enum {
MCA_PTL_TCP_CLOSED,

Просмотреть файл

@ -23,6 +23,10 @@ lam_class_t mca_ptl_tcp_proc_t_class = {
};
/**
* Initialize tcp proc instance
*/
void mca_ptl_tcp_proc_construct(mca_ptl_tcp_proc_t* proc)
{
proc->proc_lam = 0;
@ -39,6 +43,10 @@ void mca_ptl_tcp_proc_construct(mca_ptl_tcp_proc_t* proc)
}
/*
* Cleanup tcp proc instance
*/
void mca_ptl_tcp_proc_destruct(mca_ptl_tcp_proc_t* proc)
{
/* remove from list of all proc instances */

Просмотреть файл

@ -41,6 +41,10 @@ mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_create(lam_proc_t* lam_proc);
mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_lookup(void *guid, size_t size);
/**
* Inlined function to return local TCP proc instance.
*/
static inline mca_ptl_tcp_proc_t* mca_ptl_tcp_proc_local(void)
{
if(NULL == mca_ptl_tcp_module.tcp_local)

Просмотреть файл

@ -30,16 +30,28 @@ lam_class_t mca_ptl_tcp_recv_frag_t_class = {
};
/*
* TCP fragment constructor
*/
static void mca_ptl_tcp_recv_frag_construct(mca_ptl_tcp_recv_frag_t* frag)
{
}
/*
* TCP fragment destructor
*/
static void mca_ptl_tcp_recv_frag_destruct(mca_ptl_tcp_recv_frag_t* frag)
{
}
/*
* Initialize a TCP receive fragment for a specific peer.
*/
void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_base_peer_t* peer)
{
frag->super.super.frag_owner = &peer->peer_ptl->super;
@ -54,6 +66,10 @@ void mca_ptl_tcp_recv_frag_init(mca_ptl_tcp_recv_frag_t* frag, mca_ptl_base_peer
frag->frag_progressed = 0;
}
/*
* Callback from event library when socket has data available
* for receive.
*/
bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
@ -77,6 +93,9 @@ bool mca_ptl_tcp_recv_frag_handler(mca_ptl_tcp_recv_frag_t* frag, int sd)
}
}
/*
* Receive fragment header
*/
static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd, size_t size)
{
@ -112,6 +131,10 @@ static bool mca_ptl_tcp_recv_frag_header(mca_ptl_tcp_recv_frag_t* frag, int sd,
}
/*
* Receive and process an ack.
*/
static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
mca_ptl_tcp_send_frag_t* sendfrag;
@ -125,6 +148,10 @@ static bool mca_ptl_tcp_recv_frag_ack(mca_ptl_tcp_recv_frag_t* frag, int sd)
}
/*
* Receive and process a match request - first fragment.
*/
static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
/* first pass through - attempt a match */
@ -162,6 +189,10 @@ static bool mca_ptl_tcp_recv_frag_match(mca_ptl_tcp_recv_frag_t* frag, int sd)
}
/*
* Receive and process 2nd+ fragments of a multi-fragment message.
*/
static bool mca_ptl_tcp_recv_frag_frag(mca_ptl_tcp_recv_frag_t* frag, int sd)
{
/* get request from header */

Просмотреть файл

@ -94,46 +94,38 @@ static inline void mca_ptl_tcp_recv_frag_matched(mca_ptl_tcp_recv_frag_t* frag)
static inline void mca_ptl_tcp_recv_frag_progress(mca_ptl_tcp_recv_frag_t* frag)
{
if((frag)->frag_msg_cnt >= (frag)->super.super.frag_header.hdr_frag.hdr_frag_length) {
mca_ptl_base_recv_request_t* request = (frag)->super.frag_request;
/* make sure this only happens once for threaded case */
if(lam_using_threads()) {
if(fetchNset(&(frag)->frag_progressed, 1) == 1)
return;
} else {
if((frag)->frag_progressed == 1)
return;
(frag)->frag_progressed = 1;
}
if(fetchNset(&frag->frag_progressed, 1) == 0) {
mca_ptl_base_recv_request_t* request = (frag)->super.frag_request;
if((frag)->super.frag_is_buffered) {
mca_ptl_base_match_header_t* header = &(frag)->super.super.frag_header.hdr_match;
if((frag)->super.frag_is_buffered) {
mca_ptl_base_match_header_t* header = &(frag)->super.super.frag_header.hdr_match;
/*
* Initialize convertor and use it to unpack data
*/
struct iovec iov;
lam_proc_t *proc =
lam_comm_peer_lookup(request->super.req_comm, request->super.req_peer);
lam_convertor_copy(proc->proc_convertor, &(frag)->super.super.frag_convertor);
lam_convertor_init_for_recv(
&(frag)->super.super.frag_convertor, /* convertor */
0, /* flags */
request->super.req_datatype, /* datatype */
request->super.req_count, /* count elements */
request->super.req_addr, /* users buffer */
header->hdr_frag.hdr_frag_offset); /* offset in bytes into packed buffer */
/*
* Initialize convertor and use it to unpack data
*/
struct iovec iov;
lam_proc_t *proc =
lam_comm_peer_lookup(request->super.req_comm, request->super.req_peer);
lam_convertor_copy(proc->proc_convertor, &(frag)->super.super.frag_convertor);
lam_convertor_init_for_recv(
&(frag)->super.super.frag_convertor, /* convertor */
0, /* flags */
request->super.req_datatype, /* datatype */
request->super.req_count, /* count elements */
request->super.req_addr, /* users buffer */
header->hdr_frag.hdr_frag_offset); /* offset in bytes into packed buffer */
iov.iov_base = (frag)->super.super.frag_addr;
iov.iov_len = (frag)->super.super.frag_size;
lam_convertor_unpack(&(frag)->super.super.frag_convertor, &iov, 1);
}
iov.iov_base = (frag)->super.super.frag_addr;
iov.iov_len = (frag)->super.super.frag_size;
lam_convertor_unpack(&(frag)->super.super.frag_convertor, &iov, 1);
}
/* progress the request */
(frag)->super.super.frag_owner->ptl_recv_progress(request, &(frag)->super);
if((frag)->frag_ack_pending == false) {
mca_ptl_tcp_recv_frag_return((frag)->super.super.frag_owner, (frag));
}
/* progress the request */
(frag)->super.super.frag_owner->ptl_recv_progress(request, &(frag)->super);
if((frag)->frag_ack_pending == false) {
mca_ptl_tcp_recv_frag_return((frag)->super.super.frag_owner, (frag));
}
}
}
}

Просмотреть файл

@ -29,6 +29,9 @@ lam_class_t mca_ptl_tcp_send_frag_t_class = {
(lam_destruct_t)mca_ptl_tcp_send_frag_destruct
};
/*
* Placeholders for send fragment constructor/destructors.
*/
static void mca_ptl_tcp_send_frag_construct(mca_ptl_tcp_send_frag_t* frag)
{

Просмотреть файл

@ -10,6 +10,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "os/atomic.h"
#include "lam_config.h"
#include "mca/ptl/base/ptl_base_sendreq.h"
#include "mca/ptl/base/ptl_base_sendfrag.h"
@ -40,6 +41,17 @@ typedef struct mca_ptl_tcp_send_frag_t mca_ptl_tcp_send_frag_t;
bool mca_ptl_tcp_send_frag_handler(mca_ptl_tcp_send_frag_t*, int sd);
/**
* Initialize a fragment descriptor.
*
* frag (IN) Fragment
* peer (IN) PTL peer addressing information
* request (IN) Send request
* offset (IN) Current offset into packed buffer
* size (IN/OUT) Requested size / actual size returned
* flags (IN)
*/
int mca_ptl_tcp_send_frag_init(
mca_ptl_tcp_send_frag_t*,
struct mca_ptl_base_peer_t*,
@ -49,7 +61,7 @@ int mca_ptl_tcp_send_frag_init(
int flags);
/*
/**
* For fragments that require an acknowledgment, this routine will be called
* twice, once when the send completes, and again when the acknowledgment is
* returned. Only the last caller should update the request status, so we
@ -72,21 +84,18 @@ static inline void mca_ptl_tcp_send_frag_progress(mca_ptl_tcp_send_frag_t* frag)
mca_ptl_base_send_request_matched(request))) {
/* make sure this only happens once in threaded case */
if (lam_using_threads() && fetchNset(&frag->frag_progressed, 1) == 1) {
return;
} else {
frag->frag_progressed = 1;
}
if(fetchNset(&frag->frag_progressed,1) == 0) {
/* update request status */
frag->super.super.frag_owner->ptl_send_progress(request, &frag->super);
/* update request status */
frag->super.super.frag_owner->ptl_send_progress(request, &frag->super);
/* the first fragment is allocated with the request,
* all others need to be returned to free list
*/
if(frag->super.super.frag_header.hdr_frag.hdr_frag_offset != 0)
mca_ptl_tcp_send_frag_return(frag->super.super.frag_owner, frag);
}
/* the first fragment is allocated with the request,
* all others need to be returned to free list
*/
if(frag->super.super.frag_header.hdr_frag.hdr_frag_offset != 0)
mca_ptl_tcp_send_frag_return(frag->super.super.frag_owner, frag);
}
}
}

Просмотреть файл

@ -106,10 +106,6 @@ int lam_init(int argc, char *argv[])
lam_malloc_init();
/* Initialize event handling */
lam_event_init();
/* Other things that we'll probably need:
- session directory setup

Просмотреть файл

@ -8,6 +8,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/test/support -DLAM_ENABLE_DEBUG_OVERRIDE=1
noinst_PROGRAMS = \
lam_bitmap \
lam_registry \
lam_hash_table \
lam_list \
lam_value_array \
@ -37,6 +38,18 @@ lam_list_LDADD = \
$(top_builddir)/test/support/libsupport.la
lam_list_DEPENDENCIES = $(lam_list_LDADD)
lam_registry_SOURCES = registry.c
lam_registry_LDADD = \
$(top_builddir)/src/lfc/lam_list.lo \
$(top_builddir)/src/lfc/lam_object.lo \
$(top_builddir)/src/mem/malloc.lo \
$(top_builddir)/src/util/output.lo \
$(top_builddir)/src/threads/mutex.lo \
$(top_builddir)/src/threads/mutex_pthread.lo \
$(top_builddir)/src/threads/mutex_spinlock.lo \
$(top_builddir)/test/support/libsupport.la
lam_registry_DEPENDENCIES = $(lam_list_LDADD)
lam_hash_table_SOURCES = lam_hash_table.c
lam_hash_table_LDADD = \
$(top_builddir)/src/lfc/lam_hash_table.lo \