diff --git a/ddcd.cpp b/ddcd.cpp index 5222022..f8fe9d6 100644 --- a/ddcd.cpp +++ b/ddcd.cpp @@ -35,9 +35,19 @@ char host_address[100] = "127.0.0.1"; int decimation = 0; float transition_bw = 0.05; int bufsize = 1024; +int bufcnt = 1024; +int maxbufcnt = 1000 +int thread_cntr = 0; char ddc_method_str[100] = "td"; ddc_method_t ddc_method; +void sig_handler(int signo) +{ + fprintf(stderr, MSG_START "signal %d caught, exiting ddcd...\n", signo); + fflush(stderr); + exit(0); +} + int main(int argc, char* argv[]) { int c; @@ -103,7 +113,135 @@ int main(int argc, char* argv[]) } else print_exit(MSG_START "invalid parameter given to --method.\n"); + //set signals + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = sig_handler; + sigaction(SIGTERM, &sa, NULL); + sigaction(SIGKILL, &sa, NULL); + sigaction(SIGQUIT, &sa, NULL); + sigaction(SIGINT, &sa, NULL); + sigaction(SIGHUP, &sa, NULL); + + struct sockaddr_in addr_host; + int listen_socket; + std::vector clients; + clients.reserve(100); + listen_socket=socket(AF_INET,SOCK_STREAM,0); + int sockopt = 1; + if( setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&sockopt, sizeof(sockopt)) == -1 ) + error_exit(MSG_START "cannot set SO_REUSEADDR"); //the best description on SO_REUSEADDR ever: http://stackoverflow.com/a/14388707/3182453 + + memset(&addr_host,'0',sizeof(addr_host)); + addr_host.sin_family = AF_INET; + addr_host.sin_port = htons(host_port); + addr_host.sin_addr.s_addr = INADDR_ANY; + + if( (addr_host.sin_addr.s_addr=inet_addr(host_address)) == INADDR_NONE ) + error_exit(MSG_START "invalid host address"); + + if( bind(listen_socket, (struct sockaddr*) &addr_host, sizeof(addr_host)) < 0 ) + error_exit(MSG_START "cannot bind() address to the socket"); + + if( listen(listen_socket, 10) == -1 ) + error_exit(MSG_START "cannot listen() on socket"); + + fprintf(stderr,MSG_START "listening on %s:%d\n", inet_ntoa(addr_host.sin_addr), host_port); + + struct sockaddr_in addr_cli; + socklen_t addr_cli_len = sizeof(addr_cli); + int new_socket; + + int highfd = 0; + FD_ZERO(&select_fds); + FD_SET(listen_socket, &select_fds); + maxfd(&highfd, listen_socket); + FD_SET(input_fd, &select_fds); + maxfd(&highfd, input_fd); + + //Set stdin and listen_socket to non-blocking + if(set_nonblocking(input_fd) || set_nonblocking(listen_socket)) //don't do it before subprocess fork! + error_exit(MSG_START "cannot set_nonblocking()"); + + for(;;) + { + //Let's wait until there is any new data to read, or any new connection! + select(highfd, &select_fds, NULL, NULL, NULL); + + //Is there a new client connection? + if( (new_socket = accept(listen_socket, (struct sockaddr*)&addr_cli, &addr_cli_len)) != -1) + { + this_client = new client_t; + this_client->error = 0; + memcpy(&this_client->addr, &addr_cli, sizeof(this_client->addr)); + this_client->socket = new_socket; + this_client->id = thread_cntr++; + + if(pthread_create(&this_client->thread, NULL, client_thread , (void*)&this_client)<0) + { + //We're the parent + clients.push_back(this_client); + fprintf(stderr, MSG_START "pthread_create() done, this_client->id: %d\n", this_client->id); + } + } + + float* pool_next = pool->get_write_buffer(); + + int retval = read(input_fd, pool_next, mainpool->size); + if(retval==0) + { + //end of input stream, close clients and exit + } + else if(retval != -1) + { + for (int i=0; ipipefd[1], buf, retval)==-1) + { + + if(!clients[i]->error) + { + print_client(clients[i], "lost buffer, failed to write pipe."); + clients[i]->error=1; + } + //fprintf(stderr, MSG_START "errno is %d\n", errno); //usually 11 + //int wpstatus; + //int wpresult = waitpid(clients[i]->pid, &wpstatus, WNOHANG); + //fprintf(stderr, MSG_START "pid is %d\n",clients[i]->pid); + //perror("somethings wrong"); + //if(wpresult == -1) print_client(clients[i], "error while waitpid()!"); + //else if(wpresult == 0) + waitpid(clients[i]->pid, NULL, WNOHANG); + if(!proc_exists(clients[i]->pid)) + { + //Client exited! + print_client(clients[i], "closing client from main process."); + close(clients[i]->pipefd[1]); + close(clients[i]->socket); + delete clients[i]; + clients.erase(clients.begin()+i); + fprintf(stderr, MSG_START "done closing client from main process.\n"); + } + } + else { if(clients[i]->error) print_client(clients[i], "pipe okay again."); clients[i]->error=0; } + } + } + //TODO: at the end, server closes pipefd[1] for client + + } + + +} + +void* client_thread (void* param) +{ +} + +void error_exit(const char* why) +{ + perror(why); + exit(1); } void print_exit(const char* why) @@ -111,3 +249,9 @@ void print_exit(const char* why) fprintf(stderr, "%s", why); exit(1); } + +void maxfd(int* maxfd, int fd) +{ + if(fd>=*maxfd) *maxfd=fd+1; +} + diff --git a/ddcd.h b/ddcd.h index 845cc33..5436476 100644 --- a/ddcd.h +++ b/ddcd.h @@ -4,6 +4,11 @@ #include #include #include +#include +#include +#include +#include +#include #define SOFTWARE_NAME "ddcd" #define MSG_START SOFTWARE_NAME ": " @@ -14,13 +19,17 @@ typedef enum ddc_method_e M_FASTDDC } ddc_method_t; -void print_exit(const char* why); - typedef struct client_s { struct sockaddr_in addr; + int id; int socket; int error; pthread_t thread; } client_t; +void print_exit(const char* why); +void error_exit(const char* why); +void maxfd(int* maxfd, int fd); + + diff --git a/tsmpool.cpp b/tsmpool.cpp new file mode 100644 index 0000000..3af0149 --- /dev/null +++ b/tsmpool.cpp @@ -0,0 +1,53 @@ +tsmpool::tsmpool(size_t size, int num) +{ + this->threads_cntr = 0; + this->num = num; + this->size = size; + this->ok = 1; + this->lowest_read_index = -1; + if (pthread_mutex_init(&this->mutex, NULL) != 0) this->ok=0; +} + +size_t tsmpool::get_size() { return this->size; } + +void* tsmpool::get_write_buffer() +{ + if(write_index==index_before(lowest_read_index)) return NULL; + void* to_return = buffers[write_index]; + write_index=index_next(write_index); +} + +tsmthread_t* tsmpool::register_thread() +{ + if(!ok) return -1; + pthread_mutex_lock(&this->mutex); + tsmthread_t* thread = new tsmthread_t; + thread->read_index = write_index; + threads.push_back(thread); + pthread_mutex_unlock(&this->mutex); + return thread; +} + +int tsmpool::remove_thread(tsmthread_t* thread) +{ + pthread_mutex_lock(&this->mutex); + for(int i=0;imutex); +} + +void* tsmpool::get_read_buffer(tsmthread_t* thread) +{ + if(thread->read_index==write_index) return NULL; + void* to_return = buffers[thread->read_index]; + thread->read_index=index_next(thread->read_index); +} + +void* tsmpool::set_read_index_distance(tsmthread_t* thread, int distance) +{ +} diff --git a/tsmpool.h b/tsmpool.h new file mode 100644 index 0000000..aa8385c --- /dev/null +++ b/tsmpool.h @@ -0,0 +1,29 @@ +typedef struct tsmthread_s +{ + int read_index; //it always points to the next buffer to be read +} tsmthread_t; + +class tsmpool +{ +private: + size_t size; + int num; + vector threads; + vector buffers; + int threads_cntr; + pthread_mutex_t mutex; + int ok; + int write_index; //it always points to the next buffer to be written + int lowest_read_index; + +public: + size_t get_size(); + tsmpool(size_t size, int num); + void* get_write_buffer(); + int register_thread(); + void* get_read_buffer(int thread_id); + int index_next(int index) { return (index+1==size)?0:index; } + int index_before(int index) { return (index-1<0)?size-1:index; } +} + +