Record asynchronously

The record file was written from the stream thread. As a consequence,
any blocking I/O to write the file delayed the decoder.

For maximum performance even when recording is enabled, send
(refcounted) packets to a separate recording thread.
This commit is contained in:
Romain Vimont 2019-07-31 01:55:40 +02:00
parent 63af7fbafe
commit 35d9185f6c
3 changed files with 233 additions and 8 deletions

View file

@ -5,6 +5,7 @@
#include "compat.h" #include "compat.h"
#include "config.h" #include "config.h"
#include "lock_util.h"
#include "log.h" #include "log.h"
static const AVRational SCRCPY_TIME_BASE = {1, 1000000}; // timestamps in us static const AVRational SCRCPY_TIME_BASE = {1, 1000000}; // timestamps in us
@ -26,6 +27,82 @@ find_muxer(const char *name) {
return oformat; return oformat;
} }
static struct record_packet *
record_packet_new(const AVPacket *packet) {
struct record_packet *rec = SDL_malloc(sizeof(*rec));
if (!rec) {
return NULL;
}
if (av_packet_ref(&rec->packet, packet)) {
SDL_free(rec);
return NULL;
}
rec->next = NULL;
return rec;
}
static void
record_packet_delete(struct record_packet *rec) {
av_packet_unref(&rec->packet);
SDL_free(rec);
}
static void
recorder_queue_init(struct recorder_queue *queue) {
queue->first = NULL;
// queue->last is undefined if queue->first == NULL
}
static inline bool
recorder_queue_is_empty(struct recorder_queue *queue) {
return !queue->first;
}
static bool
recorder_queue_push(struct recorder_queue *queue, const AVPacket *packet) {
struct record_packet *rec = record_packet_new(packet);
if (!rec) {
LOGC("Could not allocate record packet");
return false;
}
rec->next = NULL;
if (recorder_queue_is_empty(queue)) {
queue->first = queue->last = rec;
} else {
// chain rec after the (current) last packet
queue->last->next = rec;
// the last packet is now rec
queue->last = rec;
}
return true;
}
static inline struct record_packet *
recorder_queue_take(struct recorder_queue *queue) {
SDL_assert(!recorder_queue_is_empty(queue));
struct record_packet *rec = queue->first;
SDL_assert(rec);
queue->first = rec->next;
// no need to update queue->last if the queue is left empty:
// queue->last is undefined if queue->first == NULL
return rec;
}
static void
recorder_queue_clear(struct recorder_queue *queue) {
struct record_packet *rec = queue->first;
while (rec) {
struct record_packet *current = rec;
rec = rec->next;
record_packet_delete(current);
}
queue->first = NULL;
}
bool bool
recorder_init(struct recorder *recorder, recorder_init(struct recorder *recorder,
const char *filename, const char *filename,
@ -37,6 +114,24 @@ recorder_init(struct recorder *recorder,
return false; return false;
} }
recorder->mutex = SDL_CreateMutex();
if (!recorder->mutex) {
LOGC("Could not create mutex");
SDL_free(recorder->filename);
return false;
}
recorder->queue_cond = SDL_CreateCond();
if (!recorder->queue_cond) {
LOGC("Could not create cond");
SDL_DestroyMutex(recorder->mutex);
SDL_free(recorder->filename);
return false;
}
recorder_queue_init(&recorder->queue);
recorder->stopped = false;
recorder->failed = false;
recorder->format = format; recorder->format = format;
recorder->declared_frame_size = declared_frame_size; recorder->declared_frame_size = declared_frame_size;
recorder->header_written = false; recorder->header_written = false;
@ -46,6 +141,8 @@ recorder_init(struct recorder *recorder,
void void
recorder_destroy(struct recorder *recorder) { recorder_destroy(struct recorder *recorder) {
SDL_DestroyCond(recorder->queue_cond);
SDL_DestroyMutex(recorder->mutex);
SDL_free(recorder->filename); SDL_free(recorder->filename);
} }
@ -186,3 +283,90 @@ recorder_write(struct recorder *recorder, AVPacket *packet) {
recorder_rescale_packet(recorder, packet); recorder_rescale_packet(recorder, packet);
return av_write_frame(recorder->ctx, packet) >= 0; return av_write_frame(recorder->ctx, packet) >= 0;
} }
static int
run_recorder(void *data) {
struct recorder *recorder = data;
for (;;) {
mutex_lock(recorder->mutex);
while (!recorder->stopped &&
recorder_queue_is_empty(&recorder->queue)) {
cond_wait(recorder->queue_cond, recorder->mutex);
}
// if stopped is set, continue to process the remaining events (to
// finish the recording) before actually stopping
if (recorder->stopped && recorder_queue_is_empty(&recorder->queue)) {
mutex_unlock(recorder->mutex);
break;
}
struct record_packet *rec = recorder_queue_take(&recorder->queue);
mutex_unlock(recorder->mutex);
bool ok = recorder_write(recorder, &rec->packet);
record_packet_delete(rec);
if (!ok) {
LOGE("Could not record packet");
mutex_lock(recorder->mutex);
recorder->failed = true;
// discard pending packets
recorder_queue_clear(&recorder->queue);
mutex_unlock(recorder->mutex);
break;
}
}
LOGD("Recorder thread ended");
return 0;
}
bool
recorder_start(struct recorder *recorder) {
LOGD("Starting recorder thread");
recorder->thread = SDL_CreateThread(run_recorder, "recorder", recorder);
if (!recorder->thread) {
LOGC("Could not start recorder thread");
return false;
}
return true;
}
void
recorder_stop(struct recorder *recorder) {
mutex_lock(recorder->mutex);
recorder->stopped = true;
cond_signal(recorder->queue_cond);
mutex_unlock(recorder->mutex);
}
void
recorder_join(struct recorder *recorder) {
SDL_WaitThread(recorder->thread, NULL);
}
bool
recorder_push(struct recorder *recorder, const AVPacket *packet) {
mutex_lock(recorder->mutex);
SDL_assert(!recorder->stopped);
if (recorder->failed) {
// reject any new packet (this will stop the stream)
return false;
}
bool ok = recorder_queue_push(&recorder->queue, packet);
cond_signal(recorder->queue_cond);
mutex_unlock(recorder->mutex);
return ok;
}

View file

@ -3,6 +3,8 @@
#include <stdbool.h> #include <stdbool.h>
#include <libavformat/avformat.h> #include <libavformat/avformat.h>
#include <SDL2/SDL_mutex.h>
#include <SDL2/SDL_thread.h>
#include "common.h" #include "common.h"
@ -11,12 +13,29 @@ enum recorder_format {
RECORDER_FORMAT_MKV, RECORDER_FORMAT_MKV,
}; };
struct record_packet {
AVPacket packet;
struct record_packet *next;
};
struct recorder_queue {
struct record_packet *first;
struct record_packet *last; // undefined if first is NULL
};
struct recorder { struct recorder {
char *filename; char *filename;
enum recorder_format format; enum recorder_format format;
AVFormatContext *ctx; AVFormatContext *ctx;
struct size declared_frame_size; struct size declared_frame_size;
bool header_written; bool header_written;
SDL_Thread *thread;
SDL_mutex *mutex;
SDL_cond *queue_cond;
bool stopped; // set on recorder_stop() by the stream reader
bool failed; // set on packet write failure
struct recorder_queue queue;
}; };
bool bool
@ -33,6 +52,15 @@ void
recorder_close(struct recorder *recorder); recorder_close(struct recorder *recorder);
bool bool
recorder_write(struct recorder *recorder, AVPacket *packet); recorder_start(struct recorder *recorder);
void
recorder_stop(struct recorder *recorder);
void
recorder_join(struct recorder *recorder);
bool
recorder_push(struct recorder *recorder, const AVPacket *packet);
#endif #endif

View file

@ -71,7 +71,7 @@ notify_stopped(void) {
static bool static bool
process_config_packet(struct stream *stream, AVPacket *packet) { process_config_packet(struct stream *stream, AVPacket *packet) {
if (stream->recorder && !recorder_write(stream->recorder, packet)) { if (stream->recorder && !recorder_push(stream->recorder, packet)) {
LOGE("Could not send config packet to recorder"); LOGE("Could not send config packet to recorder");
return false; return false;
} }
@ -87,8 +87,8 @@ process_frame(struct stream *stream, AVPacket *packet) {
if (stream->recorder) { if (stream->recorder) {
packet->dts = packet->pts; packet->dts = packet->pts;
if (!recorder_write(stream->recorder, packet)) { if (!recorder_push(stream->recorder, packet)) {
LOGE("Could not write frame to output file"); LOGE("Could not send packet to recorder");
return false; return false;
} }
} }
@ -201,15 +201,22 @@ run_stream(void *data) {
goto finally_free_codec_ctx; goto finally_free_codec_ctx;
} }
if (stream->recorder && !recorder_open(stream->recorder, codec)) { if (stream->recorder) {
LOGE("Could not open recorder"); if (!recorder_open(stream->recorder, codec)) {
goto finally_close_decoder; LOGE("Could not open recorder");
goto finally_close_decoder;
}
if (!recorder_start(stream->recorder)) {
LOGE("Could not start recorder");
goto finally_close_recorder;
}
} }
stream->parser = av_parser_init(AV_CODEC_ID_H264); stream->parser = av_parser_init(AV_CODEC_ID_H264);
if (!stream->parser) { if (!stream->parser) {
LOGE("Could not initialize parser"); LOGE("Could not initialize parser");
goto finally_close_recorder; goto finally_stop_and_join_recorder;
} }
// We must only pass complete frames to av_parser_parse2()! // We must only pass complete frames to av_parser_parse2()!
@ -239,6 +246,12 @@ run_stream(void *data) {
} }
av_parser_close(stream->parser); av_parser_close(stream->parser);
finally_stop_and_join_recorder:
if (stream->recorder) {
recorder_stop(stream->recorder);
LOGI("Finishing recording...");
recorder_join(stream->recorder);
}
finally_close_recorder: finally_close_recorder:
if (stream->recorder) { if (stream->recorder) {
recorder_close(stream->recorder); recorder_close(stream->recorder);