1
1

added api to flush pending output

This commit was SVN r3993.
Этот коммит содержится в:
Tim Woodall 2005-01-14 00:11:24 +00:00
родитель ed13778705
Коммит 4ec9581542
7 изменённых файлов: 107 добавлений и 60 удалений

Просмотреть файл

@ -32,6 +32,7 @@ libmca_iof_base_la_SOURCES = \
$(headers) \
iof_base_close.c \
iof_base_open.c \
iof_base_flush.c \
iof_base_endpoint.c \
iof_base_fragment.c \
iof_base_select.c

Просмотреть файл

@ -38,6 +38,7 @@ struct mca_iof_base_t {
ompi_list_t iof_endpoints;
ompi_mutex_t iof_lock;
ompi_condition_t iof_condition;
size_t iof_waiting;
ompi_free_list_t iof_fragments;
size_t iof_window_size;
ompi_process_name_t* iof_service;
@ -49,6 +50,7 @@ typedef struct mca_iof_base_t mca_iof_base_t;
int mca_iof_base_open(void);
int mca_iof_base_close(void);
int mca_iof_base_select(bool* allow_multi_user_threads, bool* have_hidden_threads);
int mca_iof_base_flush(void);
extern mca_iof_base_t mca_iof_base;

Просмотреть файл

@ -25,59 +25,12 @@
#include "mca/iof/base/iof_base_endpoint.h"
static void mca_iof_base_timer_cb(int fd, short flags, void *cbdata)
{
int *flushed = (int*)cbdata;
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
*flushed = 1;
ompi_condition_signal(&mca_iof_base.iof_condition);
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
}
int mca_iof_base_close(void)
{
ompi_list_item_t* item;
ompi_event_t ev;
struct timeval tv = { 0, 0 };
int flushed = 0;
size_t closed = 0;
/* flush any pending output */
fflush(NULL);
/* wait until event loop has been progressed at least once */
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
ompi_evtimer_set(&ev, mca_iof_base_timer_cb, &flushed);
ompi_event_add(&ev, &tv);
while(flushed == 0)
ompi_condition_wait(&mca_iof_base.iof_condition, &mca_iof_base.iof_lock);
/* attempt to close all of the endpoints */
item = ompi_list_get_first(&mca_iof_base.iof_endpoints);
while(item != ompi_list_get_end(&mca_iof_base.iof_endpoints)) {
ompi_list_item_t* next = ompi_list_get_next(item);
mca_iof_base_endpoint_t* endpoint = (mca_iof_base_endpoint_t*)item;
mca_iof_base_endpoint_close(endpoint);
item = next;
}
/* wait for all to flush output and change to closed state */
while(closed != ompi_list_get_size(&mca_iof_base.iof_endpoints)) {
closed = 0;
for(item = ompi_list_get_first(&mca_iof_base.iof_endpoints);
item != ompi_list_get_end(&mca_iof_base.iof_endpoints);
item = ompi_list_get_next(item)) {
mca_iof_base_endpoint_t* endpoint = (mca_iof_base_endpoint_t*)item;
if(endpoint->ep_state == MCA_IOF_EP_CLOSED) {
closed++;
}
}
if(closed != ompi_list_get_size(&mca_iof_base.iof_endpoints)) {
ompi_condition_wait(&mca_iof_base.iof_condition, &mca_iof_base.iof_lock);
}
}
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
/* flush all pending output */
mca_iof_base_flush();
/* shutdown any remaining opened components */
if (0 != ompi_list_get_size(&mca_iof_base.iof_components_opened)) {

Просмотреть файл

@ -135,9 +135,6 @@ static void mca_iof_base_endpoint_read_handler(int fd, short flags, void *cbdata
static void mca_iof_base_endpoint_write_handler(int sd, short flags, void *user)
{
mca_iof_base_endpoint_t* endpoint = (mca_iof_base_endpoint_t*)user;
ompi_list_t completed;
ompi_process_name_t last = mca_oob_name_any;
OBJ_CONSTRUCT(&completed, ompi_list_t);
/*
* step through the list of queued fragments and attempt to write
@ -167,6 +164,9 @@ static void mca_iof_base_endpoint_write_handler(int sd, short flags, void *user)
/* is there anything left to write? */
if(ompi_list_get_size(&endpoint->ep_frags) == 0) {
ompi_event_del(&endpoint->ep_event);
if(mca_iof_base.iof_waiting) {
ompi_condition_signal(&mca_iof_base.iof_condition);
}
}
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
}
@ -401,15 +401,13 @@ int mca_iof_base_endpoint_ack(
window_open =
MCA_IOF_BASE_SEQDIFF(endpoint->ep_seq,endpoint->ep_ack) < mca_iof_base.iof_window_size;
/* if we are shutting down - cleanup endpoint */
if(endpoint->ep_state == MCA_IOF_EP_CLOSING) {
if(endpoint->ep_seq == endpoint->ep_ack) {
endpoint->ep_state = MCA_IOF_EP_CLOSED;
ompi_condition_signal(&mca_iof_base.iof_condition);
}
/* someone is waiting on all output to be flushed */
if(mca_iof_base.iof_waiting && endpoint->ep_seq == endpoint->ep_ack) {
ompi_condition_signal(&mca_iof_base.iof_condition);
}
/* otherwise check to see if we need to reenable forwarding */
} else if(window_closed && window_open) {
/* check to see if we need to reenable forwarding */
if(window_closed && window_open) {
ompi_event_add(&endpoint->ep_event, 0);
}
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);

Просмотреть файл

@ -113,5 +113,15 @@ int mca_iof_base_endpoint_ack(
mca_iof_base_endpoint_t* endpoint,
uint32_t seq);
/**
* Check for pending I/O
*/
static inline bool mca_iof_base_endpoint_pending(
mca_iof_base_endpoint_t* endpoint)
{
return ompi_list_get_size(&endpoint->ep_frags) || (endpoint->ep_seq != endpoint->ep_ack);
}
#endif

82
src/mca/iof/base/iof_base_flush.c Обычный файл
Просмотреть файл

@ -0,0 +1,82 @@
#include "ompi_config.h"
#include <stdlib.h>
#include <string.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <errno.h>
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_FCNTL_H
#include <sys/fcntl.h>
#endif
#include <netinet/in.h>
#include "util/output.h"
#include "mca/oob/base/base.h"
#include "mca/iof/base/base.h"
#include "iof_base_endpoint.h"
#include "iof_base_fragment.h"
/**
* timer callback out of the event loop
*/
static void mca_iof_base_timer_cb(int fd, short flags, void *cbdata)
{
int *flushed = (int*)cbdata;
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
*flushed = 1;
ompi_condition_signal(&mca_iof_base.iof_condition);
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
}
/*
* flush output streams and block until there is no pending I/O
* on any of the current endpoints.
*/
int mca_iof_base_flush(void)
{
ompi_list_item_t* item;
ompi_event_t ev;
struct timeval tv = { 0, 0 };
int flushed = 0;
size_t pending;
/* flush any pending output */
fflush(NULL);
/* force all file descriptors to be progressed at least once,
* wait on a timer callback to be called out of the event loop
*/
OMPI_THREAD_LOCK(&mca_iof_base.iof_lock);
mca_iof_base.iof_waiting++;
ompi_evtimer_set(&ev, mca_iof_base_timer_cb, &flushed);
ompi_event_add(&ev, &tv);
while(flushed == 0)
ompi_condition_wait(&mca_iof_base.iof_condition, &mca_iof_base.iof_lock);
/* wait for all of the endpoints to reach an idle state */
pending = ompi_list_get_size(&mca_iof_base.iof_endpoints);
while(pending == ompi_list_get_size(&mca_iof_base.iof_endpoints)) {
pending = 0;
for(item = ompi_list_get_first(&mca_iof_base.iof_endpoints);
item != ompi_list_get_end(&mca_iof_base.iof_endpoints);
item = ompi_list_get_next(item)) {
mca_iof_base_endpoint_t* endpoint = (mca_iof_base_endpoint_t*)item;
if(mca_iof_base_endpoint_pending(endpoint)) {
pending++;
}
}
if(pending != 0) {
ompi_condition_wait(&mca_iof_base.iof_condition, &mca_iof_base.iof_lock);
}
}
mca_iof_base.iof_waiting--;
OMPI_THREAD_UNLOCK(&mca_iof_base.iof_lock);
return OMPI_SUCCESS;
}

Просмотреть файл

@ -56,6 +56,7 @@ int mca_iof_base_open(void)
OBJ_CONSTRUCT(&mca_iof_base.iof_lock, ompi_mutex_t);
OBJ_CONSTRUCT(&mca_iof_base.iof_condition, ompi_condition_t);
OBJ_CONSTRUCT(&mca_iof_base.iof_fragments, ompi_free_list_t);
mca_iof_base.iof_waiting = 0;
/* lookup common parameters */
id = mca_base_param_register_int("iof","base","window_size",NULL,MCA_IOF_BASE_MSG_MAX << 1);