1
1

Not as bad as this all may look. Tim and I made a significant change to the way we handle the startup of the oob, the seed, etc. We have made it backwards-compatible so that mpirun2 and singleton operations remain working. We had to adjust the name server and gpr as well, plus the process_info structure.

This also includes a checkpoint update to openmpi.c and ompid.c. I have re-enabled the ompid compile.

This latter raises an important point. The trunk compiles the programs like ompid just fine under Linux. It also does just fine for OSX under the dynamic libraries. However, we are seeing errors when compiling under OSX for the static case - the linker seems to have trouble resolving some variable names, even though linker diagnostics show the variables as being defined. Thus, a warning to Mac users that you may have to locally turn things off if you are trying to do static compiles. We ask, however, that you don't commit those changes that turn things off for everyone else - instead, let's try to figure out why the static compile is having a problem, and let everyone else continue to work.

Thanks
Ralph

This commit was SVN r2534.
Этот коммит содержится в:
Ralph Castain 2004-09-08 03:59:06 +00:00
родитель 8230d39f77
Коммит e8c36d02c9
29 изменённых файлов: 627 добавлений и 496 удалений

Просмотреть файл

@ -105,9 +105,9 @@ mca_gpr_base_module_t* mca_gpr_proxy_init(bool *allow_multi_user_threads, bool *
ompi_output(0, "gpr_proxy_init called");
}
/* If we're NOT the seed, then we want to be selected, so do all
/* If we are NOT to host a replica, then we want to be selected, so do all
the setup and return the module */
if (!ompi_process_info.seed) {
if (NULL != ompi_process_info.gpr_replica) {
if (mca_gpr_proxy_debug) {
ompi_output(0, "gpr_proxy_init: proxy selected");
@ -125,8 +125,11 @@ mca_gpr_base_module_t* mca_gpr_proxy_init(bool *allow_multi_user_threads, bool *
*allow_multi_user_threads = true;
*have_hidden_threads = false;
/* define the replica for us to use - for now, use only the seed */
mca_gpr_my_replica = ompi_name_server.create_process_name(0,0,0);
/* define the replica for us to use - get it from process_info */
mca_gpr_my_replica = ompi_name_server.copy_process_name(ompi_process_info.gpr_replica);
if (NULL == mca_gpr_my_replica) { /* can't function */
return NULL;
}
/* initialize the notify list */
OBJ_CONSTRUCT(&mca_gpr_proxy_notify_request_tracker, ompi_list_t);

Просмотреть файл

@ -306,10 +306,10 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool
{
/* ompi_output(0, "entered replica init"); */
/* If we're the seed, then we want to be selected, so do all the
/* If we are to host a replica, then we want to be selected, so do all the
setup and return the module */
if (ompi_process_info.seed) {
if (NULL == ompi_process_info.gpr_replica) {
int rc;
/* Return a module (choose an arbitrary, positive priority --

Просмотреть файл

@ -104,10 +104,10 @@ int mca_ns_proxy_close(void)
mca_ns_base_module_t* mca_ns_proxy_init(bool *allow_multi_user_threads, bool *have_hidden_threads, int *priority)
{
/* If we're NOT the seed, then we want to be selected, so do all
/* If we are NOT to host a replica, then we want to be selected, so do all
the setup and return the module */
/* ompi_output(mca_ns_base_output, "ns_proxy: entered init\n"); */
if (!ompi_process_info.seed) {
if (NULL != ompi_process_info.ns_replica) {
/* Return a module (choose an arbitrary, positive priority --
it's only relevant compared to other ns components). If
@ -123,8 +123,8 @@ mca_ns_base_module_t* mca_ns_proxy_init(bool *allow_multi_user_threads, bool *ha
/* define the replica for us to use */
/* default to seed for now */
mca_ns_my_replica = mca_ns_proxy.create_process_name(0,0,0);
if (NULL == mca_ns_my_replica) { /* couldn't create process name - can't operate */
mca_ns_my_replica = mca_ns_proxy.copy_process_name(ompi_process_info.ns_replica);
if (NULL == mca_ns_my_replica) { /* can't operate */
return NULL;
}

Просмотреть файл

@ -127,10 +127,10 @@ int mca_ns_replica_close(void)
mca_ns_base_module_t* mca_ns_replica_init(bool *allow_multi_user_threads, bool *have_hidden_threads, int *priority)
{
/* If we're the seed, then we want to be selected, so do all the
/* If we are to host a replica, then we want to be selected, so do all the
setup and return the module */
if (ompi_process_info.seed) {
if (NULL == ompi_process_info.ns_replica) {
int rc;
mca_ns_replica_last_used_cellid = 0;

Просмотреть файл

@ -19,17 +19,12 @@
*/
extern ompi_process_name_t mca_oob_name_any;
extern ompi_process_name_t mca_oob_name_seed;
extern ompi_process_name_t mca_oob_name_self;
/**
* The wildcard for recieves from any peer.
*/
#define MCA_OOB_NAME_ANY &mca_oob_name_any
/**
* The process name of the seed deamon
*/
#define MCA_OOB_NAME_SEED &mca_oob_name_seed
/**
* Process name of self
*/
@ -94,33 +89,27 @@ char* mca_oob_get_contact_info(void);
* Set the MCA parameter (OMPI_MCA_oob_base_seed) used by the OOB to
* bootstrap communication between peers.
*
* @param seed The contact information of the peer process obtained
* @param uri The contact information of the peer process obtained
* via a call to mca_oob_get_contact_info().
*
* Note that this routine currently just sets the MCA parameter - so
* this function must be called prior to mca_oob_base_init().
*/
int mca_oob_set_contact_info(const char*);
/**
* Returns a null terminated character string returning contact info
* for all supported OOB channels.
* Extract from the contact info the peer process identifier.
*
* @return
*
* Note that the caller is responsible for freeing the returned string.
* @param cinfo (IN) The contact information of the peer process.
* @param name (OUT) The peer process identifier.
* @param uris (OUT) Will return an array of uri strings corresponding
* to the peers exported protocols.
*
* Note the caller may pass NULL for the uris if they only wish to extact
* the process name.
*/
char* mca_oob_get_contact_info(void);
int mca_oob_parse_contact_info(const char* uri, ompi_process_name_t* peer, char*** uris);
/**
* Temporary routine to aid in debugging. Don't attempt to use TCP
* and/or register w/ the GPR if the contact info for the seed daemon
* is not available.
*/
bool mca_oob_has_seed(void);
/**
* Set the contact info for the seed daemon.

Просмотреть файл

@ -34,29 +34,38 @@ OBJ_CLASS_INSTANCE(
NULL
);
ompi_process_name_t mca_oob_name_seed = { 0, 0, 0 };
ompi_process_name_t mca_oob_name_self = { MCA_NS_BASE_CELLID_MAX, MCA_NS_BASE_JOBID_MAX, MCA_NS_BASE_VPID_MAX };
ompi_process_name_t mca_oob_name_self = { MCA_NS_BASE_CELLID_MAX, MCA_NS_BASE_JOBID_MAX, MCA_NS_BASE_VPID_MAX };
ompi_process_name_t mca_oob_name_any = { MCA_NS_BASE_CELLID_MAX, MCA_NS_BASE_JOBID_MAX, MCA_NS_BASE_VPID_MAX };
/**
* Parse contact info string into process name and list of uri strings.
*/
static int mca_oob_base_parse_contact_info(
char* contact_info,
int mca_oob_parse_contact_info(
const char* contact_info,
ompi_process_name_t* name,
char*** uri)
{
ompi_process_name_t* proc_name;
/* parse the process name */
char* cinfo = strdup(contact_info);
char* ptr = strchr(contact_info, ';');
if(NULL == ptr)
if(NULL == ptr) {
free(cinfo);
return OMPI_ERR_BAD_PARAM;
}
*ptr = '\0';
ptr++;
*name = *ns_base_convert_string_to_process_name(contact_info);
proc_name = ns_base_convert_string_to_process_name(contact_info);
*name = *proc_name;
free(proc_name);
/* parse the remainder of the string into an array of uris */
*uri = ompi_argv_split(ptr, ';');
if (NULL != uri) {
/* parse the remainder of the string into an array of uris */
*uri = ompi_argv_split(ptr, ';');
}
free(cinfo);
return OMPI_SUCCESS;
}
@ -74,9 +83,6 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads)
mca_oob_base_component_t *component;
mca_oob_t *module;
extern ompi_list_t mca_oob_base_components;
int i, id;
char* seed;
char** uri = NULL;
mca_oob_t *s_module = NULL;
bool s_user_threads;
bool s_hidden_threads;
@ -85,21 +91,6 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads)
char** include = ompi_argv_split(mca_oob_base_include, ',');
char** exclude = ompi_argv_split(mca_oob_base_exclude, ',');
/* setup seed daemons name and address */
id = mca_base_param_register_string("oob","base","seed",NULL,NULL);
mca_base_param_lookup_string(id,&seed);
if(seed == NULL) {
/* we are seed daemon */
mca_oob_name_seed = mca_oob_name_self;
} else {
/* resolve name of seed daemon */
mca_oob_base_parse_contact_info(seed,&mca_oob_name_seed, &uri);
if(NULL == uri || NULL == *uri) {
ompi_output(0, "mca_oob_base_init: unable to parse seed contact info.");
return OMPI_ERROR;
}
}
/* Traverse the list of available modules; call their init functions. */
for (item = ompi_list_get_first(&mca_oob_base_components);
item != ompi_list_get_end(&mca_oob_base_components);
@ -154,16 +145,6 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads)
inited->oob_module = module;
ompi_list_append(&mca_oob_base_modules, &inited->super);
/* lookup contact info for this component */
if(NULL != uri) {
for(i=0; NULL != uri[i]; i++) {
if(strncmp(uri[i], component->oob_base.mca_component_name,
strlen(component->oob_base.mca_component_name)) == 0) {
module->oob_set_seed(uri[i]);
}
}
}
/* setup highest priority oob channel */
if(priority > s_priority) {
s_priority = priority;
@ -174,10 +155,6 @@ int mca_oob_base_init(bool *user_threads, bool *hidden_threads)
}
}
}
if(uri != NULL) {
ompi_argv_free(uri);
}
/* set the global variable to point to the first initialize module */
if(s_module == NULL) {
ompi_output(0, "mca_oob_base_init: no OOB modules available\n");
@ -221,23 +198,32 @@ char* mca_oob_get_contact_info()
* @param seed
*/
int mca_oob_set_contact_info(const char* seed)
int mca_oob_set_contact_info(const char* contact_info)
{
/* TSW - fix this - currently just stuff the parameter in the environment */
setenv("OMPI_MCA_oob_base_seed", seed, 1);
ompi_process_name_t name;
char** uri;
char** ptr;
int rc = mca_oob_parse_contact_info(contact_info, &name, &uri);
if(rc != OMPI_SUCCESS)
return rc;
for(ptr = uri; ptr != NULL && *ptr != NULL; ptr++) {
ompi_list_item_t* item;
for (item = ompi_list_get_first(&mca_oob_base_modules);
item != ompi_list_get_end(&mca_oob_base_modules);
item = ompi_list_get_next(item)) {
mca_oob_base_info_t* base = (mca_oob_base_info_t *) item;
if (strncmp(base->oob_component->oob_base.mca_component_name, *ptr,
strlen(base->oob_component->oob_base.mca_component_name)) == 0)
base->oob_module->oob_set_addr(&name, *ptr);
}
}
if(uri != NULL) {
ompi_argv_free(uri);
}
return OMPI_SUCCESS;
}
/**
* Are we or do we know how to get to the seed.
*/
bool mca_oob_has_seed(void)
{
return (getenv("OMPI_MCA_oob_base_seed") != NULL || ompi_process_info.seed);
}
/**
* Called to request the selected oob components to
@ -250,10 +236,6 @@ int mca_oob_base_module_init(void)
/* setup self to point to actual process name */
mca_oob_name_self = *mca_pcmclient.pcmclient_get_self();
if(ompi_process_info.seed) {
char* seed = mca_oob_get_contact_info();
mca_oob_set_contact_info(seed);
}
/* Initialize all modules after oob/gpr/ns have initialized */
for (item = ompi_list_get_first(&mca_oob_base_modules);
@ -265,4 +247,3 @@ int mca_oob_base_module_init(void)
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -38,44 +38,20 @@ int mca_oob_recv(ompi_process_name_t* peer, struct iovec *msg, int count, int* t
* iovec array without removing the message from the queue.
* @return OMPI error code (<0) on error or number of bytes actually received.
*/
int mca_oob_recv_packed (ompi_process_name_t* peer, ompi_buffer_t *buf, int* tag)
int mca_oob_recv_packed(ompi_process_name_t* peer, ompi_buffer_t *buf, int* tag)
{
/* ok, this routine is a bit of a cow */
/* the oob_recv actually needs the real target buffers in advance */
/* this forces a three stage method */
/* first we need to peek the size we will need */
/* then we allocate a buffer of the correct size and then */
/* we post a recv with the matching iov :) */
/* and we hope that someone inbtween has not posted a recv */
/* that matches. */
/* To avoid any RACE we NEED to change the OOB lowlevel to */
/* alloc the buffer for us.. as per original design. */
/* Or do locking on all recv posting between the peek and recv! GEF */
uint32_t insize;
int rc;
struct iovec msg[1];
ompi_buffer_t tmpbuf;
void *targetptr;
insize = mca_oob.oob_recv(peer, NULL, 0, tag, MCA_OOB_PEEK|MCA_OOB_TRUNC);
/* setup iov */
msg[0].iov_base = NULL;
msg[0].iov_len = 0;
if (OMPI_ERROR==insize) { return (rc); }
rc = mca_oob.oob_recv(peer, msg, 1, tag, MCA_OOB_ALLOC);
if(rc < 0)
return rc;
targetptr = (void*) malloc (insize);
if (!targetptr) { return (OMPI_ERROR); }
rc = ompi_buffer_init_preallocated (&tmpbuf, targetptr, insize);
if (OMPI_ERROR==rc) { return (rc); }
/* now update the IOV */
msg[0].iov_base = (char*) targetptr;
msg[0].iov_len = insize;
rc = mca_oob.oob_recv(peer, msg, 1, tag, 0);
if (OMPI_ERROR!=rc) *buf = tmpbuf;
return (rc);
/* initialize buffer */
return ompi_buffer_init_preallocated (buf, msg[0].iov_base, msg[0].iov_len);
}

Просмотреть файл

@ -27,7 +27,7 @@ int mca_oob_cofs_module_fini(void);
/* stubs */
char* mca_oob_cofs_get_addr(void);
int mca_oob_cofs_set_seed(const char*);
int mca_oob_cofs_set_addr(const ompi_process_name_t*, const char*);
/**

Просмотреть файл

@ -42,7 +42,7 @@ mca_oob_base_component_1_0_0_t mca_oob_cofs_component = {
mca_oob_t mca_oob_cofs = {
mca_oob_cofs_get_addr,
mca_oob_cofs_set_seed,
mca_oob_cofs_set_addr,
mca_oob_cofs_send,
mca_oob_cofs_recv,
mca_oob_cofs_send_nb,
@ -62,7 +62,7 @@ char* mca_oob_cofs_get_addr(void)
return strdup("cofs://");
}
int mca_oob_cofs_set_seed(const char* addr)
int mca_oob_cofs_set_addr(const ompi_process_name_t* name, const char* addr)
{
return OMPI_SUCCESS;
}
@ -113,6 +113,11 @@ mca_oob_t* mca_oob_cofs_init(int* priority, bool *allow_multi_user_threads, bool
int mca_oob_cofs_module_init(void)
{
if(memcmp(&mca_oob_name_self, &mca_oob_name_any, sizeof(ompi_process_name_t)) == 0) {
mca_oob_name_self.cellid = 0;
mca_oob_name_self.jobid = 1;
mca_oob_name_self.vpid = 0;
}
return OMPI_SUCCESS;
}

Просмотреть файл

@ -45,12 +45,12 @@ typedef struct mca_oob_1_0_0_t mca_oob_t;
typedef char* (*mca_oob_base_module_get_addr_fn_t)(void);
/**
* Implementation of mca_oob_base_module_set_seed().
* Implementation of mca_oob_base_module_set_addr().
*
* @param addr Address of seed in component specific uri format.
*/
typedef int (*mca_oob_base_module_set_seed_fn_t)(const char* addr);
typedef int (*mca_oob_base_module_set_addr_fn_t)(const ompi_process_name_t*, const char* addr);
/**
* Implementation of mca_oob_send().
@ -151,7 +151,7 @@ typedef int (*mca_oob_base_module_fini_fn_t)(void);
*/
struct mca_oob_1_0_0_t {
mca_oob_base_module_get_addr_fn_t oob_get_addr;
mca_oob_base_module_set_seed_fn_t oob_set_seed;
mca_oob_base_module_set_addr_fn_t oob_set_addr;
mca_oob_base_module_send_fn_t oob_send;
mca_oob_base_module_recv_fn_t oob_recv;
mca_oob_base_module_send_nb_fn_t oob_send_nb;

Просмотреть файл

@ -59,7 +59,7 @@ mca_oob_tcp_component_t mca_oob_tcp_component = {
static mca_oob_t mca_oob_tcp = {
mca_oob_tcp_get_addr,
mca_oob_tcp_set_seed,
mca_oob_tcp_set_addr,
mca_oob_tcp_send,
mca_oob_tcp_recv,
mca_oob_tcp_send_nb,
@ -118,7 +118,6 @@ int mca_oob_tcp_component_open(void)
mca_oob_tcp_param_register_int("peer_retries", 60);
mca_oob_tcp_component.tcp_debug =
mca_oob_tcp_param_register_int("debug", 1);
memset(&mca_oob_tcp_component.tcp_seed_addr, 0, sizeof(mca_oob_tcp_component.tcp_seed_addr));
/* initialize state */
mca_oob_tcp_component.tcp_listen_sd = -1;
@ -267,11 +266,14 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
/* recv the process identifier */
while((rc = recv(sd, guid, sizeof(guid), 0)) != sizeof(guid)) {
if(rc >= 0) {
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: peer closed connection",
OMPI_NAME_COMPONENTS(mca_oob_name_self));
close(sd);
return;
}
if(errno != EINTR) {
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: recv() failed with errno=%d\n", errno);
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: recv() failed with errno=%d\n",
OMPI_NAME_COMPONENTS(mca_oob_name_self), errno);
close(sd);
return;
}
@ -281,11 +283,13 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
/* now set socket up to be non-blocking */
if((flags = fcntl(sd, F_GETFL, 0)) < 0) {
ompi_output(0, "mca_oob_tcp_recv_handler: fcntl(F_GETFL) failed with errno=%d", errno);
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: fcntl(F_GETFL) failed with errno=%d",
OMPI_NAME_COMPONENTS(mca_oob_name_self), errno);
} else {
flags |= O_NONBLOCK;
if(fcntl(sd, F_SETFL, flags) < 0) {
ompi_output(0, "mca_oob_tcp_recv_handler: fcntl(F_SETFL) failed with errno=%d", errno);
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: fcntl(F_SETFL) failed with errno=%d",
OMPI_NAME_COMPONENTS(mca_oob_name_self), errno);
}
}
@ -301,12 +305,19 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
/* lookup the corresponding process */
peer = mca_oob_tcp_peer_lookup(guid);
if(NULL == peer) {
ompi_output(0, "mca_oob_tcp_recv_handler: unable to locate peer");
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_recv_handler: unable to locate peer",
OMPI_NAME_COMPONENTS(mca_oob_name_self));
close(sd);
return;
}
/* is the peer instance willing to accept this connection */
if(mca_oob_tcp_peer_accept(peer, sd) == false) {
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_recv_handler: "
"rejected connection from [%d,%d,%d] connection state %d",
OMPI_NAME_COMPONENTS(mca_oob_name_self),
OMPI_NAME_COMPONENTS(peer->peer_name),
OMPI_NAME_COMPONENTS(guid[0]),
peer->peer_state);
close(sd);
return;
}
@ -319,10 +330,6 @@ static void mca_oob_tcp_recv_handler(int sd, short flags, void* user)
*/
mca_oob_t* mca_oob_tcp_component_init(int* priority, bool *allow_multi_user_threads, bool *have_hidden_threads)
{
/* dont allow tcp to be selected if we dont know the seed */
if(mca_oob_has_seed() == false)
return NULL;
*priority = 1;
*allow_multi_user_threads = true;
*have_hidden_threads = OMPI_HAVE_THREADS;
@ -524,7 +531,7 @@ int mca_oob_tcp_init(void)
/* register synchro callback to receive notification when all processes have registered */
subscription = OBJ_NEW(mca_oob_tcp_subscription_t);
subscription->jobid = mca_oob_name_self.jobid;
ompi_list_append(&mca_oob_tcp_component.tcp_subscriptions, subscription);
ompi_list_append(&mca_oob_tcp_component.tcp_subscriptions, &subscription->item);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
rc = ompi_registry.synchro(
@ -681,11 +688,11 @@ int mca_oob_tcp_parse_uri(const char* uri, struct sockaddr_in* inaddr)
/*
* Set address for the seed daemon. Note that this could be called multiple
* times if the seed daemon exports multiple addresses.
* Setup address in the cache. Note that this could be called multiple
* times if a given destination exports multiple addresses.
*/
int mca_oob_tcp_set_seed(const char* uri)
int mca_oob_tcp_set_addr(const ompi_process_name_t* name, const char* uri)
{
struct sockaddr_in inaddr;
mca_oob_tcp_addr_t* addr;
@ -694,10 +701,10 @@ int mca_oob_tcp_set_seed(const char* uri)
return rc;
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
addr = (mca_oob_tcp_addr_t*)ompi_rb_tree_find(&mca_oob_tcp_component.tcp_peer_names, &mca_oob_name_seed);
addr = (mca_oob_tcp_addr_t*)ompi_rb_tree_find(&mca_oob_tcp_component.tcp_peer_names, (ompi_process_name_t*)name);
if(NULL == addr) {
addr = OBJ_NEW(mca_oob_tcp_addr_t);
addr->addr_name = mca_oob_name_seed;
addr->addr_name = *name;
ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_names, &addr->addr_name, addr);
}
rc = mca_oob_tcp_addr_insert(addr, &inaddr);

Просмотреть файл

@ -89,10 +89,10 @@ int mca_oob_tcp_process_name_compare(const ompi_process_name_t* n1, const ompi_p
char* mca_oob_tcp_get_addr(void);
/**
* Set address for the seed.
* Setup cached addresses for the peers.
*/
int mca_oob_tcp_set_seed(const char*);
int mca_oob_tcp_set_addr(const ompi_process_name_t*, const char*);
/**
* Similiar to unix writev(2).
@ -203,13 +203,12 @@ struct mca_oob_tcp_component_t {
mca_oob_base_component_1_0_0_t super; /**< base OOB component */
int tcp_listen_sd; /**< listen socket for incoming connection requests */
unsigned short tcp_listen_port; /**< listen port */
struct sockaddr_in tcp_seed_addr; /**< uri string of tcp peer address */
ompi_list_t tcp_subscriptions; /**< list of registry subscriptions */
ompi_list_t tcp_peer_list; /**< list of peers sorted in mru order */
ompi_rb_tree_t tcp_peer_tree; /**< tree of peers sorted by name */
ompi_rb_tree_t tcp_peer_names; /**< cache of peer contact info sorted by name */
ompi_free_list_t tcp_peer_free; /**< free list of peers */
size_t tcp_peer_limit; /**< max size of tcp peer cache */
int tcp_peer_limit; /**< max size of tcp peer cache */
int tcp_peer_retries; /**< max number of retries before declaring peer gone */
ompi_free_list_t tcp_msgs; /**< free list of messages */
ompi_event_t tcp_send_event; /**< event structure for sends */

Просмотреть файл

@ -7,6 +7,9 @@
static void mca_oob_tcp_msg_construct(mca_oob_tcp_msg_t*);
static void mca_oob_tcp_msg_destruct(mca_oob_tcp_msg_t*);
static void mca_oob_tcp_msg_ident(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* peer);
static bool mca_oob_tcp_msg_recv(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* peer);
static void mca_oob_tcp_msg_match(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* peer);
OBJ_CLASS_INSTANCE(
@ -106,7 +109,10 @@ bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee
else if (errno == EAGAIN)
return false;
else {
ompi_output(0, "mca_oob_tcp_msg_send_handler: bad return from writev. errno=%d", errno);
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_msg_send_handler: writev failed with errno=%d",
OMPI_NAME_COMPONENTS(mca_oob_name_self),
OMPI_NAME_COMPONENTS(peer->peer_name),
errno);
mca_oob_tcp_peer_close(peer);
return false;
}
@ -138,6 +144,48 @@ bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee
* @retval false if the whole message was not received
*/
bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer)
{
/* has entire header been received */
if(msg->msg_rwptr == msg->msg_rwiov) {
if(mca_oob_tcp_msg_recv(msg, peer) == false)
return false;
/* allocate a buffer for the receive */
MCA_OOB_TCP_HDR_NTOH(&msg->msg_hdr);
if(msg->msg_hdr.msg_size > 0) {
msg->msg_rwbuf = malloc(msg->msg_hdr.msg_size);
if(NULL == msg->msg_rwbuf) {
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_msg_recv_handler: malloc(%d) failed\n",
OMPI_NAME_COMPONENTS(mca_oob_name_self),
OMPI_NAME_COMPONENTS(peer->peer_name),
msg->msg_hdr.msg_size);
mca_oob_tcp_peer_close(peer);
return false;
}
msg->msg_rwiov[1].iov_base = msg->msg_rwbuf;
msg->msg_rwiov[1].iov_len = msg->msg_hdr.msg_size;
msg->msg_rwnum = 1;
}
}
/* do the right thing based on the message type */
switch(msg->msg_hdr.msg_type) {
case MCA_OOB_TCP_IDENT:
/* done - there is nothing else to receive */
return true;
case MCA_OOB_TCP_MSG:
/* finish receiving message */
return mca_oob_tcp_msg_recv(msg, peer);
default:
return true;
}
}
/**
* Process the current iovec
*/
static bool mca_oob_tcp_msg_recv(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* peer)
{
int rc;
while(1) {
@ -148,12 +196,18 @@ bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee
else if (errno == EAGAIN)
return false;
else {
ompi_output(0, "mca_oob_tcp_msg_recv_handler: readv failed with errno=%d", errno);
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_msg_recv: readv failed with errno=%d",
OMPI_NAME_COMPONENTS(mca_oob_name_self),
OMPI_NAME_COMPONENTS(peer->peer_name),
errno);
mca_oob_tcp_peer_close(peer);
return false;
}
} else if (rc == 0) {
ompi_output(0, "mca_oob_tcp_msg_recv_handler: read failedd - peer closed connection");
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_msg_recv: peer closed connection",
OMPI_NAME_COMPONENTS(mca_oob_name_self),
OMPI_NAME_COMPONENTS(peer->peer_name),
errno);
mca_oob_tcp_peer_close(peer);
return false;
}
@ -175,6 +229,100 @@ bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee
}
}
/**
* Process a completed message.
*/
void mca_oob_tcp_msg_recv_complete(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* peer)
{
switch(msg->msg_hdr.msg_type) {
case MCA_OOB_TCP_IDENT:
mca_oob_tcp_msg_ident(msg,peer);
break;
case MCA_OOB_TCP_MSG:
mca_oob_tcp_msg_match(msg,peer);
break;
default:
ompi_output(0, "[%d,%d,%d] mca_oob_tcp_msg_recv_complete: invalid message type: %d\n",
OMPI_NAME_COMPONENTS(mca_oob_name_self), msg->msg_hdr.msg_type);
MCA_OOB_TCP_MSG_RETURN(msg);
break;
}
}
/**
* Process an ident message.
*/
static void mca_oob_tcp_msg_ident(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* peer)
{
ompi_process_name_t src = msg->msg_hdr.msg_src;
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
if(mca_oob_tcp_process_name_compare(&peer->peer_name, &src) != 0) {
ompi_rb_tree_delete(&mca_oob_tcp_component.tcp_peer_tree, &peer->peer_name);
peer->peer_name = src;
ompi_rb_tree_insert(&mca_oob_tcp_component.tcp_peer_tree, &peer->peer_name, peer);
}
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
}
/*
* Progress a completed recv:
* (1) signal a posted recv as complete
* (2) queue an unexpected message in the recv list
*/
static void mca_oob_tcp_msg_match(mca_oob_tcp_msg_t* msg, mca_oob_tcp_peer_t* peer)
{
/* attempt to match unexpected message to a posted recv */
mca_oob_tcp_msg_t* post;
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
post = mca_oob_tcp_msg_match_post(&peer->peer_name, msg->msg_hdr.msg_tag,true);
if(NULL != post) {
if(post->msg_flags & MCA_OOB_ALLOC) {
/* set the users iovec struct to point to pre-allocated buffer */
if(NULL == post->msg_uiov || 0 == post->msg_ucnt) {
post->msg_rc = OMPI_ERR_BAD_PARAM;
} else {
/* first iovec of recv message contains the header -
* subsequent contain user data
*/
post->msg_uiov[0].iov_base = msg->msg_rwiov[1].iov_base;
post->msg_uiov[0].iov_len = msg->msg_rwiov[1].iov_len;
msg->msg_rwbuf = NULL;
post->msg_rc = msg->msg_rwiov[1].iov_len;
}
} else {
/* copy msg data into posted recv */
post->msg_rc = mca_oob_tcp_msg_copy(msg, post->msg_uiov, post->msg_ucnt);
if(post->msg_flags & MCA_OOB_TRUNC) {
int i, size = 0;
for(i=1; i<msg->msg_rwcnt+1; i++)
size += msg->msg_rwiov[i].iov_len;
post->msg_rc = size;
}
}
if(post->msg_flags & MCA_OOB_PEEK) {
/* will need message for actual receive */
ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, &msg->super);
} else {
MCA_OOB_TCP_MSG_RETURN(msg);
}
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
mca_oob_tcp_msg_complete(post, &peer->peer_name);
} else {
ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t*)msg);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
}
}
/*
* Called to copy the results of a message into user supplied iovec array.
* @param msg (IN) Message send that is in progress.
@ -185,7 +333,7 @@ bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee
int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, struct iovec* iov, int count)
{
int i;
struct iovec *src = msg->msg_rwiov;
struct iovec *src = msg->msg_rwiov+1;
struct iovec *dst = iov;
unsigned char* src_ptr = (unsigned char*)src->iov_base;
size_t src_len = src->iov_len;
@ -204,7 +352,7 @@ int mca_oob_tcp_msg_copy(mca_oob_tcp_msg_t* msg, struct iovec* iov, int count)
src_ptr += len;
src_len -= len;
if(src_len == 0) {
if(++src_cnt == count)
if(++src_cnt == msg->msg_rwcnt)
return rc;
src++;
src_ptr = (unsigned char*)src->iov_base;

Просмотреть файл

@ -121,6 +121,13 @@ bool mca_oob_tcp_msg_send_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_pee
bool mca_oob_tcp_msg_recv_handler(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t * peer);
/**
* The message has been completely received - so attempt to match
* against posted recvs.
*/
void mca_oob_tcp_msg_recv_complete(mca_oob_tcp_msg_t* msg, struct mca_oob_tcp_peer_t* peer);
/**
* Match name to a message that has been received asynchronously (unexpected).
*

Просмотреть файл

@ -29,7 +29,6 @@ static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user);
static void mca_oob_tcp_peer_send_handler(int sd, short flags, void* user);
static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user);
static void mca_oob_tcp_peer_dump(mca_oob_tcp_peer_t* peer, const char* msg);
static mca_oob_tcp_msg_t* mca_oob_tcp_peer_msg_start(mca_oob_tcp_peer_t* peer, mca_oob_tcp_hdr_t* hdr);
OBJ_CLASS_INSTANCE(
@ -148,7 +147,7 @@ int mca_oob_tcp_peer_send(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t* msg)
mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(ompi_process_name_t* name)
{
int rc;
mca_oob_tcp_peer_t * peer, * old;
mca_oob_tcp_peer_t * peer, *old;
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
peer = (mca_oob_tcp_peer_t*)ompi_rb_tree_find(&mca_oob_tcp_component.tcp_peer_tree,
@ -183,8 +182,8 @@ mca_oob_tcp_peer_t * mca_oob_tcp_peer_lookup(ompi_process_name_t* name)
/* if the peer list is over the maximum size, remove one unsed peer */
ompi_list_prepend(&mca_oob_tcp_component.tcp_peer_list, (ompi_list_item_t *) peer);
if(ompi_list_get_size(&mca_oob_tcp_component.tcp_peer_list) >
mca_oob_tcp_component.tcp_peer_limit) {
if(mca_oob_tcp_component.tcp_peer_limit > 0 &&
ompi_list_get_size(&mca_oob_tcp_component.tcp_peer_list) > mca_oob_tcp_component.tcp_peer_limit) {
old = (mca_oob_tcp_peer_t *)
ompi_list_get_last(&mca_oob_tcp_component.tcp_peer_list);
while(1) {
@ -226,7 +225,14 @@ static int mca_oob_tcp_peer_start_connect(mca_oob_tcp_peer_t* peer)
peer->peer_state = MCA_OOB_TCP_CONNECTING;
peer->peer_sd = socket(AF_INET, SOCK_STREAM, 0);
if (peer->peer_sd < 0) {
peer->peer_retries++;
struct timeval tv = { 1,0 };
ompi_output(0,
"[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_start_connect: socket() failed with errno=%d\n",
OMPI_NAME_COMPONENTS(mca_oob_name_self),
OMPI_NAME_COMPONENTS(peer->peer_name),
errno);
mca_oob_tcp_peer_close(peer);
ompi_evtimer_add(&peer->peer_timer_event, &tv);
return OMPI_ERR_UNREACH;
}
@ -324,6 +330,10 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
return;
} else if (so_error == ECONNREFUSED) {
struct timeval tv = { 1,0 };
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_complete_connect: "
"connection refused - retrying\n",
OMPI_NAME_COMPONENTS(mca_oob_name_self),
OMPI_NAME_COMPONENTS(peer->peer_name));
mca_oob_tcp_peer_close(peer);
if(peer->peer_retries > mca_oob_tcp_component.tcp_peer_retries) {
return;
@ -356,6 +366,7 @@ static void mca_oob_tcp_peer_complete_connect(mca_oob_tcp_peer_t* peer)
*/
static void mca_oob_tcp_peer_connected(mca_oob_tcp_peer_t* peer)
{
ompi_event_del(&peer->peer_timer_event);
peer->peer_state = MCA_OOB_TCP_CONNECTED;
peer->peer_retries = 0;
if(ompi_list_get_size(&peer->peer_send_queue) > 0) {
@ -440,7 +451,11 @@ static int mca_oob_tcp_peer_recv_connect_ack(mca_oob_tcp_peer_t* peer)
/* compare the peers name to the expected value */
if(memcmp(&peer->peer_name, &guid[0], sizeof(ompi_process_name_t)) != 0) {
ompi_output(0, "mca_oob_tcp_peer_connect: received unexpected process identifier");
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_connect: "
"received unexpected process identifier [%d,%d,%d]\n",
OMPI_NAME_COMPONENTS(mca_oob_name_self),
OMPI_NAME_COMPONENTS(peer->peer_name),
OMPI_NAME_COMPONENTS(guid[0]));
mca_oob_tcp_peer_close(peer);
return OMPI_ERR_UNREACH;
}
@ -471,6 +486,11 @@ static int mca_oob_tcp_peer_recv_blocking(mca_oob_tcp_peer_t* peer, void* data,
/* remote closed connection */
if(retval == 0) {
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_recv_blocking: "
"peer closed connection: peer state %d",
OMPI_NAME_COMPONENTS(mca_oob_name_self),
OMPI_NAME_COMPONENTS(peer->peer_name),
peer->peer_state);
mca_oob_tcp_peer_close(peer);
return -1;
}
@ -518,76 +538,6 @@ static int mca_oob_tcp_peer_send_blocking(mca_oob_tcp_peer_t* peer, void* data,
return cnt;
}
/*
* Progress a completed recv:
* (1) signal a posted recv as complete
* (2) queue an unexpected message in the recv list
*/
static void mca_oob_tcp_peer_recv_progress(mca_oob_tcp_peer_t* peer, mca_oob_tcp_msg_t *msg)
{
/* was this a posted recv? */
if(msg->msg_type == MCA_OOB_TCP_POSTED) {
if(msg->msg_flags & MCA_OOB_ALLOC) {
/* set the users iovec struct to point to pre-allocated buffer */
if(NULL == msg->msg_uiov || 0 == msg->msg_ucnt) {
msg->msg_rc = OMPI_ERR_BAD_PARAM;
} else {
msg->msg_uiov[0].iov_base = msg->msg_rwiov->iov_base;
msg->msg_uiov[0].iov_len = msg->msg_rwiov->iov_len;
msg->msg_rwbuf = NULL;
msg->msg_rc = msg->msg_rwiov->iov_len;
}
}
mca_oob_tcp_msg_complete(msg, &peer->peer_name);
} else {
/* if not attempt to match unexpected message to a posted recv */
mca_oob_tcp_msg_t* post;
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
post = mca_oob_tcp_msg_match_post(&peer->peer_name, msg->msg_hdr.msg_tag,true);
if(NULL != post) {
if(post->msg_flags & MCA_OOB_ALLOC) {
/* set the users iovec struct to point to pre-allocated buffer */
if(NULL == post->msg_uiov || 0 == post->msg_ucnt) {
post->msg_rc = OMPI_ERR_BAD_PARAM;
} else {
post->msg_uiov[0].iov_base = msg->msg_rwiov->iov_base;
post->msg_uiov[0].iov_len = msg->msg_rwiov->iov_len;
msg->msg_rwbuf = NULL;
post->msg_rc = msg->msg_rwiov->iov_len;
}
} else {
/* copy msg data into posted recv */
post->msg_rc = mca_oob_tcp_msg_copy(msg, post->msg_uiov, post->msg_ucnt);
if(post->msg_flags & MCA_OOB_TRUNC) {
int i, size = 0;
for(i=0; i<msg->msg_rwcnt; i++)
size += msg->msg_rwiov[i].iov_len;
post->msg_rc = size;
}
}
if(post->msg_flags & MCA_OOB_PEEK) {
/* will need message for actual receive */
ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, &msg->super);
} else {
MCA_OOB_TCP_MSG_RETURN(msg);
}
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
mca_oob_tcp_msg_complete(post, &peer->peer_name);
} else {
ompi_list_append(&mca_oob_tcp_component.tcp_msg_recv, (ompi_list_item_t*)msg);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
}
}
}
int mca_oob_tcp_peer_send_ident(mca_oob_tcp_peer_t* peer)
{
@ -616,108 +566,6 @@ static void mca_oob_tcp_peer_recv_ident(mca_oob_tcp_peer_t* peer, mca_oob_tcp_hd
}
/*
* Start receiving a new message.
* (1) receive header
* (2) attempt to match posted receives
* (3) if a posted receive is available - receive into users buffer
* (4) otherwise, allocate a new message and buffer for receive
*/
static mca_oob_tcp_msg_t* mca_oob_tcp_peer_recv_start(mca_oob_tcp_peer_t* peer)
{
mca_oob_tcp_hdr_t hdr;
/* blocking receive of the message header */
if(mca_oob_tcp_peer_recv_blocking(peer, &hdr, sizeof(hdr)) == sizeof(hdr)) {
MCA_OOB_TCP_HDR_NTOH(&hdr);
if(mca_oob_tcp_component.tcp_debug > 2) {
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_recv_handler: src [%d,%d,%d] dst [%d,%d,%d] tag %d type %d\n",
OMPI_NAME_COMPONENTS(mca_oob_name_self),
OMPI_NAME_COMPONENTS(peer->peer_name),
OMPI_NAME_COMPONENTS(hdr.msg_src),
OMPI_NAME_COMPONENTS(hdr.msg_dst),
hdr.msg_tag,
hdr.msg_type);
}
switch(hdr.msg_type) {
case MCA_OOB_TCP_IDENT:
mca_oob_tcp_peer_recv_ident(peer, &hdr);
return NULL;
case MCA_OOB_TCP_MSG:
return mca_oob_tcp_peer_msg_start(peer,&hdr);
}
}
return NULL;
}
static mca_oob_tcp_msg_t* mca_oob_tcp_peer_msg_start(mca_oob_tcp_peer_t* peer, mca_oob_tcp_hdr_t* hdr)
{
mca_oob_tcp_msg_t* msg;
uint32_t size = hdr->msg_size;
/* attempt to match posted receive
* however - dont match message w/ peek attribute, as we need to
* queue the message anyway to match subsequent recv.
*/
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_match_lock);
msg = mca_oob_tcp_msg_match_post(&peer->peer_name, hdr->msg_tag, false);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
if(NULL != msg) {
uint32_t posted_size = 0;
int i;
/* setup buffer for receive */
for(i=0; i<msg->msg_ucnt; i++)
posted_size += msg->msg_uiov[i].iov_len;
/* allocate an additional buffer to receive entire message */
if(msg->msg_flags & MCA_OOB_ALLOC) {
msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,1);
msg->msg_rwbuf = malloc(size);
msg->msg_rwiov[0].iov_base = msg->msg_rwbuf;
msg->msg_rwiov[0].iov_len = size;
msg->msg_rwcnt = msg->msg_rwnum = 1;
} else if (posted_size < size) {
uint32_t alloc_size = size - posted_size;
msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,msg->msg_ucnt+1);
memcpy(msg->msg_rwiov, msg->msg_uiov, msg->msg_ucnt * sizeof(struct iovec));
msg->msg_rwbuf = malloc(alloc_size);
msg->msg_rwiov[msg->msg_ucnt].iov_base = msg->msg_rwbuf;
msg->msg_rwiov[msg->msg_ucnt].iov_len = alloc_size;
msg->msg_rwcnt = msg->msg_rwnum = msg->msg_ucnt+1;
} else {
msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,msg->msg_ucnt);
memcpy(msg->msg_rwiov, msg->msg_uiov, msg->msg_ucnt * sizeof(struct iovec));
msg->msg_rwcnt = msg->msg_rwnum = msg->msg_ucnt;
}
} else {
/* allocate a new message along with buffer */
int rc;
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg) {
return NULL;
}
msg->msg_type = MCA_OOB_TCP_UNEXPECTED;
msg->msg_rc = 0;
msg->msg_flags = 0;
msg->msg_peer = peer->peer_name;
msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,1);
msg->msg_rwbuf = malloc(size);
msg->msg_rwiov->iov_base = msg->msg_rwbuf;
msg->msg_rwiov->iov_len = size;
msg->msg_rwcnt = msg->msg_rwnum = 1;
}
msg->msg_rwptr = msg->msg_rwiov;
msg->msg_hdr = *hdr;
return msg;
}
/*
* Dispatch to the appropriate action routine based on the state
* of the connection with the peer.
@ -735,15 +583,38 @@ static void mca_oob_tcp_peer_recv_handler(int sd, short flags, void* user)
}
case MCA_OOB_TCP_CONNECTED:
{
/* allocate a new message and setup for recv */
if(NULL == peer->peer_recv_msg) {
peer->peer_recv_msg = mca_oob_tcp_peer_recv_start(peer);
int rc;
mca_oob_tcp_msg_t* msg;
MCA_OOB_TCP_MSG_ALLOC(msg, rc);
if(NULL == msg) {
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_recv_handler: unable to allocate recv message\n",
OMPI_NAME_COMPONENTS(mca_oob_name_self),
OMPI_NAME_COMPONENTS(peer->peer_name));
return;
}
msg->msg_type = MCA_OOB_TCP_UNEXPECTED;
msg->msg_rc = 0;
msg->msg_flags = 0;
msg->msg_peer = peer->peer_name;
msg->msg_rwiov = mca_oob_tcp_msg_iov_alloc(msg,2);
msg->msg_rwbuf = NULL;
msg->msg_rwiov->iov_base = msg->msg_rwbuf;
msg->msg_rwiov->iov_len = 1;
msg->msg_rwcnt = msg->msg_rwnum = 1;
msg->msg_rwptr = msg->msg_rwiov;
msg->msg_rwiov[0].iov_base = &msg->msg_hdr;
msg->msg_rwiov[0].iov_len = sizeof(msg->msg_hdr);
peer->peer_recv_msg = msg;
}
if (peer->peer_recv_msg &&
mca_oob_tcp_msg_recv_handler(peer->peer_recv_msg, peer)) {
mca_oob_tcp_msg_t* msg = peer->peer_recv_msg;
peer->peer_recv_msg = NULL;
OMPI_THREAD_UNLOCK(&peer->peer_lock);
mca_oob_tcp_peer_recv_progress(peer, msg);
mca_oob_tcp_msg_recv_complete(msg, peer);
return;
}
break;
@ -870,6 +741,7 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd)
{
OMPI_THREAD_LOCK(&peer->peer_lock);
if ((peer->peer_state == MCA_OOB_TCP_CLOSED) ||
(peer->peer_state == MCA_OOB_TCP_RESOLVE) ||
(peer->peer_state != MCA_OOB_TCP_CONNECTED &&
mca_oob_tcp_process_name_compare(&peer->peer_name, MCA_OOB_NAME_SELF) < 0)) {
@ -880,6 +752,7 @@ bool mca_oob_tcp_peer_accept(mca_oob_tcp_peer_t* peer, int sd)
mca_oob_tcp_peer_event_init(peer);
if(mca_oob_tcp_peer_send_connect_ack(peer) != OMPI_SUCCESS) {
ompi_output(0, "[%d,%d,%d]-[%d,%d,%d] mca_oob_tcp_peer_accept: mca_oob_tcp_peer_send_blocking failed\n");
mca_oob_tcp_peer_close(peer);
OMPI_THREAD_UNLOCK(&peer->peer_lock);
return false;
@ -906,7 +779,8 @@ void mca_oob_tcp_peer_resolved(mca_oob_tcp_peer_t* peer, mca_oob_tcp_addr_t* add
{
OMPI_THREAD_LOCK(&peer->peer_lock);
peer->peer_addr = addr;
if(peer->peer_state == MCA_OOB_TCP_RESOLVE) {
if((peer->peer_state == MCA_OOB_TCP_RESOLVE) ||
(peer->peer_state == MCA_OOB_TCP_CLOSED && ompi_list_get_size(&peer->peer_send_queue))) {
mca_oob_tcp_peer_start_connect(peer);
}
OMPI_THREAD_UNLOCK(&peer->peer_lock);
@ -921,7 +795,8 @@ static void mca_oob_tcp_peer_timer_handler(int sd, short flags, void* user)
/* start the connection to the peer */
mca_oob_tcp_peer_t* peer = (mca_oob_tcp_peer_t*)user;
OMPI_THREAD_LOCK(&peer->peer_lock);
mca_oob_tcp_peer_start_connect(peer);
if(peer->peer_state == MCA_OOB_TCP_CLOSED)
mca_oob_tcp_peer_start_connect(peer);
OMPI_THREAD_UNLOCK(&peer->peer_lock);
}

Просмотреть файл

@ -51,8 +51,8 @@ int mca_oob_tcp_recv(
if(NULL == iov || 0 == count) {
return OMPI_ERR_BAD_PARAM;
}
iov[0].iov_base = msg->msg_rwiov->iov_base;
iov[0].iov_len = msg->msg_rwiov->iov_len;
iov[0].iov_base = msg->msg_rwiov[1].iov_base;
iov[0].iov_len = msg->msg_rwiov[1].iov_len;
msg->msg_rwbuf = NULL;
} else {
@ -61,7 +61,8 @@ int mca_oob_tcp_recv(
rc = mca_oob_tcp_msg_copy(msg, iov, count);
if(rc >= 0 && MCA_OOB_TRUNC & flags) {
rc = 0;
for(i=0; i<msg->msg_rwcnt; i++)
/* skip first iovec element which is the header */
for(i=1; i<msg->msg_rwcnt+1; i++)
rc += msg->msg_rwiov[i].iov_len;
}
if(MCA_OOB_PEEK & flags) {
@ -153,17 +154,30 @@ int mca_oob_tcp_recv_nb(
if(msg->msg_rc < 0)
return msg->msg_rc;
/* if we are just doing peek, return bytes without dequeing message */
rc = mca_oob_tcp_msg_copy(msg, iov, count);
if(rc >= 0 && MCA_OOB_TRUNC & flags) {
rc = 0;
for(i=0; i<msg->msg_rwcnt; i++)
rc += msg->msg_rwiov[i].iov_len;
}
if(MCA_OOB_PEEK & flags) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
cbfunc(rc, &msg->msg_peer, iov, count, tag, cbdata);
return 0;
/* if we are returning an allocated buffer - just take it from the message */
if(flags & MCA_OOB_ALLOC) {
if(NULL == iov || 0 == count) {
return OMPI_ERR_BAD_PARAM;
}
iov[0].iov_base = msg->msg_rwiov[1].iov_base;
iov[0].iov_len = msg->msg_rwiov[1].iov_len;
msg->msg_rwbuf = NULL;
} else {
/* if we are just doing peek, return bytes without dequeing message */
rc = mca_oob_tcp_msg_copy(msg, iov, count);
if(rc >= 0 && MCA_OOB_TRUNC & flags) {
rc = 0;
for(i=1; i<msg->msg_rwcnt+1; i++)
rc += msg->msg_rwiov[i].iov_len;
}
if(MCA_OOB_PEEK & flags) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_match_lock);
cbfunc(rc, &msg->msg_peer, iov, count, tag, cbdata);
return 0;
}
}
/* otherwise dequeue the message and return to free list */

Просмотреть файл

@ -39,6 +39,10 @@
#include "mca/topo/base/base.h"
#include "mca/io/io.h"
#include "mca/io/base/base.h"
#include "mca/oob/base/base.h"
#include "mca/ns/base/base.h"
#include "runtime/runtime.h"
/*
@ -60,6 +64,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
bool have_hidden_threads;
ompi_proc_t** procs;
size_t nprocs;
ompi_cmd_line_t *cmd_line=NULL;
char *contact=NULL, **tmp, *nsreplica=NULL, *gprreplica=NULL;
char *error;
/* Save command line parameters */
@ -86,10 +92,52 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
/* Join the run-time environment */
allow_multi_user_threads = true;
have_hidden_threads = false;
if ((OMPI_SUCCESS != (ret = ompi_rte_init_stage1(&allow_multi_user_threads,
&have_hidden_threads))) ||
(OMPI_SUCCESS != (ret = ompi_rte_init_stage2(&allow_multi_user_threads,
&have_hidden_threads)))) {
if (OMPI_SUCCESS != (ret = ompi_rte_init_stage1(&allow_multi_user_threads,
&have_hidden_threads))) {
return ret;
}
/* setup rte command line arguments */
cmd_line = OBJ_NEW(ompi_cmd_line_t);
ompi_rte_cmd_line_setup(cmd_line);
/* parse the rte command line arguments */
if (OMPI_SUCCESS != ompi_cmd_line_parse(cmd_line, true, argc, argv)) {
exit(ret);
}
if (ompi_cmd_line_is_taken(cmd_line, "initcontact")) {
if (NULL == (contact = ompi_cmd_line_get_param(cmd_line, "initcontact", 0, 0))) {
return OMPI_ERROR;
}
mca_oob_set_contact_info(contact);
}
if (ompi_cmd_line_is_taken(cmd_line, "nsreplica")) {
if (NULL == (nsreplica = ompi_cmd_line_get_param(cmd_line, "nsreplica", 0, 0))) {
return OMPI_ERROR;
}
mca_oob_set_contact_info(nsreplica);
ompi_process_info.ns_replica = ns_base_create_process_name(0,0,0); /* allocate a space */
mca_oob_parse_contact_info(nsreplica, ompi_process_info.ns_replica, NULL);
} else {
ompi_process_info.ns_replica = NULL;
}
if (ompi_cmd_line_is_taken(cmd_line, "gprreplica")) {
if (NULL == (gprreplica = ompi_cmd_line_get_param(cmd_line, "gprreplica", 0, 0))) {
return OMPI_ERROR;
}
mca_oob_set_contact_info(gprreplica);
ompi_process_info.gpr_replica = ns_base_create_process_name(0,0,0); /* allocate a space */
mca_oob_parse_contact_info(gprreplica, ompi_process_info.gpr_replica, NULL);
} else {
ompi_process_info.gpr_replica = NULL;
}
/* start the rest of the rte */
if (OMPI_SUCCESS != (ret = ompi_rte_init_stage2(&allow_multi_user_threads,
&have_hidden_threads))) {
error = "mca_rte_init() failed";
goto error;
}

Просмотреть файл

@ -57,15 +57,15 @@ void ompi_rte_cmd_line_setup(ompi_cmd_line_t *cmd_line)
'\0', "hostfile", "hostfile", 1,
"Hostfile for this universe");
ompi_cmd_line_make_opt3(cmd_line, /* read in ompi_rte_parse_daemon_cmd_line */
'\0', "nameserver", "nameserver", 0,
"Setup a name server replica");
ompi_cmd_line_make_opt3(cmd_line, /* read in ompi_rte_parse_daemon_cmd_line */
'\0', "registry", "registry", 0,
"Setup a GPR replica");
ompi_cmd_line_make_opt3(cmd_line, /* read in ompi_rte_parse_cmd_line */
'\0', "initcontact", "initcontact", 1,
"Initial oob contact info");
ompi_cmd_line_make_opt3(cmd_line, /* read in ompi_rte_parse_cmd_line */
'\0', "nsreplica", "nsreplica", 1,
"OOB contact info for name server replica assigned to this process");
ompi_cmd_line_make_opt3(cmd_line, /* read in ompi_rte_parse_cmd_line */
'\0', "gprreplica", "gprreplica", 1,
"OOB contact info for GPR replica assigned to this process");
}

Просмотреть файл

@ -131,7 +131,19 @@ int ompi_rte_init_stage2(bool *allow_multi_user_threads, bool *have_hidden_threa
/* session directory */
jobid_str = ompi_name_server.get_jobid_string(ompi_process_info.name);
procid_str = ompi_name_server.get_vpid_string(ompi_process_info.name);
if (OMPI_ERROR == ompi_session_dir(true,
if (ompi_rte_debug_flag) {
ompi_output(0, "[%d,%d,%d] setting up session dir with", ompi_process_info.name->cellid, ompi_process_info.name->jobid, ompi_process_info.name->vpid);
if (NULL != ompi_process_info.tmpdir_base) {
ompi_output(0, "\ttmpdir %s", ompi_process_info.tmpdir_base);
}
ompi_output(0, "\tuniverse %s", ompi_process_info.my_universe);
ompi_output(0, "\tuser %s", ompi_system_info.user);
ompi_output(0, "\thost %s", ompi_system_info.nodename);
ompi_output(0, "\tjobid %s", jobid_str);
ompi_output(0, "\tprocid %s", procid_str);
}
if (OMPI_ERROR == ompi_session_dir(true,
ompi_process_info.tmpdir_base,
ompi_system_info.user,
ompi_system_info.nodename, NULL,

Просмотреть файл

@ -27,7 +27,7 @@ static ompi_condition_t ompi_rte_condition;
static bool ompi_rte_job_started = false;
static bool ompi_rte_job_finished = false;
extern char* mca_oob_base_include;
/*
* Update the registry with an entry for this process.
@ -36,7 +36,7 @@ static bool ompi_rte_job_finished = false;
int ompi_rte_register(void)
{
/* temporarily disable this if dont know seed - e.g. using cofs */
if(mca_oob_has_seed()) {
if(NULL == mca_oob_base_include) {
ompi_buffer_t buffer;
char segment[32];
char *jobid = ompi_name_server.get_jobid_string(ompi_process_info.name);
@ -85,7 +85,7 @@ int ompi_rte_register(void)
int ompi_rte_unregister(void)
{
/* temporarily disable this if dont know seed - e.g. using cofs */
if(mca_oob_has_seed()) {
if(NULL == mca_oob_base_include) {
char segment[32];
char *jobid = ompi_name_server.get_jobid_string(ompi_process_info.name);
char *keys[2];

Просмотреть файл

@ -11,6 +11,8 @@
#include <string.h>
#include "mca/oob/base/base.h"
#include "util/output.h"
#include "util/cmd_line.h"
#include "util/sys_info.h"
@ -20,7 +22,9 @@
void ompi_rte_parse_cmd_line(ompi_cmd_line_t *cmd_line)
{
char *universe, *tmp;
char *universe, *initcontact, *nsreplica, *gprreplica, *tmp;
ompi_output(0, "parsing command line");
/* get universe name and store it, if user specified it */
/* otherwise, stick with default name */
@ -28,10 +32,11 @@ void ompi_rte_parse_cmd_line(ompi_cmd_line_t *cmd_line)
if (ompi_cmd_line_is_taken(cmd_line, "universe") ||
ompi_cmd_line_is_taken(cmd_line, "u")) {
if (NULL == ompi_cmd_line_get_param(cmd_line, "universe", 0, 0)) {
fprintf(stderr, "error retrieving universe name - please report error to bugs@open-mpi.org\n");
exit(1);
ompi_output(0, "error retrieving universe name - please report error to bugs@open-mpi.org\n");
return;
}
universe = strdup(ompi_cmd_line_get_param(cmd_line, "universe", 0, 0));
ompi_output(0, "got universe name %s", universe);
if (NULL != (tmp = strchr(universe, ':'))) { /* name contains remote host */
@ -54,15 +59,57 @@ void ompi_rte_parse_cmd_line(ompi_cmd_line_t *cmd_line)
}
/* copy the universe name into the process_info structure */
ompi_process_info.my_universe = strdup(ompi_universe_info.name);
ompi_output(0, "my universe name is %s", ompi_process_info.my_universe);
/* get the temporary directory name for the session directory, if provided on command line */
if (ompi_cmd_line_is_taken(cmd_line, "tmpdir")) {
if (NULL == ompi_cmd_line_get_param(cmd_line, "tmpdir", 0, 0)) {
fprintf(stderr, "error retrieving tmpdir name - please report error to bugs@open-mpi.org\n");
exit(1);
ompi_output(0, "error retrieving tmpdir name - please report error to bugs@open-mpi.org\n");
return;
}
ompi_process_info.tmpdir_base = strdup(ompi_cmd_line_get_param(cmd_line, "tmpdir", 0, 0));
} else {
ompi_process_info.tmpdir_base = NULL;
}
/* get initial contact info */
if (ompi_cmd_line_is_taken(cmd_line, "initcontact")) {
if (NULL == ompi_cmd_line_get_param(cmd_line, "initcontact", 0, 0)) {
ompi_output(0, "error retrieving initial contact info - please report error to bugs@open-mpi.org");
return;
}
initcontact = strdup(ompi_cmd_line_get_param(cmd_line, "initcontact", 0, 0));
mca_oob_set_contact_info(initcontact);
}
/* see if name server replica provided */
if (ompi_cmd_line_is_taken(cmd_line, "nsreplica")) {
if (NULL == ompi_cmd_line_get_param(cmd_line, "nsreplica", 0, 0)) {
ompi_output(0, "error retrieving name server replica - please report error to bugs@open-mpi.org");
return;
}
nsreplica = strdup(ompi_cmd_line_get_param(cmd_line, "nsreplica", 0, 0));
if (0 != strcmp(nsreplica, initcontact)) {
mca_oob_set_contact_info(nsreplica);
}
mca_oob_parse_contact_info(nsreplica, ompi_process_info.ns_replica, NULL);
} else {
ompi_process_info.ns_replica = NULL;
}
/* see if GPR replica provided */
if (ompi_cmd_line_is_taken(cmd_line, "gprreplica")) {
if (NULL == ompi_cmd_line_get_param(cmd_line, "gprreplica", 0, 0)) {
ompi_output(0, "error retrieving GPR replica - please report error to bugs@open-mpi.org");
return;
}
gprreplica = strdup(ompi_cmd_line_get_param(cmd_line, "gprreplica", 0, 0));
if (0 != strcmp(nsreplica, gprreplica) &&
0 != strcmp(initcontact, gprreplica)) { /* check to see if different */
mca_oob_set_contact_info(gprreplica);
}
mca_oob_parse_contact_info(gprreplica, ompi_process_info.gpr_replica, NULL);
} else {
ompi_process_info.gpr_replica = NULL;
}
}

Просмотреть файл

@ -16,11 +16,22 @@
#include "util/sys_info.h"
#include "util/proc_info.h"
#include "mca/ns/base/base.h"
#include "runtime/runtime.h"
void ompi_rte_parse_daemon_cmd_line(ompi_cmd_line_t *cmd_line)
{
/* see if I'm the seed */
if (ompi_cmd_line_is_taken(cmd_line, "seed")) {
ompi_process_info.seed = true;
ompi_process_info.name = ns_base_create_process_name(0,0,0);
} else {
ompi_process_info.seed = false;
ompi_process_info.name = NULL;
}
/* see if I'm a probe */
if (ompi_cmd_line_is_taken(cmd_line, "probe")) {
ompi_universe_info.probe = true;
@ -28,20 +39,6 @@ void ompi_rte_parse_daemon_cmd_line(ompi_cmd_line_t *cmd_line)
ompi_universe_info.probe = false;
}
/* should I start a name server replica? */
if (ompi_cmd_line_is_taken(cmd_line, "nameserver")) {
ompi_universe_info.ns_replica = true;
} else {
ompi_universe_info.ns_replica = false;
}
/* should I start a registry replica? */
if (ompi_cmd_line_is_taken(cmd_line, "registry")) {
ompi_universe_info.gpr_replica = true;
} else {
ompi_universe_info.gpr_replica = false;
}
/* get desired universe scope, if specified */
if (ompi_cmd_line_is_taken(cmd_line, "scope")) {
if (NULL == ompi_cmd_line_get_param(cmd_line, "scope", 0, 0)) {

Просмотреть файл

@ -20,6 +20,7 @@ mpirun2_LDADD = \
$(LIBMPI_EXTRA_LIBS) \
$(LIBOMPI_EXTRA_LIBS)
mpirun2_LDFLAGS = \
$(libs) \
$(LIBMPI_EXTRA_LDFLAGS) \
$(LIBOMPI_EXTRA_LDFLAGS)
mpirun2_DEPENDENCIES = \

Просмотреть файл

@ -11,6 +11,8 @@
#include "mca/pcm/base/base.h"
#include "runtime/runtime.h"
#include "mca/base/base.h"
#include "util/argv.h"
#include "mca/oob/base/base.h"
#include "util/cmd_line.h"
#include "include/constants.h"
@ -44,6 +46,7 @@ main(int argc, char *argv[])
int num_procs = 1;
ompi_rte_node_schedule_t *sched;
char cwd[MAXPATHLEN];
char *my_contact_info, *tmp;
/*
* Intialize our Open MPI environment
@ -70,7 +73,7 @@ main(int argc, char *argv[])
ompi_cmd_line_make_opt3(cmd_line, 'n', "np", "np", 1,
"Number of processes to start");
ompi_cmd_line_make_opt3(cmd_line, '\0', "hostfile", "hostfile", 1,
"Host description file");
"Host description file");
if (OMPI_SUCCESS != ompi_cmd_line_parse(cmd_line, true, argc, argv) ||
ompi_cmd_line_is_taken(cmd_line, "help") ||
@ -96,7 +99,8 @@ main(int argc, char *argv[])
*
*/
ompi_process_info.seed = true;
setenv("OMPI_MCA_oob_base_include", "tcp", 1);
ompi_process_info.ns_replica = NULL;
ompi_process_info.gpr_replica = NULL;
/*
* Start the Open MPI Run Time Environment
@ -114,6 +118,9 @@ main(int argc, char *argv[])
return ret;
}
/* init proc info - assigns name, among other things */
ompi_proc_info();
/*
* Prep for starting a new job
*/
@ -136,6 +143,14 @@ main(int argc, char *argv[])
sched = OBJ_NEW(ompi_rte_node_schedule_t);
ompi_list_append(&schedlist, (ompi_list_item_t*) sched);
ompi_cmd_line_get_tail(cmd_line, &(sched->argc), &(sched->argv));
/* set initial contact info */
my_contact_info = mca_oob_get_contact_info();
ompi_argv_append(&(sched->argc), &(sched->argv), "-initcontact");
ompi_argv_append(&(sched->argc), &(sched->argv), my_contact_info);
ompi_argv_append(&(sched->argc), &(sched->argv), "-nsreplica");
ompi_argv_append(&(sched->argc), &(sched->argv), my_contact_info);
ompi_argv_append(&(sched->argc), &(sched->argv), "-gprreplica");
ompi_argv_append(&(sched->argc), &(sched->argv), my_contact_info);
mca_pcm_base_build_base_env(environ, &(sched->env));
getcwd(cwd, MAXPATHLEN);
sched->cwd = strdup(cwd);

Просмотреть файл

@ -25,7 +25,7 @@ AM_CPPFLAGS = \
libs = $(top_builddir)/src/libmpi.la
bin_PROGRAMS =
bin_PROGRAMS = ompid
ompid_SOURCES = \
ompid.h \
ompid.c

Просмотреть файл

@ -13,7 +13,6 @@
#include <errno.h>
#include "runtime/runtime.h"
#include "runtime/universe_connect.h"
#include "util/output.h"
#include "util/sys_info.h"
#include "util/cmd_line.h"
@ -38,17 +37,14 @@ const char *type_base = "base";
int main(int argc, char *argv[])
{
int ret = 0;
int i;
char **tmp;
int ret = 0, i;
bool multi_thread = false;
bool hidden_thread = false;
bool allow_multi_user_threads = false;
bool have_hidden_threads = false;
ompi_cmd_line_t *mca_cmd_line=NULL;
/* require tcp oob */
setenv("OMPI_MCA_oob_base_include", "tcp", 1);
for (i=0; i<argc; i++) {
ompi_output(0, "i %d argv %s", i, argv[i]);
}
/*
* Intialize the Open MPI environment
@ -59,16 +55,25 @@ int main(int argc, char *argv[])
return ret;
}
/* Open up the MCA */
if (OMPI_SUCCESS != (ret = mca_base_open())) {
/* JMS show_help */
printf("show_help: ompi_mpi_init failed in mca_base_open\n");
return ret;
}
/* Join the run-time environment */
allow_multi_user_threads = true;
have_hidden_threads = false;
if (OMPI_SUCCESS != (ret = ompi_rte_init_stage1(&allow_multi_user_threads,
&have_hidden_threads))) {
return ret;
}
/* get the system info */
ompi_sys_info();
ompi_output(0, "HEY - YOU CALLED ME");
tmp = argv;
for (i=0; i<argc; i++) {
ompi_output(0, "\tompid args: %d %s", i,*tmp);
tmp++;
}
/* setup to read common command line options that span all Open MPI programs */
if (OMPI_SUCCESS != (ret = ompi_common_cmd_line_init(argc, argv))) {
exit(ret);
@ -112,70 +117,77 @@ int main(int argc, char *argv[])
/* parse the cmd_line for rte options - provides the universe name
* and temp directory base, if provided by user. Both loaded into
* ompi_universe_info and ompi_process_info structures as specified
*/
* and temp directory base, if provided by user. Both loaded into
* ompi_universe_info and ompi_process_info structures as specified
* Also provides name server and gpr replicas, if provided, and the
* initial contact info for the "i'm alive" callback.
*/
ompi_rte_parse_cmd_line(cmd_line);
/* parse the cmd_line for daemon options - gets all the options relating
* specifically to seed behavior, in case i'm a seed, but also gets
* options about scripts and hostfiles that might be of use to me
*/
ompi_rte_parse_daemon_cmd_line(cmd_line);
/* start the rest of the rte */
if (OMPI_SUCCESS != (ret = ompi_rte_init_stage2(&allow_multi_user_threads,
&have_hidden_threads))) {
/* JMS show_help */
printf("show_help: ompi_mpi_init failed in mca_rte_init\n");
return ret;
}
/* if (OMPI_SUCCESS != (ret = mca_base_open())) { */
/* /\* JMS show_help *\/ */
/* printf("show_help: mca_base_open failed\n"); */
/* return ret; */
/* } */
ompi_output(0, "HEY - I DID IT");
/* /\* Execute the desired action(s) *\/ */
/* /\* Execute the desired action(s) *\/ */
/* if (ompi_cmd_line_is_taken(ompi_common_cmd_line, "version")) { */
/* printf ("ompid (OpenMpi Daemon) version: 0\n"); */
/* } */
/* if (ompi_cmd_line_is_taken(ompi_common_cmd_line, "version")) { */
/* printf ("ompid (OpenMpi Daemon) version: 0\n"); */
/* } */
/* /\* setup universe session directory *\/ */
/* if (OMPI_SUCCESS != ompi_session_dir(true, tmpdir, ompi_system_info.user, ompi_system_info.nodename, NULL, */
/* ompi_universe.name, NULL, NULL)) { /\* couldn't create session dir - error *\/ */
/* fprintf(stderr, "could not create universe session directory tree - please report error to bugs@open-mpi.org\n"); */
/* exit(1); */
/* } */
/* /\* setup universe session directory *\/ */
/* if (OMPI_SUCCESS != ompi_session_dir(true, tmpdir, ompi_system_info.user, ompi_system_info.nodename, NULL, */
/* ompi_universe.name, NULL, NULL)) { /\* couldn't create session dir - error *\/ */
/* fprintf(stderr, "could not create universe session directory tree - please report error to bugs@open-mpi.org\n"); */
/* exit(1); */
/* } */
/* /\* If there is a seed argument, this is the magic */
/* * seed daemon. */
/* *\/ */
/* if ( ompi_cmd_line_is_taken(cmd_line, "seed")) { */
/* ompi_process_info.seed = true; */
/* ompi_process_info.my_universe = strdup(ompi_universe.name); */
/* } */
/* /\* If there is a seed argument, this is the magic */
/* * seed daemon. */
/* *\/ */
/* if ( ompi_cmd_line_is_taken(cmd_line, "seed")) { */
/* ompi_process_info.seed = true; */
/* ompi_process_info.my_universe = strdup(ompi_universe.name); */
/* } */
/* /\* convert myself to be a daemon *\/ */
/* if (OMPI_SUCCESS != ompi_daemon_init(ompi_process_info.universe_session_dir)) { */
/* fprintf(stderr, "could not convert to daemon - please report error to bugs@open-mpi.org\n"); */
/* exit(1); */
/* } */
/* /\* convert myself to be a daemon *\/ */
/* if (OMPI_SUCCESS != ompi_daemon_init(ompi_process_info.universe_session_dir)) { */
/* fprintf(stderr, "could not convert to daemon - please report error to bugs@open-mpi.org\n"); */
/* exit(1); */
/* } */
/* /\* before calling anything else we to call rte init *\/ */
/* if (OMPI_SUCCESS != ompi_rte_init(&multi_thread, &hidden_thread)) { */
/* printf("ompid: ompi_rte_init failed\n"); */
/* /\* before calling anything else we to call rte init *\/ */
/* if (OMPI_SUCCESS != ompi_rte_init(&multi_thread, &hidden_thread)) { */
/* printf("ompid: ompi_rte_init failed\n"); */
/* /\* Do a partial clean-up. This needs to be reviewed */
/* * at a later date to make certain we are not */
/* * missing soemthing */
/* *\/ */
/* ompi_rte_finalize(); */
/* OBJ_RELEASE(cmd_line); */
/* mca_base_close(); */
/* /\* Do a partial clean-up. This needs to be reviewed */
/* * at a later date to make certain we are not */
/* * missing soemthing */
/* *\/ */
/* ompi_rte_finalize(); */
/* OBJ_RELEASE(cmd_line); */
/* mca_base_close(); */
/* return 1; */
/* } */
/* return 1; */
/* } */
/* /\* */
/* * if seed, call open functions of comm frameworks (oob, socket, etc.) to */
/* * get contact info. write contact info into universe session directory */
/* /\* */
/* * if seed, call open functions of comm frameworks (oob, socket, etc.) to */
/* * get contact info. write contact info into universe session directory */
/* /\* get OOB contact info *\/ */
/* ompi_universe.oob_contact_info = mca_oob_get_contact_info(); */
@ -205,19 +217,19 @@ int main(int argc, char *argv[])
/* * as file "contact-info" so others can find us. */
/* *\/ */
/* * as file "contact-info" so others can find us. */
/* *\/ */
/* /\* Add in the calls to initialize the services *\/ */
/* /\* Add in the calls to initialize the services *\/ */
/* /\* Add the section for the event loop... *\/ */
/* /\* Add the section for the event loop... *\/ */
/* /\* All done *\/ */
/* /\* All done *\/ */
/* /\* Close services *\/ */
/* /\* Close services *\/ */
/* OBJ_RELEASE(cmd_line); */
/* mca_base_close(); */
/* OBJ_RELEASE(cmd_line); */
/* mca_base_close(); */
ompi_finalize();
return 0;
}

Просмотреть файл

@ -175,13 +175,11 @@ int main(int argc, char **argv)
* including universe name and tmpdir_base
*/
seed_argv = NULL;
seed_argc = 0;
ompi_argv_append(&seed_argc, &seed_argv, "ompid");
ompi_argv_append(&seed_argc, &seed_argv, "-seed");
ompi_argv_append(&seed_argc, &seed_argv, "-nameserver");
ompi_argv_append(&seed_argc, &seed_argv, "-registry");
asprintf(&tmp, "-scope %s", ompi_universe_info.scope);
ompi_argv_append(&seed_argc, &seed_argv, tmp);
free(tmp);
ompi_argv_append(&seed_argc, &seed_argv, "-scope");
ompi_argv_append(&seed_argc, &seed_argv, ompi_universe_info.scope);
if (ompi_universe_info.persistence) {
ompi_argv_append(&seed_argc, &seed_argv, "-persistent");
}
@ -189,44 +187,29 @@ int main(int argc, char **argv)
ompi_argv_append(&seed_argc, &seed_argv, "-webserver");
}
if (NULL != ompi_universe_info.scriptfile) {
asprintf(&tmp, "-script %s", ompi_universe_info.scriptfile);
ompi_argv_append(&seed_argc, &seed_argv, tmp);
free(tmp);
ompi_argv_append(&seed_argc, &seed_argv, "-script");
ompi_argv_append(&seed_argc, &seed_argv, ompi_universe_info.scriptfile);
}
if (NULL != ompi_universe_info.hostfile) {
asprintf(&tmp, "-hostfile %s", ompi_universe_info.hostfile);
ompi_argv_append(&seed_argc, &seed_argv, tmp);
free(tmp);
ompi_argv_append(&seed_argc, &seed_argv, "-hostfile");
ompi_argv_append(&seed_argc, &seed_argv, ompi_universe_info.hostfile);
}
/* provide my contact info */
contact_info = mca_oob_get_contact_info();
asprintf(&tmp, "-initcontact %s", contact_info);
ompi_argv_append(&seed_argc, &seed_argv, tmp);
free(contact_info);
free(tmp);
ompi_argv_append(&seed_argc, &seed_argv, "-initcontact");
ompi_argv_append(&seed_argc, &seed_argv, contact_info);
/* add options for universe name and tmpdir_base, if provided */
asprintf(&tmp, "-universe %s", ompi_universe_info.name);
ompi_argv_append(&seed_argc, &seed_argv, tmp);
free(tmp);
ompi_argv_append(&seed_argc, &seed_argv, "-universe");
ompi_argv_append(&seed_argc, &seed_argv, ompi_universe_info.name);
if (NULL != ompi_process_info.tmpdir_base) {
asprintf(&tmp, "-tmpdir %s", ompi_process_info.tmpdir_base);
ompi_argv_append(&seed_argc, &seed_argv, tmp);
free(tmp);
ompi_argv_append(&seed_argc, &seed_argv, "-tmpdir");
ompi_argv_append(&seed_argc, &seed_argv, ompi_process_info.tmpdir_base);
}
/* mca_pcm_base_build_base_env(environ, &(sched->env)); */
/* fprintf(stderr, "getting ready to disgorge\n"); */
/* tmp2 = seed_argv; */
/* for (i=0; i<seed_argc; i++) { */
/* fprintf(stderr, "i %d %s\n", i, *tmp2); */
/* tmp2++; */
/* } */
/*
* spawn the seed
* spawn the local seed
*/
if (0 > execv("../ompid/.libs/ompid", seed_argv)) {
if (0 > execvp("ompid", seed_argv)) {
fprintf(stderr, "unable to exec daemon - please report error to bugs@open-mpi.org\n");
fprintf(stderr, "errno: %s\n", strerror(errno));
exit(1);

Просмотреть файл

@ -22,6 +22,8 @@ ompi_proc_info_t ompi_process_info = {
/* .pid = */ 0,
/* .name = */ NULL,
/* .seed = */ false,
/* .ns_replica = */ NULL,
/* .gpr_replica = */ NULL,
/* .my_universe */ "default-universe",
/* .tmpdir_base = */ NULL,
/* .top_session_dir = */ NULL,

Просмотреть файл

@ -14,6 +14,10 @@
#include "mca/ns/ns.h"
#ifdef __cplusplus
extern "C" {
#endif
/**
* Process information structure
*
@ -31,6 +35,8 @@ struct ompi_proc_info_t {
pid_t pid; /**< Local process ID for this process */
ompi_process_name_t *name; /**< Process name structure */
bool seed; /**< Indicate whether or not this is seed daemon */
ompi_process_name_t *ns_replica; /**< Name of my name server replica (NULL=>me) */
ompi_process_name_t *gpr_replica; /**< Name of my registry replica (NULL=>me) */
char *my_universe; /**< Name of the universe to which this process belongs */
char *tmpdir_base; /**< Base directory of the session dir tree */
char *top_session_dir; /**< Top-most directory of the session tree */
@ -82,3 +88,7 @@ extern ompi_proc_info_t ompi_process_info;
*/
int ompi_proc_info(void);
#ifdef __cplusplus
}
#endif