1
1

Checkpoint the console and daemon.

Folks - there appears to be something unreliable about communication with the daemon at the moment. We are trying to track it down. Meantime, please be patient if experimenting with it.

This commit was SVN r2633.
Этот коммит содержится в:
Ralph Castain 2004-09-13 16:51:53 +00:00
родитель 70231457ce
Коммит a14ee7eb48
4 изменённых файлов: 117 добавлений и 35 удалений

Просмотреть файл

@ -156,6 +156,7 @@ int ompi_rte_universe_exists()
/* ...and ping to verify it's alive */ /* ...and ping to verify it's alive */
ping_success = false; ping_success = false;
for (i=0; i<5 && !ping_success; i++) { for (i=0; i<5 && !ping_success; i++) {
ompi_output(0, "univ_exists: attempting ping number %d", i);
if (OMPI_SUCCESS == mca_oob_ping(&proc, &ompi_rte_ping_wait)) { if (OMPI_SUCCESS == mca_oob_ping(&proc, &ompi_rte_ping_wait)) {
ping_success = true; ping_success = true;
} }

Просмотреть файл

@ -21,20 +21,26 @@
#include "tools/ompid/ompid.h" #include "tools/ompid/ompid.h"
static void ompi_console_recv(int status, ompi_process_name_t* sender, #define OMPI_CONSOLE_MAX_LINE_LENGTH 1024
ompi_buffer_t buffer, int tag,
void* cbdata); static void ompi_console_recv(void);
static char *ompi_getinputline(void);
static void ompi_console_sendcmd(ompi_daemon_cmd_flag_t usercmd);
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
int ret, recv_tag; int ret;
ompi_cmd_line_t *cmd_line; ompi_cmd_line_t *cmd_line;
bool allow_multi_user_threads = false; bool allow_multi_user_threads = false;
bool have_hidden_threads = false; bool have_hidden_threads = false;
ompi_buffer_t cmd; bool exit_cmd;
ompi_daemon_cmd_flag_t command; char *usercmd, *str_response;
ompi_buffer_t buffer;
ompi_process_name_t seed={0,0,0}; ompi_process_name_t seed={0,0,0};
int recv_tag;
/* /*
* Intialize the Open MPI environment * Intialize the Open MPI environment
@ -163,19 +169,37 @@ int main(int argc, char *argv[])
return ret; return ret;
} }
/* register the console callback function */ /* /\* register the console callback function *\/ */
ret = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_DAEMON, 0, ompi_console_recv, NULL); /* ret = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_DAEMON, 0, ompi_console_recv, NULL); */
if(ret != OMPI_SUCCESS && ret != OMPI_ERR_NOT_IMPLEMENTED) { /* if(ret != OMPI_SUCCESS && ret != OMPI_ERR_NOT_IMPLEMENTED) { */
printf("daemon callback not registered: error code %d", ret); /* printf("daemon callback not registered: error code %d", ret); */
return ret; /* return ret; */
} /* } */
fprintf (stderr, "issuing exit cmd\n"); exit_cmd = false;
ompi_buffer_init(&cmd, 0); while (!exit_cmd) {
command = OMPI_DAEMON_EXIT_CMD; printf("ompiconsole> ");
recv_tag = MCA_OOB_TAG_DAEMON; usercmd = ompi_getinputline();
ompi_pack(cmd, &command, 1, OMPI_DAEMON_OOB_PACK_CMD); if (0 == strncmp(usercmd, "exit", strlen("exit"))) {
mca_oob_send_packed(&seed, cmd, MCA_OOB_TAG_DAEMON, 0); exit_cmd = true;
ompi_console_sendcmd(OMPI_DAEMON_EXIT_CMD);
} else if (0 == strncmp(usercmd, "contactinfo", strlen("contactinfo"))) {
ompi_console_sendcmd(OMPI_DAEMON_CONTACT_QUERY_CMD);
if (0 > mca_oob_recv_packed(&seed, &buffer, &recv_tag)) {
printf("****got a bad response\n");
} else {
if (0 > ompi_unpack_string(buffer, &str_response)) {
printf("****couldn't decode answer\n");
} else {
printf(str_response);
printf("\n");
}
}
} else {
printf("huh???\n");
}
}
ompi_rte_finalize(); ompi_rte_finalize();
mca_base_close(); mca_base_close();
@ -183,22 +207,38 @@ int main(int argc, char *argv[])
return 0; return 0;
} }
static void ompi_console_recv(int status, ompi_process_name_t* sender,
ompi_buffer_t buffer, int tag, static void ompi_console_sendcmd(ompi_daemon_cmd_flag_t usercmd)
void* cbdata)
{ {
ompi_buffer_t cmd;
ompi_daemon_cmd_flag_t command; ompi_daemon_cmd_flag_t command;
int recv_tag;
ompi_process_name_t seed={0,0,0};
ompi_buffer_init(&cmd, 0);
command = usercmd;
recv_tag = MCA_OOB_TAG_DAEMON;
ompi_pack(cmd, &command, 1, OMPI_DAEMON_OOB_PACK_CMD);
mca_oob_send_packed(&seed, cmd, MCA_OOB_TAG_DAEMON, 0);
ompi_buffer_free(cmd);
}
static void ompi_console_recv(void)
{
int32_t num_bytes, i; int32_t num_bytes, i;
uint8_t *outbytes; uint8_t *outbytes;
ompi_buffer_t buffer;
ompi_process_name_t seed={0,0,0};
int recv_tag;
printf("console - message received from [%d,%d,%d]\n", sender->cellid,
sender->jobid, sender->vpid);
if (OMPI_SUCCESS != ompi_unpack(buffer, &num_bytes, 1, OMPI_INT32)) { if (OMPI_SUCCESS != ompi_unpack(buffer, &num_bytes, 1, OMPI_INT32)) {
printf("\terror unpacking number of bytes\n"); printf("\terror unpacking number of bytes\n");
return; return;
} }
if (0 < num_bytes) {
outbytes = (uint8_t*)malloc(num_bytes); outbytes = (uint8_t*)malloc(num_bytes);
if (OMPI_SUCCESS != ompi_unpack(buffer, &outbytes, num_bytes, OMPI_BYTE)) { if (OMPI_SUCCESS != ompi_unpack(buffer, &outbytes, num_bytes, OMPI_BYTE)) {
@ -206,11 +246,32 @@ static void ompi_console_recv(int status, ompi_process_name_t* sender,
return; return;
} }
fprintf(stderr, "unpacked the bytes\n");
for (i=0; i<num_bytes; i++) { for (i=0; i<num_bytes; i++) {
printf("%c", outbytes[i]); printf("%c", outbytes[i]);
} }
free(outbytes); free(outbytes);
} else {
printf("got zero bytes back\n");
}
ompi_buffer_free(buffer); ompi_buffer_free(buffer);
return; return;
} }
char *ompi_getinputline()
{
char *ret, *buff;
char input[OMPI_CONSOLE_MAX_LINE_LENGTH];
ret = fgets(input, OMPI_CONSOLE_MAX_LINE_LENGTH, stdin);
if (NULL != ret) {
input[strlen(input)-1] = '\0'; /* remove newline */
buff = strdup(input);
return buff;
}
return NULL;
}

Просмотреть файл

@ -312,6 +312,9 @@ static void ompi_daemon_recv(int status, ompi_process_name_t* sender,
{ {
ompi_buffer_t answer; ompi_buffer_t answer;
ompi_daemon_cmd_flag_t command; ompi_daemon_cmd_flag_t command;
int ret;
int32_t str_len;
char *contact_info;
OMPI_THREAD_LOCK(&ompi_daemon_mutex); OMPI_THREAD_LOCK(&ompi_daemon_mutex);
@ -333,8 +336,24 @@ static void ompi_daemon_recv(int status, ompi_process_name_t* sender,
ompi_daemon_exit_condition = true; ompi_daemon_exit_condition = true;
ompi_condition_signal(&ompi_daemon_condition); ompi_condition_signal(&ompi_daemon_condition);
} else if (OMPI_DAEMON_HEARTBEAT_CMD == command) { /**** CONTACT QUERY COMMAND ****/
/* send back an "i'm alive" message */ } else if (OMPI_DAEMON_CONTACT_QUERY_CMD == command) {
/* send back contact info */
contact_info = mca_oob_get_contact_info();
if (NULL != contact_info) {
if (OMPI_SUCCESS != ompi_pack_string(answer, contact_info)) {
/* RHC -- not sure what to do if this fails */
}
if (0 > (ret = mca_oob_send_packed(sender, answer, tag, 0))) {
if (ompi_daemon_debug) {
ompi_output(0, "ompid_recv: send failed with return %d", ret);
}
}
}
} }
RETURN_ERROR: RETURN_ERROR:

Просмотреть файл

@ -18,6 +18,7 @@
#define OMPI_DAEMON_HOSTFILE_CMD 0x01 #define OMPI_DAEMON_HOSTFILE_CMD 0x01
#define OMPI_DAEMON_SCRIPTFILE_CMD 0x02 #define OMPI_DAEMON_SCRIPTFILE_CMD 0x02
#define OMPI_DAEMON_CONTACT_QUERY_CMD 0x03
#define OMPI_DAEMON_HEARTBEAT_CMD 0xfe #define OMPI_DAEMON_HEARTBEAT_CMD 0xfe
#define OMPI_DAEMON_EXIT_CMD 0xff #define OMPI_DAEMON_EXIT_CMD 0xff