Added a lot of things, tsmpool should now work in 1R1W mode as well
This commit is contained in:
parent
d442696cb8
commit
5064d3b72c
4 changed files with 58 additions and 6 deletions
43
ddcd.cpp
43
ddcd.cpp
|
@ -173,6 +173,7 @@ int main(int argc, char* argv[])
|
||||||
|
|
||||||
//Create tsmpool
|
//Create tsmpool
|
||||||
tsmpool* pool = new tsmpool(bufsize, bufcnt);
|
tsmpool* pool = new tsmpool(bufsize, bufcnt);
|
||||||
|
if(!pool->ok) print_exit(MSG_START "tsmpool failed to initialize\n");
|
||||||
|
|
||||||
unsigned char* current_write_buffer = pool->get_write_buffer();
|
unsigned char* current_write_buffer = pool->get_write_buffer();
|
||||||
int index_in_current_write_buffer = 0;
|
int index_in_current_write_buffer = 0;
|
||||||
|
@ -198,13 +199,13 @@ int main(int argc, char* argv[])
|
||||||
clients.push_back(new_client);
|
clients.push_back(new_client);
|
||||||
fprintf(stderr, MSG_START "pthread_create() done, clients now: %d\n", clients.size());
|
fprintf(stderr, MSG_START "pthread_create() done, clients now: %d\n", clients.size());
|
||||||
}
|
}
|
||||||
else fprintf(stderr, MSG_START "pthread_create() failed.");
|
else fprintf(stderr, MSG_START "pthread_create() failed.\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
if(index_in_current_write_buffer >= bufsize)
|
if(index_in_current_write_buffer >= bufsize)
|
||||||
{
|
{
|
||||||
current_write_buffer = pool->get_write_buffer();
|
current_write_buffer = pool->get_write_buffer();
|
||||||
index_in_current_write_buffer = 0;
|
index_in_current_write_buffer = 0;error_exiterror_exit
|
||||||
}
|
}
|
||||||
int retval = read(input_fd, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer);
|
int retval = read(input_fd, current_write_buffer + index_in_current_write_buffer, bufsize - index_in_current_write_buffer);
|
||||||
if(retval>0)
|
if(retval>0)
|
||||||
|
@ -263,18 +264,54 @@ void clients_close_all_finished()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void client_parser_push(char c)
|
||||||
|
{ //!TODO
|
||||||
|
command_t cmd;
|
||||||
|
char* commands_cstr = commands.c_str();
|
||||||
|
int newline_index = -1;
|
||||||
|
|
||||||
|
for(int i=0;commands_cstr[i];i++) if(commands_cstr[i]=='\n') newline_index = i;
|
||||||
|
if(newline_index == -1)
|
||||||
|
|
||||||
|
char param_name[101];
|
||||||
|
char param_value[101];
|
||||||
|
for(int i=0;i<100;commands_csdr
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
void* client_thread (void* param) //!TODO
|
void* client_thread (void* param) //!TODO
|
||||||
{
|
{
|
||||||
client_t* me_the_client = (client_t*)param;
|
client_t* me_the_client = (client_t*)param;
|
||||||
|
me_the_client->status = CS_THREAD_RUNNING;
|
||||||
|
char ctl_data_buffer;
|
||||||
|
int retval;
|
||||||
|
for(;;)
|
||||||
|
{
|
||||||
|
do
|
||||||
|
{
|
||||||
|
retval = recv(me_the_client->socket, &ctl_data_buffer, 1, 0);
|
||||||
|
if(client_parser_push(ctl_data_buffer)) break;
|
||||||
|
} while (retval);
|
||||||
|
|
||||||
|
|
||||||
|
//read control data from socket
|
||||||
|
//process control data
|
||||||
|
//run shift
|
||||||
|
//run decimation
|
||||||
|
//have an exit condition (??)
|
||||||
|
if(ddc_method == M_TD)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
me_the_client->status = CS_THREAD_FINISHED;
|
||||||
pthread_exit(NULL);
|
pthread_exit(NULL);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void error_exit(const char* why)
|
void error_exit(const char* why)
|
||||||
{
|
{
|
||||||
perror(why);
|
perror(why); //do we need a \n at the end of (why)?
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
13
ddcd.h
13
ddcd.h
|
@ -38,6 +38,19 @@ typedef struct client_s
|
||||||
|
|
||||||
} client_t;
|
} client_t;
|
||||||
|
|
||||||
|
typedef enum command_type_e
|
||||||
|
{
|
||||||
|
CT_SHIFT,
|
||||||
|
CT_BYPASS
|
||||||
|
} command_type_t;
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct command_s
|
||||||
|
{
|
||||||
|
command_type_t type;
|
||||||
|
float float_param;
|
||||||
|
} command_t;
|
||||||
|
|
||||||
void print_exit(const char* why);
|
void print_exit(const char* why);
|
||||||
void error_exit(const char* why);
|
void error_exit(const char* why);
|
||||||
void maxfd(int* maxfd, int fd);
|
void maxfd(int* maxfd, int fd);
|
||||||
|
|
|
@ -46,9 +46,10 @@ int tsmpool::remove_thread(tsmthread_t* thread)
|
||||||
|
|
||||||
void* tsmpool::get_read_buffer(tsmthread_t* thread)
|
void* tsmpool::get_read_buffer(tsmthread_t* thread)
|
||||||
{
|
{
|
||||||
if(thread->read_index==index_before(write_index)) return NULL;
|
int* actual_read_index = (thread==NULL) ? &my_read_index : &thread->read_index;
|
||||||
void* to_return = buffers[thread->read_index];
|
if(*actual_read_index==index_before(write_index)) return NULL;
|
||||||
thread->read_index=index_next(thread->read_index);
|
void* to_return = buffers[*actual_read_index];
|
||||||
|
*actual_read_index=index_next(*actual_read_index);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* tsmpool::set_read_index_distance(tsmthread_t* thread, int distance)
|
void* tsmpool::set_read_index_distance(tsmthread_t* thread, int distance)
|
||||||
|
|
|
@ -20,6 +20,7 @@ private:
|
||||||
int ok;
|
int ok;
|
||||||
int write_index; //it always points to the next buffer to be written
|
int write_index; //it always points to the next buffer to be written
|
||||||
int lowest_read_index; //unused
|
int lowest_read_index; //unused
|
||||||
|
int my_read_index; //it is used when tsmpool is used as a single writer - single reader circular buffer
|
||||||
|
|
||||||
public:
|
public:
|
||||||
size_t get_size();
|
size_t get_size();
|
||||||
|
|
Loading…
Reference in a new issue