diff --git a/ddcd.h b/ddcd.h index 766be47..f1cf384 100644 --- a/ddcd.h +++ b/ddcd.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include diff --git a/nmux.cpp b/nmux.cpp index 0b093b4..4225678 100644 --- a/nmux.cpp +++ b/nmux.cpp @@ -35,7 +35,7 @@ char host_address[100] = "127.0.0.1"; int thread_cntr = 0; //CLI parameters -int bufsize = 1024; //! currently unused +int bufsize = 1024; int bufcnt = 1024; char** global_argv; @@ -133,6 +133,8 @@ int main(int argc, char* argv[]) int new_socket; int highfd = 0; + int input_fd = STDIN_FILENO; + fd_set select_fds; FD_ZERO(&select_fds); FD_SET(listen_socket, &select_fds); maxfd(&highfd, listen_socket); @@ -150,7 +152,10 @@ int main(int argc, char* argv[]) unsigned char* current_write_buffer = pool->get_write_buffer(); int index_in_current_write_buffer = 0; - pthread_cond_t* wait_condition = new pthread_cond_t; + //Create wait condition: client threads waiting for input data from the main thread will be + // waiting on this condition. They will be woken up with pthread_cond_broadcast() if new + // data arrives. + pthread_cond_t* wait_condition = new pthread_cond_t; if(!pthread_cond_init(wait_condition, NULL)) print_exit(MSG_START "pthread_cond_init failed"); //cond_attrs is ignored by Linux @@ -189,7 +194,11 @@ int main(int argc, char* argv[]) if(index_in_current_write_buffer >= bufsize) { current_write_buffer = pool->get_write_buffer(); - pthread_cond_broadcast(wait_condition); + pthread_cond_broadcast(wait_condition); + //Shouldn't we do it after we put data in? + // No, on get_write_buffer() actually the previous buffer is getting available + // for read for threads that wait for new data (wait on pthead mutex + // client->wait_condition). index_in_current_write_buffer = 0; } int retval = read(input_fd, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer); @@ -199,53 +208,17 @@ int main(int argc, char* argv[]) } else if(retval==0) { - //!end of input stream, close clients and exit - print_exit(MSG_START "end of input, exiting.\n") + //End of input stream, close clients and exit + print_exit(MSG_START "end of input stream, exiting.\n") } } } -#if 0 -for (int i=0; ipipefd[1], buf, retval)==-1) - { - - if(!clients[i]->error) - { - print_client(clients[i], "lost buffer, failed to write pipe."); - clients[i]->error=1; - } - //fprintf(stderr, MSG_START "errno is %d\n", errno); //usually 11 - //int wpstatus; - //int wpresult = waitpid(clients[i]->pid, &wpstatus, WNOHANG); - //fprintf(stderr, MSG_START "pid is %d\n",clients[i]->pid); - //perror("somethings wrong"); - //if(wpresult == -1) print_client(clients[i], "error while waitpid()!"); - //else if(wpresult == 0) - waitpid(clients[i]->pid, NULL, WNOHANG); - if(!proc_exists(clients[i]->pid)) - { - //Client exited! - print_client(clients[i], "closing client from main process."); - close(clients[i]->pipefd[1]); - close(clients[i]->socket); - delete clients[i]; - clients.erase(clients.begin()+i); - fprintf(stderr, MSG_START "done closing client from main process.\n"); - } - } - else { if(clients[i]->error) print_client(clients[i], "pipe okay again."); clients[i]->error=0; } -} -} -//TODO: at the end, server closes pipefd[1] for client -#endif - void client_erase(client_t* client) { pthread_mutex_destroy(client->wait_mutex); pthread_cond_destroy(client->wait_condition); - pool-> + pool->remove_thread(client->tsmthread); } void clients_close_all_finished() @@ -266,30 +239,66 @@ void* client_thread (void* param) this_client->status = CS_THREAD_RUNNING; int retval; tsmpool* lpool = this_client->lpool; + + fd_set client_select_fds; + int client_highfd = 0; + FD_ZERO(&client_select_fds); + FD_SET(client->socket, &client_select_fds); + maxfd(&client_highfd, client->socket); + + //Set client->socket to non-blocking + if(set_nonblocking(client->socket)) + error_exit(MSG_START "cannot set_nonblocking() on client->socket"); + + int client_buffer_index = 0; + char* pool_read_buffer = NULL; + for(;;) { - //wait until there is any data in the tsmpool for me (wait for the server process to wake me up) - for(;;) + //Wait until there is any data to send. + // If I haven't sent all the data from my last buffer, don't wait. + // (Wait for the server process to wake me up.) + while(!pool_read_buffer || client_buffer_index == lpool->size) { char* pool_read_buffer = (char*)lpool->get_read_buffer(); - if(pool_read_buffer) break; pthread_mutex_lock(&this_client->wait_mutex); this_client->sleeping = 1; pthread_cond_wait(this_client->wait_condition, &this_client->wait_mutex); } - //wait for the socket to be available for write - //read data from server global tsmpool - //write data to client socket - //have an exit condition - + //Wait for the socket to be available for write. + select(highfd, NULL, &client_select_fds, NULL, NULL); + //Read data from global tsmpool and write it to client socket + int ret = send(client->socket, pool_read_buffer + client_buffer_index, lpool->size - client_buffer_index, 0); + if(ret == -1) + { + switch(errno) + { + case EAGAIN: break; + default: goto client_thread_exit; + } + } + else client_buffer_index += ret; } + +client_thread_exit: this_client->status = CS_THREAD_FINISHED; + fprintf(stderr, "CS_THREAD_FINISHED"); //Debug pthread_exit(NULL); return NULL; } + +int set_nonblocking(int fd) +{ + int flagtmp; + if((flagtmp = fcntl(fd, F_GETFL))!=-1) + if((flagtmp = fcntl(fd, F_SETFL, flagtmp|O_NONBLOCK))!=-1) + return 0; + return 1; +} + void error_exit(const char* why) { perror(why); //do we need a \n at the end of (why)? diff --git a/nmux.h b/nmux.h index cb8df09..40003ef 100644 --- a/nmux.h +++ b/nmux.h @@ -1,5 +1,9 @@ #include #include +#include +#include +#include +#include #include #include #include @@ -29,3 +33,12 @@ typedef struct client_s pthread_cond_t* wait_condition; pthread_mutex_t wait_mutex; } client_t; + +void print_exit(const char* why); +void sig_handler(int signo); +void client_erase(client_t* client); +void clients_close_all_finished(); +void* client_thread (void* param); +void error_exit(const char* why); +void maxfd(int* maxfd, int fd); +int set_nonblocking(int fd); diff --git a/tsmpool.cpp b/tsmpool.cpp index d081eb9..ff3b773 100644 --- a/tsmpool.cpp +++ b/tsmpool.cpp @@ -1,18 +1,24 @@ #include "tsmpool.h" tsmpool::tsmpool(size_t size, int num) + size(size), + num(num) //number of buffers of (size) to alloc { this->threads_cntr = 0; - this->size = size; - this->num = num; //number of buffers of (size) to alloc this->ok = 1; this->lowest_read_index = -1; this->write_index = 0; this->my_read_index = 0; - if (pthread_mutex_init(&this->mutex, NULL) != 0) this->ok=0; + if (pthread_mutex_init(&this->mutex, NULL) != 0) { this->ok = 0; return; } + for(int i=0; iok = 0; return; } + buffers.push_back(newptr); + } } -size_t tsmpool::get_size() { return this->size; } +int tsmpool::is_ok() { return this->ok; } void* tsmpool::get_write_buffer() { diff --git a/tsmpool.h b/tsmpool.h index f207689..c1ff956 100644 --- a/tsmpool.h +++ b/tsmpool.h @@ -15,19 +15,20 @@ typedef struct tsmthread_s class tsmpool { private: - size_t size; - int num; vector threads; vector buffers; int threads_cntr; pthread_mutex_t mutex; - int ok; + int ok; //tsmpool is expected to be included in C-style programs. + // If something fails in the constructor, it will be seen here instead of a try{}catch{} int write_index; //it always points to the next buffer to be written int lowest_read_index; //unused int my_read_index; //it is used when tsmpool is used as a single writer - single reader circular buffer public: - size_t get_size(); + const size_t size; + const int num; + int is_ok(); tsmpool(size_t size, int num); void* get_write_buffer(); tsmthread_t* register_thread();