1
1

A bunch of minor changes, mostly adding diagnostics. Just wanted to checkpoint so I can start fresh since there now seem to be problems in the tree with mpirun2.

Fixed ompid so it reissues the non-blocking receive - should now be close to ready for primetime. Fixed some logic in the svc framework that wasn't checking properly for action flags. 

This commit was SVN r2660.
Этот коммит содержится в:
Ralph Castain 2004-09-14 14:21:04 +00:00
родитель 6582ca7c0a
Коммит 069682e046
11 изменённых файлов: 148 добавлений и 65 удалений

Просмотреть файл

@ -1,6 +1,7 @@
/*
* $HEADER$
*/
/** @file **/
#ifndef MCA_SVC_BASE_H
#define MCA_SVC_BASE_H

Просмотреть файл

@ -101,9 +101,9 @@ static void mca_svc_sched_registry_callback(
&proc_name);
/* do the right thing based on the trigger type */
switch(msg->trig_action) {
case OMPI_REGISTRY_NOTIFY_MODIFICATION:
case OMPI_REGISTRY_NOTIFY_ADD_ENTRY:
if ((OMPI_REGISTRY_NOTIFY_MODIFICATION & msg->trig_action) ||
(OMPI_REGISTRY_NOTIFY_ADD_ENTRY & msg->trig_action) ||
(OMPI_REGISTRY_NOTIFY_PRE_EXISTING & msg->trig_action)) {
/* create or modify the corresponding daemon entry */
if(node == NULL) {
node = OBJ_NEW(mca_svc_sched_node_t);
@ -112,8 +112,7 @@ static void mca_svc_sched_registry_callback(
ompi_list_append(&mca_svc_sched_component.sched_node_list, &node->node_item);
}
mca_svc_sched_node_set(node,hostname,contact_info,proc_slots);
break;
case OMPI_REGISTRY_NOTIFY_DELETE_ENTRY:
} else if (OMPI_REGISTRY_NOTIFY_DELETE_ENTRY & msg->trig_action) {
/* delete the corresponding deamon entry */
if(node != NULL) {
ompi_list_item_t* next;
@ -123,8 +122,7 @@ static void mca_svc_sched_registry_callback(
if(mca_svc_sched_component.sched_node_next == node)
mca_svc_sched_component.sched_node_next = (mca_svc_sched_node_t*)next;
}
break;
}
}
/* cleanup */
if(hostname != NULL)
@ -157,7 +155,7 @@ int mca_svc_sched_module_init(mca_svc_base_module_t* module)
OMPI_REGISTRY_NOTIFY_ADD_ENTRY|
OMPI_REGISTRY_NOTIFY_DELETE_ENTRY|
OMPI_REGISTRY_NOTIFY_PRE_EXISTING,
"vm", /* segment */
"ompi-vm", /* segment */
NULL, /* keys */
mca_svc_sched_registry_callback,
NULL);

Просмотреть файл

@ -1,6 +1,7 @@
/*
* $HEADER$
*/
/** @file **/
#ifndef MCA_SVC_H
#define MCA_SVC_H

Просмотреть файл

@ -134,6 +134,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
goto error;
}
ompi_output(0, "proc sessiondir(1): %s", ompi_process_info.proc_session_dir);
/*
* Register my process info with my replica.
*/
@ -142,6 +143,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
goto error;
}
ompi_output(0, "proc sessiondir(2): %s", ompi_process_info.proc_session_dir);
/* finalize the rte startup */
if (OMPI_SUCCESS != (ret = ompi_rte_init_finalstage(&allow_multi_user_threads,
&have_hidden_threads))) {
@ -149,6 +152,8 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
goto error;
}
ompi_output(0, "proc sessiondir(3): %s", ompi_process_info.proc_session_dir);
/* Once we've joined the RTE, see if any MCA parameters were
passed to the MPI level */
@ -156,6 +161,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
error = "mca_mpi_register_params() failed";
goto error;
}
ompi_output(0, "proc sessiondir(4): %s", ompi_process_info.proc_session_dir);
/* initialize ompi procs */
if (OMPI_SUCCESS != (ret = ompi_proc_init())) {
@ -335,6 +341,7 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided)
}
/* All done */
ompi_output(0, "proc sessiondir(end): %s", ompi_process_info.proc_session_dir);
ompi_mpi_initialized = true;
ompi_mpi_finalized = false;

Просмотреть файл

@ -29,6 +29,7 @@ libruntime_la_SOURCES = \
ompi_rte_llm.c \
ompi_rte_monitor.c \
ompi_rte_cmd_line_setup.c \
ompi_vm_register.c \
universe_exists.c \
ompi_rte_parse_environ.c \
ompi_rte_parse_cmd_line.c \

Просмотреть файл

@ -1,3 +1,76 @@
/*
* $HEADER$
*/
/** @file **/
#include "ompi_config.h"
#include <stdio.h>
#include "include/constants.h"
#include "util/output.h"
#include "util/sys_info.h"
#include "util/proc_info.h"
#include "util/pack.h"
#include "mca/oob/base/base.h"
#include "mca/ns/base/base.h"
#include "mca/gpr/base/base.h"
#include "runtime/runtime.h"
int ompi_vm_register()
{
ompi_buffer_t buffer;
int ret_code;
int32_t num;
char *keys[2];
if (OMPI_SUCCESS != ompi_buffer_init(&buffer, 0)) {
ret_code = OMPI_ERROR;
return OMPI_ERROR;
}
if (OMPI_SUCCESS != ompi_pack_string(buffer, ompi_system_info.nodename)) {
ret_code = OMPI_ERROR;
goto ERROR;
}
if (OMPI_SUCCESS != ompi_pack(buffer, ompi_process_info.name, 1, OMPI_NAME)) {
ret_code = OMPI_ERROR;
goto ERROR;
}
if (OMPI_SUCCESS != ompi_pack_string(buffer, mca_oob_get_contact_info())) {
ret_code = OMPI_ERROR;
goto ERROR;
}
if (0 == strncmp(ompi_system_info.sysname, "Darwin", strlen("Darwin"))) {
num = 1;
} else if (0 == strncmp(ompi_system_info.sysname, "Linux", strlen("Linux"))) {
/* get it from proc/cpuinfo */
num = 1;
} else {
num = 1;
}
if (OMPI_SUCCESS != ompi_pack(buffer, &num, 1, OMPI_INT32)) {
ret_code = OMPI_ERROR;
goto ERROR;
}
keys[0] = ompi_name_server.get_proc_name_string(ompi_process_info.name);
keys[1] = NULL;
ret_code = ompi_registry.put(OMPI_REGISTRY_XAND, "ompi-vm", keys, buffer, sizeof(buffer));
ERROR:
ompi_buffer_free(buffer);
return ret_code;
}
/* under windows, there is a SystemInfo structure that contains following info:
@ -33,3 +106,4 @@ Private m_typSystemInfo As SYSTEM_INFO
Not sure what to do about Mac yet...
*/

Просмотреть файл

@ -304,6 +304,13 @@ extern "C" {
*/
void ompi_rte_parse_environ(void);
/**
* Register a daemon on the virtual machine segment.
*/
int ompi_vm_register(void);
#ifdef __cplusplus
}
#endif

Просмотреть файл

@ -170,6 +170,8 @@ int ompi_rte_universe_exists()
/* set the my_universe field */
ompi_process_info.my_universe = strdup(ompi_universe_info.name);
ompi_process_info.ns_replica = ns_base_copy_process_name(&proc);
ompi_process_info.gpr_replica = ns_base_copy_process_name(&proc);
/* request ns_replica and gpr_replica info for this process
* only request info required - check ns_found/gpr_found

Просмотреть файл

@ -18,13 +18,12 @@
#include "mca/base/base.h"
#include "mca/oob/base/base.h"
#include "mca/ns/base/base.h"
#include "mca/gpr/base/base.h"
#include "tools/ompid/ompid.h"
#define OMPI_CONSOLE_MAX_LINE_LENGTH 1024
static void ompi_console_recv(void);
static char *ompi_getinputline(void);
static void ompi_console_sendcmd(ompi_daemon_cmd_flag_t usercmd);
@ -32,6 +31,14 @@ static void ompi_console_sendcmd(ompi_daemon_cmd_flag_t usercmd);
int main(int argc, char *argv[])
{
ompi_list_t *list;
ompi_list_item_t *item;
ompi_registry_value_t *value;
char *hostname, *contact_info;
ompi_process_name_t proc_name;
int32_t proc_slots;
mca_ns_base_jobid_t jobid;
mca_ns_base_vpid_t vpid;
int ret;
ompi_cmd_line_t *cmd_line;
bool allow_multi_user_threads = false;
@ -51,11 +58,6 @@ int main(int argc, char *argv[])
return ret;
}
/* get the system info and setup defaults */
ompi_sys_info();
ompi_universe_info.host = strdup(ompi_system_info.nodename);
ompi_universe_info.uid = strdup(ompi_system_info.user);
/* setup to read common command line options that span all Open MPI programs */
cmd_line = OBJ_NEW(ompi_cmd_line_t);
@ -153,23 +155,23 @@ int main(int argc, char *argv[])
return ret;
}
/***** SET MY NAME *****/
if (NULL == ompi_process_info.name) { /* don't overwrite an existing name */
if (ompi_process_info.seed) {
ompi_process_info.name = ompi_name_server.create_process_name(0, 0, 0);
} else {
ompi_process_info.name = ompi_rte_get_self();
}
}
/* finalize the rte startup */
/* finalize the rte startup */
if (OMPI_SUCCESS != (ret = ompi_rte_init_finalstage(&allow_multi_user_threads,
&have_hidden_threads))) {
printf("failed to finalize the rte startup\n");
return ret;
}
/* /\* register the console callback function *\/ */
/***** SET MY NAME *****/
/* jobid = ompi_name_server.create_jobid(); */
/* vpid = ompi_name_server.reserve_range(jobid, 1); */
/* ompi_process_info.name = ompi_name_server.create_process_name(0, jobid, vpid); */
/* fprintf(stderr, "my name: [%d,%d,%d]\n", ompi_process_info.name->cellid, */
/* ompi_process_info.name->jobid, ompi_process_info.name->vpid); */
/* /\* register the console callback function *\/ */
/* ret = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_DAEMON, 0, ompi_console_recv, NULL); */
/* if(ret != OMPI_SUCCESS && ret != OMPI_ERR_NOT_IMPLEMENTED) { */
/* printf("daemon callback not registered: error code %d", ret); */
@ -196,6 +198,24 @@ int main(int argc, char *argv[])
printf("\n");
}
}
} else if (0 == strncmp(usercmd, "dumpvm", strlen("dumpvm"))) {
fprintf(stderr, "getting vm list\n");
list = ompi_registry.get(OMPI_REGISTRY_OR, "ompi-vm", NULL);
fprintf(stderr, "got vm list: length %d\n", (int)ompi_list_get_size(list));
for (item = ompi_list_get_first(list);
item != ompi_list_get_end(list);
item = ompi_list_get_next(item)) {
value = (ompi_registry_value_t*)item;
buffer = (ompi_buffer_t)value->object;
ompi_unpack_string(buffer, &hostname);
ompi_unpack(buffer, &proc_name, 1, OMPI_NAME);
ompi_unpack_string(buffer, &contact_info);
ompi_unpack(buffer, &proc_slots, 1, OMPI_INT32);
printf("host: %s\n", hostname);
printf("proc: [%d,%d,%d]\n", proc_name.cellid, proc_name.jobid, proc_name.vpid);
printf("cont: %s\n", contact_info);
printf("slot: %d\n\n", proc_slots);
}
} else {
printf("huh???\n");
}
@ -223,44 +243,6 @@ static void ompi_console_sendcmd(ompi_daemon_cmd_flag_t usercmd)
ompi_buffer_free(cmd);
}
static void ompi_console_recv(void)
{
int32_t num_bytes, i;
uint8_t *outbytes;
ompi_buffer_t buffer;
ompi_process_name_t seed={0,0,0};
int recv_tag;
if (OMPI_SUCCESS != ompi_unpack(buffer, &num_bytes, 1, OMPI_INT32)) {
printf("\terror unpacking number of bytes\n");
return;
}
if (0 < num_bytes) {
outbytes = (uint8_t*)malloc(num_bytes);
if (OMPI_SUCCESS != ompi_unpack(buffer, &outbytes, num_bytes, OMPI_BYTE)) {
printf("\terror unpacking number of bytes\n");
return;
}
fprintf(stderr, "unpacked the bytes\n");
for (i=0; i<num_bytes; i++) {
printf("%c", outbytes[i]);
}
free(outbytes);
} else {
printf("got zero bytes back\n");
}
ompi_buffer_free(buffer);
return;
}
char *ompi_getinputline()
{
char *ret, *buff;

Просмотреть файл

@ -256,7 +256,7 @@ int main(int argc, char *argv[])
/* register this node on the virtual machine */
/* ompi_vm_register(); */
ompi_vm_register();
if (ompi_daemon_debug) {
ompi_output(0, "[%d,%d,%d] ompid: issuing callback", ompi_process_info.name->cellid,
@ -357,6 +357,12 @@ static void ompi_daemon_recv(int status, ompi_process_name_t* sender,
}
RETURN_ERROR:
/* reissue the non-blocking receive */
ret = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_DAEMON, 0, ompi_daemon_recv, NULL);
if(ret != OMPI_SUCCESS && ret != OMPI_ERR_NOT_IMPLEMENTED) {
ompi_output(0, "daemon callback not registered: error code %d", ret);
return;
}
OMPI_THREAD_UNLOCK(&ompi_daemon_mutex);
return;

Просмотреть файл

@ -12,6 +12,8 @@
#include "ompi_config.h"
#include "include/constants.h"
#include "util/output.h"
#include "util/sys_info.h"
ompi_sys_info_t ompi_system_info = {
@ -64,6 +66,8 @@ int ompi_sys_info(void)
ompi_system_info.release = strdup(sys_info.release);
ompi_system_info.version = strdup(sys_info.version);
ompi_system_info.machine = strdup(sys_info.machine);
ompi_output(0, "sysname: %s", sys_info.sysname);
}
if (NULL != (path_name = getcwd(NULL, 0))) {