From 37555f2a777344c9890345a829d13d70af3a64e7 Mon Sep 17 00:00:00 2001 From: ha7ilm Date: Thu, 5 Nov 2015 23:57:03 +0100 Subject: [PATCH] This is a kind of stream multiplexer now (stdin input, TCP output to multiple clients), but closing clients is not handled yet. --- ddcd.cpp | 158 ++++++++++++++++++++++++++++++++++++++----------------- ddcd.h | 27 ++++++++++ 2 files changed, 138 insertions(+), 47 deletions(-) diff --git a/ddcd.cpp b/ddcd.cpp index d40878b..3e6202a 100644 --- a/ddcd.cpp +++ b/ddcd.cpp @@ -29,36 +29,33 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "ddcd.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include + #define SOFTWARE_NAME "ddcd" #define MSG_START SOFTWARE_NAME ": " -typedef struct client_s -{ - struct sockaddr_in addr; - int socket; - pid_t pid; - int pipefd[2]; -} client_t; - int host_port = 0; char host_address[100] = "127.0.0.1"; int decimation = 0; +int bufsize = 1024; +int bufsizeall; +char* buf; + +int set_nonblocking(int fd) +{ + int flagtmp; + if((flagtmp = fcntl(fd, F_GETFL))!=-1) + if((flagtmp = fcntl(fd, F_SETFL, flagtmp|O_NONBLOCK))!=-1) + return 0; + return 1; +} + +client_t* this_client; int main(int argc, char* argv[]) { int c; + fd_set select_fds; for(;;) { @@ -66,9 +63,10 @@ int main(int argc, char* argv[]) static struct option long_options[] = { {"port", required_argument, 0, 'p' }, {"address", required_argument, 0, 'a' }, - {"decimation", required_argument, 0, 'd' } + {"decimation", required_argument, 0, 'd' }, + {"bufsize", required_argument, 0, 'b' } }; - c = getopt_long(argc, argv, "p:a:d:", long_options, &option_index); + c = getopt_long(argc, argv, "p:a:d:b:", long_options, &option_index); if(c==-1) break; switch (c) { @@ -82,6 +80,9 @@ int main(int argc, char* argv[]) case 'd': decimation=atoi(optarg); break; + case 'b': + bufsize=atoi(optarg); + break; case 0: case '?': case ':': @@ -90,62 +91,125 @@ int main(int argc, char* argv[]) } } - if(!decimation) { fprintf(stderr, MSG_START "missing required command line argument, --decimation.\n"); exit(1); } - if(!host_port) { fprintf(stderr, MSG_START "missing required command line argument, --port.\n"); exit(1); } + if(!decimation) error_exit(MSG_START "missing required command line argument, --decimation.\n"); + if(!host_port) error_exit(MSG_START "missing required command line argument, --port.\n"); struct sockaddr_in addr_host; int listen_socket; - std::vector clients(10); + std::vector clients; + clients.reserve(100); listen_socket=socket(AF_INET,SOCK_STREAM,0); + + int sockopt = 1; + if( setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&sockopt, sizeof(sockopt)) == -1 ) + error_exit(MSG_START "cannot set SO_REUSEADDR.\n"); //the best description on SO_REUSEADDR ever: http://stackoverflow.com/a/14388707/3182453 + memset(&addr_host,'0',sizeof(addr_host)); addr_host.sin_family=AF_INET; addr_host.sin_port=htons(host_port); addr_host.sin_addr.s_addr = INADDR_ANY; if( (addr_host.sin_addr.s_addr=inet_addr(host_address)) == INADDR_NONE ) - { fprintf(stderr, MSG_START "invalid host address.\n"); exit(1); } + error_exit(MSG_START "invalid host address.\n"); if( bind(listen_socket, (struct sockaddr*) &addr_host, sizeof(addr_host)) < 0 ) - { fprintf(stderr, MSG_START "cannot bind() address to the socket.\n"); exit(1); } + error_exit(MSG_START "cannot bind() address to the socket.\n"); if( listen(listen_socket, 10) == -1 ) - { fprintf(stderr, MSG_START "cannot listen() on socket.\n"); exit(1); } + error_exit(MSG_START "cannot listen() on socket.\n"); + + struct sockaddr_in addr_cli; + socklen_t addr_cli_len = sizeof(addr_cli); + int new_socket; + + //The server will wait on these sockets later... + + + //Set stdin and listen_socket to non-blocking + if(set_nonblocking(STDIN_FILENO) || set_nonblocking(listen_socket)) + error_exit(MSG_START "cannot set_nonblocking().\n"); + + bufsizeall = bufsize*sizeof(char); + buf = (char*)malloc(bufsizeall); + + FD_ZERO(&select_fds); + FD_SET(listen_socket, &select_fds); + FD_SET(STDIN_FILENO, &select_fds); + int highfd = ((listen_socket>STDIN_FILENO)?listen_socket:STDIN_FILENO) + 1; for(;;) { - struct sockaddr_in addr_cli; - socklen_t addr_cli_len = sizeof(addr_cli); - int new_socket; - - if( (new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) == -1) + //Let's wait until there is any new data to read, or any new connection! + select(highfd, &select_fds, NULL, NULL, NULL); + + //Is there a new client connection? + if( (new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1) { - fprintf(stderr, MSG_START "cannot accept() a connection.\n"); - continue; + this_client = new client_t; + memcpy(&this_client->addr, &addr_cli, sizeof(this_client->addr)); + this_client->socket = new_socket; + if(pipe(this_client->pipefd) == -1) + { + perror(MSG_START "cannot open new pipe() for the client.\n"); + continue; + } + if(this_client->pid = fork()) + { + //We're the parent + set_nonblocking(this_client->pipefd[1]); + clients.push_back(this_client); + printf("client pid: %d\n", this_client->pid); + } + else + { + //We're the client + client(); + return 1; + } } - client_t* new_client = new client_t; - memcpy(&new_client->addr, &addr_cli, sizeof(new_client->addr)); - new_client->socket = new_socket; - - if(new_client->pid = fork()) + int retval = read(STDIN_FILENO, buf, bufsizeall); + if(retval==0) { - //We're the parent - clients.push_back(new_client); - printf("client pid: %d\n", new_client->pid); + //end of input stream, close clients and exit } - else + else if(retval != -1) { - //We're the client - client(); - break; + for (int i=0;ipipefd[1], buf, retval)==-1) + print_client(clients[i], "lost buffer, failed to write pipe"); + } } + //TODO: at the end, server closes pipefd[1] for client + //close(this_client->pipefd[1]); } return 0; } +void print_client(client_t* client, const char* what) +{ + fprintf(stderr,MSG_START " (client %s:%d) %s\n", inet_ntoa(client->addr.sin_addr), client->addr.sin_port, what); +} + +void client_cleanup() +{ + close(this_client->pipefd[0]); +} + void client() { printf("I'm the client\n"); - for(;;) sleep(1); + for(;;) + { + read(this_client->pipefd[0],buf,bufsizeall); + send(this_client->socket,buf,bufsizeall,0); + } +} + +void error_exit(const char* why) +{ + perror(why); + exit(1); } diff --git a/ddcd.h b/ddcd.h index 02ed196..1106a00 100644 --- a/ddcd.h +++ b/ddcd.h @@ -1 +1,28 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +typedef struct client_s +{ + struct sockaddr_in addr; + int socket; + pid_t pid; + int pipefd[2]; +} client_t; + + void client(); +void error_exit(const char* why); +void print_client(client_t* client, const char* what);