1
1

first cut at support for threaded case - latency is bad

This commit was SVN r3730.
Этот коммит содержится в:
Tim Woodall 2004-12-07 15:38:01 +00:00
родитель d8e1f396a7
Коммит 448c6632ab
5 изменённых файлов: 135 добавлений и 22 удалений

Просмотреть файл

@ -17,6 +17,9 @@
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include "util/output.h"
#include "util/if.h"
@ -28,6 +31,7 @@
#include "mca/ptl/base/ptl_base_sendfrag.h"
#include "mca/ptl/base/ptl_base_recvfrag.h"
#include "mca/base/mca_base_module_exchange.h"
#include "mca/oob/base/base.h"
#include "mca/common/sm/common_sm_mmap.h"
#include "ptl_sm.h"
#include "util/sys_info.h"
@ -112,9 +116,6 @@ int mca_ptl_sm_add_procs_same_base_addr(
ompi_proc_t* my_proc; /* pointer to caller's proc structure */
mca_ptl_sm_t *ptl_sm;
ompi_fifo_t *my_fifos;
/*
volatile ompi_fifo_t **fifo_tmp;
*/
ompi_fifo_t * volatile *fifo_tmp;
bool same_sm_base;
ssize_t diff;
@ -181,22 +182,34 @@ int mca_ptl_sm_add_procs_same_base_addr(
if( len == my_len ) {
if( 0 == strncmp(ompi_system_info.nodename,
(char *)(sm_proc_info[proc]),len) ) {
struct mca_ptl_base_peer_t *peer = peers[proc];
#if OMPI_HAVE_THREADS == 1
char path[PATH_MAX];
int flags;
#endif
/* initialize the peers information */
peers[proc]=malloc(sizeof(struct mca_ptl_base_peer_t));
if( NULL == peers[proc] ){
peer = peers[proc]=malloc(sizeof(struct mca_ptl_base_peer_t));
if( NULL == peer ){
return_code=OMPI_ERR_OUT_OF_RESOURCE;
goto CLEANUP;
}
peers[proc]->peer_smp_rank=n_local_procs+
peer->peer_smp_rank=n_local_procs+
mca_ptl_sm_component.num_smp_procs;
n_local_procs++;
/* */
#if OMPI_HAVE_THREADS == 1
sprintf(path, "%s/sm_fifo.%d", ompi_process_info.job_session_dir,
procs[proc]->proc_name.vpid);
peer->fifo_fd = open(path, O_WRONLY);
if(peer->fifo_fd < 0) {
ompi_output(0, "mca_ptl_sm_add_procs: open(%s) failed with errno=%d\n", path, errno);
goto CLEANUP;
}
#endif
mca_ptl_sm_component.sm_proc_connect[proc]=SM_CONNECTED;
}
}
}
/* make sure that my_smp_rank has been defined */
@ -205,14 +218,6 @@ int mca_ptl_sm_add_procs_same_base_addr(
goto CLEANUP;
}
/* set local proc's smp rank in the peers structure for
* rapid access */
for( proc=0 ; proc < nprocs; proc++ ) {
if(NULL != peers[proc] ) {
peers[proc]->my_smp_rank=mca_ptl_sm_component.my_smp_rank;
}
}
/* see if need to allocate space for extra procs */
if( 0 > mca_ptl_sm_component.sm_max_procs ) {
/* no limit */
@ -237,6 +242,24 @@ int mca_ptl_sm_add_procs_same_base_addr(
goto CLEANUP;
}
/* create a list of peers */
mca_ptl_sm_component.sm_peers=(struct mca_ptl_base_peer_t**)
malloc(n_to_allocate*sizeof(struct mca_ptl_base_peer_t*));
if(NULL == mca_ptl_sm_component.sm_peers ) {
return_code=OMPI_ERR_OUT_OF_RESOURCE;
goto CLEANUP;
}
/* set local proc's smp rank in the peers structure for
* rapid access */
for( proc=0 ; proc < nprocs; proc++ ) {
struct mca_ptl_base_peer_t* peer = peers[proc];
if(NULL != peer) {
mca_ptl_sm_component.sm_peers[peer->peer_smp_rank] = peer;
peer->my_smp_rank=mca_ptl_sm_component.my_smp_rank;
}
}
/* Allocate Shared Memory PTL process coordination
* data structure. This will reside in shared memory */
@ -768,6 +791,7 @@ int mca_ptl_sm_send(
return_status=ompi_fifo_write_to_head_same_base_addr(sm_request->req_frag,
send_fifo, mca_ptl_sm_component.sm_mpool);
if( 0 <= return_status ) {
MCA_PTL_SM_SIGNAL_PEER(ptl_peer);
return_status=OMPI_SUCCESS;
}
@ -892,8 +916,9 @@ int mca_ptl_sm_send_continue(
return_status=ompi_fifo_write_to_head_same_base_addr(send_frag,
send_fifo, mca_ptl_sm_component.sm_mpool);
if( 0 <= return_status ) {
MCA_PTL_SM_SIGNAL_PEER(ptl_peer);
return_status=OMPI_SUCCESS;
}
}
/* release threa lock */
if( ompi_using_threads() ) {

Просмотреть файл

@ -23,12 +23,14 @@
#include <netinet/in.h>
#include "class/ompi_free_list.h"
#include "class/ompi_bitmap.h"
#include "class/ompi_fifo.h"
#include "event/event.h"
#include "mca/pml/pml.h"
#include "mca/ptl/ptl.h"
#include "mca/mpool/mpool.h"
#include "mca/common/sm/common_sm_mmap.h"
#include "class/ompi_fifo.h"
#include "mca/ptl/sm/src/ptl_sm_peer.h"
#if defined(c_plusplus) || defined(__cplusplus)
extern "C" {
#endif
@ -118,6 +120,13 @@ struct mca_ptl_sm_component_t {
ompi_mutex_t sm_pending_ack_lock;
ompi_list_t sm_pending_ack; /**< list of fragmnent that need to be
acked */
struct mca_ptl_base_peer_t **sm_peers;
#if OMPI_HAVE_THREADS == 1
char sm_fifo_path[PATH_MAX]; /**< path to fifo used to signal this process */
int sm_fifo_fd; /**< file descriptor corresponding to opened fifo */
ompi_event_t sm_fifo_event; /**< event for callbacks on fifo activity */
#endif
};
typedef struct mca_ptl_sm_component_t mca_ptl_sm_component_t;
extern mca_ptl_sm_component_t mca_ptl_sm_component;
@ -408,6 +417,23 @@ typedef struct mca_ptl_sm_exchange{
char host_name[MCA_PTL_SM_MAX_HOSTNAME_LEN];
}mca_ptl_sm_exchange_t;
#if OMPI_HAVE_THREADS == 1
void mca_ptl_sm_component_event_handler(int sd, short flags, void* user);
#endif
#if OMPI_HAVE_THREADS == 1
#define MCA_PTL_SM_SIGNAL_PEER(peer) \
{ \
unsigned char cmd = 0; \
if(write(peer->fifo_fd, &cmd, sizeof(cmd)) != sizeof(cmd)) { \
ompi_output(0, "mca_ptl_sm_send: write fifo failed: errno=%d\n", errno); \
} \
}
#else
#define MCA_PTL_SM_SIGNAL_PEER(peer)
#endif
#if defined(c_plusplus) || defined(__cplusplus)
}
#endif

Просмотреть файл

@ -26,6 +26,7 @@
#include "util/argv.h"
#include "util/output.h"
#include "util/sys_info.h"
#include "util/proc_info.h"
#include "mca/pml/pml.h"
#include "mca/ptl/ptl.h"
#include "mca/pml/base/pml_base_sendreq.h"
@ -33,6 +34,7 @@
#include "mca/base/mca_base_module_exchange.h"
#include "mca/ptl/sm/src/ptl_sm.h"
#include "mca/mpool/base/base.h"
#include "mca/oob/base/base.h"
#include "ptl_sm.h"
#include "ptl_sm_sendreq.h"
#include "ptl_sm_sendfrag.h"
@ -47,7 +49,6 @@
static int mca_ptl_sm_component_exchange(void);
/*
* Shared Memory (SM) component instance.
*/
@ -207,6 +208,16 @@ int mca_ptl_sm_component_close(void)
unlink(mca_ptl_sm_component.mmap_file->map_path);
}
#if OMPI_HAVE_THREADS == 1
/* close/cleanup fifo create for event notification */
if(mca_ptl_sm_component.sm_fifo_fd >= 0) {
ompi_event_del(&mca_ptl_sm_component.sm_fifo_event);
close(mca_ptl_sm_component.sm_fifo_fd);
unlink(mca_ptl_sm_component.sm_fifo_path);
}
#endif
CLEANUP:
/* return */
@ -237,9 +248,34 @@ mca_ptl_base_module_t** mca_ptl_sm_component_init(
/* publish shared memory parameters with the MCA framework */
if(mca_ptl_sm_component_exchange() != OMPI_SUCCESS)
return 0;
return NULL;
/* allocate the Shared Memory PTL. Only one is being allocated */
#if OMPI_HAVE_THREADS == 1
/* create a named pipe to receive events */
sprintf(mca_ptl_sm_component.sm_fifo_path,
"%s/sm_fifo.%d", ompi_process_info.job_session_dir,
mca_oob_name_self.vpid);
if(mkfifo(mca_ptl_sm_component.sm_fifo_path, 0660) < 0) {
ompi_output(0, "mca_ptl_sm_component_init: mkfifo failed with errno=%d\n",errno);
return NULL;
}
mca_ptl_sm_component.sm_fifo_fd = open(mca_ptl_sm_component.sm_fifo_path, O_RDWR);
if(mca_ptl_sm_component.sm_fifo_fd < 0) {
ompi_output(0, "mca_ptl_sm_component_init: open(%s) failed with errno=%d\n",
mca_ptl_sm_component.sm_fifo_path, errno);
return NULL;
}
memset(&mca_ptl_sm_component.sm_fifo_event, 0, sizeof(ompi_event_t));
ompi_event_set(&mca_ptl_sm_component.sm_fifo_event,
mca_ptl_sm_component.sm_fifo_fd,
OMPI_EV_READ|OMPI_EV_PERSIST,
mca_ptl_sm_component_event_handler,
NULL);
ompi_event_add(&mca_ptl_sm_component.sm_fifo_event, NULL);
#endif
/* allocate the Shared Memory PTL */
*num_ptls = 2;
ptls = malloc((*num_ptls)*sizeof(mca_ptl_base_module_t*));
if(NULL == ptls)
@ -295,6 +331,21 @@ int mca_ptl_sm_component_control(int param, void* value, size_t size)
* SM component progress.
*/
#if OMPI_HAVE_THREADS == 1
void mca_ptl_sm_component_event_handler(int sd, short flags, void* user)
{
unsigned char cmd;
int rc;
mca_ptl_sm_component_progress(0);
if(read(sd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
ompi_output(0, "mca_ptl_sm_component_event_handler: read failed, errno=%d\n",
errno);
ompi_event_del(&mca_ptl_sm_component.sm_fifo_event);
}
}
#endif
int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp)
{
/* local variables */
@ -571,6 +622,7 @@ int mca_ptl_sm_component_progress(mca_ptl_tstamp_t tstamp)
ompi_list_prepend(&(mca_ptl_sm_component.sm_pending_ack),item);
break;
}
MCA_PTL_SM_SIGNAL_PEER(mca_ptl_sm_component.sm_peers[header_ptr->queue_index]);
/* get next fragment to ack */
item = ompi_list_remove_first(&(mca_ptl_sm_component.sm_pending_ack));

Просмотреть файл

@ -17,6 +17,10 @@
#ifndef MCA_PTL_SM_PEER_H
#define MCA_PTL_SM_PEER_H
#if OMPI_HAVE_THREADS == 1
#include "event/event.h"
#endif
/**
* An abstraction that represents a connection to a peer process.
* An instance of mca_ptl_base_peer_t is associated w/ each process
@ -28,7 +32,9 @@ struct mca_ptl_base_peer_t {
* SMP specfic data structures. */
int peer_smp_rank; /**< My peer's SMP process rank. Used for accessing
* SMP specfic data structures. */
#if OMPI_HAVE_THREADS == 1
int fifo_fd; /**< pipe/fifo used to signal peer that data is queued */
#endif
};
#endif

Просмотреть файл

@ -16,6 +16,7 @@
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include "util/output.h"
@ -162,8 +163,11 @@ void mca_ptl_sm_matched(
ompi_list_append(&(mca_ptl_sm_component.sm_pending_ack),
(ompi_list_item_t *)sm_frag_desc);
OMPI_THREAD_UNLOCK(&(mca_ptl_sm_component.sm_pending_ack_lock));
} else {
MCA_PTL_SM_SIGNAL_PEER(mca_ptl_sm_component.sm_peers[peer_local_smp_rank]);
}
/* return */
return;
}