From f410f2bdc468cd32dd099679471af08ca6a76a54 Mon Sep 17 00:00:00 2001 From: Romain Vimont Date: Thu, 2 Mar 2023 00:31:43 +0100 Subject: [PATCH] Extract sc_delay_buffer A video buffer had 2 responsibilities: - handle the frame delaying mechanism (queuing packets and pushing them after the expected delay); - keep only the most recent frame (using a sc_frame_buffer). In order to be able to reuse only the frame delaying mechanism, extract it to a separate component, sc_delay_buffer. --- app/meson.build | 1 + app/src/delay_buffer.c | 246 +++++++++++++++++++++++++++++++++++++++++ app/src/delay_buffer.h | 69 ++++++++++++ app/src/video_buffer.c | 216 +++--------------------------------- app/src/video_buffer.h | 34 +----- 5 files changed, 337 insertions(+), 229 deletions(-) create mode 100644 app/src/delay_buffer.c create mode 100644 app/src/delay_buffer.h diff --git a/app/meson.build b/app/meson.build index e34f7cc1..7749d664 100644 --- a/app/meson.build +++ b/app/meson.build @@ -10,6 +10,7 @@ src = [ 'src/control_msg.c', 'src/controller.c', 'src/decoder.c', + 'src/delay_buffer.c', 'src/demuxer.c', 'src/device_msg.c', 'src/icon.c', diff --git a/app/src/delay_buffer.c b/app/src/delay_buffer.c new file mode 100644 index 00000000..95d47c9c --- /dev/null +++ b/app/src/delay_buffer.c @@ -0,0 +1,246 @@ +#include "delay_buffer.h" + +#include +#include + +#include +#include + +#include "util/log.h" + +#define SC_BUFFERING_NDEBUG // comment to debug + +static bool +sc_delayed_frame_init(struct sc_delayed_frame *dframe, const AVFrame *frame) { + dframe->frame = av_frame_alloc(); + if (!dframe->frame) { + LOG_OOM(); + return false; + } + + if (av_frame_ref(dframe->frame, frame)) { + LOG_OOM(); + av_frame_free(&dframe->frame); + return false; + } + + return true; +} + +static void +sc_delayed_frame_destroy(struct sc_delayed_frame *dframe) { + av_frame_unref(dframe->frame); + av_frame_free(&dframe->frame); +} + +static bool +sc_delay_buffer_offer(struct sc_delay_buffer *db, const AVFrame *frame) { + return db->cbs->on_new_frame(db, frame, db->cbs_userdata); +} + +static int +run_buffering(void *data) { + struct sc_delay_buffer *db = data; + + assert(db->delay > 0); + + for (;;) { + sc_mutex_lock(&db->b.mutex); + + while (!db->b.stopped && sc_vecdeque_is_empty(&db->b.queue)) { + sc_cond_wait(&db->b.queue_cond, &db->b.mutex); + } + + if (db->b.stopped) { + sc_mutex_unlock(&db->b.mutex); + goto stopped; + } + + struct sc_delayed_frame dframe = sc_vecdeque_pop(&db->b.queue); + + sc_tick max_deadline = sc_tick_now() + db->delay; + // PTS (written by the server) are expressed in microseconds + sc_tick pts = SC_TICK_FROM_US(dframe.frame->pts); + + bool timed_out = false; + while (!db->b.stopped && !timed_out) { + sc_tick deadline = sc_clock_to_system_time(&db->b.clock, pts) + + db->delay; + if (deadline > max_deadline) { + deadline = max_deadline; + } + + timed_out = + !sc_cond_timedwait(&db->b.wait_cond, &db->b.mutex, deadline); + } + + bool stopped = db->b.stopped; + sc_mutex_unlock(&db->b.mutex); + + if (stopped) { + sc_delayed_frame_destroy(&dframe); + goto stopped; + } + +#ifndef SC_BUFFERING_NDEBUG + LOGD("Buffering: %" PRItick ";%" PRItick ";%" PRItick, + pts, dframe.push_date, sc_tick_now()); +#endif + + bool ok = sc_delay_buffer_offer(db, dframe.frame); + sc_delayed_frame_destroy(&dframe); + if (!ok) { + LOGE("Delayed frame could not be pushed, stopping"); + sc_mutex_lock(&db->b.mutex); + // Prevent to push any new packet + db->b.stopped = true; + sc_mutex_unlock(&db->b.mutex); + goto stopped; + } + } + +stopped: + assert(db->b.stopped); + + // Flush queue + while (!sc_vecdeque_is_empty(&db->b.queue)) { + struct sc_delayed_frame *dframe = sc_vecdeque_popref(&db->b.queue); + sc_delayed_frame_destroy(dframe); + } + + LOGD("Buffering thread ended"); + + return 0; +} + +bool +sc_delay_buffer_init(struct sc_delay_buffer *db, sc_tick delay, + const struct sc_delay_buffer_callbacks *cbs, + void *cbs_userdata) { + assert(delay >= 0); + + if (delay) { + bool ok = sc_mutex_init(&db->b.mutex); + if (!ok) { + return false; + } + + ok = sc_cond_init(&db->b.queue_cond); + if (!ok) { + sc_mutex_destroy(&db->b.mutex); + return false; + } + + ok = sc_cond_init(&db->b.wait_cond); + if (!ok) { + sc_cond_destroy(&db->b.queue_cond); + sc_mutex_destroy(&db->b.mutex); + return false; + } + + sc_clock_init(&db->b.clock); + sc_vecdeque_init(&db->b.queue); + } + + assert(cbs); + assert(cbs->on_new_frame); + + db->delay = delay; + db->cbs = cbs; + db->cbs_userdata = cbs_userdata; + + return true; +} + +bool +sc_delay_buffer_start(struct sc_delay_buffer *db) { + if (db->delay) { + bool ok = + sc_thread_create(&db->b.thread, run_buffering, "scrcpy-dbuf", db); + if (!ok) { + LOGE("Could not start buffering thread"); + return false; + } + } + + return true; +} + +void +sc_delay_buffer_stop(struct sc_delay_buffer *db) { + if (db->delay) { + sc_mutex_lock(&db->b.mutex); + db->b.stopped = true; + sc_cond_signal(&db->b.queue_cond); + sc_cond_signal(&db->b.wait_cond); + sc_mutex_unlock(&db->b.mutex); + } +} + +void +sc_delay_buffer_join(struct sc_delay_buffer *db) { + if (db->delay) { + sc_thread_join(&db->b.thread, NULL); + } +} + +void +sc_delay_buffer_destroy(struct sc_delay_buffer *db) { + if (db->delay) { + sc_cond_destroy(&db->b.wait_cond); + sc_cond_destroy(&db->b.queue_cond); + sc_mutex_destroy(&db->b.mutex); + } +} + +bool +sc_delay_buffer_push(struct sc_delay_buffer *db, const AVFrame *frame) { + if (!db->delay) { + // No buffering + return sc_delay_buffer_offer(db, frame); + } + + sc_mutex_lock(&db->b.mutex); + + if (db->b.stopped) { + sc_mutex_unlock(&db->b.mutex); + return false; + } + + sc_tick pts = SC_TICK_FROM_US(frame->pts); + sc_clock_update(&db->b.clock, sc_tick_now(), pts); + sc_cond_signal(&db->b.wait_cond); + + if (db->b.clock.count == 1) { + sc_mutex_unlock(&db->b.mutex); + // First frame, offer it immediately, for two reasons: + // - not to delay the opening of the scrcpy window + // - the buffering estimation needs at least two clock points, so it + // could not handle the first frame + return sc_delay_buffer_offer(db, frame); + } + + struct sc_delayed_frame dframe; + bool ok = sc_delayed_frame_init(&dframe, frame); + if (!ok) { + sc_mutex_unlock(&db->b.mutex); + return false; + } + +#ifndef SC_BUFFERING_NDEBUG + dframe.push_date = sc_tick_now(); +#endif + + ok = sc_vecdeque_push(&db->b.queue, dframe); + if (!ok) { + sc_mutex_unlock(&db->b.mutex); + LOG_OOM(); + return false; + } + + sc_cond_signal(&db->b.queue_cond); + + sc_mutex_unlock(&db->b.mutex); + + return true; +} diff --git a/app/src/delay_buffer.h b/app/src/delay_buffer.h new file mode 100644 index 00000000..9e5347c7 --- /dev/null +++ b/app/src/delay_buffer.h @@ -0,0 +1,69 @@ +#ifndef SC_DELAY_BUFFER_H +#define SC_DELAY_BUFFER_H + +#include "common.h" + +#include + +#include "clock.h" +#include "util/thread.h" +#include "util/tick.h" +#include "util/vecdeque.h" + +// forward declarations +typedef struct AVFrame AVFrame; + +struct sc_delayed_frame { + AVFrame *frame; +#ifndef NDEBUG + sc_tick push_date; +#endif +}; + +struct sc_delayed_frame_queue SC_VECDEQUE(struct sc_delayed_frame); + +struct sc_delay_buffer { + sc_tick delay; + + // only if delay > 0 + struct { + sc_thread thread; + sc_mutex mutex; + sc_cond queue_cond; + sc_cond wait_cond; + + struct sc_clock clock; + struct sc_delayed_frame_queue queue; + bool stopped; + } b; // buffering + + const struct sc_delay_buffer_callbacks *cbs; + void *cbs_userdata; +}; + +struct sc_delay_buffer_callbacks { + bool (*on_new_frame)(struct sc_delay_buffer *db, const AVFrame *frame, + void *userdata); +}; + +bool +sc_delay_buffer_init(struct sc_delay_buffer *db, sc_tick delay, + const struct sc_delay_buffer_callbacks *cbs, + void *cbs_userdata); + +bool +sc_delay_buffer_start(struct sc_delay_buffer *db); + +void +sc_delay_buffer_stop(struct sc_delay_buffer *db); + +void +sc_delay_buffer_join(struct sc_delay_buffer *db); + +void +sc_delay_buffer_destroy(struct sc_delay_buffer *db); + +bool +sc_delay_buffer_push(struct sc_delay_buffer *db, const AVFrame *frame); + +#endif diff --git a/app/src/video_buffer.c b/app/src/video_buffer.c index 74a4b042..da47a0b5 100644 --- a/app/src/video_buffer.c +++ b/app/src/video_buffer.c @@ -8,32 +8,13 @@ #include "util/log.h" -#define SC_BUFFERING_NDEBUG // comment to debug - static bool -sc_video_buffer_frame_init(struct sc_video_buffer_frame *vb_frame, - const AVFrame *frame) { - vb_frame->frame = av_frame_alloc(); - if (!vb_frame->frame) { - return false; - } +sc_delay_buffer_on_new_frame(struct sc_delay_buffer *db, const AVFrame *frame, + void *userdata) { + (void) db; - if (av_frame_ref(vb_frame->frame, frame)) { - av_frame_free(&vb_frame->frame); - return false; - } + struct sc_video_buffer *vb = userdata; - return true; -} - -static void -sc_video_buffer_frame_destroy(struct sc_video_buffer_frame *vb_frame) { - av_frame_unref(vb_frame->frame); - av_frame_free(&vb_frame->frame); -} - -static bool -sc_video_buffer_offer(struct sc_video_buffer *vb, const AVFrame *frame) { bool previous_skipped; bool ok = sc_frame_buffer_push(&vb->fb, frame, &previous_skipped); if (!ok) { @@ -43,83 +24,8 @@ sc_video_buffer_offer(struct sc_video_buffer *vb, const AVFrame *frame) { return vb->cbs->on_new_frame(vb, previous_skipped, vb->cbs_userdata); } -static int -run_buffering(void *data) { - struct sc_video_buffer *vb = data; - - assert(vb->buffering_time > 0); - - for (;;) { - sc_mutex_lock(&vb->b.mutex); - - while (!vb->b.stopped && sc_vecdeque_is_empty(&vb->b.queue)) { - sc_cond_wait(&vb->b.queue_cond, &vb->b.mutex); - } - - if (vb->b.stopped) { - sc_mutex_unlock(&vb->b.mutex); - goto stopped; - } - - struct sc_video_buffer_frame vb_frame = sc_vecdeque_pop(&vb->b.queue); - - sc_tick max_deadline = sc_tick_now() + vb->buffering_time; - // PTS (written by the server) are expressed in microseconds - sc_tick pts = SC_TICK_FROM_US(vb_frame.frame->pts); - - bool timed_out = false; - while (!vb->b.stopped && !timed_out) { - sc_tick deadline = sc_clock_to_system_time(&vb->b.clock, pts) - + vb->buffering_time; - if (deadline > max_deadline) { - deadline = max_deadline; - } - - timed_out = - !sc_cond_timedwait(&vb->b.wait_cond, &vb->b.mutex, deadline); - } - - if (vb->b.stopped) { - sc_video_buffer_frame_destroy(&vb_frame); - sc_mutex_unlock(&vb->b.mutex); - goto stopped; - } - - sc_mutex_unlock(&vb->b.mutex); - -#ifndef SC_BUFFERING_NDEBUG - LOGD("Buffering: %" PRItick ";%" PRItick ";%" PRItick, - pts, vb_frame.push_date, sc_tick_now()); -#endif - - bool ok = sc_video_buffer_offer(vb, vb_frame.frame); - sc_video_buffer_frame_destroy(&vb_frame); - if (!ok) { - LOGE("Delayed frame could not be pushed, stopping"); - sc_mutex_lock(&vb->b.mutex); - // Prevent to push any new packet - vb->b.stopped = true; - sc_mutex_unlock(&vb->b.mutex); - goto stopped; - } - } - -stopped: - assert(vb->b.stopped); - - // Flush queue - while (!sc_vecdeque_is_empty(&vb->b.queue)) { - struct sc_video_buffer_frame *p = sc_vecdeque_popref(&vb->b.queue); - sc_video_buffer_frame_destroy(p); - } - - LOGD("Buffering thread ended"); - - return 0; -} - bool -sc_video_buffer_init(struct sc_video_buffer *vb, sc_tick buffering_time, +sc_video_buffer_init(struct sc_video_buffer *vb, sc_tick delay, const struct sc_video_buffer_callbacks *cbs, void *cbs_userdata) { bool ok = sc_frame_buffer_init(&vb->fb); @@ -127,135 +33,49 @@ sc_video_buffer_init(struct sc_video_buffer *vb, sc_tick buffering_time, return false; } - assert(buffering_time >= 0); - if (buffering_time) { - ok = sc_mutex_init(&vb->b.mutex); - if (!ok) { - sc_frame_buffer_destroy(&vb->fb); - return false; - } + static const struct sc_delay_buffer_callbacks db_cbs = { + .on_new_frame = sc_delay_buffer_on_new_frame, + }; - ok = sc_cond_init(&vb->b.queue_cond); - if (!ok) { - sc_mutex_destroy(&vb->b.mutex); - sc_frame_buffer_destroy(&vb->fb); - return false; - } - - ok = sc_cond_init(&vb->b.wait_cond); - if (!ok) { - sc_cond_destroy(&vb->b.queue_cond); - sc_mutex_destroy(&vb->b.mutex); - sc_frame_buffer_destroy(&vb->fb); - return false; - } - - sc_clock_init(&vb->b.clock); - sc_vecdeque_init(&vb->b.queue); + ok = sc_delay_buffer_init(&vb->db, delay, &db_cbs, vb); + if (!ok) { + sc_frame_buffer_destroy(&vb->fb); + return false; } assert(cbs); assert(cbs->on_new_frame); - vb->buffering_time = buffering_time; vb->cbs = cbs; vb->cbs_userdata = cbs_userdata; + return true; } bool sc_video_buffer_start(struct sc_video_buffer *vb) { - if (vb->buffering_time) { - bool ok = - sc_thread_create(&vb->b.thread, run_buffering, "scrcpy-vbuf", vb); - if (!ok) { - LOGE("Could not start buffering thread"); - return false; - } - } - - return true; + return sc_delay_buffer_start(&vb->db); } void sc_video_buffer_stop(struct sc_video_buffer *vb) { - if (vb->buffering_time) { - sc_mutex_lock(&vb->b.mutex); - vb->b.stopped = true; - sc_cond_signal(&vb->b.queue_cond); - sc_cond_signal(&vb->b.wait_cond); - sc_mutex_unlock(&vb->b.mutex); - } + return sc_delay_buffer_stop(&vb->db); } void sc_video_buffer_join(struct sc_video_buffer *vb) { - if (vb->buffering_time) { - sc_thread_join(&vb->b.thread, NULL); - } + return sc_delay_buffer_join(&vb->db); } void sc_video_buffer_destroy(struct sc_video_buffer *vb) { sc_frame_buffer_destroy(&vb->fb); - if (vb->buffering_time) { - sc_cond_destroy(&vb->b.wait_cond); - sc_cond_destroy(&vb->b.queue_cond); - sc_mutex_destroy(&vb->b.mutex); - } + sc_delay_buffer_destroy(&vb->db); } bool sc_video_buffer_push(struct sc_video_buffer *vb, const AVFrame *frame) { - if (!vb->buffering_time) { - // No buffering - return sc_video_buffer_offer(vb, frame); - } - - sc_mutex_lock(&vb->b.mutex); - - if (vb->b.stopped) { - sc_mutex_unlock(&vb->b.mutex); - return false; - } - - sc_tick pts = SC_TICK_FROM_US(frame->pts); - sc_clock_update(&vb->b.clock, sc_tick_now(), pts); - sc_cond_signal(&vb->b.wait_cond); - - if (vb->b.clock.count == 1) { - sc_mutex_unlock(&vb->b.mutex); - // First frame, offer it immediately, for two reasons: - // - not to delay the opening of the scrcpy window - // - the buffering estimation needs at least two clock points, so it - // could not handle the first frame - return sc_video_buffer_offer(vb, frame); - } - - struct sc_video_buffer_frame vb_frame; - bool ok = sc_video_buffer_frame_init(&vb_frame, frame); - if (!ok) { - sc_mutex_unlock(&vb->b.mutex); - LOG_OOM(); - return false; - } - -#ifndef SC_BUFFERING_NDEBUG - vb_frame.push_date = sc_tick_now(); -#endif - - ok = sc_vecdeque_push(&vb->b.queue, vb_frame); - if (!ok) { - sc_mutex_unlock(&vb->b.mutex); - LOG_OOM(); - return false; - } - - sc_cond_signal(&vb->b.queue_cond); - - sc_mutex_unlock(&vb->b.mutex); - - return true; + return sc_delay_buffer_push(&vb->db, frame); } void diff --git a/app/src/video_buffer.h b/app/src/video_buffer.h index d183a484..ebca3915 100644 --- a/app/src/video_buffer.h +++ b/app/src/video_buffer.h @@ -5,41 +5,13 @@ #include -#include "clock.h" +#include "delay_buffer.h" #include "frame_buffer.h" -#include "util/thread.h" -#include "util/tick.h" -#include "util/vecdeque.h" - -// forward declarations -typedef struct AVFrame AVFrame; - -struct sc_video_buffer_frame { - AVFrame *frame; -#ifndef NDEBUG - sc_tick push_date; -#endif -}; - -struct sc_video_buffer_frame_queue SC_VECDEQUE(struct sc_video_buffer_frame); struct sc_video_buffer { + struct sc_delay_buffer db; struct sc_frame_buffer fb; - sc_tick buffering_time; - - // only if buffering_time > 0 - struct { - sc_thread thread; - sc_mutex mutex; - sc_cond queue_cond; - sc_cond wait_cond; - - struct sc_clock clock; - struct sc_video_buffer_frame_queue queue; - bool stopped; - } b; // buffering - const struct sc_video_buffer_callbacks *cbs; void *cbs_userdata; }; @@ -50,7 +22,7 @@ struct sc_video_buffer_callbacks { }; bool -sc_video_buffer_init(struct sc_video_buffer *vb, sc_tick buffering_time, +sc_video_buffer_init(struct sc_video_buffer *vb, sc_tick delay, const struct sc_video_buffer_callbacks *cbs, void *cbs_userdata);