1
1

Check the return status when we forward stdin and remove the recipient when they are no longer alive

This commit was SVN r22786.
Этот коммит содержится в:
Ralph Castain 2010-03-05 13:41:28 +00:00
родитель 577eef1491
Коммит b2e24693c4
3 изменённых файлов: 29 добавлений и 16 удалений

Просмотреть файл

@ -79,10 +79,10 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata);
void orte_iof_hnp_stdin_cb(int fd, short event, void *cbdata); void orte_iof_hnp_stdin_cb(int fd, short event, void *cbdata);
bool orte_iof_hnp_stdin_check(int fd); bool orte_iof_hnp_stdin_check(int fd);
void orte_iof_hnp_send_data_to_endpoint(orte_process_name_t *host, int orte_iof_hnp_send_data_to_endpoint(orte_process_name_t *host,
orte_process_name_t *target, orte_process_name_t *target,
orte_iof_tag_t tag, orte_iof_tag_t tag,
unsigned char *data, int numbytes); unsigned char *data, int numbytes);
END_C_DECLS END_C_DECLS

Просмотреть файл

@ -85,7 +85,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
orte_iof_read_event_t *rev = (orte_iof_read_event_t*)cbdata; orte_iof_read_event_t *rev = (orte_iof_read_event_t*)cbdata;
unsigned char data[ORTE_IOF_BASE_MSG_MAX]; unsigned char data[ORTE_IOF_BASE_MSG_MAX];
int32_t numbytes; int32_t numbytes;
opal_list_item_t *item; opal_list_item_t *item, *prev_item;
orte_iof_proc_t *proct; orte_iof_proc_t *proct;
int rc; int rc;
@ -182,7 +182,15 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
* sent - this will tell the daemon to close * sent - this will tell the daemon to close
* the fd for stdin to that proc * the fd for stdin to that proc
*/ */
orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &sink->name, ORTE_IOF_STDIN, data, numbytes); if( ORTE_SUCCESS != (rc = orte_iof_hnp_send_data_to_endpoint(&sink->daemon, &sink->name, ORTE_IOF_STDIN, data, numbytes))) {
/* if the addressee is unknown, remove the sink from the list */
if( ORTE_ERR_ADDRESSEE_UNKNOWN == rc ) {
prev_item = opal_list_get_prev(item);
opal_list_remove_item(&mca_iof_hnp_component.sinks, item);
OBJ_RELEASE(item);
item = prev_item;
}
}
} }
} }
/* if num_bytes was zero, then we need to terminate the event */ /* if num_bytes was zero, then we need to terminate the event */

Просмотреть файл

@ -51,10 +51,10 @@ static void send_cb(int status, orte_process_name_t *peer,
OBJ_RELEASE(buf); OBJ_RELEASE(buf);
} }
void orte_iof_hnp_send_data_to_endpoint(orte_process_name_t *host, int orte_iof_hnp_send_data_to_endpoint(orte_process_name_t *host,
orte_process_name_t *target, orte_process_name_t *target,
orte_iof_tag_t tag, orte_iof_tag_t tag,
unsigned char *data, int numbytes) unsigned char *data, int numbytes)
{ {
opal_buffer_t *buf; opal_buffer_t *buf;
int rc; int rc;
@ -67,7 +67,7 @@ void orte_iof_hnp_send_data_to_endpoint(orte_process_name_t *host,
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &tag, 1, ORTE_IOF_TAG))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf); OBJ_RELEASE(buf);
return; return rc;
} }
/* pack the name of the target - this is either the intended /* pack the name of the target - this is either the intended
* recipient (if the tag is stdin and we are sending to a daemon), * recipient (if the tag is stdin and we are sending to a daemon),
@ -76,7 +76,7 @@ void orte_iof_hnp_send_data_to_endpoint(orte_process_name_t *host,
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, target, 1, ORTE_NAME))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, target, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf); OBJ_RELEASE(buf);
return; return rc;
} }
/* if data is NULL, then we are done */ /* if data is NULL, then we are done */
@ -85,7 +85,7 @@ void orte_iof_hnp_send_data_to_endpoint(orte_process_name_t *host,
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, data, numbytes, OPAL_BYTE))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, data, numbytes, OPAL_BYTE))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf); OBJ_RELEASE(buf);
return; return rc;
} }
} }
@ -95,12 +95,17 @@ void orte_iof_hnp_send_data_to_endpoint(orte_process_name_t *host,
/* xcast this to everyone - the local daemons will know how to handle it */ /* xcast this to everyone - the local daemons will know how to handle it */
orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, buf, ORTE_RML_TAG_IOF_PROXY); orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, buf, ORTE_RML_TAG_IOF_PROXY);
OBJ_RELEASE(buf); OBJ_RELEASE(buf);
return; return ORTE_SUCCESS;
} }
/* send the buffer to the host - this is either a daemon or /* send the buffer to the host - this is either a daemon or
* a tool that requested IOF * a tool that requested IOF
*/ */
orte_rml.send_buffer_nb(host, buf, ORTE_RML_TAG_IOF_PROXY, if( ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(host, buf, ORTE_RML_TAG_IOF_PROXY,
0, send_cb, NULL); 0, send_cb, NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
} }