1
1

Split Server function in TCP/UDP related functions

Этот коммит содержится в:
kaustubhprabhu 2009-06-05 22:00:10 +00:00
родитель 5633640e37
Коммит 9f62884d32

Просмотреть файл

@ -4,7 +4,7 @@
* kprabhu - 2nd june 2009 - server side code with select * kprabhu - 2nd june 2009 - server side code with select
* with updated linked list functions. * with updated linked list functions.
*kprabhu - 3rd June 2009 - client side code with select *kprabhu - 3rd June 2009 - client side code with select
* *kprabhu - 5th June 2009 - created functions for Server TCP/UDP connections
*/ */
#include <stdio.h> #include <stdio.h>
@ -45,7 +45,7 @@ enum {
uS_TO_NS = 1000, uS_TO_NS = 1000,
MAX_BUFFER_SIZE =10, MAX_BUFFER_SIZE =10,
DEFAULT_UDP_BUFSIZE = 1470, DEFAULT_UDP_BUFSIZE = 1,
DEFAULT_TCP_BUFSIZE = 8192 DEFAULT_TCP_BUFSIZE = 8192
}; };
#define SEC_TO_NS 1000000000 /* too big for enum on some platforms */ #define SEC_TO_NS 1000000000 /* too big for enum on some platforms */
@ -75,9 +75,11 @@ struct iperf_stream
// Run routines for TCP and UDP- will be called by pthread_create() indirectly // Run routines for TCP and UDP- will be called by pthread_create() indirectly
void *udp_client_thread(struct iperf_stream *sp); void *udp_client_thread(struct iperf_stream *sp);
void *udp_server_thread(struct iperf_stream *sp); void *udp_server_thread(int maxfd, fd_set *temp_set, fd_set *read_set);
int udp_server_accept(int *s, int maxfd, fd_set *read_set, struct iperf_settings *settings);
void *tcp_client_thread(struct iperf_stream *sp); void *tcp_client_thread(struct iperf_stream *sp);
void *tcp_server_thread(struct iperf_stream *sp); void *tcp_server_thread(int maxfd, fd_set *temp_set, fd_set *read_set);
int tcp_server_accept(int s, int maxfd, fd_set *read_set, struct iperf_settings *settings);
static struct option longopts[] = static struct option longopts[] =
{ {
@ -140,18 +142,18 @@ void Display()
while(1) while(1)
{ {
if(n->next==NULL)
{
printf("position-%d\tsp=%d\tsocket=%d\n",count++,(int)n,n->sock);
break;
}
else if(n)
{ {
printf("position-%d\tsp=%d\tsocket=%d\n",count++,(int)n,n->sock); if(n->settings->mode ==Mclient)
printf("position-%d\tsp=%d\tsocket=%d\tbytes sent=%llu\n",count++,(int)n,n->sock,n->bytes_out);
else
printf("position-%d\tsp=%d\tsocket=%d\tbytes received=%llu\n",count++,(int)n,n->sock,n->bytes_in);
if(n->next==NULL) if(n->next==NULL)
{ {
printf("=================END====================\n"); printf("=================END====================\n");
fflush(stdout);
break; break;
} }
n=n->next; n=n->next;
@ -159,8 +161,6 @@ void Display()
} }
} }
/*-------------------------------------------------------- /*--------------------------------------------------------
* sets the parameters for the new stream created * sets the parameters for the new stream created
-------------------------------------------------------*/ -------------------------------------------------------*/
@ -245,8 +245,6 @@ new_stream(int s, struct iperf_settings *settings)
return(sp); return(sp);
} }
/*-------------------------------------------------------- /*--------------------------------------------------------
* add a stream into stream_list linked list * add a stream into stream_list linked list
-------------------------------------------------------*/ -------------------------------------------------------*/
@ -265,7 +263,6 @@ add_stream(struct iperf_stream *sp)
} }
} }
/*-------------------------------------------------------- /*--------------------------------------------------------
* delete the stream * delete the stream
-------------------------------------------------------*/ -------------------------------------------------------*/
@ -307,7 +304,6 @@ free_stream(struct iperf_stream *sp)
} }
/*-------------------------------------------------------- /*--------------------------------------------------------
* update the stream * update the stream
-------------------------------------------------------*/ -------------------------------------------------------*/
@ -321,6 +317,7 @@ update_stream(int j, int result)
{ {
if(n->sock == j) if(n->sock == j)
{ {
n->bytes_in+= result; //update the byte count n->bytes_in+= result; //update the byte count
break; break;
} }
@ -417,30 +414,91 @@ udp_client_thread(struct iperf_stream *sp)
pthread_exit(NULL); pthread_exit(NULL);
} }
/*-------------------------------------------------------- /*--------------------------------------------------------
* UDP Server functionality. - NOT USED * UDP Server functionality
-------------------------------------------------------*/ -------------------------------------------------------*/
void * void *
udp_server_thread(struct iperf_stream *sp) udp_server_thread(int maxfd, fd_set *temp_set, fd_set *read_set)
{ {
char *buf, ubuf[UNIT_LEN]; char buffer[DEFAULT_UDP_BUFSIZE], ubuf[UNIT_LEN];
ssize_t sz; int j,result;
struct iperf_stream *n;
buf = (char *) malloc(sp->settings->bufsize); for (j=0; j<maxfd+1; j++){
if(!buf) {
perror("malloc: unable to allocate receive buffer"); if (FD_ISSET(j, temp_set)){
pthread_exit(NULL);
do{
result = recv(j, buffer,DEFAULT_UDP_BUFSIZE, 0);
} while (result == -1 && errno == EINTR);
if (result > 0){
update_stream(j,result);
} }
while((sz = recv(sp->sock, buf, sp->settings->bufsize, 0)) > 0) { else if (result == 0){
//just find the stream with zero update
n = update_stream(j,0);
unit_snprintf(ubuf, UNIT_LEN, (double) n->bytes_in / n->settings->duration, 'a');
printf("%llu bytes received %s/sec for stream %d\n\n", n->bytes_in, ubuf,(int)n);
close(j);
free_stream(n);
FD_CLR(j, read_set);
}
else
{
printf("Error in recv(): %s\n", strerror(errno));
}
} // end if (FD_ISSET(j, &temp_set))
}// end for (j=0;...)
return 0;
}
/*--------------------------------------------------------
* UDP Server new connection
-------------------------------------------------------*/
int udp_server_accept(int *s, int maxfd, fd_set *read_set, struct iperf_settings *settings)
{
struct iperf_stream *sp;
struct sockaddr_in sa_peer;
char buf[settings->bufsize];
socklen_t len;
int sz;
len = sizeof sa_peer;
// getting a new UDP packet
sz = recvfrom(*s, buf, settings->bufsize, 0, (struct sockaddr *) &sa_peer, &len);
if(!sz)
return -1;
if(connect(*s, (struct sockaddr *) &sa_peer, len) < 0)
{
perror("connect");
return -1;
}
// get a new socket to connect to client
sp = new_stream(*s, settings);
sp->bytes_in += sz; sp->bytes_in += sz;
} add_stream(sp);
close(sp->sock); printf("calling netannounce within function \n");
unit_snprintf(ubuf, UNIT_LEN, (double) sp->bytes_in / sp->settings->duration, 'a'); *s = netannounce(settings->proto, NULL, settings->port);
printf("%llu bytes received %s/sec\n", sp->bytes_in, ubuf); if(*s < 0)
pthread_exit(NULL); return -1;
FD_SET(*s, read_set);
maxfd = (maxfd < *s)?*s:maxfd;
return maxfd;
} }
@ -452,8 +510,6 @@ udp_report(int final)
{ {
} }
/*-------------------------------------------------------- /*--------------------------------------------------------
* TCP Reporting routine - NOT USED * TCP Reporting routine - NOT USED
-------------------------------------------------------*/ -------------------------------------------------------*/
@ -468,14 +524,17 @@ tcp_report(int final)
void setnonblocking(int sock) void setnonblocking(int sock)
{ {
int opts; int opts;
/*
opts = fcntl(sock,F_GETFL); opts = fcntl(sock,F_GETFL);
if (opts < 0) { if (opts < 0) {
perror("fcntl(F_GETFL)"); perror("fcntl(F_GETFL)");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
*/
opts = (opts | O_NONBLOCK); opts = (opts | O_NONBLOCK);
if (fcntl(sock,F_SETFL,opts) < 0) { if (fcntl(sock,F_SETFL,opts) < 0)
{
perror("fcntl(F_SETFL)"); perror("fcntl(F_SETFL)");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
@ -520,32 +579,87 @@ tcp_client_thread(struct iperf_stream *sp)
} }
/*-------------------------------------------------------- /*--------------------------------------------------------
* TCP Server functionality - NOT USED * TCP Server functionality
* -------------------------------------------------------*/ * -------------------------------------------------------*/
void * void *
tcp_server_thread(struct iperf_stream *sp) tcp_server_thread(int maxfd, fd_set *temp_set, fd_set *read_set)
{ {
char *buf, ubuf[UNIT_LEN];
ssize_t sz;
buf = (char *) malloc(sp->settings->bufsize); int j,result;
if(!buf) { char buffer[DEFAULT_TCP_BUFSIZE], ubuf[UNIT_LEN];
perror("malloc: unable to allocate receive buffer"); struct iperf_stream *n;
pthread_exit(NULL);
// scanning all socket descriptors for read
for (j=0; j<maxfd+1; j++)
{
if (FD_ISSET(j, temp_set)){
do{
result = recv(j, buffer,DEFAULT_TCP_BUFSIZE, 0);
} while (result == -1 && errno == EINTR);
if (result > 0){
update_stream(j,result);
} }
printf("window: %d\n", getsock_tcp_windowsize(sp->sock, SO_RCVBUF)); else if (result == 0){
while( (sz = recv(sp->sock, buf, sp->settings->bufsize, 0)) > 0) { n = update_stream(j, 0);
sp->bytes_in += sz; printf("window: %d\n", getsock_tcp_windowsize(n->sock, SO_RCVBUF));
unit_snprintf(ubuf, UNIT_LEN, (double) n->bytes_in / n->settings->duration, 'a');
printf("%llu bytes received %s/sec for stream %d\n\n", n->bytes_in, ubuf,(int)n);
close(j);
free_stream(n);
FD_CLR(j, read_set);
}
else
{
printf("Error in recv(): %s\n", strerror(errno));
}
} // end if (FD_ISSET(j, &temp_set))
} // end for (j=0;...)
return 0;
} }
close(sp->sock); /*--------------------------------------------------------
unit_snprintf(ubuf, UNIT_LEN, (double) sp->bytes_in / sp->settings->duration, 'a'); * TCP new connection
printf("%llu bytes received %s/sec\n", sp->bytes_in, ubuf); * -------------------------------------------------------*/
pthread_exit(NULL); int
tcp_server_accept(int s, int maxfd, fd_set *read_set, struct iperf_settings *settings)
{
socklen_t len;
struct sockaddr_in addr;
int peersock;
struct iperf_stream *sp;
len = sizeof(addr);
peersock = accept(s,(struct sockaddr *) &addr, &len);
if (peersock < 0)
{
printf("Error in accept(): %s\n", strerror(errno));
return 0;
}
else
{
//make socket non blocking
setnonblocking(peersock);
FD_SET(peersock, read_set);
maxfd = (maxfd < peersock)?peersock:maxfd;
// creating a new stream
sp = new_stream(peersock, settings);
add_stream(sp);
connect_msg(sp);
return maxfd;
} }
return -1;
}
/*-------------------------------------------------------- /*--------------------------------------------------------
* This is code for Client * This is code for Client
@ -561,7 +675,7 @@ client(struct iperf_settings *settings)
struct timeval before, after; struct timeval before, after;
fd_set write_set; fd_set write_set;
struct timeval tv; struct timeval tv;
int maxfd; int maxfd,ret=0;
FD_ZERO(&write_set); FD_ZERO(&write_set);
FD_SET(s, &write_set); FD_SET(s, &write_set);
@ -578,21 +692,19 @@ client(struct iperf_settings *settings)
return -1; return -1;
} }
//setnonblocking(s);
FD_SET(s, &write_set); FD_SET(s, &write_set);
maxfd = (maxfd < s)?s:maxfd; maxfd = (maxfd < s)?s:maxfd;
set_tcp_windowsize(s, settings->window, SO_SNDBUF); set_tcp_windowsize(s, settings->window, SO_SNDBUF);
if(s < 0) if(s < 0)
return -1; return -1;
//setting noblock causes error in byte count -kprabhu
//setnonblocking(s);
sp = new_stream(s, settings); sp = new_stream(s, settings);
add_stream(sp); add_stream(sp);
connect_msg(sp); connect_msg(sp);
} }
// sety necessary parameters for TCP/UDP // sety necessary parameters for TCP/UDP
@ -626,9 +738,20 @@ client(struct iperf_settings *settings)
timer = new_timer(settings->duration, 0); timer = new_timer(settings->duration, 0);
printf("calling select\n");
Display();
// send data till the timer expires // send data till the timer expires
while(!timer->expired(timer)) while(!timer->expired(timer))
{ {
ret = select(maxfd+1, NULL, &write_set, NULL, &tv);
if(ret<0)
continue;
sp=streams; sp=streams;
for(i=0;i<settings->threads;i++) for(i=0;i<settings->threads;i++)
{ {
@ -637,7 +760,6 @@ client(struct iperf_settings *settings)
send(sp->sock, buf, sp->settings->bufsize, 0); send(sp->sock, buf, sp->settings->bufsize, 0);
sp->bytes_out += sp->settings->bufsize; sp->bytes_out += sp->settings->bufsize;
if (settings->proto==Pudp) if (settings->proto==Pudp)
{ {
if(delayns > 0) if(delayns > 0)
@ -666,13 +788,16 @@ client(struct iperf_settings *settings)
} }
} }
Display();
/* XXX: report */ /* XXX: report */
sp = streams; sp = streams;
do { do {
send(sp->sock, buf, 0, 0); send(sp->sock, buf, 0, 0);
printf("%llu bytes sent\n", sp->bytes_out); printf("%llu bytes sent\n", sp->bytes_out);
//close(sp->sock);
//free(sp);
sp = sp->next; sp = sp->next;
} while (sp); } while (sp);
return 0; return 0;
@ -683,19 +808,10 @@ client(struct iperf_settings *settings)
int int
server(struct iperf_settings *settings) server(struct iperf_settings *settings)
{ {
int s,sz;
struct iperf_stream *sp;
struct sockaddr_in sa_peer;
socklen_t len;
char buf[settings->bufsize], ubuf[UNIT_LEN];
fd_set read_set, temp_set;
int maxfd;
int peersock, j, result;
struct timeval tv; struct timeval tv;
char ubuf[UNIT_LEN];
char buffer[DEFAULT_TCP_BUFSIZE]; fd_set read_set, temp_set;
int maxfd,result,s;
struct sockaddr_in addr;
s = netannounce(settings->proto, NULL, settings->port); s = netannounce(settings->proto, NULL, settings->port);
if(s < 0) if(s < 0)
@ -712,15 +828,12 @@ server(struct iperf_settings *settings)
if((x = getsock_tcp_windowsize(s, SO_RCVBUF)) < 0) if((x = getsock_tcp_windowsize(s, SO_RCVBUF)) < 0)
perror("SO_RCVBUF"); perror("SO_RCVBUF");
unit_snprintf(ubuf, UNIT_LEN, (double) x, 'A'); unit_snprintf(ubuf, UNIT_LEN, (double) x, 'A');
printf("%s: %s\n", printf("%s: %s\n",
settings->proto == Ptcp ? "TCP window size" : "UDP buffer size", ubuf); settings->proto == Ptcp ? "TCP window size" : "UDP buffer size", ubuf);
printf("-----------------------------------------------------------\n"); printf("-----------------------------------------------------------\n");
len = sizeof sa_peer;
FD_ZERO(&read_set); FD_ZERO(&read_set);
FD_SET(s, &read_set); FD_SET(s, &read_set);
maxfd = s; maxfd = s;
@ -735,128 +848,37 @@ server(struct iperf_settings *settings)
result = select(maxfd + 1, &temp_set, NULL, NULL, &tv); result = select(maxfd + 1, &temp_set, NULL, NULL, &tv);
if (result == 0) if (result == 0)
{
printf("select() timed out!\n"); printf("select() timed out!\n");
}
else if (result < 0 && errno != EINTR) else if (result < 0 && errno != EINTR)
{
printf("Error in select(): %s\n", strerror(errno)); printf("Error in select(): %s\n", strerror(errno));
}
else if (result > 0) else if (result > 0)
{ {
if (FD_ISSET(s, &temp_set)) if (FD_ISSET(s, &temp_set))
{ {
if(settings->proto== Ptcp) // New TCP Connection if(settings->proto== Ptcp) // New TCP Connection
{ maxfd = tcp_server_accept(s, maxfd, &read_set, settings);
len = sizeof(addr);
peersock = accept(s,(struct sockaddr *) &addr, &len);
if (peersock < 0)
{
printf("Error in accept(): %s\n", strerror(errno));
}
else
{
//make socket non blocking
setnonblocking(peersock);
FD_SET(peersock, &read_set);
maxfd = (maxfd < peersock)?peersock:maxfd;
// creating a new stream
sp = new_stream(peersock, settings);
add_stream(sp);
connect_msg(sp);
}
}
else if( settings->proto == Pudp) //New UDP Connection else if( settings->proto == Pudp) //New UDP Connection
{ maxfd = udp_server_accept(&s, maxfd, &read_set, settings);
// getting a new UDP packet
sz = recvfrom(s, buf, settings->bufsize, 0, (struct sockaddr *) &sa_peer, &len);
if(!sz)
break;
if(connect(s, (struct sockaddr *) &sa_peer, len) < 0)
{
perror("connect");
return -1;
}
// get a new socket to connect to client
sp = new_stream(s, settings);
sp->bytes_in += sz;
add_stream(sp);
s = netannounce(settings->proto, NULL, settings->port);
if(s < 0)
return -1;
// same as TCP -repetation
FD_SET(s, &read_set);
maxfd = (maxfd < s)?s:maxfd;
}
FD_CLR(s, &temp_set); FD_CLR(s, &temp_set);
Display(); Display();
} }
// Monitor the sockets for TCP
// scanning all socket descriptors for read
for (j=0; j<maxfd+1; j++)
{
if (FD_ISSET(j, &temp_set))
{
do
{
if(settings->proto== Ptcp) if(settings->proto== Ptcp)
result = recv(j, buffer,DEFAULT_TCP_BUFSIZE, 0); tcp_server_thread(maxfd, &temp_set, &read_set);
else
result = recv(j, buffer,DEFAULT_UDP_BUFSIZE, 0);
} while (result == -1 && errno == EINTR); // Monitor the sockets for TCP
else if(settings->proto== Pudp)
udp_server_thread(maxfd, &temp_set, &read_set);
if (result > 0)
{
sp= update_stream(j,result);
}
else if (result == 0)
{
//just find the stream with zero update
sp = update_stream(j,0);
if(settings->proto == Ptcp)
{
printf("window: %d\n", getsock_tcp_windowsize(sp->sock, SO_RCVBUF));
}
unit_snprintf(ubuf, UNIT_LEN, (double) sp->bytes_in / sp->settings->duration, 'a');
printf("%llu bytes received %s/sec for stream %d\n\n", sp->bytes_in, ubuf,(int)sp);
close(j);
FD_CLR(j, &read_set);
free_stream(sp); // this needs to be a linked list delete
}
else
{
printf("Error in recv(): %s\n", strerror(errno));
}
} // end if (FD_ISSET(j, &temp_set))
} // end for (j=0;...)
} // end else if (result > 0) } // end else if (result > 0)
} while (1); } while (1);
return 0; return 0;
} }