Change to use the new opal_fd_*() functions.
This commit was SVN r23451.
Этот коммит содержится в:
родитель
ab5fc1b570
Коммит
3031b59cfe
@ -36,6 +36,7 @@
|
|||||||
#include "opal/class/opal_list.h"
|
#include "opal/class/opal_list.h"
|
||||||
#include "opal/event/event.h"
|
#include "opal/event/event.h"
|
||||||
#include "opal/util/output.h"
|
#include "opal/util/output.h"
|
||||||
|
#include "opal/util/fd.h"
|
||||||
|
|
||||||
#include "ompi/constants.h"
|
#include "ompi/constants.h"
|
||||||
|
|
||||||
@ -118,53 +119,6 @@ static size_t waiting_for_ack_from_main_thread = 0;
|
|||||||
static opal_list_t pending_to_main_thread;
|
static opal_list_t pending_to_main_thread;
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Simple loop over reading from a fd
|
|
||||||
*/
|
|
||||||
static int read_fd(int fd, int len, void *buffer)
|
|
||||||
{
|
|
||||||
int rc;
|
|
||||||
char *b = buffer;
|
|
||||||
|
|
||||||
while (len > 0) {
|
|
||||||
rc = read(fd, b, len);
|
|
||||||
if (rc < 0 && EAGAIN == errno) {
|
|
||||||
continue;
|
|
||||||
} else if (rc > 0) {
|
|
||||||
len -= rc;
|
|
||||||
b += rc;
|
|
||||||
} else {
|
|
||||||
return OMPI_ERROR;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return OMPI_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Simple loop over writing to an fd
|
|
||||||
*/
|
|
||||||
static int write_fd(int fd, int len, void *buffer)
|
|
||||||
{
|
|
||||||
int rc;
|
|
||||||
char *b = buffer;
|
|
||||||
|
|
||||||
while (len > 0) {
|
|
||||||
rc = write(fd, b, len);
|
|
||||||
if (rc < 0 && EAGAIN == errno) {
|
|
||||||
continue;
|
|
||||||
} else if (rc > 0) {
|
|
||||||
len -= rc;
|
|
||||||
b += rc;
|
|
||||||
} else {
|
|
||||||
return OMPI_ERROR;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return OMPI_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Write a command to the main thread, or queue it up if the pipe is full
|
* Write a command to the main thread, or queue it up if the pipe is full
|
||||||
*/
|
*/
|
||||||
@ -188,7 +142,7 @@ static int write_to_main_thread(cmd_t *cmd)
|
|||||||
opal_list_append(&pending_to_main_thread, &(cli->super));
|
opal_list_append(&pending_to_main_thread, &(cli->super));
|
||||||
} else {
|
} else {
|
||||||
OPAL_OUTPUT((-1, "fd: writing to main thread"));
|
OPAL_OUTPUT((-1, "fd: writing to main thread"));
|
||||||
write_fd(pipe_to_main_thread[1], cmd_size, cmd);
|
opal_fd_write(pipe_to_main_thread[1], cmd_size, cmd);
|
||||||
++waiting_for_ack_from_main_thread;
|
++waiting_for_ack_from_main_thread;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -256,7 +210,7 @@ static int service_pipe_cmd_call_function(cmd_t *cmd)
|
|||||||
/* Now ACK that we ran the function */
|
/* Now ACK that we ran the function */
|
||||||
memset(&local_cmd, 0, cmd_size);
|
memset(&local_cmd, 0, cmd_size);
|
||||||
local_cmd.pc_cmd = ACK_RAN_FUNCTION;
|
local_cmd.pc_cmd = ACK_RAN_FUNCTION;
|
||||||
write_fd(pipe_to_main_thread[1], cmd_size, &local_cmd);
|
opal_fd_write(pipe_to_main_thread[1], cmd_size, &local_cmd);
|
||||||
|
|
||||||
/* Done */
|
/* Done */
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
@ -331,7 +285,7 @@ static int main_pipe_cmd_call_function(cmd_t *cmd)
|
|||||||
/* Now ACK that we ran the function */
|
/* Now ACK that we ran the function */
|
||||||
memset(&local_cmd, 0, cmd_size);
|
memset(&local_cmd, 0, cmd_size);
|
||||||
local_cmd.pc_cmd = ACK_RAN_FUNCTION;
|
local_cmd.pc_cmd = ACK_RAN_FUNCTION;
|
||||||
write_fd(pipe_to_service_thread[1], cmd_size, &local_cmd);
|
opal_fd_write(pipe_to_service_thread[1], cmd_size, &local_cmd);
|
||||||
|
|
||||||
/* Done */
|
/* Done */
|
||||||
return OMPI_SUCCESS;
|
return OMPI_SUCCESS;
|
||||||
@ -347,7 +301,7 @@ static bool service_pipe_cmd(void)
|
|||||||
cmd_t cmd;
|
cmd_t cmd;
|
||||||
cmd_list_item_t *cli;
|
cmd_list_item_t *cli;
|
||||||
|
|
||||||
read_fd(pipe_to_service_thread[0], cmd_size, &cmd);
|
opal_fd_read(pipe_to_service_thread[0], cmd_size, &cmd);
|
||||||
switch (cmd.pc_cmd) {
|
switch (cmd.pc_cmd) {
|
||||||
case CMD_ADD_FD:
|
case CMD_ADD_FD:
|
||||||
OPAL_OUTPUT((-1, "fd service thread: CMD_ADD_FD"));
|
OPAL_OUTPUT((-1, "fd service thread: CMD_ADD_FD"));
|
||||||
@ -385,7 +339,7 @@ static bool service_pipe_cmd(void)
|
|||||||
cli = (cmd_list_item_t*) opal_list_remove_first(&pending_to_main_thread);
|
cli = (cmd_list_item_t*) opal_list_remove_first(&pending_to_main_thread);
|
||||||
if (NULL != cli) {
|
if (NULL != cli) {
|
||||||
OPAL_OUTPUT((-1, "sending queued up cmd function to main thread"));
|
OPAL_OUTPUT((-1, "sending queued up cmd function to main thread"));
|
||||||
write_fd(pipe_to_main_thread[1], cmd_size, &(cli->cli_cmd));
|
opal_fd_write(pipe_to_main_thread[1], cmd_size, &(cli->cli_cmd));
|
||||||
OBJ_RELEASE(cli);
|
OBJ_RELEASE(cli);
|
||||||
} else {
|
} else {
|
||||||
--waiting_for_ack_from_main_thread;
|
--waiting_for_ack_from_main_thread;
|
||||||
@ -482,7 +436,7 @@ static void main_thread_event_callback(int fd, short event, void *context)
|
|||||||
cmd_t cmd;
|
cmd_t cmd;
|
||||||
|
|
||||||
OPAL_OUTPUT((-1, "main thread -- reading command"));
|
OPAL_OUTPUT((-1, "main thread -- reading command"));
|
||||||
read_fd(pipe_to_main_thread[0], cmd_size, &cmd);
|
opal_fd_read(pipe_to_main_thread[0], cmd_size, &cmd);
|
||||||
switch (cmd.pc_cmd) {
|
switch (cmd.pc_cmd) {
|
||||||
case CMD_CALL_FUNCTION:
|
case CMD_CALL_FUNCTION:
|
||||||
OPAL_OUTPUT((-1, "fd main thread: calling command"));
|
OPAL_OUTPUT((-1, "fd main thread: calling command"));
|
||||||
@ -576,7 +530,7 @@ int ompi_btl_openib_fd_monitor(int fd, int flags,
|
|||||||
if (OPAL_HAVE_THREADS) {
|
if (OPAL_HAVE_THREADS) {
|
||||||
/* For the threaded version, write a command down the pipe */
|
/* For the threaded version, write a command down the pipe */
|
||||||
OPAL_OUTPUT((-1, "main thread sending monitor fd %d", fd));
|
OPAL_OUTPUT((-1, "main thread sending monitor fd %d", fd));
|
||||||
write_fd(pipe_to_service_thread[1], cmd_size, &cmd);
|
opal_fd_write(pipe_to_service_thread[1], cmd_size, &cmd);
|
||||||
} else {
|
} else {
|
||||||
/* Otherwise, add it directly */
|
/* Otherwise, add it directly */
|
||||||
service_pipe_cmd_add_fd(true, &cmd);
|
service_pipe_cmd_add_fd(true, &cmd);
|
||||||
@ -609,7 +563,7 @@ int ompi_btl_openib_fd_unmonitor(int fd,
|
|||||||
if (OPAL_HAVE_THREADS) {
|
if (OPAL_HAVE_THREADS) {
|
||||||
/* For the threaded version, write a command down the pipe */
|
/* For the threaded version, write a command down the pipe */
|
||||||
OPAL_OUTPUT((-1, "main thread sending unmonitor fd %d", fd));
|
OPAL_OUTPUT((-1, "main thread sending unmonitor fd %d", fd));
|
||||||
write_fd(pipe_to_service_thread[1], cmd_size, &cmd);
|
opal_fd_write(pipe_to_service_thread[1], cmd_size, &cmd);
|
||||||
} else {
|
} else {
|
||||||
/* Otherwise, remove it directly */
|
/* Otherwise, remove it directly */
|
||||||
service_pipe_cmd_remove_fd(&cmd);
|
service_pipe_cmd_remove_fd(&cmd);
|
||||||
@ -635,7 +589,7 @@ int ompi_btl_openib_fd_run_in_service(ompi_btl_openib_fd_main_callback_fn_t *cal
|
|||||||
if (OPAL_HAVE_THREADS) {
|
if (OPAL_HAVE_THREADS) {
|
||||||
/* For the threaded version, write a command down the pipe */
|
/* For the threaded version, write a command down the pipe */
|
||||||
OPAL_OUTPUT((-1, "main thread sending 'run in service'"));
|
OPAL_OUTPUT((-1, "main thread sending 'run in service'"));
|
||||||
write_fd(pipe_to_service_thread[1], cmd_size, &cmd);
|
opal_fd_write(pipe_to_service_thread[1], cmd_size, &cmd);
|
||||||
} else {
|
} else {
|
||||||
/* Otherwise, run it directly */
|
/* Otherwise, run it directly */
|
||||||
callback(context);
|
callback(context);
|
||||||
@ -712,7 +666,7 @@ int ompi_btl_openib_fd_finalize(void)
|
|||||||
opal_event_del(&main_thread_event);
|
opal_event_del(&main_thread_event);
|
||||||
memset(&cmd, 0, cmd_size);
|
memset(&cmd, 0, cmd_size);
|
||||||
cmd.pc_cmd = CMD_TIME_TO_QUIT;
|
cmd.pc_cmd = CMD_TIME_TO_QUIT;
|
||||||
write_fd(pipe_to_service_thread[1], cmd_size, &cmd);
|
opal_fd_write(pipe_to_service_thread[1], cmd_size, &cmd);
|
||||||
|
|
||||||
pthread_join(thread, NULL);
|
pthread_join(thread, NULL);
|
||||||
opal_atomic_rmb();
|
opal_atomic_rmb();
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user