Finished nmux, now working on making it compile
This commit is contained in:
parent
377faec68e
commit
3ad4d15945
5 changed files with 88 additions and 58 deletions
1
ddcd.h
1
ddcd.h
|
@ -5,6 +5,7 @@
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <getopt.h>
|
#include <getopt.h>
|
||||||
#include <signal.h>
|
#include <signal.h>
|
||||||
|
#include <unistd.h>
|
||||||
#include <arpa/inet.h>
|
#include <arpa/inet.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
|
|
105
nmux.cpp
105
nmux.cpp
|
@ -35,7 +35,7 @@ char host_address[100] = "127.0.0.1";
|
||||||
int thread_cntr = 0;
|
int thread_cntr = 0;
|
||||||
|
|
||||||
//CLI parameters
|
//CLI parameters
|
||||||
int bufsize = 1024; //! currently unused
|
int bufsize = 1024;
|
||||||
int bufcnt = 1024;
|
int bufcnt = 1024;
|
||||||
|
|
||||||
char** global_argv;
|
char** global_argv;
|
||||||
|
@ -133,6 +133,8 @@ int main(int argc, char* argv[])
|
||||||
int new_socket;
|
int new_socket;
|
||||||
|
|
||||||
int highfd = 0;
|
int highfd = 0;
|
||||||
|
int input_fd = STDIN_FILENO;
|
||||||
|
fd_set select_fds;
|
||||||
FD_ZERO(&select_fds);
|
FD_ZERO(&select_fds);
|
||||||
FD_SET(listen_socket, &select_fds);
|
FD_SET(listen_socket, &select_fds);
|
||||||
maxfd(&highfd, listen_socket);
|
maxfd(&highfd, listen_socket);
|
||||||
|
@ -150,6 +152,9 @@ int main(int argc, char* argv[])
|
||||||
unsigned char* current_write_buffer = pool->get_write_buffer();
|
unsigned char* current_write_buffer = pool->get_write_buffer();
|
||||||
int index_in_current_write_buffer = 0;
|
int index_in_current_write_buffer = 0;
|
||||||
|
|
||||||
|
//Create wait condition: client threads waiting for input data from the main thread will be
|
||||||
|
// waiting on this condition. They will be woken up with pthread_cond_broadcast() if new
|
||||||
|
// data arrives.
|
||||||
pthread_cond_t* wait_condition = new pthread_cond_t;
|
pthread_cond_t* wait_condition = new pthread_cond_t;
|
||||||
if(!pthread_cond_init(wait_condition, NULL))
|
if(!pthread_cond_init(wait_condition, NULL))
|
||||||
print_exit(MSG_START "pthread_cond_init failed"); //cond_attrs is ignored by Linux
|
print_exit(MSG_START "pthread_cond_init failed"); //cond_attrs is ignored by Linux
|
||||||
|
@ -190,6 +195,10 @@ int main(int argc, char* argv[])
|
||||||
{
|
{
|
||||||
current_write_buffer = pool->get_write_buffer();
|
current_write_buffer = pool->get_write_buffer();
|
||||||
pthread_cond_broadcast(wait_condition);
|
pthread_cond_broadcast(wait_condition);
|
||||||
|
//Shouldn't we do it after we put data in?
|
||||||
|
// No, on get_write_buffer() actually the previous buffer is getting available
|
||||||
|
// for read for threads that wait for new data (wait on pthead mutex
|
||||||
|
// client->wait_condition).
|
||||||
index_in_current_write_buffer = 0;
|
index_in_current_write_buffer = 0;
|
||||||
}
|
}
|
||||||
int retval = read(input_fd, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer);
|
int retval = read(input_fd, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer);
|
||||||
|
@ -199,53 +208,17 @@ int main(int argc, char* argv[])
|
||||||
}
|
}
|
||||||
else if(retval==0)
|
else if(retval==0)
|
||||||
{
|
{
|
||||||
//!end of input stream, close clients and exit
|
//End of input stream, close clients and exit
|
||||||
print_exit(MSG_START "end of input, exiting.\n")
|
print_exit(MSG_START "end of input stream, exiting.\n")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
for (int i=0; i<clients.size(); i++)
|
|
||||||
{
|
|
||||||
if(write(clients[i]->pipefd[1], buf, retval)==-1)
|
|
||||||
{
|
|
||||||
|
|
||||||
if(!clients[i]->error)
|
|
||||||
{
|
|
||||||
print_client(clients[i], "lost buffer, failed to write pipe.");
|
|
||||||
clients[i]->error=1;
|
|
||||||
}
|
|
||||||
//fprintf(stderr, MSG_START "errno is %d\n", errno); //usually 11
|
|
||||||
//int wpstatus;
|
|
||||||
//int wpresult = waitpid(clients[i]->pid, &wpstatus, WNOHANG);
|
|
||||||
//fprintf(stderr, MSG_START "pid is %d\n",clients[i]->pid);
|
|
||||||
//perror("somethings wrong");
|
|
||||||
//if(wpresult == -1) print_client(clients[i], "error while waitpid()!");
|
|
||||||
//else if(wpresult == 0)
|
|
||||||
waitpid(clients[i]->pid, NULL, WNOHANG);
|
|
||||||
if(!proc_exists(clients[i]->pid))
|
|
||||||
{
|
|
||||||
//Client exited!
|
|
||||||
print_client(clients[i], "closing client from main process.");
|
|
||||||
close(clients[i]->pipefd[1]);
|
|
||||||
close(clients[i]->socket);
|
|
||||||
delete clients[i];
|
|
||||||
clients.erase(clients.begin()+i);
|
|
||||||
fprintf(stderr, MSG_START "done closing client from main process.\n");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else { if(clients[i]->error) print_client(clients[i], "pipe okay again."); clients[i]->error=0; }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//TODO: at the end, server closes pipefd[1] for client
|
|
||||||
#endif
|
|
||||||
|
|
||||||
void client_erase(client_t* client)
|
void client_erase(client_t* client)
|
||||||
{
|
{
|
||||||
pthread_mutex_destroy(client->wait_mutex);
|
pthread_mutex_destroy(client->wait_mutex);
|
||||||
pthread_cond_destroy(client->wait_condition);
|
pthread_cond_destroy(client->wait_condition);
|
||||||
pool->
|
pool->remove_thread(client->tsmthread);
|
||||||
}
|
}
|
||||||
|
|
||||||
void clients_close_all_finished()
|
void clients_close_all_finished()
|
||||||
|
@ -266,30 +239,66 @@ void* client_thread (void* param)
|
||||||
this_client->status = CS_THREAD_RUNNING;
|
this_client->status = CS_THREAD_RUNNING;
|
||||||
int retval;
|
int retval;
|
||||||
tsmpool* lpool = this_client->lpool;
|
tsmpool* lpool = this_client->lpool;
|
||||||
|
|
||||||
|
fd_set client_select_fds;
|
||||||
|
int client_highfd = 0;
|
||||||
|
FD_ZERO(&client_select_fds);
|
||||||
|
FD_SET(client->socket, &client_select_fds);
|
||||||
|
maxfd(&client_highfd, client->socket);
|
||||||
|
|
||||||
|
//Set client->socket to non-blocking
|
||||||
|
if(set_nonblocking(client->socket))
|
||||||
|
error_exit(MSG_START "cannot set_nonblocking() on client->socket");
|
||||||
|
|
||||||
|
int client_buffer_index = 0;
|
||||||
|
char* pool_read_buffer = NULL;
|
||||||
|
|
||||||
for(;;)
|
for(;;)
|
||||||
{
|
{
|
||||||
//wait until there is any data in the tsmpool for me (wait for the server process to wake me up)
|
//Wait until there is any data to send.
|
||||||
for(;;)
|
// If I haven't sent all the data from my last buffer, don't wait.
|
||||||
|
// (Wait for the server process to wake me up.)
|
||||||
|
while(!pool_read_buffer || client_buffer_index == lpool->size)
|
||||||
{
|
{
|
||||||
char* pool_read_buffer = (char*)lpool->get_read_buffer();
|
char* pool_read_buffer = (char*)lpool->get_read_buffer();
|
||||||
if(pool_read_buffer) break;
|
|
||||||
pthread_mutex_lock(&this_client->wait_mutex);
|
pthread_mutex_lock(&this_client->wait_mutex);
|
||||||
this_client->sleeping = 1;
|
this_client->sleeping = 1;
|
||||||
pthread_cond_wait(this_client->wait_condition, &this_client->wait_mutex);
|
pthread_cond_wait(this_client->wait_condition, &this_client->wait_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
//wait for the socket to be available for write
|
//Wait for the socket to be available for write.
|
||||||
//read data from server global tsmpool
|
select(highfd, NULL, &client_select_fds, NULL, NULL);
|
||||||
//write data to client socket
|
|
||||||
//have an exit condition
|
|
||||||
|
|
||||||
|
|
||||||
|
//Read data from global tsmpool and write it to client socket
|
||||||
|
int ret = send(client->socket, pool_read_buffer + client_buffer_index, lpool->size - client_buffer_index, 0);
|
||||||
|
if(ret == -1)
|
||||||
|
{
|
||||||
|
switch(errno)
|
||||||
|
{
|
||||||
|
case EAGAIN: break;
|
||||||
|
default: goto client_thread_exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else client_buffer_index += ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
client_thread_exit:
|
||||||
this_client->status = CS_THREAD_FINISHED;
|
this_client->status = CS_THREAD_FINISHED;
|
||||||
|
fprintf(stderr, "CS_THREAD_FINISHED"); //Debug
|
||||||
pthread_exit(NULL);
|
pthread_exit(NULL);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int set_nonblocking(int fd)
|
||||||
|
{
|
||||||
|
int flagtmp;
|
||||||
|
if((flagtmp = fcntl(fd, F_GETFL))!=-1)
|
||||||
|
if((flagtmp = fcntl(fd, F_SETFL, flagtmp|O_NONBLOCK))!=-1)
|
||||||
|
return 0;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
void error_exit(const char* why)
|
void error_exit(const char* why)
|
||||||
{
|
{
|
||||||
perror(why); //do we need a \n at the end of (why)?
|
perror(why); //do we need a \n at the end of (why)?
|
||||||
|
|
13
nmux.h
13
nmux.h
|
@ -1,5 +1,9 @@
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
#include <getopt.h>
|
||||||
|
#include <signal.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <unistd.h>
|
||||||
#include <arpa/inet.h>
|
#include <arpa/inet.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
|
@ -29,3 +33,12 @@ typedef struct client_s
|
||||||
pthread_cond_t* wait_condition;
|
pthread_cond_t* wait_condition;
|
||||||
pthread_mutex_t wait_mutex;
|
pthread_mutex_t wait_mutex;
|
||||||
} client_t;
|
} client_t;
|
||||||
|
|
||||||
|
void print_exit(const char* why);
|
||||||
|
void sig_handler(int signo);
|
||||||
|
void client_erase(client_t* client);
|
||||||
|
void clients_close_all_finished();
|
||||||
|
void* client_thread (void* param);
|
||||||
|
void error_exit(const char* why);
|
||||||
|
void maxfd(int* maxfd, int fd);
|
||||||
|
int set_nonblocking(int fd);
|
||||||
|
|
14
tsmpool.cpp
14
tsmpool.cpp
|
@ -1,18 +1,24 @@
|
||||||
#include "tsmpool.h"
|
#include "tsmpool.h"
|
||||||
|
|
||||||
tsmpool::tsmpool(size_t size, int num)
|
tsmpool::tsmpool(size_t size, int num)
|
||||||
|
size(size),
|
||||||
|
num(num) //number of buffers of (size) to alloc
|
||||||
{
|
{
|
||||||
this->threads_cntr = 0;
|
this->threads_cntr = 0;
|
||||||
this->size = size;
|
|
||||||
this->num = num; //number of buffers of (size) to alloc
|
|
||||||
this->ok = 1;
|
this->ok = 1;
|
||||||
this->lowest_read_index = -1;
|
this->lowest_read_index = -1;
|
||||||
this->write_index = 0;
|
this->write_index = 0;
|
||||||
this->my_read_index = 0;
|
this->my_read_index = 0;
|
||||||
if (pthread_mutex_init(&this->mutex, NULL) != 0) this->ok=0;
|
if (pthread_mutex_init(&this->mutex, NULL) != 0) { this->ok = 0; return; }
|
||||||
|
for(int i=0; i<num; i++)
|
||||||
|
{
|
||||||
|
void* newptr = (void*)new char[size];
|
||||||
|
if(!newptr) { this->ok = 0; return; }
|
||||||
|
buffers.push_back(newptr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t tsmpool::get_size() { return this->size; }
|
int tsmpool::is_ok() { return this->ok; }
|
||||||
|
|
||||||
void* tsmpool::get_write_buffer()
|
void* tsmpool::get_write_buffer()
|
||||||
{
|
{
|
||||||
|
|
|
@ -15,19 +15,20 @@ typedef struct tsmthread_s
|
||||||
class tsmpool
|
class tsmpool
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
size_t size;
|
|
||||||
int num;
|
|
||||||
vector<tsmthread_t*> threads;
|
vector<tsmthread_t*> threads;
|
||||||
vector<void*> buffers;
|
vector<void*> buffers;
|
||||||
int threads_cntr;
|
int threads_cntr;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
int ok;
|
int ok; //tsmpool is expected to be included in C-style programs.
|
||||||
|
// If something fails in the constructor, it will be seen here instead of a try{}catch{}
|
||||||
int write_index; //it always points to the next buffer to be written
|
int write_index; //it always points to the next buffer to be written
|
||||||
int lowest_read_index; //unused
|
int lowest_read_index; //unused
|
||||||
int my_read_index; //it is used when tsmpool is used as a single writer - single reader circular buffer
|
int my_read_index; //it is used when tsmpool is used as a single writer - single reader circular buffer
|
||||||
|
|
||||||
public:
|
public:
|
||||||
size_t get_size();
|
const size_t size;
|
||||||
|
const int num;
|
||||||
|
int is_ok();
|
||||||
tsmpool(size_t size, int num);
|
tsmpool(size_t size, int num);
|
||||||
void* get_write_buffer();
|
void* get_write_buffer();
|
||||||
tsmthread_t* register_thread();
|
tsmthread_t* register_thread();
|
||||||
|
|
Loading…
Reference in a new issue