1
1

- integration of gpr/ns/oob w/ mpirun2

This commit was SVN r2344.
Этот коммит содержится в:
Tim Woodall 2004-08-28 01:15:19 +00:00
родитель 38ad4dcf1f
Коммит 16d250b376
33 изменённых файлов: 530 добавлений и 144 удалений

Просмотреть файл

@ -126,6 +126,7 @@ static void ompi_event_queue_insert(struct ompi_event *, int);
static void ompi_event_queue_remove(struct ompi_event *, int);
static void ompi_timeout_process(void);
int ompi_event_haveevents(void);
bool ompi_event_progress_thread(void);
static RB_HEAD(ompi_event_tree, ompi_event) ompi_timetree;
static struct ompi_event_list ompi_activequeue;
@ -141,6 +142,15 @@ static int ompi_event_pipe[2];
static int ompi_event_pipe_signalled;
#endif
bool ompi_event_progress_thread(void)
{
#if OMPI_HAVE_THREADS
return ompi_thread_self(&ompi_event_thread);
#else
return false;
#endif
}
static int
compare(struct ompi_event *a, struct ompi_event *b)
{

Просмотреть файл

@ -146,6 +146,7 @@ int ompi_event_init(void);
int ompi_event_fini(void);
int ompi_event_dispatch(void);
int ompi_event_loop(int);
bool ompi_event_progress_thread(void);
#define ompi_evtimer_add(ev, tv) ompi_event_add(ev, tv)
#define ompi_evtimer_set(ev, cb, arg) ompi_event_set(ev, -1, 0, cb, arg)

Просмотреть файл

@ -26,7 +26,7 @@ enum {
OMPI_ERR_NOT_FOUND = -16,
OMPI_ERR_BUFFER = -17, /* equivalent to MPI_ERR_BUFFER */
OMPI_ERR_REQUEST = -18, /* equivalent to MPI_ERR_REQUEST */
OMPI_EXISTS = -19 /* indicates that the specified object already exists */
OMPI_EXISTS = -20 /* indicates that the specified object already exists */
};
#endif /* OMPI_CONSTANTS_H */

Просмотреть файл

@ -55,7 +55,7 @@ SMPLOCK "cmpxchgl %1,%2 \n\
setz %%al \n\
movzbl %%al,%0 \n"
: "+a" (ret)
: "r" (newval), "m" (*addr)
: "a" (oldval) "r" (newval), "m" (*addr)
: "memory");
return (ret == oldval);

Просмотреть файл

@ -48,6 +48,9 @@ extern ompi_process_name_t mca_oob_name_self;
#define MCA_OOB_TAG_NS 1
#define MCA_OOB_TAG_GPR 2
#define MCA_OOB_TAG_GPR_NOTIFY 3
#define MCA_OOB_TAG_GPR_NOTIFY 3
#define MCA_OOB_TAG_RTE 4
#define MCA_OOB_TAG_EXEC 5
/*
@ -112,6 +115,24 @@ int mca_oob_set_contact_info(const char*);
char* mca_oob_get_contact_info(void);
/**
* Temporary routine to aid in debugging. Don't attempt to use TCP
* and/or register w/ the GPR if the contact info for the seed daemon
* is not available.
*/
bool mca_oob_has_seed(void);
/**
* Set the contact info for the seed daemon.
*
* Note that this can also be passed to the application as an
* MCA parameter (OMPI_MCA_oob_base_seed). The contact info (of the seed)
* must currently be set before calling mca_oob_base_init().
*/
int mca_oob_set_contact_info(const char*);
/**
* Similiar to unix writev(2).
*

Просмотреть файл

@ -71,13 +71,16 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads)
{
ompi_list_item_t *item;
mca_base_component_list_item_t *cli;
mca_oob_base_info_t * first;
mca_oob_base_component_t *component;
mca_oob_t *module;
extern ompi_list_t mca_oob_base_components;
int i, id;
char* seed;
char** uri = NULL;
mca_oob_t *s_module = NULL;
bool s_user_threads;
bool s_hidden_threads;
int s_priority = -1;
char** include = ompi_argv_split(mca_oob_base_include, ',');
char** exclude = ompi_argv_split(mca_oob_base_exclude, ',');
@ -141,24 +144,33 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads)
if (NULL == component->oob_init) {
ompi_output_verbose(10, mca_oob_base_output, "mca_oob_base_init: no init function; ignoring component");
} else {
module = component->oob_init(user_threads, hidden_threads);
if (NULL == module) {
ompi_output_verbose(10, mca_oob_base_output, "mca_oob_base_init: oob_init returned failure");
} else {
inited = OBJ_NEW(mca_oob_base_info_t);
inited->oob_component = component;
inited->oob_module = module;
ompi_list_append(&mca_oob_base_modules, &inited->super);
int priority = -1;
bool u_threads;
bool h_threads;
module = component->oob_init(&priority, &u_threads, &h_threads);
if (NULL != module) {
inited = OBJ_NEW(mca_oob_base_info_t);
inited->oob_component = component;
inited->oob_module = module;
ompi_list_append(&mca_oob_base_modules, &inited->super);
/* lookup contact info for this component */
if(NULL != uri) {
for(i=0; NULL != uri[i]; i++) {
if(strncmp(uri[i], component->oob_base.mca_component_name,
strlen(component->oob_base.mca_component_name)) == 0) {
module->oob_set_seed(uri[i]);
}
}
}
/* lookup contact info for this component */
if(NULL != uri) {
for(i=0; NULL != uri[i]; i++) {
if(strncmp(uri[i], component->oob_base.mca_component_name,
strlen(component->oob_base.mca_component_name)) == 0) {
module->oob_set_seed(uri[i]);
}
}
}
/* setup highest priority oob channel */
if(priority > s_priority) {
s_priority = priority;
s_module = module;
s_user_threads = u_threads;
s_hidden_threads = h_threads;
}
}
}
}
@ -167,15 +179,15 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads)
}
/* set the global variable to point to the first initialize module */
if (0 < ompi_list_get_size(&mca_oob_base_modules)) {
first = (mca_oob_base_info_t *) ompi_list_get_first(&mca_oob_base_modules);
mca_oob = *first->oob_module;
return OMPI_SUCCESS;
} else {
printf("No OOB modules available!\n");
fflush(stdout);
if(s_module == NULL) {
ompi_output(0, "mca_oob_base_init: no OOB modules available\n");
return OMPI_ERROR;
}
}
mca_oob = *s_module;
*user_threads &= s_user_threads;
*hidden_threads |= s_hidden_threads;
return OMPI_SUCCESS;
}
@ -216,6 +228,17 @@ int mca_oob_set_contact_info(const char* seed)
return OMPI_SUCCESS;
}
/**
* Are we or do we know how to get to the seed.
*/
bool mca_oob_has_seed(void)
{
return (getenv("OMPI_MCA_oob_base_seed") != NULL || ompi_process_info.seed);
}
/**
* Called to request the selected oob components to
* register their address with the seed deamon.
@ -226,8 +249,10 @@ int mca_oob_base_module_init(void)
ompi_list_item_t* item;
/* setup self to point to actual process name */
if(ompi_name_server.compare(OMPI_NS_CMP_ALL, &mca_oob_name_self, &mca_oob_name_any) == 0) {
mca_oob_name_self = *mca_pcmclient.pcmclient_get_self();
mca_oob_name_self = *mca_pcmclient.pcmclient_get_self();
if(ompi_process_info.seed) {
char* seed = mca_oob_get_contact_info();
mca_oob_set_contact_info(seed);
}
/* Initialize all modules after oob/gpr/ns have initialized */
@ -240,5 +265,4 @@ int mca_oob_base_module_init(void)
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -50,7 +50,7 @@ int mca_oob_base_open(void)
mca_base_param_lookup_string(
mca_base_param_register_string("oob","base","include",NULL,NULL), &mca_oob_base_include);
mca_base_param_lookup_string(
mca_base_param_register_string("oob","base","exclude",NULL,"tcp"), &mca_oob_base_exclude);
mca_base_param_register_string("oob","base","exclude",NULL,NULL), &mca_oob_base_exclude);
/* All done */
return OMPI_SUCCESS;

Просмотреть файл

@ -21,7 +21,7 @@ int mca_oob_cofs_close(void);
/*
* Startup / Shutdown
*/
mca_oob_t* mca_oob_cofs_init(bool *allow_multi_user_threads, bool *have_hidden_threads);
mca_oob_t* mca_oob_cofs_init(int* priority, bool *allow_multi_user_threads, bool *have_hidden_threads);
int mca_oob_cofs_module_init(void);
int mca_oob_cofs_module_fini(void);

Просмотреть файл

@ -67,13 +67,15 @@ int mca_oob_cofs_set_seed(const char* addr)
return OMPI_SUCCESS;
}
mca_oob_t* mca_oob_cofs_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
mca_oob_t* mca_oob_cofs_init(int* priority, bool *allow_multi_user_threads, bool *have_hidden_threads)
{
int len;
char *tmp;
FILE *fp;
*allow_multi_user_threads &= true;
*priority = 0;
*allow_multi_user_threads = true;
*have_hidden_threads = false;
/*
* See if we can write in our directory...

Просмотреть файл

@ -164,6 +164,7 @@ struct mca_oob_1_0_0_t {
* OOB Component
*/
typedef mca_oob_t* (*mca_oob_base_component_init_fn_t)(
int *priority,
bool *allow_multi_user_threads,
bool *have_hidden_threads);

Просмотреть файл

@ -233,7 +233,7 @@ static int mca_oob_tcp_create_listen(void)
static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
{
ompi_process_name_t name;
ompi_process_name_t guid[2];
mca_oob_tcp_peer_t* peer;
int rc;
@ -246,14 +246,15 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
free(user);
/* recv the process identifier */
rc = recv(sd, &name, sizeof(name), 0);
if(rc != sizeof(name)) {
rc = recv(sd, guid, sizeof(guid), 0);
if(rc != sizeof(guid)) {
ompi_output(0, "mca_oob_tcp_recv_handler: recv() return value %d != %d, errno = %d",
rc, sizeof(name), errno);
rc, sizeof(guid), errno);
close(sd);
return;
}
OMPI_PROCESS_NAME_NTOH(name);
OMPI_PROCESS_NAME_NTOH(guid[0]);
OMPI_PROCESS_NAME_NTOH(guid[1]);
/* now set socket up to be non-blocking */
if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
@ -268,14 +269,14 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
/* check for wildcard name - if this is true - we allocate a name from the name server
* and return to the peer
*/
if(mca_oob_tcp_process_name_compare(&name, MCA_OOB_NAME_ANY) == 0) {
name.jobid = ompi_name_server.create_jobid();
name.vpid = ompi_name_server.reserve_range(name.jobid,1);
ompi_name_server.assign_cellid_to_process(&name);
if(mca_oob_tcp_process_name_compare(guid, MCA_OOB_NAME_ANY) == 0) {
guid->jobid = ompi_name_server.create_jobid();
guid->vpid = ompi_name_server.reserve_range(guid->jobid,1);
ompi_name_server.assign_cellid_to_process(guid);
}
/* lookup the corresponding process */
peer = mca_oob_tcp_peer_lookup(&name, true);
peer = mca_oob_tcp_peer_lookup(guid, true);
if(NULL == peer) {
ompi_output(0, "mca_oob_tcp_recv_handler: unable to locate peer");
close(sd);
@ -294,8 +295,16 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
* (1) initialize static resources
* (2) create listen socket
*/
mca_oob_t* mca_oob_tcp_component_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
mca_oob_t* mca_oob_tcp_component_init(int* priority, bool *allow_multi_user_threads, bool *have_hidden_threads)
{
/* dont allow tcp to be selected if we dont know the seed */
if(mca_oob_has_seed() == false)
return NULL;
*priority = 1;
*allow_multi_user_threads = true;
*have_hidden_threads = OMPI_HAVE_THREADS;
/* initialize data structures */
ompi_rb_tree_init(&mca_oob_tcp_component.tcp_peer_tree, (ompi_rb_tree_comp_fn_t)mca_oob_tcp_process_name_compare);
@ -340,11 +349,6 @@ int mca_oob_tcp_init(void)
char *addr;
int rc;
/* setup self to point to actual process name */
if(mca_oob_tcp_process_name_compare(&mca_oob_name_self, &mca_oob_name_any) == 0) {
mca_oob_name_self = *mca_pcmclient.pcmclient_get_self();
}
/* put contact info in registry */
keys[0] = "tcp";
keys[1] = ompi_name_server.get_proc_name_string(&mca_oob_name_self);

Просмотреть файл

@ -30,7 +30,7 @@ extern "C" {
*/
int mca_oob_tcp_component_open(void);
int mca_oob_tcp_component_close(void);
mca_oob_t* mca_oob_tcp_component_init(bool *allow_multi_user_threads, bool *have_hidden_threads);
mca_oob_t* mca_oob_tcp_component_init(int* priority, bool *allow_multi_user_threads, bool *have_hidden_threads);
/**
* Hook function to allow the selected oob components

Просмотреть файл

@ -40,11 +40,8 @@ static void mca_oob_tcp_msg_destruct(mca_oob_tcp_msg_t* msg)
int mca_oob_tcp_msg_wait(mca_oob_tcp_msg_t* msg, int* rc)
{
/* wait for message to complete */
ompi_mutex_lock(&msg->msg_lock);
while(msg->msg_complete == false) {
ompi_condition_wait(&msg->msg_condition, &msg->msg_lock);
}
ompi_mutex_unlock(&msg->msg_lock);
while(msg->msg_complete == false)
ompi_event_loop(OMPI_EVLOOP_ONCE);
/* return status */
if(NULL != rc) {
@ -128,16 +125,20 @@ bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee
int rc;
while(1) {
rc = readv(peer->peer_sd, msg->msg_rwptr, msg->msg_rwnum);
if(rc <= 0) {
if(rc < 0) {
if(errno == EINTR)
continue;
else if (errno == EAGAIN)
return false;
else {
ompi_output(0, "mca_oob_tcp_msg_recv_handler: bad return from writev. errno=%d", errno);
ompi_output(0, "mca_oob_tcp_msg_recv_handler: readv failed with errno=%d", errno);
mca_oob_tcp_peer_close(peer);
return false;
}
} else if (rc == 0) {
ompi_output(0, "mca_oob_tcp_msg_recv_handler: read failedd - peer closed connection");
mca_oob_tcp_peer_close(peer);
return false;
}
do {

Просмотреть файл

@ -26,6 +26,7 @@ static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data,
static int mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, void* data, size_t size);
static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user);
static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user);
static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user);
static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg);
@ -48,6 +49,8 @@ static void mca_oob_tcp_peer_construct(mca_oob_tcp_peer_t* peer)
{
OBJ_CONSTRUCT(&(peer->peer_send_queue), ompi_list_t);
OBJ_CONSTRUCT(&(peer->peer_lock), ompi_mutex_t);
memset(&peer->peer_timer_event, 0, sizeof(peer->peer_timer_event));
ompi_evtimer_set(&peer->peer_timer_event, mca_oob_tcp_peer_timer_handler, peer);
}
/*
@ -102,8 +105,9 @@ int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
* queue the message and start the connection to the peer
*/
ompi_list_append(&peer->peer_send_queue, (ompi_list_item_t*)msg);
if(peer->peer_state == MCA_OOB_TCP_CLOSED)
if(peer->peer_state == MCA_OOB_TCP_CLOSED) {
rc = mca_oob_tcp_peer_start_connect(peer);
}
break;
case MCA_OOB_TCP_FAILED:
rc = OMPI_ERR_UNREACH;
@ -153,6 +157,7 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(ompi_process_name_t* name, bool get
return peer;
}
/* allocate from free list */
MCA_OOB_TCP_PEER_ALLOC(peer, rc);
if(NULL == peer) {
if(get_lock) {
@ -169,6 +174,7 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(ompi_process_name_t* name, bool get
peer->peer_send_msg = NULL;
peer->peer_retries = 0;
/* add to lookup table */
if(OMPI_SUCCESS != ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_tree, &peer->peer_name, peer)) {
MCA_OOB_TCP_PEER_RETURN(peer);
if(get_lock) {
@ -237,8 +243,13 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
/* resolve the peer address */
if ((rc = mca_oob_tcp_peer_name_lookup(peer)) != OMPI_SUCCESS) {
struct timeval tv = { 1, 0 };
mca_oob_tcp_peer_close(peer);
return OMPI_ERR_UNREACH;
if(peer->peer_retries > mca_oob_tcp_component.tcp_peer_retries) {
return OMPI_ERR_UNREACH;
}
ompi_evtimer_add(&peer->peer_timer_event, &tv);
return OMPI_SUCCESS;
}
/* start the connect - will likely fail with EINPROGRESS */
@ -289,14 +300,12 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
ompi_event_add(&peer->peer_send_event, 0);
return;
} else if (so_error == ECONNREFUSED) {
if(peer->peer_retries++ > mca_oob_tcp_component.tcp_peer_retries) {
ompi_output(0, "mca_oob_tcp_peer_complete_connect: unable to contact peer after %d retries\n", peer->peer_retries);
mca_oob_tcp_peer_close(peer);
struct timeval tv = { 1,0 };
mca_oob_tcp_peer_close(peer);
if(peer->peer_retries > mca_oob_tcp_component.tcp_peer_retries) {
return;
}
mca_oob_tcp_peer_close(peer);
sleep(1);
mca_oob_tcp_peer_start_connect(peer);
ompi_evtimer_add(&peer->peer_timer_event, &tv);
return;
} else if(so_error != 0) {
ompi_output(0, "mca_oob_tcp_peer_complete_connect: connect() failed with errno=%d\n", so_error);
@ -336,6 +345,17 @@ static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer)
*/
void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
{
/* giving up and cleanup any pending messages */
if(peer->peer_retries++ > mca_oob_tcp_component.tcp_peer_retries) {
mca_oob_tcp_msg_t *msg = peer->peer_send_msg;
while(msg != NULL) {
msg->msg_rc = OMPI_ERR_UNREACH;
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
msg = (mca_oob_tcp_msg_t*)ompi_list_remove_first(&peer->peer_send_queue);
}
peer->peer_send_msg = NULL;
}
if(peer->peer_state != MCA_OOB_TCP_CLOSED &&
peer->peer_sd >= 0) {
ompi_event_del(&peer->peer_recv_event);
@ -344,7 +364,6 @@ void mca_oob_tcp_peer_close(mca_oob_tcp_peer_t* peer)
peer->peer_sd = -1;
}
peer->peer_state = MCA_OOB_TCP_CLOSED;
peer->peer_retries++;
}
/*
@ -396,7 +415,7 @@ static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer)
/* connected */
mca_oob_tcp_peer_connected(peer);
#if OMPI_ENABLE_DEBUG
#if OMPI_ENABLE_DEBUG && 0
mca_oob_tcp_peer_dump(peer, "connected");
#endif
return OMPI_SUCCESS;
@ -415,7 +434,6 @@ static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data,
/* remote closed connection */
if(retval == 0) {
ompi_output(0, "mca_oob_tcp_peer_recv_blocking: remote connection closed");
mca_oob_tcp_peer_close(peer);
return -1;
}
@ -537,7 +555,7 @@ static void mca_oob_tcp_peer_recv_progress(mca_oob_tcp_peer_t* peer, mca_oob_tcp
* (3) if a posted receive is available - receive into users buffer
* (4) otherwise, allocate a new message and buffer for receive
*/
static void mca_oob_tcp_peer_recv_start(mca_oob_tcp_peer_t* peer)
static mca_oob_tcp_msg_t* mca_oob_tcp_peer_recv_start(mca_oob_tcp_peer_t* peer)
{
mca_oob_tcp_msg_t* msg;
mca_oob_tcp_hdr_t hdr;
@ -545,7 +563,7 @@ static void mca_oob_tcp_peer_recv_start(mca_oob_tcp_peer_t* peer)
/* blocking receive of the message header */
if(mca_oob_tcp_peer_recv_blocking(peer, &hdr, sizeof(hdr)) != sizeof(hdr))
return;
return NULL;
size = ntohl(hdr.msg_size);
/* attempt to match posted receive
@ -589,7 +607,7 @@ static void mca_oob_tcp_peer_recv_start(mca_oob_tcp_peer_t* peer)
int rc;
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg) {
return;
return NULL;
}
msg->msg_type = MCA_OOB_TCP_UNEXPECTED;
msg->msg_rc = 0;
@ -604,14 +622,8 @@ static void mca_oob_tcp_peer_recv_start(mca_oob_tcp_peer_t* peer)
msg->msg_rwptr = msg->msg_rwiov;
msg->msg_hdr = hdr;
return msg;
/* if receive of message data completed - queue the receive message */
if(mca_oob_tcp_msg_recv_handler(msg, peer)) {
mca_oob_tcp_peer_recv_progress(peer, msg);
} else {
/* continue processing until complete */
peer->peer_recv_msg = msg;
}
}
@ -633,10 +645,15 @@ static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user)
case MCA_OOB_TCP_CONNECTED:
{
if(NULL == peer->peer_recv_msg) {
mca_oob_tcp_peer_recv_start(peer);
} else if (mca_oob_tcp_msg_recv_handler(peer->peer_recv_msg, peer)) {
mca_oob_tcp_peer_recv_progress(peer, peer->peer_recv_msg);
peer->peer_recv_msg = mca_oob_tcp_peer_recv_start(peer);
}
if (peer->peer_recv_msg &&
mca_oob_tcp_msg_recv_handler(peer->peer_recv_msg, peer)) {
mca_oob_tcp_msg_t* msg = peer->peer_recv_msg;
peer->peer_recv_msg = NULL;
OMPI_THREAD_UNLOCK(&peer->peer_lock);
mca_oob_tcp_peer_recv_progress(peer, msg);
return;
}
break;
}
@ -768,7 +785,7 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd)
}
ompi_event_add(&peer->peer_recv_event, 0);
mca_oob_tcp_peer_connected(peer);
#if OMPI_ENABLE_DEBUG
#if OMPI_ENABLE_DEBUG && 0
mca_oob_tcp_peer_dump(peer, "accepted");
#endif
OMPI_THREAD_UNLOCK(&peer->peer_lock);
@ -799,13 +816,15 @@ int mca_oob_tcp_peer_name_lookup(mca_oob_tcp_peer_t* peer)
keys[1] = ompi_name_server.get_proc_name_string(&peer->peer_name);
keys[2] = NULL;
items = ompi_registry.get(OMPI_REGISTRY_AND, "oob", keys);
if(items == NULL || ompi_list_get_size(items) == 0)
if(items == NULL || ompi_list_get_size(items) == 0) {
return OMPI_ERR_UNREACH;
}
/* unpack the results into a uri string */
item = (ompi_registry_value_t*)ompi_list_remove_first(items);
if((uri = item->object) == NULL)
if((uri = item->object) == NULL) {
return OMPI_ERR_UNREACH;
}
/* validate the result */
if(mca_oob_tcp_parse_uri(uri, &peer->peer_addr) != OMPI_SUCCESS) {
@ -817,4 +836,12 @@ int mca_oob_tcp_peer_name_lookup(mca_oob_tcp_peer_t* peer)
}
}
/*
* Callback on timeout - retry connection attempt.
*/
static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user)
{
mca_oob_tcp_peer_start_connect((mca_oob_tcp_peer_t*)user);
}

Просмотреть файл

@ -41,6 +41,7 @@ struct mca_oob_tcp_peer_t {
int peer_sd; /**< socket descriptor of the connection */
ompi_event_t peer_send_event; /**< registration with event thread for send events */
ompi_event_t peer_recv_event; /**< registration with event thread for recv events */
ompi_event_t peer_timer_event; /**< used for timer callback */
ompi_mutex_t peer_lock; /**< protect critical data structures */
ompi_list_t peer_send_queue; /**< list of messages to send */
mca_oob_tcp_msg_t *peer_send_msg; /**< current send in progress */

Просмотреть файл

@ -24,6 +24,7 @@
#include "mca/pcm/base/base.h"
#include "mca/pcm/rsh/src/pcm_rsh.h"
#include "runtime/runtime_types.h"
#include "event/event.h"
#include "util/output.h"
#include "util/argv.h"
#include "util/numtostr.h"
@ -387,10 +388,24 @@ internal_spawn_proc(int jobid, ompi_rte_node_schedule_t *sched,
ret = OMPI_SUCCESS;
proc_cleanup:
/* TSW - this needs to be fixed - however, ssh is not existing - and for
* now this at least gives us stdout/stderr.
*/
/* Wait for the command to exit. */
do {
if (waitpid(pid, &status, 0) < 0) {
ret = OMPI_ERROR;
#if OMPI_HAVE_THREADS
int rc = waitpid(pid, &status, 0);
#else
int rc = waitpid(pid, &status, WNOHANG);
if(rc == 0) {
ompi_event_loop(OMPI_EVLOOP_ONCE);
}
#endif
if (rc < 0) {
ret = OMPI_ERROR;
break;
}
} while (!WIFEXITED(status));

Просмотреть файл

@ -20,12 +20,43 @@
#include "mca/pml/base/static-components.h"
static int mca_pml_base_progress(void)
{
return OMPI_SUCCESS;
}
/*
* Global variables
*/
int mca_pml_base_output = -1;
mca_pml_base_module_t mca_pml;
mca_pml_base_module_t mca_pml = {
NULL,
NULL,
NULL,
NULL,
mca_pml_base_progress,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL
};
ompi_list_t mca_pml_base_components_available;
mca_pml_base_component_t mca_pml_base_selected_component;

Просмотреть файл

@ -66,6 +66,8 @@ int mca_pml_teg_component_open(void)
OBJ_CONSTRUCT(&mca_pml_teg.teg_send_requests, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_pml_teg.teg_recv_requests, ompi_free_list_t);
OBJ_CONSTRUCT(&mca_pml_teg.teg_procs, ompi_list_t);
OBJ_CONSTRUCT(&mca_pml_teg.teg_request_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_pml_teg.teg_request_cond, ompi_condition_t);
OBJ_CONSTRUCT(teg_null, mca_pml_base_request_t);
teg_null->req_type = MCA_PML_REQUEST_NULL;
@ -125,6 +127,8 @@ int mca_pml_teg_component_close(void)
if(NULL != mca_pml_teg.teg_ptl_components) {
free(mca_pml_teg.teg_ptl_components);
}
OBJ_DESTRUCT(&mca_pml_teg.teg_request_lock);
OBJ_DESTRUCT(&mca_pml_teg.teg_request_cond);
OBJ_DESTRUCT(&mca_pml_teg.teg_send_requests);
OBJ_DESTRUCT(&mca_pml_teg.teg_recv_requests);
OBJ_DESTRUCT(&mca_pml_teg.teg_procs);
@ -140,11 +144,11 @@ mca_pml_base_module_t* mca_pml_teg_component_init(int* priority,
*priority = 0;
*have_hidden_threads = false;
OBJ_CONSTRUCT(&mca_pml_teg.teg_lock, ompi_mutex_t);
mca_pml_teg.teg_ptl_components = NULL;
mca_pml_teg.teg_num_ptl_components = 0;
mca_pml_teg.teg_ptl_components = NULL;
mca_pml_teg.teg_num_ptl_components = 0;
mca_pml_teg.teg_request_waiting = 0;
/* recv requests */
ompi_free_list_init(
@ -156,11 +160,6 @@ mca_pml_base_module_t* mca_pml_teg_component_init(int* priority,
mca_pml_teg.teg_free_list_inc,
NULL);
/* request completion */
OBJ_CONSTRUCT(&mca_pml_teg.teg_request_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_pml_teg.teg_request_cond, ompi_condition_t);
mca_pml_teg.teg_request_waiting = 0;
/* buffered send */
if(mca_pml_base_bsend_init(allow_multi_user_threads) != OMPI_SUCCESS) {
ompi_output(0, "mca_pml_teg_component_init: mca_pml_bsend_init failed\n");

Просмотреть файл

@ -14,19 +14,20 @@ noinst_LTLIBRARIES = libruntime.la
headers = \
runtime.h \
runtime_types.h \
universe_connect.h \
universe_connect.h \
ompi_progress.h
libruntime_la_SOURCES = \
$(headers) \
ompi_abort.c \
ompi_finalize.c \
ompi_init.c \
ompi_universe.c \
ompi_abort.c \
ompi_finalize.c \
ompi_init.c \
ompi_universe.c \
ompi_progress.c \
ompi_rte_finalize.c \
ompi_rte_init.c \
ompi_rte_finalize.c \
ompi_rte_init.c \
ompi_rte_llm.c \
ompi_rte_monitor.c \
ompi_rte_pcm.c
# Conditionally install the header files

Просмотреть файл

@ -47,11 +47,11 @@ int ompi_init(int argc, char *argv[])
if (!ompi_output_init())
return OMPI_ERROR;
/* For the moment, the OMPI library is not multi-threaded. MPI_INIT
may reset this value later, but for now, we say that we are not
using threads. */
/* If threads are supported - assume that we are using threads - and reset
* otherwise.
*/
ompi_set_using_threads(false);
ompi_set_using_threads(OMPI_HAVE_THREADS);
/* For malloc debugging */

Просмотреть файл

@ -9,6 +9,7 @@
#include "include/constants.h"
#include "runtime/runtime.h"
#include "util/output.h"
#include "util/proc_info.h"
#include "threads/mutex.h"
#include "mca/llm/base/base.h"
#include "mca/pcm/base/base.h"
@ -27,6 +28,7 @@
*/
int ompi_rte_finalize(void)
{
ompi_rte_unregister();
mca_oob_base_close();
mca_pcm_base_close();
mca_llm_base_close();

Просмотреть файл

@ -92,13 +92,9 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
*allow_multi_user_threads = true;
*have_hidden_threads = false;
ompi_output(0, "entered rte_init");
/*
* Out of Band Messaging
*/
ompi_output(0, "starting oob");
if (OMPI_SUCCESS != (ret = mca_oob_base_open())) {
/* JMS show_help */
printf("show_help: ompi_rte_init failed in oob_base_open\n");
@ -116,8 +112,6 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
/*
* Name Server
*/
ompi_output(0, "starting name server");
if (OMPI_SUCCESS != (ret = mca_ns_base_open())) {
/* JMS show_help */
printf("show_help: ompi_rte_init failed in ns_base_open\n");
@ -135,8 +129,6 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
/*
* Process Control and Monitoring Client
*/
ompi_output(0, "starting pcm-client");
if (OMPI_SUCCESS != (ret = mca_pcmclient_base_open())) {
/* JMS show_help */
printf("show_help: ompi_rte_init failed in pcmclient_base_open\n");
@ -151,8 +143,6 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
*allow_multi_user_threads &= user_threads;
*have_hidden_threads |= hidden_threads;
ompi_output(0, "starting llm");
/*
* Allocation code - open only. pcm will init if needed
*/
@ -162,8 +152,6 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
return ret;
}
ompi_output(0, "starting pcm");
/*
* Process Control and Monitoring
*/
@ -181,8 +169,6 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
*allow_multi_user_threads &= user_threads;
*have_hidden_threads |= hidden_threads;
ompi_output(0, "starting gpr");
/*
* Registry
*/
@ -200,16 +186,12 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
*allow_multi_user_threads &= user_threads;
*have_hidden_threads |= hidden_threads;
ompi_output(0, "calling proc_info");
/*
* Fill in the various important structures
*/
/* proc structure startup */
ompi_proc_info();
ompi_output(0, "doing session_dir");
/* session directory */
jobid_str = ompi_name_server.get_jobid_string(ompi_process_info.name);
procid_str = ompi_name_server.get_vpid_string(ompi_process_info.name);
@ -224,6 +206,14 @@ int ompi_rte_init(bool *allow_multi_user_threads, bool *have_hidden_threads)
return OMPI_ERROR;
}
/*
* Register process info we/ seed daemon.
*/
if (OMPI_SUCCESS != (ret = ompi_rte_register())) {
ompi_output(0, "ompi_rte_init: failed in ompi_rte_register()\n");
return ret;
}
/*
* Call back into OOB to allow do any final initialization
* (e.g. put contact info in register).

192
src/runtime/ompi_rte_monitor.c Обычный файл
Просмотреть файл

@ -0,0 +1,192 @@
/*
* $HEADER$
*/
/** @file **/
#include "ompi_config.h"
#include "include/constants.h"
#include "util/proc_info.h"
#include "util/sys_info.h"
#include "runtime/runtime.h"
#include "util/output.h"
#include "event/event.h"
#include "threads/mutex.h"
#include "threads/condition.h"
#include "mca/oob/base/base.h"
#include "mca/oob/oob.h"
#include "mca/ns/ns.h"
#include "mca/ns/base/base.h"
#include "mca/gpr/gpr.h"
#include "mca/gpr/base/base.h"
static ompi_mutex_t ompi_rte_mutex;
static ompi_condition_t ompi_rte_condition;
static bool ompi_rte_job_started = false;
static bool ompi_rte_job_finished = false;
/*
* Update the registry with an entry for this process.
*/
int ompi_rte_register(void)
{
/* temporarily disable this if dont know seed - e.g. using cofs */
if(mca_oob_has_seed()) {
ompi_buffer_t buffer;
char segment[32];
char *jobid = ompi_name_server.get_jobid_string(ompi_process_info.name);
char *keys[2];
void *addr;
int rc,size;
/* setup keys and segment for this job */
sprintf(segment, "job-%s", jobid);
keys[0] = ompi_name_server.get_proc_name_string(ompi_process_info.name);
keys[1] = NULL;
free(jobid);
/* setup packed buffer of proc info - may expand as needed */
ompi_buffer_init(&buffer, 128);
ompi_pack(buffer, &ompi_process_info.pid, 1, OMPI_INT32);
ompi_pack_string(buffer, ompi_system_info.nodename);
/* peek the buffer and resulting size */
ompi_buffer_get(buffer, &addr, &size);
rc = ompi_registry.put(OMPI_REGISTRY_OVERWRITE, segment, keys, addr, size);
ompi_buffer_free(buffer);
return rc;
}
return OMPI_SUCCESS;
}
/*
* Register process info.
*/
int ompi_rte_unregister(void)
{
/* temporarily disable this if dont know seed - e.g. using cofs */
if(mca_oob_has_seed()) {
char segment[32];
char *jobid = ompi_name_server.get_jobid_string(ompi_process_info.name);
char *keys[2];
int rc;
/* setup keys and segment for this job */
sprintf(segment, "job-%s", jobid);
free(jobid);
keys[0] = ompi_name_server.get_proc_name_string(ompi_process_info.name);
keys[1] = NULL;
rc = ompi_registry.delete_object(OMPI_REGISTRY_OVERWRITE, segment, keys);
free(keys[0]);
return rc;
}
return OMPI_SUCCESS;
}
/*
* Change state as processes register/unregister. Note that we could save
* the list of registrations - and use the host/pid for cleanup later.
*/
static void ompi_rte_registered(ompi_registry_notify_message_t* match, void* cbdata)
{
OMPI_THREAD_LOCK(&ompi_rte_mutex);
ompi_rte_job_started = true;
ompi_condition_signal(&ompi_rte_condition);
OMPI_THREAD_UNLOCK(&ompi_rte_mutex);
}
static void ompi_rte_unregistered(ompi_registry_notify_message_t* match, void* cbdata)
{
OMPI_THREAD_LOCK(&ompi_rte_mutex);
ompi_rte_job_finished = true;
ompi_condition_signal(&ompi_rte_condition);
OMPI_THREAD_UNLOCK(&ompi_rte_mutex);
}
int ompi_rte_notify(mca_ns_base_jobid_t jobid, int num_procs)
{
char segment[32];
int rc;
/* setup segment for this job */
sprintf(segment, "job-%d", jobid);
/* register for a callback when all processes on this jobid have
* registered their process info
*/
rc = ompi_registry.synchro(
OMPI_REGISTRY_SYNCHRO_MODE_ASCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
OMPI_REGISTRY_OR,
segment,
NULL,
num_procs,
ompi_rte_registered,
NULL);
if(rc != OMPI_SUCCESS)
return rc;
/* register for a callback when all processes on this jobid have
* unregistered their process info
*/
rc = ompi_registry.synchro(
OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
OMPI_REGISTRY_OR,
segment,
NULL,
0,
ompi_rte_unregistered,
NULL);
return rc;
}
/**
* TSW - This is a temporary solution - that only handles graceful
* shutdown....
*/
int ompi_rte_monitor(void)
{
struct timeval tv;
struct timespec ts;
OBJ_CONSTRUCT(&ompi_rte_mutex, ompi_mutex_t);
OBJ_CONSTRUCT(&ompi_rte_condition, ompi_condition_t);
/* block until a timeout occurs or all processes have registered */
gettimeofday(&tv, NULL);
ts.tv_sec = tv.tv_sec + 30;
ts.tv_nsec = 0;
OMPI_THREAD_LOCK(&ompi_rte_mutex);
if(ompi_rte_job_started == false) {
ompi_condition_timedwait(&ompi_rte_condition, &ompi_rte_mutex, &ts);
if(ompi_rte_job_started == false) {
ompi_mutex_unlock(&ompi_rte_mutex);
return OMPI_ERROR;
}
}
/* wait for all processes to complete */
while(ompi_rte_job_finished == false) {
ompi_condition_wait(&ompi_rte_condition, &ompi_rte_mutex);
}
OMPI_THREAD_UNLOCK(&ompi_rte_mutex);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -139,6 +139,25 @@ extern "C" {
*/
int ompi_rte_get_peers(ompi_process_name_t **peers, size_t *npeers);
/**
* Setup process info in the registry.
*/
int ompi_rte_register(void);
/**
* Monitor a job - currently implemented by monitoring process
* registration/deregistration to/from the GPR.
*/
int ompi_rte_notify(mca_ns_base_jobid_t job, int num_procs);
int ompi_rte_monitor(void);
/**
* Remove process registration.
*/
int ompi_rte_unregister(void);
/**
* Kill a specific process in this cell

Просмотреть файл

@ -9,6 +9,7 @@
#include "runtime/ompi_progress.h"
struct ompi_condition_t {
ompi_object_t super;
volatile int c_waiting;
volatile int c_signaled;
};
@ -28,9 +29,7 @@ static inline int ompi_condition_wait(ompi_condition_t *c, ompi_mutex_t *m)
}
} else {
while (c->c_signaled == 0) {
ompi_mutex_unlock(m);
ompi_progress();
ompi_mutex_lock(m);
}
}
c->c_signaled--;

Просмотреть файл

@ -104,6 +104,10 @@ int ompi_thread_join(ompi_thread_t *t, void **thr_return)
return (rc == 0) ? OMPI_SUCCESS : OMPI_ERROR;
}
bool ompi_thread_self(ompi_thread_t *t)
{
return t->t_handle == pthread_self();
}
#else
@ -119,4 +123,9 @@ int ompi_thread_join(ompi_thread_t *t, void **thr_return)
return OMPI_ERROR;
}
bool ompi_thread_self(ompi_thread_t *t)
{
return true;
}
#endif

Просмотреть файл

@ -32,7 +32,8 @@ typedef struct ompi_thread_t ompi_thread_t;
OBJ_CLASS_DECLARATION(ompi_thread_t);
int ompi_thread_start(ompi_thread_t *);
int ompi_thread_join(ompi_thread_t *, void **thread_return);
int ompi_thread_start(ompi_thread_t *);
int ompi_thread_join(ompi_thread_t *, void **thread_return);
bool ompi_thread_self(ompi_thread_t*);
#endif /* OMPI_THREAD_H */

Просмотреть файл

@ -5,7 +5,9 @@
#include "ompi_config.h"
#include "util/proc_info.h"
#include "mca/ns/ns.h"
#include "mca/ns/base/base.h"
#include "mca/pcm/base/base.h"
#include "runtime/runtime.h"
#include "mca/base/base.h"
@ -89,6 +91,13 @@ main(int argc, char *argv[])
printf("num_procs: %d\n", num_procs);
}
/*
* TSW - temporarily force to be a seed - and to use tcp oob.
*
*/
ompi_process_info.seed = true;
setenv("OMPI_MCA_oob_base_include", "tcp", 1);
/*
* Start the Open MPI Run Time Environment
*/
@ -104,13 +113,12 @@ main(int argc, char *argv[])
return ret;
}
/*
* Prep for starting a new job
*/
/* BWB - ompi_rte_get_new_jobid() */
new_jobid = getpid();
new_jobid = ompi_name_server.create_jobid();
/* BWB - fix jobid, procs, and nodes */
nodelist = ompi_rte_allocate_resources(new_jobid, 0, num_procs);
@ -142,6 +150,7 @@ main(int argc, char *argv[])
* register the monitor
*/
ompi_rte_notify(new_jobid,num_procs);
/*
* spawn procs
@ -151,9 +160,13 @@ main(int argc, char *argv[])
return -1;
}
/*
*
*/
ompi_rte_monitor();
/*
* - ompi_rte_monitor()
* - ompi_rte_kill_job()
*/
@ -167,6 +180,6 @@ main(int argc, char *argv[])
ompi_finalize();
OBJ_DESTRUCT(&schedlist);
return 0;
}

Просмотреть файл

@ -25,6 +25,7 @@
#include "util/session_dir.h"
#include "util/printf.h"
#include "util/daemon_init.h"
#include "event/event.h"
#include "util/universe_setup_file_io.h"
#include "mca/base/base.h"
#include "mca/oob/base/base.h"
@ -51,7 +52,7 @@ int main(int argc, char **argv)
char *tmpdir = NULL;
char *universe = NULL;
char *tmp, *universe_name, *remote_host, *remote_uid;
char *script_file, *socket_contact_info, *oob_contact_info;
char *script_file;
char *contact_file;
int ret;
bool persistent, silent, script, webserver;
@ -92,7 +93,7 @@ int main(int argc, char **argv)
}
/* setup the rte command line arguments */
cmd_line = ompi_cmd_line_create();
cmd_line = OBJ_NEW(ompi_cmd_line_t);
ompi_cmd_line_make_opt(cmd_line, 's', "seed", 0,
"Set the daemon seed to true.");
@ -116,7 +117,7 @@ int main(int argc, char **argv)
/*
* setup mca command line arguments
*/
mca_cmd_line = ompi_cmd_line_create();
mca_cmd_line = OBJ_NEW(ompi_cmd_line_t);
if (OMPI_SUCCESS != (ret = mca_base_cmd_line_setup(mca_cmd_line))) {
/* BWB show_help */
printf("show_help: mca_base_cmd_line_setup failed\n");
@ -245,10 +246,12 @@ int main(int argc, char **argv)
ompi_process_info.seed = true;
ompi_process_info.my_universe = strdup(ompi_universe.name);
#if 0
if (OMPI_SUCCESS != daemon_init(ompi_process_info.universe_session_dir)) {
fprintf(stderr, "could not convert to daemon - please report error to bugs@open-mpi.org\n");
exit(1);
}
#endif
/*
* Start the Open MPI Run Time Environment
@ -266,10 +269,10 @@ int main(int argc, char **argv)
}
/* get OOB contact info */
oob_contact_info = mca_oob_get_contact_info();
ompi_universe.oob_contact_info = mca_oob_get_contact_info();
/* get Web contact info */
socket_contact_info = strdup("dum.add.for.tst");
ompi_universe.socket_contact_info = strdup("dum.add.for.tst");
/* save all pertinent info in universe file */
contact_file = ompi_os_path(false, ompi_process_info.universe_session_dir,
@ -282,6 +285,8 @@ int main(int argc, char **argv)
/* put info on the registry */
/* event loop */
ompi_event_loop(0);
}
}
/* spawn console process */

Просмотреть файл

@ -97,8 +97,10 @@ static int ompi_ifinit(void)
}
if ((ifr->ifr_flags & IFF_UP) == 0)
continue;
#if 0
if ((ifr->ifr_flags & IFF_LOOPBACK) != 0)
continue;
#endif
strcpy(intf.if_name, ifr->ifr_name);
intf.if_flags = ifr->ifr_flags;

Просмотреть файл

@ -253,6 +253,19 @@ ompi_buffer_internal_t* bptr;
return (OMPI_SUCCESS);
}
int ompi_buffer_get(ompi_buffer_t buffer, void** baseptr, int *size)
{
/* check that buffer is not null */
if (!buffer) { return (OMPI_ERROR); }
/* deref and pass back */
if (baseptr) { *baseptr = buffer->base_ptr; }
if (size) { *size = buffer->len; }
return (OMPI_SUCCESS);
}
/**
* This function frees a given buffer
* If the buffer has data still, it is lost

Просмотреть файл

@ -115,6 +115,15 @@ extern "C" {
void** baseptr, void** dataptr, void** fromptr);
/**
* This function returns the base pointer and size of the
* packed region - suitable for use by oob_send/gpr_put/etc.
* Note the memory is still owned by the buffer.
*
*/
int ompi_buffer_get(ompi_buffer_t buffer, void** base, int* size);
/**
* This function frees a given buffer
* If the buffer has data still, it is lost

Просмотреть файл

@ -11,12 +11,6 @@ noinst_PROGRAMS = \
atomic_SOURCES = atomic.c
atomic_LDADD = \
$(top_builddir)/src/util/output.lo \
$(top_builddir)/src/class/ompi_object.lo \
$(top_builddir)/src/util/malloc.lo \
$(top_builddir)/src/threads/mutex.lo \
$(top_builddir)/src/threads/mutex_pthread.lo \
$(top_builddir)/src/threads/mutex_spinlock.lo \
$(top_builddir)/src/util/libutil.la \
$(top_builddir)/src/libmpi.la \
$(top_builddir)/test/support/libsupport.la
atomic_DEPENDENCIES = $(atomic_LDADD)