1
1

orte/iof: Generalize the fix related to always-ready fds

Reference: https://bugzilla.kernel.org/show_bug.cgi?id=15272.
Work with both stdin/stdout fds that are known to be always
ready using libevent timers.
Such fds can not be effectively used with non-blocking I/O
functions like epoll, poll, select:
- for poll/select the event will be triggered immediately;
- for epoll `epoll_ctl` will reject an attempt to add this
fd to the working set.

Reference: http://www.wangafu.net/~nickm/libevent-book/Ref4_event.html
Libevent suggests to use timers over event_active for the
reasons provided by the link above.

Signed-off-by: Artem Polyakov <artpol84@gmail.com>
Этот коммит содержится в:
Artem Polyakov 2017-06-29 14:15:51 +07:00
родитель d9ad918a14
Коммит 374c824a5c
8 изменённых файлов: 174 добавлений и 147 удалений

Просмотреть файл

@ -55,6 +55,7 @@
#include "orte/runtime/orte_globals.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/util/threads.h"
#include "orte/mca/errmgr/errmgr.h"
BEGIN_C_DECLS
@ -88,6 +89,7 @@ typedef struct {
bool pending;
bool always_writable;
opal_event_t *ev;
struct timeval tv;
int fd;
opal_list_t outputs;
} orte_iof_write_event_t;
@ -109,9 +111,11 @@ typedef struct {
opal_object_t super;
struct orte_iof_proc_t *proc;
opal_event_t *ev;
struct timeval tv;
int fd;
orte_iof_tag_t tag;
bool active;
bool always_readable;
orte_iof_sink_t *sink;
} orte_iof_read_event_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t);
@ -145,64 +149,120 @@ struct orte_iof_base_t {
};
typedef struct orte_iof_base_t orte_iof_base_t;
/* Write event macro's */
static inline bool
orte_iof_base_fd_always_ready(int fd)
{
return opal_fd_is_regular(fd) ||
(opal_fd_is_chardev(fd) && !isatty(fd)) ||
opal_fd_is_blkdev(fd);
}
#define ORTE_IOF_SINK_BLOCKSIZE (1024)
#define ORTE_IOF_SINK_ACTIVATE(wev) \
do { \
struct timeval *tv = NULL; \
wev->pending = true; \
ORTE_POST_OBJECT(wev); \
if (wev->always_writable) { \
/* Regular is always write ready. Use timer to activate */ \
tv = &wev->tv; \
} \
if (opal_event_add(wev->ev, tv)) { \
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); \
} \
} while(0);
/* define an output "sink", adding it to the provided
* endpoint list for this proc */
#define ORTE_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr) \
do { \
orte_iof_sink_t *ep; \
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, \
"defining endpt: file %s line %d fd %d",\
__FILE__, __LINE__, (fid))); \
ep = OBJ_NEW(orte_iof_sink_t); \
ep->name.jobid = (nm)->jobid; \
ep->name.vpid = (nm)->vpid; \
ep->tag = (tg); \
if (0 <= (fid)) { \
ep->wev->fd = (fid); \
ep->wev->always_writable = opal_fd_is_regular(fid) || \
opal_fd_is_chardev(fid) || \
opal_fd_is_blkdev(fid); \
opal_event_set(orte_event_base, \
ep->wev->ev, ep->wev->fd, \
OPAL_EV_WRITE, \
wrthndlr, ep); \
opal_event_set_priority(ep->wev->ev, ORTE_MSG_PRI); \
} \
*(snk) = ep; \
ORTE_POST_OBJECT(ep); \
#define ORTE_IOF_SINK_DEFINE(snk, nm, fid, tg, wrthndlr) \
do { \
orte_iof_sink_t *ep; \
OPAL_OUTPUT_VERBOSE((1, \
orte_iof_base_framework.framework_output, \
"defining endpt: file %s line %d fd %d", \
__FILE__, __LINE__, (fid))); \
ep = OBJ_NEW(orte_iof_sink_t); \
ep->name.jobid = (nm)->jobid; \
ep->name.vpid = (nm)->vpid; \
ep->tag = (tg); \
if (0 <= (fid)) { \
ep->wev->fd = (fid); \
ep->wev->always_writable = \
orte_iof_base_fd_always_ready(fid); \
if(ep->wev->always_writable) { \
opal_event_evtimer_set(orte_event_base, \
ep->wev->ev, wrthndlr, ep); \
} else { \
opal_event_set(orte_event_base, \
ep->wev->ev, ep->wev->fd, \
OPAL_EV_WRITE, \
wrthndlr, ep); \
} \
opal_event_set_priority(ep->wev->ev, ORTE_MSG_PRI); \
} \
*(snk) = ep; \
ORTE_POST_OBJECT(ep); \
} while(0);
/* Read event macro's */
#define ORTE_IOF_READ_ADDEV(rev) \
do { \
struct timeval *tv = NULL; \
if (rev->always_readable) { \
tv = &rev->tv; \
} \
if (opal_event_add(rev->ev, tv)) { \
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); \
} \
} while(0);
#define ORTE_IOF_READ_ACTIVATE(rev) \
do { \
rev->active = true; \
ORTE_POST_OBJECT(rev); \
ORTE_IOF_READ_ADDEV(rev); \
} while(0);
/* add list of structs that has name of proc + orte_iof_tag_t - when
* defining a read event, search list for proc, add flag to the tag.
* when closing a read fd, find proc on list and zero out that flag
* when all flags = 0, then iof is complete - set message event to
* daemon processor indicating proc iof is terminated
*/
#define ORTE_IOF_READ_EVENT(rv, p, fid, tg, cbfunc, actv) \
do { \
orte_iof_read_event_t *rev; \
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output, \
"%s defining read event for %s: %s %d", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
ORTE_NAME_PRINT(&(p)->name), \
__FILE__, __LINE__)); \
rev = OBJ_NEW(orte_iof_read_event_t); \
OBJ_RETAIN((p)); \
rev->proc = (struct orte_iof_proc_t*)(p); \
rev->tag = (tg); \
rev->fd = (fid); \
*(rv) = rev; \
opal_event_set(orte_event_base, \
rev->ev, (fid), \
OPAL_EV_READ, \
(cbfunc), rev); \
opal_event_set_priority(rev->ev, ORTE_MSG_PRI); \
if ((actv)) { \
rev->active = true; \
ORTE_POST_OBJECT(rev); \
opal_event_add(rev->ev, 0); \
} \
#define ORTE_IOF_READ_EVENT(rv, p, fid, tg, cbfunc, actv) \
do { \
orte_iof_read_event_t *rev; \
OPAL_OUTPUT_VERBOSE((1, \
orte_iof_base_framework.framework_output, \
"%s defining read event for %s: %s %d", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
ORTE_NAME_PRINT(&(p)->name), \
__FILE__, __LINE__)); \
rev = OBJ_NEW(orte_iof_read_event_t); \
OBJ_RETAIN((p)); \
rev->proc = (struct orte_iof_proc_t*)(p); \
rev->tag = (tg); \
rev->fd = (fid); \
rev->always_readable = orte_iof_base_fd_always_ready(fid); \
*(rv) = rev; \
if(rev->always_readable) { \
opal_event_evtimer_set(orte_event_base, \
rev->ev, (cbfunc), rev); \
} else { \
opal_event_set(orte_event_base, \
rev->ev, (fid), \
OPAL_EV_READ, \
(cbfunc), rev); \
} \
opal_event_set_priority(rev->ev, ORTE_MSG_PRI); \
if ((actv)) { \
ORTE_IOF_READ_ACTIVATE(rev) \
} \
} while(0);

Просмотреть файл

@ -270,6 +270,8 @@ static void orte_iof_base_read_event_construct(orte_iof_read_event_t* rev)
rev->active = false;
rev->ev = opal_event_alloc();
rev->sink = NULL;
rev->tv.tv_sec = 0;
rev->tv.tv_usec = 0;
}
static void orte_iof_base_read_event_destruct(orte_iof_read_event_t* rev)
{
@ -303,6 +305,8 @@ static void orte_iof_base_write_event_construct(orte_iof_write_event_t* wev)
wev->fd = -1;
OBJ_CONSTRUCT(&wev->outputs, opal_list_t);
wev->ev = opal_event_alloc();
wev->tv.tv_sec = 0;
wev->tv.tv_usec = 0;
}
static void orte_iof_base_write_event_destruct(orte_iof_write_event_t* wev)
{

Просмотреть файл

@ -260,22 +260,11 @@ int orte_iof_base_write_output(const orte_process_name_t *name, orte_iof_tag_t s
/* is the write event issued? */
if (!channel->pending) {
int rc = -1;
/* issue it */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s write:output adding write event",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
channel->pending = true;
ORTE_POST_OBJECT(channel);
if (channel->always_writable) {
/* Regular is always write ready. Activate the handler. */
opal_event_active (channel->ev, OPAL_EV_WRITE, 1);
} else {
rc = opal_event_add(channel->ev, 0);
if (rc) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
}
}
ORTE_IOF_SINK_ACTIVATE(channel);
}
return num_buffered;
@ -307,8 +296,7 @@ void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev)
}
}
#define ORTE_IOF_REGULARF_BLOCK (1024)
void orte_iof_base_write_handler(int fd, short event, void *cbdata)
void orte_iof_base_write_handler(int _fd, short event, void *cbdata)
{
orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
orte_iof_write_event_t *wev = sink->wev;
@ -344,11 +332,7 @@ void orte_iof_base_write_handler(int fd, short event, void *cbdata)
/* leave the write event running so it will call us again
* when the fd is ready.
*/
if(wev->always_writable){
/* Schedule another event */
opal_event_active (wev->ev, OPAL_EV_WRITE, 1);
}
return;
goto NEXT_CALL;
}
/* otherwise, something bad happened so all we can do is abort
* this attempt
@ -371,29 +355,23 @@ void orte_iof_base_write_handler(int fd, short event, void *cbdata)
/* leave the write event running so it will call us again
* when the fd is ready
*/
if(wev->always_writable){
/* Schedule another event */
opal_event_active (wev->ev, OPAL_EV_WRITE, 1);
}
return;
goto NEXT_CALL;
}
OBJ_RELEASE(output);
total_written += num_written;
if(wev->always_writable && (ORTE_IOF_REGULARF_BLOCK <= total_written)){
if(wev->always_writable && (ORTE_IOF_SINK_BLOCKSIZE <= total_written)){
/* If this is a regular file it will never tell us it will block
* Write no more than ORTE_IOF_REGULARF_BLOCK at a time allowing
* other fds to progress
*/
opal_event_active (wev->ev, OPAL_EV_WRITE, 1);
return;
goto NEXT_CALL;
}
}
ABORT:
if (!wev->always_writable){
opal_event_del(wev->ev);
}
wev->pending = false;
ORTE_POST_OBJECT(wev);
return;
NEXT_CALL:
ORTE_IOF_SINK_ACTIVATE(wev);
}

Просмотреть файл

@ -16,6 +16,7 @@
* Copyright (c) 2014-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2017 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -214,16 +215,10 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
}
}
}
proct->revstdout->active = true;
ORTE_POST_OBJECT(proct->revstdout);
opal_event_add(proct->revstdout->ev, 0);
proct->revstderr->active = true;
ORTE_POST_OBJECT(proct->revstderr);
opal_event_add(proct->revstderr->ev, 0);
proct->revstddiag->active = true;
ORTE_POST_OBJECT(proct->revstddiag);
opal_event_add(proct->revstddiag->ev, 0);
}
ORTE_IOF_READ_ACTIVATE(proct->revstdout);
ORTE_IOF_READ_ACTIVATE(proct->revstderr);
ORTE_IOF_READ_ACTIVATE(proct->revstddiag);
}
return ORTE_SUCCESS;
}
@ -302,9 +297,7 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
* but may delay its activation
*/
if (!(src_tag & ORTE_IOF_STDIN) || orte_iof_hnp_stdin_check(fd)) {
mca_iof_hnp_component.stdinev->active = true;
ORTE_POST_OBJECT(proct->revstdout);
opal_event_add(mca_iof_hnp_component.stdinev->ev, 0);
ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
}
} else {
/* if we are not looking at a tty, just setup a read event
@ -518,7 +511,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
orte_iof_write_event_t *wev = sink->wev;
opal_list_item_t *item;
orte_iof_write_output_t *output;
int num_written;
int num_written, total_written = 0;
ORTE_ACQUIRE_OBJECT(sink);
@ -545,12 +538,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
"%s iof:hnp closing fd %d on write event due to zero bytes output",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
OBJ_RELEASE(wev);
sink->wev = NULL;
/* just leave - we don't want to restart the
* read event!
*/
return;
goto finish;
}
num_written = write(wev->fd, output->data, output->numbytes);
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
@ -564,10 +552,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* leave the write event running so it will call us again
* when the fd is ready.
*/
wev->pending = true;
ORTE_POST_OBJECT(wev);
opal_event_add(wev->ev, 0);
goto CHECK;
goto re_enter;
}
/* otherwise, something bad happened so all we can do is declare an
* error and abort
@ -576,9 +561,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
"%s iof:hnp closing fd %d on write event due to negative bytes written",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), wev->fd));
OBJ_RELEASE(wev);
sink->wev = NULL;
return;
goto finish;
} else if (num_written < output->numbytes) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s hnp:stdin:write:handler incomplete write %d - adjusting data",
@ -590,15 +573,19 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* leave the write event running so it will call us again
* when the fd is ready.
*/
wev->pending = true;
ORTE_POST_OBJECT(wev);
opal_event_add(wev->ev, 0);
goto CHECK;
goto re_enter;
}
OBJ_RELEASE(output);
}
CHECK:
total_written += num_written;
if ((ORTE_IOF_SINK_BLOCKSIZE <= total_written) && wev->always_writable) {
goto re_enter;
}
}
goto check;
re_enter:
ORTE_IOF_SINK_ACTIVATE(wev);
check:
if (NULL != mca_iof_hnp_component.stdinev &&
!orte_abnormal_term_ordered &&
!mca_iof_hnp_component.stdinev->active) {
@ -618,11 +605,14 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* restart the read */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"restarting read event"));
mca_iof_hnp_component.stdinev->active = true;
ORTE_POST_OBJECT(mca_iof_hnp_component.stdinev);
opal_event_add(mca_iof_hnp_component.stdinev->ev, 0);
ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
}
}
return;
finish:
OBJ_RELEASE(wev);
sink->wev = NULL;
return;
}
static int hnp_output(const orte_process_name_t* peer,

Просмотреть файл

@ -13,6 +13,7 @@
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2017 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -54,9 +55,7 @@ static void restart_stdin(int fd, short event, void *cbdata)
if (NULL != mca_iof_hnp_component.stdinev &&
!orte_job_term_ordered &&
!mca_iof_hnp_component.stdinev->active) {
mca_iof_hnp_component.stdinev->active = true;
ORTE_POST_OBJECT(mca_iof_hnp_component.stdinev);
opal_event_add(mca_iof_hnp_component.stdinev->ev, 0);
ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
}
/* if this was a timer callback, then release the timer */
@ -85,9 +84,9 @@ void orte_iof_hnp_stdin_cb(int fd, short event, void *cbdata)
should_process = orte_iof_hnp_stdin_check(0);
if (should_process) {
mca_iof_hnp_component.stdinev->active = true;
opal_event_add(mca_iof_hnp_component.stdinev->ev, 0);
ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
} else {
opal_event_del(mca_iof_hnp_component.stdinev->ev);
mca_iof_hnp_component.stdinev->active = false;
}
@ -109,6 +108,11 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
ORTE_ACQUIRE_OBJECT(rev);
/* As we may use timer events, fd can be bogus (-1)
* use the right one here
*/
fd = rev->fd;
/* read up to the fragment size */
numbytes = read(fd, data, sizeof(data));
@ -123,7 +127,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
/* non-blocking, retry */
if (EAGAIN == errno || EINTR == errno) {
opal_event_add(rev->ev, 0);
ORTE_IOF_READ_ACTIVATE(rev);
return;
}
@ -303,8 +307,6 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
}
/* re-add the event */
ORTE_POST_OBJECT(rev);
opal_event_add(rev->ev, 0);
ORTE_IOF_READ_ACTIVATE(rev);
return;
}

Просмотреть файл

@ -13,6 +13,7 @@
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2017 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -81,9 +82,7 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
if (NULL != mca_iof_hnp_component.stdinev &&
!orte_job_term_ordered &&
!mca_iof_hnp_component.stdinev->active) {
mca_iof_hnp_component.stdinev->active = true;
ORTE_POST_OBJECT(mca_iof_hnp_component.stdinev);
opal_event_add(mca_iof_hnp_component.stdinev->ev, 0);
ORTE_IOF_READ_ACTIVATE(mca_iof_hnp_component.stdinev);
}
goto CLEAN_RETURN;
} else if (ORTE_IOF_XOFF & stream) {

Просмотреть файл

@ -13,6 +13,7 @@
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2017 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -190,15 +191,9 @@ SETUP:
* been defined!
*/
if (NULL != proct->revstdout && NULL != proct->revstderr && NULL != proct->revstddiag) {
proct->revstdout->active = true;
ORTE_POST_OBJECT(proct->revstdout);
opal_event_add(proct->revstdout->ev, 0);
proct->revstderr->active = true;
ORTE_POST_OBJECT(proct->revstderr);
opal_event_add(proct->revstderr->ev, 0);
proct->revstddiag->active = true;
ORTE_POST_OBJECT(proct->revstddiag);
opal_event_add(proct->revstddiag->ev, 0);
ORTE_IOF_READ_ACTIVATE(proct->revstdout);
ORTE_IOF_READ_ACTIVATE(proct->revstderr);
ORTE_IOF_READ_ACTIVATE(proct->revstddiag);
}
return ORTE_SUCCESS;
}
@ -363,7 +358,7 @@ static int orted_ft_event(int state)
return ORTE_ERR_NOT_IMPLEMENTED;
}
static void stdin_write_handler(int fd, short event, void *cbdata)
static void stdin_write_handler(int _fd, short event, void *cbdata)
{
orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
orte_iof_write_event_t *wev = sink->wev;
@ -405,9 +400,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* leave the write event running so it will call us again
* when the fd is ready.
*/
wev->pending = true;
ORTE_POST_OBJECT(wev);
opal_event_add(wev->ev, 0);
ORTE_IOF_SINK_ACTIVATE(wev);
goto CHECK;
}
/* otherwise, something bad happened so all we can do is declare an
@ -436,9 +429,7 @@ static void stdin_write_handler(int fd, short event, void *cbdata)
/* leave the write event running so it will call us again
* when the fd is ready.
*/
wev->pending = true;
ORTE_POST_OBJECT(wev);
opal_event_add(wev->ev, 0);
ORTE_IOF_SINK_ACTIVATE(wev);
goto CHECK;
}
OBJ_RELEASE(output);

Просмотреть файл

@ -55,6 +55,11 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
ORTE_ACQUIRE_OBJECT(rev);
/* As we may use timer events, fd can be bogus (-1)
* use the right one here
*/
fd = rev->fd;
/* read up to the fragment size */
#if !defined(__WINDOWS__)
numbytes = read(fd, data, sizeof(data));
@ -83,7 +88,7 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
/* either we have a connection error or it was a non-blocking read */
if (EAGAIN == errno || EINTR == errno) {
/* non-blocking, retry */
opal_event_add(rev->ev, 0);
ORTE_IOF_READ_ACTIVATE(rev);
return;
}
@ -103,8 +108,7 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
}
if (!proct->copy) {
/* re-add the event */
ORTE_POST_OBJECT(rev);
opal_event_add(rev->ev, 0);
ORTE_IOF_READ_ACTIVATE(rev);
return;
}
@ -141,8 +145,7 @@ void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
orte_rml_send_callback, NULL);
/* re-add the event */
ORTE_POST_OBJECT(rev);
opal_event_add(rev->ev, 0);
ORTE_IOF_READ_ACTIVATE(rev);
return;