1
1
- when eof is reached at orterun, send a 0 byte message to peer indicating eof
- on receipt of zero byte message - close corresponding file descriptor associated with the endpoint
- require setup ptys for stdin and stdout so that stdin can be closed independently of stdout

This commit was SVN r8264.
Этот коммит содержится в:
Tim Woodall 2005-11-28 14:58:53 +00:00
родитель 55a9fbefd8
Коммит 943e6f0cd5
3 изменённых файлов: 94 добавлений и 74 удалений

Просмотреть файл

@ -33,7 +33,6 @@
static void orte_iof_base_endpoint_construct(orte_iof_base_endpoint_t* endpoint)
{
endpoint->ep_mode = 0;
endpoint->ep_state = ORTE_IOF_EP_CLOSED;
endpoint->ep_seq = 0;
endpoint->ep_ack = 0;
endpoint->ep_fd = -1;
@ -119,24 +118,24 @@ static void orte_iof_base_endpoint_read_handler(int fd, short flags, void *cbdat
/* read up to the fragment size */
rc = read(fd, frag->frag_data, sizeof(frag->frag_data));
if(rc <= 0) {
/* non-blocking */
if(rc < 0 && errno == EAGAIN) {
if(rc < 0) {
/* non-blocking, retry */
if(errno == EAGAIN || errno == EINTR) {
ORTE_IOF_BASE_FRAG_RETURN(frag);
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return;
/* error on the connection */
} else {
orte_iof_base_endpoint_closed(endpoint);
}
/* peer has closed the connection */
orte_iof_base_endpoint_closed(endpoint);
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
rc = 0;
return;
} else if (rc == 0) {
/* peer has closed connection - send 0 byte message to subscribers */
orte_iof_base_endpoint_closed(endpoint);
}
/* Do not append the fragment before we know that we have some data (even 0 bytes it's OK) */
frag->frag_owner = endpoint;
opal_list_append(&endpoint->ep_frags, &frag->super);
frag->frag_iov[1].iov_len = frag->frag_len = rc;
/* fill in the header */
@ -183,7 +182,17 @@ static void orte_iof_base_endpoint_write_handler(int sd, short flags, void *user
OPAL_THREAD_LOCK(&orte_iof_base.iof_lock);
while(opal_list_get_size(&endpoint->ep_frags)) {
orte_iof_base_frag_t* frag = (orte_iof_base_frag_t*)opal_list_get_first(&endpoint->ep_frags);
int rc = write(endpoint->ep_fd, frag->frag_ptr, frag->frag_len);
int rc;
/* close connection on zero byte message */
if(frag->frag_len == 0) {
orte_iof_base_endpoint_closed(endpoint);
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return;
}
/* progress pending messages */
rc = write(endpoint->ep_fd, frag->frag_ptr, frag->frag_len);
if(rc < 0) {
if(errno == EAGAIN)
break;
@ -256,12 +265,12 @@ int orte_iof_base_endpoint_create(
if((endpoint = orte_iof_base_endpoint_lookup(proc,mode,tag)) != NULL) {
OBJ_RELEASE(endpoint);
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return OMPI_SUCCESS;
return ORTE_SUCCESS;
}
endpoint = OBJ_NEW(orte_iof_base_endpoint_t);
if(NULL == endpoint) {
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
return ORTE_ERR_OUT_OF_RESOURCE;
}
endpoint->ep_name = *proc;
endpoint->ep_mode = mode;
@ -298,7 +307,7 @@ int orte_iof_base_endpoint_create(
break;
default:
opal_output(0, "orte_iof_base_endpoint_create: invalid mode %d\n", mode);
return OMPI_ERR_BAD_PARAM;
return ORTE_ERR_BAD_PARAM;
}
opal_list_append(&orte_iof_base.iof_endpoints, &endpoint->super);
@ -334,46 +343,34 @@ int orte_iof_base_endpoint_delete(
}
/*
*
*/
int orte_iof_base_endpoint_close(orte_iof_base_endpoint_t* endpoint)
{
opal_list_item_t* item;
endpoint->ep_state = ORTE_IOF_EP_CLOSING;
switch(endpoint->ep_mode) {
case ORTE_IOF_SOURCE:
opal_event_del(&endpoint->ep_event);
if(endpoint->ep_seq == endpoint->ep_ack) {
endpoint->ep_state = ORTE_IOF_EP_CLOSED;
}
break;
case ORTE_IOF_SINK:
if(opal_list_get_size(&endpoint->ep_frags) == 0) {
endpoint->ep_state = ORTE_IOF_EP_CLOSED;
}
break;
}
/* Since we detached from the event handler we need to make sure all the
* datastructres are stable, so we don't hang in orte_iof_base_flush()
*/
if(endpoint->ep_ack != endpoint->ep_seq) {
endpoint->ep_ack = endpoint->ep_seq;
}
while(NULL != (item = opal_list_remove_first(&endpoint->ep_frags))) {
ORTE_IOF_BASE_FRAG_RETURN(item)
}
return ORTE_SUCCESS;
}
/*
* Peer has gone away - cleanup and signal SOH monitor.
* Connection has gone away - cleanup and signal SOH monitor.
*/
void orte_iof_base_endpoint_closed(orte_iof_base_endpoint_t* endpoint)
{
orte_iof_base_endpoint_close(endpoint);
opal_list_item_t* item;
/* remove any event handlers */
switch(endpoint->ep_mode) {
case ORTE_IOF_SOURCE:
opal_event_del(&endpoint->ep_event);
break;
case ORTE_IOF_SINK:
if(opal_list_get_size(&endpoint->ep_frags)) {
opal_event_del(&endpoint->ep_event);
}
break;
}
/* close associated file descriptor */
close(endpoint->ep_fd);
endpoint->ep_fd = -1;
/* Can't complete any pending operations so cleanup all datastructures */
endpoint->ep_ack = endpoint->ep_seq;
while(NULL != (item = opal_list_remove_first(&endpoint->ep_frags))) {
ORTE_IOF_BASE_FRAG_RETURN(item)
}
}
/*
@ -421,13 +418,13 @@ int orte_iof_base_endpoint_forward(
int rc = 0;
if(endpoint->ep_mode != ORTE_IOF_SINK) {
return OMPI_ERR_BAD_PARAM;
return ORTE_ERR_BAD_PARAM;
}
/* allocate and initialize a fragment */
ORTE_IOF_BASE_FRAG_ALLOC(frag, rc);
if(NULL == frag) {
return OMPI_ERR_OUT_OF_RESOURCE;
return ORTE_ERR_OUT_OF_RESOURCE;
}
OPAL_THREAD_LOCK(&orte_iof_base.iof_lock);
@ -449,10 +446,18 @@ int orte_iof_base_endpoint_forward(
}
if(endpoint->ep_fd >= 0) {
/* try to write w/out copying data */
if(opal_list_get_size(&endpoint->ep_frags) == 0) {
if(len == 0) {
ORTE_IOF_BASE_FRAG_RETURN(frag);
orte_iof_base_endpoint_closed(endpoint);
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return ORTE_SUCCESS;
}
rc = write(endpoint->ep_fd,data,len);
if(rc < 0 && (errno != EAGAIN && errno != EINTR)) {
ORTE_IOF_BASE_FRAG_RETURN(frag);
orte_iof_base_endpoint_closed(endpoint);
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return ORTE_SUCCESS;
@ -460,8 +465,11 @@ int orte_iof_base_endpoint_forward(
}
frag->frag_len = len - rc;
if(frag->frag_len > 0) {
/* handle incomplete write */
if(frag->frag_len > 0 || len == 0) {
/* handle incomplete write - also queue up 0 byte message
* and recognize this as a request to close the descriptor
* when all pending operations complete
*/
frag->frag_ptr = frag->frag_data;
memcpy(frag->frag_ptr, data+rc, frag->frag_len);
opal_list_append(&endpoint->ep_frags, &frag->super);
@ -503,7 +511,7 @@ int orte_iof_base_callback_create(
endpoint = OBJ_NEW(orte_iof_base_endpoint_t);
if(NULL == endpoint) {
OPAL_THREAD_UNLOCK(&orte_iof_base.iof_lock);
return OMPI_ERR_OUT_OF_RESOURCE;
return ORTE_ERR_OUT_OF_RESOURCE;
}
endpoint->ep_name = *proc;
endpoint->ep_mode = ORTE_IOF_SINK;

Просмотреть файл

@ -7,11 +7,6 @@
#include "mca/iof/iof.h"
#include "mca/iof/base/iof_base_header.h"
enum {
ORTE_IOF_EP_OPEN,
ORTE_IOF_EP_CLOSING,
ORTE_IOF_EP_CLOSED
};
/**
* Structure store callbacks
@ -36,7 +31,6 @@ struct orte_iof_base_endpoint_t {
orte_process_name_t ep_name;
int ep_tag;
int ep_fd;
int ep_state;
uint32_t ep_seq;
uint32_t ep_ack;
opal_event_t ep_event;

Просмотреть файл

@ -77,6 +77,8 @@ orte_iof_base_setup_prefork(orte_iof_base_io_conf_t *opts)
if (opts->usepty) {
ret = openpty(&(opts->p_stdout[0]), &(opts->p_stdout[1]),
NULL, NULL, NULL);
ret = openpty(&(opts->p_stdin[0]), &(opts->p_stdin[1]),
NULL, NULL, NULL);
} else {
ret = -1;
}
@ -116,30 +118,30 @@ orte_iof_base_setup_child(orte_iof_base_io_conf_t *opts)
{
int ret;
if (! opts->usepty) {
if (!opts->usepty) {
close(opts->p_stdout[0]);
close(opts->p_stdin[1]);
}
close(opts->p_stderr[0]);
if (opts->usepty) {
if (opts->connect_stdin) {
#ifndef WIN32
/* disable echo */
/* disable new-line translation */
struct termios term_attrs;
if (tcgetattr(opts->p_stdout[1], &term_attrs) < 0) {
if (tcgetattr(opts->p_stdin[0], &term_attrs) < 0) {
return ORTE_ERROR;
}
term_attrs.c_lflag &= ~ (ECHO | ECHOE | ECHOK |
ECHOCTL | ECHOKE | ECHONL);
if (tcsetattr(opts->p_stdout[1], TCSANOW, &term_attrs) == -1) {
term_attrs.c_iflag &= ~ (ICRNL | INLCR | ISTRIP | INPCK | IXON);
term_attrs.c_oflag &= ~ (OCRNL | ONLCR);
if (tcsetattr(opts->p_stdin[0], TCSANOW, &term_attrs) == -1) {
return ORTE_ERROR;
}
/* and connect the pty to stdin */
ret = dup2(opts->p_stdout[1], fileno(stdin));
if (ret < 0) return ORTE_ERROR;
#endif
/* and connect the pty to stdin */
ret = dup2(opts->p_stdin[0], fileno(stdin));
if (ret < 0) return ORTE_ERROR;
} else {
int fd;
/* connect input to /dev/null */
@ -149,6 +151,22 @@ orte_iof_base_setup_child(orte_iof_base_io_conf_t *opts)
close(fd);
}
}
#ifndef WIN32
{
/* disable echo */
struct termios term_attrs;
if (tcgetattr(opts->p_stdout[1], &term_attrs) < 0) {
return ORTE_ERROR;
}
term_attrs.c_lflag &= ~ (ECHO | ECHOE | ECHOK |
ECHOCTL | ECHOKE | ECHONL);
term_attrs.c_iflag &= ~ (ICRNL | INLCR | ISTRIP | INPCK | IXON);
term_attrs.c_oflag &= ~ (OCRNL | ONLCR);
if (tcsetattr(opts->p_stdout[1], TCSANOW, &term_attrs) == -1) {
return ORTE_ERROR;
}
}
#endif
ret = dup2(opts->p_stdout[1], fileno(stdout));
if (ret < 0) return ORTE_ERROR;
@ -200,9 +218,9 @@ orte_iof_base_setup_parent(const orte_process_name_t* name,
/* connect stdin endpoint */
if (opts->connect_stdin) {
/* and connect the pty to stdin */
ret = orte_iof.iof_publish(name, ORTE_IOF_SINK,
ORTE_IOF_STDIN, opts->usepty ?
opts->p_stdout[0] : opts->p_stdin[1]);
ORTE_IOF_STDIN, opts->p_stdin[1]);
if(ORTE_SUCCESS != ret) {
ORTE_ERROR_LOG(ret);
return ret;