1
1

ssh_select converted to ssh_event, and bugfix to ssh_channel_select

Этот коммит содержится в:
Aris Adamantiadis 2011-08-31 16:17:15 +03:00
родитель b5351f2809
Коммит 92c8a71abb
2 изменённых файлов: 72 добавлений и 121 удалений

Просмотреть файл

@ -2938,6 +2938,7 @@ int ssh_channel_select(ssh_channel *readchans, ssh_channel *writechans,
int rc; int rc;
int i; int i;
int tm, tm_base; int tm, tm_base;
int firstround=1;
struct ssh_timestamp ts; struct ssh_timestamp ts;
if (timeout != NULL) if (timeout != NULL)
@ -2992,15 +2993,7 @@ int ssh_channel_select(ssh_channel *readchans, ssh_channel *writechans,
rchans, wchans, echans); rchans, wchans, echans);
if (rchans[0] != NULL || wchans[0] != NULL || echans[0] != NULL) { if (rchans[0] != NULL || wchans[0] != NULL || echans[0] != NULL) {
/* At least one channel has an event */ /* At least one channel has an event */
memcpy(readchans, rchans, (count_ptrs(rchans) + 1) * sizeof(ssh_channel )); break;
memcpy(writechans, wchans, (count_ptrs(wchans) + 1) * sizeof(ssh_channel ));
memcpy(exceptchans, echans, (count_ptrs(echans) + 1) * sizeof(ssh_channel ));
SAFE_FREE(rchans);
SAFE_FREE(wchans);
SAFE_FREE(echans);
if(event)
ssh_event_free(event);
return 0;
} }
/* Add all channels' sessions right into an event object */ /* Add all channels' sessions right into an event object */
if (!event){ if (!event){
@ -3021,6 +3014,10 @@ int ssh_channel_select(ssh_channel *readchans, ssh_channel *writechans,
ssh_event_add_session(event, exceptchans[i]->session); ssh_event_add_session(event, exceptchans[i]->session);
} }
} }
/* Get out if the timeout has elapsed */
if (!firstround && ssh_timeout_elapsed(&ts, tm_base)){
break;
}
/* Here we go */ /* Here we go */
rc = ssh_event_dopoll(event,tm); rc = ssh_event_dopoll(event,tm);
if (rc != SSH_OK){ if (rc != SSH_OK){
@ -3031,10 +3028,17 @@ int ssh_channel_select(ssh_channel *readchans, ssh_channel *writechans,
return rc; return rc;
} }
tm = ssh_timeout_update(&ts, tm_base); tm = ssh_timeout_update(&ts, tm_base);
firstround=0;
} while(1);
} while(1); /* Return to do loop */ memcpy(readchans, rchans, (count_ptrs(rchans) + 1) * sizeof(ssh_channel ));
memcpy(writechans, wchans, (count_ptrs(wchans) + 1) * sizeof(ssh_channel ));
/* not reached */ memcpy(exceptchans, echans, (count_ptrs(echans) + 1) * sizeof(ssh_channel ));
SAFE_FREE(rchans);
SAFE_FREE(wchans);
SAFE_FREE(echans);
if(event)
ssh_event_free(event);
return 0; return 0;
} }

Просмотреть файл

@ -403,6 +403,13 @@ socket_t ssh_connect_host_nonblocking(ssh_session session, const char *host,
* @{ * @{
*/ */
static int ssh_select_cb (socket_t fd, int revents, void *userdata){
fd_set *set = (fd_set *)userdata;
if(revents & POLLIN)
FD_SET(fd, set);
return 0;
}
/** /**
* @brief A wrapper for the select syscall * @brief A wrapper for the select syscall
* *
@ -422,132 +429,72 @@ socket_t ssh_connect_host_nonblocking(ssh_session session, const char *host,
* *
* @param[in] timeout A timeout for the select. * @param[in] timeout A timeout for the select.
* *
* @return -1 if an error occured. SSH_EINTR if it was interrupted, in * @return SSH_OK on success,
* that case, just restart it. * SSH_ERROR on error,
* SSH_EINTR if it was interrupted. In that case,
* just restart it.
* *
* @warning libssh is not threadsafe here. That means that if a signal is caught * @warning libssh is not reentrant here. That means that if a signal is caught
* during the processing of this function, you cannot call ssh * during the processing of this function, you cannot call libssh
* functions on sessions that are busy with ssh_select(). * functions on sessions that are busy with ssh_select().
* *
* @see select(2) * @see select(2)
*/ */
int ssh_select(ssh_channel *channels, ssh_channel *outchannels, socket_t maxfd, int ssh_select(ssh_channel *channels, ssh_channel *outchannels, socket_t maxfd,
fd_set *readfds, struct timeval *timeout) { fd_set *readfds, struct timeval *timeout) {
struct timeval zerotime; int i,j;
fd_set localset, localset2; int rc;
socket_t f; int base_tm, tm;
int rep; struct ssh_timestamp ts;
int set; ssh_event event = ssh_event_new();
int i; int firstround=1;
int j;
zerotime.tv_sec = 0; base_tm = tm=timeout->tv_sec * 1000 + timeout->tv_usec/1000;
zerotime.tv_usec = 0; for (i=0 ; channels[i] != NULL; ++i){
ssh_event_add_session(event, channels[i]->session);
/* }
* First, poll the maxfd file descriptors from the user with a zero-second for (i=0; i<maxfd ; ++i){
* timeout. They have the bigger priority. if(FD_ISSET(i, readfds)){
*/ ssh_event_add_fd(event, i, POLLIN, ssh_select_cb, readfds);
if (maxfd > 0) {
memcpy(&localset, readfds, sizeof(fd_set));
rep = select(maxfd, &localset, NULL, NULL, &zerotime);
/* catch the eventual errors */
if (rep==-1) {
return -1;
} }
} }
outchannels[0] = NULL;
/* Poll every channel */ FD_ZERO(readfds);
j = 0; ssh_timestamp_init(&ts);
for (i = 0; channels[i]; i++) { do {
if (channels[i]->session->alive) { /* Poll every channel */
if(ssh_channel_poll(channels[i], 0) > 0) { j = 0;
for (i = 0; channels[i]; i++) {
if(ssh_channel_poll(channels[i], 0) != 0) {
outchannels[j] = channels[i]; outchannels[j] = channels[i];
j++; j++;
} else { } else if(ssh_channel_poll(channels[i], 1) != 0) {
if(ssh_channel_poll(channels[i], 1) > 0) {
outchannels[j] = channels[i];
j++;
}
}
}
}
outchannels[j] = NULL;
/* Look into the localset for active fd */
set = 0;
for (f = 0; (f < maxfd) && !set; f++) {
if (FD_ISSET(f, &localset)) {
set = 1;
}
}
/* j != 0 means a channel has data */
if( (j != 0) || (set != 0)) {
if(maxfd > 0) {
memcpy(readfds, &localset, sizeof(fd_set));
}
return 0;
}
/*
* At this point, not any channel had any data ready for reading, nor any fd
* had data for reading.
*/
memcpy(&localset, readfds, sizeof(fd_set));
for (i = 0; channels[i]; i++) {
if (channels[i]->session->alive) {
ssh_socket_fd_set(channels[i]->session->socket, &localset, &maxfd);
}
}
rep = select(maxfd, &localset, NULL, NULL, timeout);
if (rep == -1 && errno == EINTR) {
/* Interrupted by a signal */
return SSH_EINTR;
}
if (rep == -1) {
/*
* Was the error due to a libssh's channel or from a closed descriptor from
* the user? User closed descriptors have been caught in the first select
* and not closed since that moment. That case shouldn't occur at all
*/
return -1;
}
/* Set the data_to_read flag on each session */
for (i = 0; channels[i]; i++) {
if (channels[i]->session->alive &&
ssh_socket_fd_isset(channels[i]->session->socket,&localset)) {
ssh_socket_set_read_wontblock(channels[i]->session->socket);
}
}
/* Now, test each channel */
j = 0;
for (i = 0; channels[i]; i++) {
if (channels[i]->session->alive &&
ssh_socket_fd_isset(channels[i]->session->socket,&localset)) {
if ((ssh_channel_poll(channels[i],0) > 0) ||
(ssh_channel_poll(channels[i], 1) > 0)) {
outchannels[j] = channels[i]; outchannels[j] = channels[i];
j++; j++;
} }
} }
} outchannels[j] = NULL;
outchannels[j] = NULL; if(j != 0)
break;
FD_ZERO(&localset2); /* watch if a user socket was triggered */
for (f = 0; f < maxfd; f++) { for(i = 0;i<maxfd;++i)
if (FD_ISSET(f, readfds) && FD_ISSET(f, &localset)) { if(FD_ISSET(i, readfds))
FD_SET(f, &localset2); goto out;
/* If the timeout is elapsed, we should go out */
if(!firstround && ssh_timeout_elapsed(&ts, base_tm))
goto out;
/* since there's nothing, let's fire the polling */
rc = ssh_event_dopoll(event,tm);
if (rc == SSH_ERROR){
ssh_event_free(event);
return SSH_ERROR;
} }
} tm = ssh_timeout_update(&ts, base_tm);
firstround=0;
memcpy(readfds, &localset2, sizeof(fd_set)); } while (1);
out:
return 0; ssh_event_free(event);
return SSH_OK;
} }
/** @} */ /** @} */