26bb6e8f79
This commit was SVN r22001.
174 строки
4.9 KiB
C
174 строки
4.9 KiB
C
/* -*- C -*-
|
|
*
|
|
* $HEADER$
|
|
*
|
|
*/
|
|
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <sys/types.h>
|
|
#include <sys/mman.h>
|
|
#include <sys/stat.h>
|
|
#include <netinet/in.h>
|
|
#include <signal.h>
|
|
#include <pthread.h>
|
|
#include <unistd.h>
|
|
#include <time.h>
|
|
#include <fcntl.h>
|
|
#include <string.h>
|
|
#include <errno.h>
|
|
|
|
int main(int argc, char* argv[])
|
|
{
|
|
int rc, i;
|
|
char hostname[512];
|
|
pid_t pid;
|
|
struct timeval tv;
|
|
int xmit, recv;
|
|
struct sockaddr_in rx, inaddr;
|
|
struct ip_mreq req;
|
|
fd_set fdset;
|
|
fd_set errset;
|
|
int flags;
|
|
int addrlen;
|
|
struct timespec ts, tsrem;
|
|
uint8_t ttl = 1;
|
|
uint8_t bytes[256];
|
|
char *nprocstr;
|
|
int nprocs;
|
|
|
|
gethostname(hostname, 512);
|
|
pid = getpid();
|
|
|
|
printf("orte_mcast_recv: Node %s Pid %ld\n",
|
|
hostname, (long)pid);
|
|
|
|
nprocstr = getenv("OMPI_COMM_WORLD_SIZE");
|
|
nprocs = strtol(nprocstr, NULL, 10);
|
|
|
|
/* create a xmit socket */
|
|
xmit = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
|
|
if(xmit < 0) {
|
|
fprintf(stderr,"%d: rmcast:init: socket() failed\n", (int)pid);
|
|
exit(1);
|
|
}
|
|
|
|
/* set the multicast flags */
|
|
if ((setsockopt(xmit, IPPROTO_IP, IP_MULTICAST_TTL,
|
|
(void *)&ttl, sizeof(ttl))) < 0) {
|
|
fprintf(stderr,"%d: rmcast:init: socketopt() failed\n", (int)pid);
|
|
exit(1);
|
|
}
|
|
|
|
flags = 1;
|
|
if (setsockopt (xmit, SOL_SOCKET, SO_REUSEADDR, (const char *)&flags, sizeof(flags)) < 0) {
|
|
fprintf(stderr, "rmcast:basic: unable to set the SO_REUSEADDR option\n");
|
|
exit(1);
|
|
}
|
|
memset(&inaddr, 0, sizeof(inaddr));
|
|
inaddr.sin_family = AF_INET;
|
|
inaddr.sin_addr.s_addr = htonl(0xEFFF0001);
|
|
inaddr.sin_port = htons(5002);
|
|
addrlen = sizeof(struct sockaddr_in);
|
|
|
|
/* set membership to "any" */
|
|
memset(&req, 0, sizeof (req));
|
|
req.imr_multiaddr.s_addr = htonl(0xEFFF0001);
|
|
req.imr_interface.s_addr = htonl(INADDR_ANY);
|
|
|
|
if ((setsockopt(xmit, IPPROTO_IP, IP_ADD_MEMBERSHIP,
|
|
(void *)&req, sizeof (req))) < 0) {
|
|
fprintf(stderr, "setsockopt() failed\n");
|
|
exit(1);
|
|
}
|
|
|
|
/* create a recv socket */
|
|
recv = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
|
|
if(recv < 0) {
|
|
fprintf(stderr,"%d: rmcast:init: socket() failed\n", (int)pid);
|
|
exit(1);
|
|
}
|
|
|
|
flags = 1;
|
|
if (setsockopt (recv, SOL_SOCKET, SO_REUSEADDR, (const char *)&flags, sizeof(flags)) < 0) {
|
|
fprintf(stderr, "rmcast:basic: unable to set the SO_REUSEADDR option\n");
|
|
exit(1);
|
|
}
|
|
memset(&rx, 0, sizeof(rx));
|
|
rx.sin_family = AF_INET;
|
|
rx.sin_addr.s_addr = htonl(0xEFFF0001);
|
|
rx.sin_port = htons(5002);
|
|
|
|
/* bind the socket */
|
|
if (bind(recv, (struct sockaddr*)&rx, sizeof(rx)) < 0) {
|
|
fprintf(stderr, "%d: rmcast:init: bind()\n", (int)pid);
|
|
exit(1);
|
|
}
|
|
|
|
/* set membership to "any" */
|
|
memset(&req, 0, sizeof (req));
|
|
req.imr_multiaddr.s_addr = htonl(0xEFFF0001);
|
|
req.imr_interface.s_addr = htonl(INADDR_ANY);
|
|
|
|
if ((setsockopt(recv, IPPROTO_IP, IP_ADD_MEMBERSHIP,
|
|
(void *)&req, sizeof (req))) < 0) {
|
|
fprintf(stderr, "%d: rmcast:init: sockopt()\n", (int)pid);
|
|
exit(1);
|
|
}
|
|
|
|
tv.tv_sec = 1;
|
|
tv.tv_usec = 100*1000;
|
|
FD_ZERO(&fdset);
|
|
FD_ZERO(&errset);
|
|
FD_SET(recv, &fdset);
|
|
|
|
while ((rc = select(recv+1,
|
|
&fdset /* read-fds */, 0 /* write-fds */,
|
|
&errset /* error-fds */, NULL)) < 0) {
|
|
fprintf(stderr, "select\n");
|
|
}
|
|
if (0 == rc) {
|
|
fprintf(stderr, "select failed to find anything - errno %d\n", errno);
|
|
exit(0);
|
|
}
|
|
fprintf(stderr, "%d: MESSAGE ARRIVED...READING DATA\n", (int)pid);
|
|
|
|
addrlen = sizeof(rx);
|
|
rc = recvfrom(recv, bytes, 256, 0, (struct sockaddr *)&rx, &addrlen);
|
|
|
|
fprintf(stderr, "%d: RECVD %d bytes...SENDING RESPONSE\n", (int)pid, rc);
|
|
/* send a reply */
|
|
if ((rc = sendto(xmit, (char*)bytes, 256, 0, (struct sockaddr *)&inaddr, sizeof(struct sockaddr_in))) != 256) {
|
|
fprintf(stderr, "%d: send error %d\n", (int)pid, errno);
|
|
exit(1);
|
|
}
|
|
fprintf(stderr, "%d: RESPONSE SENT\n", (int)pid);
|
|
|
|
ts.tv_sec = 0;
|
|
ts.tv_nsec = 450*1000;
|
|
|
|
while (nanosleep(&ts, &tsrem) < 0) {
|
|
ts = tsrem;
|
|
}
|
|
|
|
for (i=1; i < nprocs; i++) {
|
|
while ((rc = select(recv+1,
|
|
&fdset /* read-fds */, 0 /* write-fds */,
|
|
&errset /* error-fds */, NULL)) < 0) {
|
|
fprintf(stderr, "select\n");
|
|
}
|
|
if (0 == rc) {
|
|
fprintf(stderr, "select failed to find anything - errno %d\n", errno);
|
|
exit(0);
|
|
}
|
|
fprintf(stderr, "%d: MESSAGE ARRIVED...READING DATA\n", (int)pid);
|
|
|
|
addrlen = sizeof(rx);
|
|
rc = recvfrom(recv, bytes, 256, 0, (struct sockaddr *)&rx, &addrlen);
|
|
|
|
fprintf(stderr, "%d: RECVD %d bytes\n", (int)pid, rc);
|
|
}
|
|
|
|
exit(0);
|
|
}
|