From c6f24137f1799fdd95dade7c64f1ef1261711d65 Mon Sep 17 00:00:00 2001 From: kaustubhprabhu Date: Thu, 11 Jun 2009 01:18:18 +0000 Subject: [PATCH] changed newlty created APIs for Server side --- src/iperf_api.c | 523 ++++++++++++++++++++++++++++++------------------ src/iperf_api.h | 18 +- 2 files changed, 344 insertions(+), 197 deletions(-) diff --git a/src/iperf_api.c b/src/iperf_api.c index 662fc4a..e74f584 100644 --- a/src/iperf_api.c +++ b/src/iperf_api.c @@ -57,10 +57,106 @@ static struct option longopts[] = {"interval", required_argument, NULL, 'i'} }; +/* TCP/UDP server functions */ + +/*-------------------------------------------------------- + * UDP Server new connection + -------------------------------------------------------*/ + +int udp_server_accept(struct iperf_test *test, int maxfd, fd_set *read_set) +{ + struct iperf_stream *sp; + struct sockaddr_in sa_peer; + struct iperf_sock_opts *sockopt; + char buf[((struct iperf_udp_settings *)(sp->settings))->packet_size]; + socklen_t len; + int sz,size, rate; + + sp = sp = iperf_create_udp_stream(rate, size, sockopt); + + len = sizeof sa_peer; + + // getting a new UDP packet + printf("before rcvfrm \n"); + + sz = recvfrom(test->listener_sock, buf,((struct iperf_udp_settings *)(sp->settings))->packet_size, 0, (struct sockaddr *) &sa_peer, &len); + + printf(" after rcvfrm \n"); + + if(!sz) + return -1; + + if(connect(test->listener_sock, (struct sockaddr *) &sa_peer, len) < 0) + { + perror("connect"); + return -1; + } + + sp->socket = test->listener_sock; + + iperf_init_stream(sp); + + sp->result->bytes_received += sz; + iperf_add_stream(test, sp); + + sp->socket = netannounce(test->proto, NULL,sp->local_port); + if(sp->socket < 0) + return -1; + + FD_SET(sp->socket, read_set); + maxfd = (maxfd < sp->socket)?sp->socket:maxfd; + + printf(" socket created for new UDP client \n"); + fflush(stdout); + return maxfd; +} + +/*-------------------------------------------------------- + * TCP new connection + * -------------------------------------------------------*/ +int +tcp_server_accept(struct iperf_test *test, int maxfd, fd_set *read_set) +{ + socklen_t len; + struct sockaddr_in addr; + struct iperf_sock_opts *sockopt; + int peersock; + int window = 1024*1024; + + struct iperf_stream *sp; + + sp = iperf_create_tcp_stream(window, sockopt); + + len = sizeof(addr); + peersock = accept(test->listener_sock,(struct sockaddr *) &addr, &len); + if (peersock < 0) + { + printf("Error in accept(): %s\n", strerror(errno)); + return 0; + } + else + { + //setnonblocking(peersock); + + FD_SET(peersock, read_set); + maxfd = (maxfd < peersock)?peersock:maxfd; + + sp->socket = peersock; + iperf_init_stream(sp); + iperf_add_stream(test, sp); + //connect_msg(sp); + + printf(" socket created for new TCP client \n"); + + return maxfd; + } + + return -1; +} struct iperf_test *iperf_create_test() { - struct iperf_test * test; + struct iperf_test *test; test = (struct iperf_test *) malloc(sizeof(struct iperf_test)); if(!test) @@ -72,13 +168,15 @@ struct iperf_test *iperf_create_test() // initialise everything to zero memset(test, 0, sizeof(struct iperf_test)); - test->role = 's'; test->proto = Ptcp; memset(&test->remote_ip_addr, 0, sizeof(struct sockaddr_storage)); memset(&test->local_ip_addr, 0, sizeof(struct sockaddr_storage)); + + test->role = 's'; + test->duration = 10; test->stats_interval = 1; @@ -99,13 +197,12 @@ void iperf_init_test(struct iperf_test *test) if(test->role == 's') { - test->streams->socket = netannounce(test->proto, NULL, test->streams->local_port); - if( test->streams->socket < 0) + printf("Into init\n"); + + test->listener_sock = netannounce(test->proto, NULL, ((struct sockaddr_in *)(test->local_ip_addr))->sin_port); + if( test->listener_sock < 0) exit(0); - - //initiate for Server - iperf_init_stream(test->streams); - + /* if(set_tcp_windowsize( test->streams->socket, (test->streams->settings->window_size, SO_RCVBUF) < 0) { @@ -114,9 +211,9 @@ void iperf_init_test(struct iperf_test *test) }*/ printf("-----------------------------------------------------------\n"); - printf("Server listening on %d\n", test->streams->local_port); + printf("Server listening on %d\n",ntohs(((struct sockaddr_in *)(test->local_ip_addr))->sin_port)); // need to change this port assignment int x; - if((x = getsock_tcp_windowsize( test->streams->socket, SO_RCVBUF)) < 0) + if((x = getsock_tcp_windowsize( test->listener_sock, SO_RCVBUF)) < 0) perror("SO_RCVBUF"); @@ -134,23 +231,21 @@ void iperf_init_test(struct iperf_test *test) // initiate for each client stream for(i = 0; i < test->num_streams; i++) { - // need to pass the client ip address currently HARDCODED - kprabhu - sp->socket = netdial(test->proto, "127.0.0.1", sp->local_port); + + sp->socket = netdial(test->proto,"127.0.0.1", ntohs(((struct sockaddr_in *)(test->local_ip_addr))->sin_port)); if(sp->socket < 0) { fprintf(stderr, "netdial failed\n"); exit(0); } printf("The socket created for client at %d\n", sp->socket); - - iperf_init_stream(sp); - + if(sp->next == NULL) break; sp=sp->next; } } - + } void iperf_destroy_test(struct iperf_test *test) @@ -200,6 +295,7 @@ struct iperf_stream * iperf_create_tcp_stream(int window, struct iperf_sock_opts { struct iperf_stream *sp; struct iperf_tcp_settings * tcp_settings; + struct iperf_stream_result *result; sp = (struct iperf_stream *) malloc(sizeof(struct iperf_stream)); if(!sp) { @@ -208,15 +304,19 @@ struct iperf_stream * iperf_create_tcp_stream(int window, struct iperf_sock_opts } sp->settings = (struct iperf_tcp_settings *)malloc(sizeof(struct iperf_tcp_settings)); + tcp_settings = (struct iperf_tcp_settings *)malloc(sizeof(struct iperf_tcp_settings)); - sp->result = (struct iperf_stream_result *)malloc(sizeof(struct iperf_stream_result)); + result = (struct iperf_stream_result *)malloc(sizeof(struct iperf_stream_result)); + + if(!result) + perror("malloc"); + + //sp->result->interval_results = (struct iperf_interval_results *)malloc(sizeof(struct iperf_interval_results)); //initialise sp with 0 memset(sp, 0, sizeof(struct iperf_stream)); - - memset(&sp->result, 0, sizeof(struct iperf_stream_result)); - + sp->local_port = 5001; sp->remote_port = 5001; @@ -227,9 +327,13 @@ struct iperf_stream * iperf_create_tcp_stream(int window, struct iperf_sock_opts tcp_settings->window_size =window; sp->settings = tcp_settings; + result->duration = 10; + result->bytes_received = 0; + result->bytes_sent = 0; + sp->result = result; + sp->socket = -1; - return sp; } @@ -323,159 +427,203 @@ int iperf_add_stream(struct iperf_test *test, struct iperf_stream *sp) return 0; } - -int *recv_stream(struct iperf_stream *sp) +struct iperf_stream * +update_stream(struct iperf_test *test, int j, int add) { -/* - struct timeval tv; - char ubuf[UNIT_LEN]; - fd_set read_set, temp_set; - int maxfd,result; + struct iperf_stream *n; + n=test->streams; - FD_ZERO(&read_set); - FD_SET(sp->socket, &read_set); - maxfd = sp->socket; + + //find the correct stream for update while(1) - { - memcpy(&temp_set, &read_set,sizeof(temp_set)); - tv.tv_sec = 50; // timeout interval in seconds - tv.tv_usec = 0; - - // using select to check on multiple descriptors. - result = select(maxfd + 1, &temp_set, NULL, NULL, &tv); - - if (result == 0) - printf("select() timed out!\n"); - - else if (result < 0 && errno != EINTR) - printf("Error in select(): %s\n", strerror(errno)); - - else if (result > 0) - { - if (FD_ISSET(sp->socket, &temp_set)) - { - if(sp->protocol == Ptcp) - maxfd = tcp_server_accept(&sp, maxfd, &read_set); // need to change the arguements - - else if(sp->protocol == Pudp) - maxfd = udp_server_accept(&sp, maxfd, &read_set); // need to change the arguements - - FD_CLR(sp->socket, &temp_set); - - Display(); - } - - if(sp->protocol == Ptcp) - tcp_server_thread(maxfd, &temp_set, &read_set, sp); // need to change the arguements - - else if( sp->protocol == Pudp) - udp_server_thread(maxfd, &temp_set, &read_set, sp); // need to change the arguements - - } // end else if (result > 0) - } - - // return ? -*/ -} - - -int *send_stream( struct iperf_stream *sp) -{ - -/* - int s, i; - struct iperf_stream *sp; - char *buf; - int64_t delayns, adjustns, dtargns; - struct timeval before, after; - fd_set write_set; - struct timeval tv; - int maxfd,ret=0; - - if (test->proto == Pudp) { - dtargns = (int64_t)settings->bufsize * SEC_TO_NS * 8; - dtargns /= settings->bw; - - assert(dtargns != 0); - - if(gettimeofday(&before, 0) < 0) { - perror("gettimeofday"); - } - - delayns = dtargns; - adjustns = 0; - printf("%lld adj %lld delay\n", adjustns, delayns); - } - - - - // most of the current Client function goes here. - ret = select(maxfd+1, NULL, &write_set, NULL, &tv); - - if(ret<0) - continue; - - sp = test->streams; - for(i=0;ithreads;i++) - { - if(FD_ISSET(sp->sock, &write_set)) + if(n->socket == j) { - send(sp->sock, buf, sp->settings->bufsize, 0); - sp->result->bytes_out += sp->settings->bufsize; - - if (test->proto == Pudp) - { - if(delayns > 0) - delay(delayns); - - if(gettimeofday(&after, 0) < 0) { - perror("gettimeofday"); - } - - - adjustns = dtargns; - adjustns += (before.tv_sec - after.tv_sec) * SEC_TO_NS; - adjustns += (before.tv_usec - after.tv_usec) * uS_TO_NS; - - if( adjustns > 0 || delayns > 0) { - printf("%lld adj %lld delay\n", adjustns, delayns); - delayns += adjustns; - } - - memcpy(&before, &after, sizeof before); - } - - if(sp->next==NULL) + printf("In update 6\n"); + n->result->bytes_received+= add; break; - sp=sp->next; } - } - - return 0; -*/ + + if(n->next==NULL) + break; + + n = n->next; + } + + return n; } - + +int +free_stream(struct iperf_test *test, struct iperf_stream *sp) +{ + + struct iperf_stream *prev,*start; + prev = test->streams; + start = test->streams; + + if(test->streams->socket == sp->socket){ + + test->streams = test->streams->next; + return 0; + } + else + { + start= test->streams->next; + + while(1) + { + if(start->socket == sp->socket){ + + prev->next = sp->next; + free(sp); + return 0; + } + + if(start->next!=NULL){ + + start=start->next; + prev=prev->next; + } + } + + return -1; + } + +} + + + +int rcv(struct iperf_stream *sp) +{ + int result; + if(sp->protocol == Ptcp) + { + char buffer[DEFAULT_TCP_BUFSIZE]; + + do{ + result = recv(sp->socket, buffer,DEFAULT_TCP_BUFSIZE, 0); + } while (result == -1 && errno == EINTR); + + sp->result->bytes_received+= result; + + return result; + } + + else if (sp->protocol == Pudp) + { + char buffer[DEFAULT_UDP_BUFSIZE]; + + do{ + result = recv(sp->socket, buffer,DEFAULT_UDP_BUFSIZE, 0); + } while (result == -1 && errno == EINTR); + + sp->result->bytes_received+= result; + + return result; + } +} + + + // This function would be big. int iperf_run(struct iperf_test *test) { -/* + int rc; - struct *timer timer; - + struct timer *timer; + // don't see any other way of assigning this anywhere - test->streams->protocol = test->proto; - + if(test->role == 's') { - init(test->streams, test); - - rc = recv_stream(test->streams); + printf("Server running now \n"); + struct timeval tv; + struct iperf_stream *n; + char ubuf[UNIT_LEN]; + fd_set read_set, temp_set; + int maxfd,j; + int result; + + FD_ZERO(&read_set); + FD_SET(test->listener_sock, &read_set); + maxfd = test->listener_sock; + + while(1) + { + memcpy(&temp_set, &read_set,sizeof(temp_set)); + tv.tv_sec = 50; // timeout interval in seconds + tv.tv_usec = 0; - } - else (if test->role == 'c') + // using select to check on multiple descriptors. + result = select(maxfd + 1, &temp_set, NULL, NULL, &tv); + + if (result == 0) + printf("select() timed out!\n"); + else if (result < 0 && errno != EINTR) + printf("Error in select(): %s\n", strerror(errno)); + + else if(result >0) + { + /*accept new connections + / if we create a new function for this part, we need to return/ pass select related + / parameters like maxfd, read_set etc */ + + if (FD_ISSET(test->listener_sock, &temp_set)) + { + if(test->proto == Ptcp) + { + printf(" called TCP accept \n"); + maxfd = tcp_server_accept(test, maxfd, &read_set); + printf(" called TCP accept \n"); + fflush(stdout); + } + else if(test->proto == Pudp) + { + printf(" calling udp accept \n"); + maxfd = udp_server_accept(test, maxfd, &read_set); + } + + FD_CLR(test->listener_sock, &temp_set); + + //Display(); + } + } + + for (j=0; jstreams; + + if (FD_ISSET(j, &temp_set)) + { + // find the correct stream + n = update_stream(test,j,0); + result = rcv(n); + + if(result == 0) + { + unit_snprintf(ubuf, UNIT_LEN, (double) (n->result->bytes_received / n->result->duration), 'a'); + printf("%llu bytes received %s/sec for stream %d\n\n",n->result->bytes_received, ubuf,(int)n); + close(j); + free_stream(test, n); + FD_CLR(j, &read_set); + } + else + { + printf("Error in recv(): %s\n", strerror(errno)); + } + } // end if (FD_ISSET(j, &temp_set)) + + }// end for (j=0;...) + + }// end while + + }// end if(test->role == 's') + + + else if ( test->role == 'c') { + /* timer = new_timer(test->duration, 0); while(!timer->expired(timer)) @@ -487,28 +635,26 @@ int iperf_run(struct iperf_test *test) rc = send_stream(test->streams,test); } - - + */ + ; } - */ + } - - - - - int main(int argc, char **argv) { char ch; struct iperf_test *test; struct iperf_stream *sp, temp; + struct sockaddr_in *addr_local, *addr_remote; struct iperf_sock_opts *sockopt=NULL; int window= 1024*1024, size; int i, bw = 100000, buffer_size; - + + addr_local = (struct sockaddr_in *)malloc(sizeof (struct sockaddr_in)); + addr_remote = (struct sockaddr_in *)malloc(sizeof (struct sockaddr_in)); sockopt = (struct iperf_sock_opts *)malloc(sizeof(struct iperf_sock_opts)); @@ -520,14 +666,17 @@ main(int argc, char **argv) case 'c': test->role = 'c'; // remote_ip_addr - + inet_pton(AF_INET, optarg, &addr_remote->sin_addr); + break; case 'p': - temp.remote_port = atoi(optarg); + addr_remote->sin_port = htons(atoi(optarg)); + break; case 's': - test->role = 's'; + test->role = 's'; + addr_local->sin_port = htons(5001); break; case 't': test->duration = atoi(optarg); @@ -551,7 +700,19 @@ main(int argc, char **argv) break; } + printf("role = %s\n", (test->role == 's')?"Server":"Client"); + printf("duration = %d\n", test->duration); + printf("protocol = %s\n", (test->proto == Ptcp)?"TCP":"UDP"); + printf("interval = %d\n", test->stats_interval); + printf("Parallel streams = %d\n", test->num_streams); + test->local_ip_addr= (struct sockaddr_storage *) addr_local; + test->remote_ip_addr= (struct sockaddr_storage *) addr_remote; + + + if(test->role == 'c') + { + switch(test->proto) { case Ptcp: for(i=0;inum_streams;i++) @@ -579,34 +740,16 @@ main(int argc, char **argv) iperf_add_stream(test,sp); } break; + } } - + printf("streams created \n"); - for(i=0;inum_streams;i++) - { - if(test->proto == Ptcp) - printf(" %d is the windowsize for tcp\n",((struct iperf_tcp_settings *)(sp->settings))->window_size ); - else - printf(" %d is the rate for udp\n",((struct iperf_udp_settings *)(sp->settings))->rate ); - if(sp->next!= NULL) - sp = sp->next; - } - - - printf("role = %s\n", (test->role == 's')?"Server":"Client"); - printf("duration = %d\n", test->duration); - printf("protocol = %s\n", (test->proto == Ptcp)?"TCP":"UDP"); - printf("interval = %d\n", test->stats_interval); - printf("Local port = %d\n", test->streams->local_port); - printf("Remote port = %d\n", test->streams->remote_port); - printf("Parallel streams = %d\n", test->num_streams); - // depending whether client or server, we would call function ? iperf_init_test(test); - // init_test and init_stream functions - //run function + + iperf_run(test); return 0; } diff --git a/src/iperf_api.h b/src/iperf_api.h index c50bd39..99e6473 100644 --- a/src/iperf_api.h +++ b/src/iperf_api.h @@ -63,9 +63,9 @@ struct iperf_stream struct sockaddr_storage local_addr; struct sockaddr_storage remote_addr; - int *(*init)(struct iperf_stream *stream); - int *(*recv_stream)(struct iperf_stream *stream); - int *(*send_stream)(struct iperf_stream *stream); + int *(*init)(struct iperf_stream *stream); + int (*rcv)(struct iperf_stream *stream); + int *(*snd)(struct iperf_stream *stream); int *(*update_stats)(struct iperf_stream *stream); struct iperf_stream *next; @@ -79,17 +79,21 @@ struct iperf_test struct sockaddr_storage *local_ip_addr; int duration; // total duration of test -t + int listener_sock; + int stats_interval; // time interval to gather stats -i void *(*stats_callback)(struct iperf_test *); // callback function pointer for stats int reporter_interval; // time interval for reporter void *(*reporter_callback)(struct iperf_test *); // callback function pointer for reporter int reporter_fd; // file descriptor for reporter - - /* internal state */ - - int num_streams; // total streams in the test -P ? + + int num_streams; // total streams in the test -P struct iperf_stream *streams; // pointer to list of struct stream + + + /*function pointers : moved here because of memeber passing problem */ + }; /**