/* * Copyright (c) 2002-2004 Niels Provos * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "opal_config.h" #include #ifdef HAVE_SYS_TIME_H #include #endif #include #include #include #include #ifdef HAVE_STDARG_H #include #endif #include "event.h" /* prototypes */ void bufferevent_setwatermark(struct bufferevent *, short, size_t, size_t); static void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *); static int bufferevent_add(struct opal_event *ev, int timeout) { struct timeval tv, *ptv = NULL; if (timeout) { timerclear(&tv); tv.tv_sec = timeout; ptv = &tv; } return (opal_event_add(ev, ptv)); } /* * This callback is executed when the size of the input buffer changes. * We use it to apply back pressure on the reading side. */ static void bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, void *arg) { struct bufferevent *bufev = arg; /* * If we are below the watermak then reschedule reading if it's * still enabled. */ if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) { evbuffer_setcb(buf, NULL, NULL); if (bufev->enabled & OPAL_EV_READ) bufferevent_add(&bufev->ev_read, bufev->timeout_read); } } static void bufferevent_readcb(int fd, short event, void *arg) { struct bufferevent *bufev = arg; int res = 0; short what = OPAL_EVBUFFER_READ; size_t len; if (event == OPAL_EV_TIMEOUT) { what |= OPAL_EVBUFFER_TIMEOUT; goto error; } res = evbuffer_read(bufev->input, fd, -1); if (res == -1) { if (errno == EAGAIN || errno == EINTR) goto reschedule; /* error case */ what |= OPAL_EVBUFFER_ERROR; } else if (res == 0) { /* eof case */ what |= OPAL_EVBUFFER_EOF; } if (res <= 0) goto error; bufferevent_add(&bufev->ev_read, bufev->timeout_read); /* See if this callbacks meets the water marks */ len = OPAL_EVBUFFER_LENGTH(bufev->input); if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) return; if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) { struct evbuffer *buf = bufev->input; opal_event_del(&bufev->ev_read); /* Now schedule a callback for us */ evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); return; } /* Invoke the user callback - must always be called last */ (*bufev->readcb)(bufev, bufev->cbarg); return; reschedule: bufferevent_add(&bufev->ev_read, bufev->timeout_read); return; error: (*bufev->errorcb)(bufev, what, bufev->cbarg); } static void bufferevent_writecb(int fd, short event, void *arg) { struct bufferevent *bufev = arg; int res = 0; short what = OPAL_EVBUFFER_WRITE; if (event == OPAL_EV_TIMEOUT) { what |= OPAL_EVBUFFER_TIMEOUT; goto error; } if (OPAL_EVBUFFER_LENGTH(bufev->output)) { res = evbuffer_write(bufev->output, fd); if (res == -1) { if (errno == EAGAIN || errno == EINTR || errno == EINPROGRESS) goto reschedule; /* error case */ what |= OPAL_EVBUFFER_ERROR; } else if (res == 0) { /* eof case */ what |= OPAL_EVBUFFER_EOF; } if (res <= 0) goto error; } if (OPAL_EVBUFFER_LENGTH(bufev->output) != 0) bufferevent_add(&bufev->ev_write, bufev->timeout_write); /* * Invoke the user callback if our buffer is drained or below the * low watermark. */ if (OPAL_EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) (*bufev->writecb)(bufev, bufev->cbarg); return; reschedule: if (OPAL_EVBUFFER_LENGTH(bufev->output) != 0) bufferevent_add(&bufev->ev_write, bufev->timeout_write); return; error: (*bufev->errorcb)(bufev, what, bufev->cbarg); } /* * Create a new buffered event object. * * The read callback is invoked whenever we read new data. * The write callback is invoked whenever the output buffer is drained. * The error callback is invoked on a write/read error or on EOF. */ struct bufferevent * bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg) { struct bufferevent *bufev; if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL) return (NULL); if ((bufev->input = evbuffer_new()) == NULL) { free(bufev); return (NULL); } if ((bufev->output = evbuffer_new()) == NULL) { evbuffer_free(bufev->input); free(bufev); return (NULL); } opal_event_set(&bufev->ev_read, fd, OPAL_EV_READ, bufferevent_readcb, bufev); opal_event_set(&bufev->ev_write, fd, OPAL_EV_WRITE, bufferevent_writecb, bufev); bufev->readcb = readcb; bufev->writecb = writecb; bufev->errorcb = errorcb; bufev->cbarg = cbarg; bufev->enabled = OPAL_EV_READ | OPAL_EV_WRITE; return (bufev); } int bufferevent_priority_set(struct bufferevent *bufev, int priority) { if (opal_event_priority_set(&bufev->ev_read, priority) == -1) return (-1); if (opal_event_priority_set(&bufev->ev_write, priority) == -1) return (-1); return (0); } /* Closing the file descriptor is the responsibility of the caller */ void bufferevent_free(struct bufferevent *bufev) { opal_event_del(&bufev->ev_read); opal_event_del(&bufev->ev_write); evbuffer_free(bufev->input); evbuffer_free(bufev->output); free(bufev); } /* * Returns 0 on success; * -1 on failure. */ int bufferevent_write(struct bufferevent *bufev, void *data, size_t size) { int res; res = evbuffer_add(bufev->output, data, size); if (res == -1) return (res); /* If everything is okay, we need to schedule a write */ if (size > 0 && (bufev->enabled & OPAL_EV_WRITE)) bufferevent_add(&bufev->ev_write, bufev->timeout_write); return (res); } int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) { int res; res = bufferevent_write(bufev, buf->buffer, buf->off); if (res != -1) evbuffer_drain(buf, buf->off); return (res); } size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size) { struct evbuffer *buf = bufev->input; if (buf->off < size) size = buf->off; /* Copy the available data to the user buffer */ memcpy(data, buf->buffer, size); if (size) evbuffer_drain(buf, size); return (size); } int bufferevent_enable(struct bufferevent *bufev, short event) { if (event & OPAL_EV_READ) { if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1) return (-1); } if (event & OPAL_EV_WRITE) { if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1) return (-1); } bufev->enabled |= event; return (0); } int bufferevent_disable(struct bufferevent *bufev, short event) { if (event & OPAL_EV_READ) { if (opal_event_del(&bufev->ev_read) == -1) return (-1); } if (event & OPAL_EV_WRITE) { if (opal_event_del(&bufev->ev_write) == -1) return (-1); } bufev->enabled &= ~event; return (0); } /* * Sets the read and write timeout for a buffered event. */ void bufferevent_settimeout(struct bufferevent *bufev, int timeout_read, int timeout_write) { bufev->timeout_read = timeout_read; bufev->timeout_write = timeout_write; } /* * Sets the water marks */ void bufferevent_setwatermark(struct bufferevent *bufev, short events, size_t lowmark, size_t highmark) { if (events & OPAL_EV_READ) { bufev->wm_read.low = lowmark; bufev->wm_read.high = highmark; } if (events & OPAL_EV_WRITE) { bufev->wm_write.low = lowmark; bufev->wm_write.high = highmark; } /* If the watermarks changed then see if we should call read again */ bufferevent_read_pressure_cb(bufev->input, 0, OPAL_EVBUFFER_LENGTH(bufev->input), bufev); }