From 63af7fbafe3740b17b5495a8230298eff9d45581 Mon Sep 17 00:00:00 2001 From: Romain Vimont Date: Wed, 31 Jul 2019 01:55:32 +0200 Subject: [PATCH] Reduce latency by 1 frame To packetize the H.264 raw stream, av_parser_parse2() (called by av_read_frame()) knows that it has received a full frame only after it has received some data for the next frame. As a consequence, the client always waited until the next frame before sending the current frame to the decoder! On the device side, we know packets boundaries. To reduce latency, make the device always transmit the "frame meta" to packetize the stream manually (it was already implemented to send PTS, but only enabled on recording). On the client side, replace av_read_frame() by manual packetizing and parsing. --- app/src/recorder.c | 10 ++ app/src/scrcpy.c | 1 - app/src/server.c | 2 +- app/src/server.h | 1 - app/src/stream.c | 335 ++++++++++++++++++++++----------------------- app/src/stream.h | 18 +-- 6 files changed, 180 insertions(+), 187 deletions(-) diff --git a/app/src/recorder.c b/app/src/recorder.c index 3de8257a..c14394a3 100644 --- a/app/src/recorder.c +++ b/app/src/recorder.c @@ -166,11 +166,21 @@ recorder_rescale_packet(struct recorder *recorder, AVPacket *packet) { bool recorder_write(struct recorder *recorder, AVPacket *packet) { if (!recorder->header_written) { + if (packet->pts != AV_NOPTS_VALUE) { + LOGE("The first packet is not a config packet"); + return false; + } bool ok = recorder_write_header(recorder, packet); if (!ok) { return false; } recorder->header_written = true; + return true; + } + + if (packet->pts == AV_NOPTS_VALUE) { + // ignore config packets + return true; } recorder_rescale_packet(recorder, packet); diff --git a/app/src/scrcpy.c b/app/src/scrcpy.c index 01dc52e4..ed988778 100644 --- a/app/src/scrcpy.c +++ b/app/src/scrcpy.c @@ -277,7 +277,6 @@ scrcpy(const struct scrcpy_options *options) { .local_port = options->port, .max_size = options->max_size, .bit_rate = options->bit_rate, - .send_frame_meta = record, .control = options->control, }; if (!server_start(&server, options->serial, ¶ms)) { diff --git a/app/src/server.c b/app/src/server.c index 5b593c47..85b1b6b8 100644 --- a/app/src/server.c +++ b/app/src/server.c @@ -130,7 +130,7 @@ execute_server(struct server *server, const struct server_params *params) { bit_rate_string, server->tunnel_forward ? "true" : "false", params->crop ? params->crop : "-", - params->send_frame_meta ? "true" : "false", + "true", // always send frame meta (packet boundaries + timestamp) params->control ? "true" : "false", }; return adb_execute(server->serial, cmd, sizeof(cmd) / sizeof(cmd[0])); diff --git a/app/src/server.h b/app/src/server.h index 74a6cac8..4970d64e 100644 --- a/app/src/server.h +++ b/app/src/server.h @@ -34,7 +34,6 @@ struct server_params { uint16_t local_port; uint16_t max_size; uint32_t bit_rate; - bool send_frame_meta; bool control; }; diff --git a/app/src/stream.c b/app/src/stream.c index e85834c1..0396bf60 100644 --- a/app/src/stream.c +++ b/app/src/stream.c @@ -22,54 +22,8 @@ #define HEADER_SIZE 12 #define NO_PTS UINT64_C(-1) -static struct frame_meta * -frame_meta_new(uint64_t pts) { - struct frame_meta *meta = SDL_malloc(sizeof(*meta)); - if (!meta) { - return meta; - } - meta->pts = pts; - meta->next = NULL; - return meta; -} - -static void -frame_meta_delete(struct frame_meta *frame_meta) { - SDL_free(frame_meta); -} - static bool -receiver_state_push_meta(struct receiver_state *state, uint64_t pts) { - struct frame_meta *frame_meta = frame_meta_new(pts); - if (!frame_meta) { - return false; - } - - // append to the list - // (iterate to find the last item, in practice the list should be tiny) - struct frame_meta **p = &state->frame_meta_queue; - while (*p) { - p = &(*p)->next; - } - *p = frame_meta; - return true; -} - -static uint64_t -receiver_state_take_meta(struct receiver_state *state) { - struct frame_meta *frame_meta = state->frame_meta_queue; // first item - SDL_assert(frame_meta); // must not be empty - uint64_t pts = frame_meta->pts; - state->frame_meta_queue = frame_meta->next; // remove the item - frame_meta_delete(frame_meta); - return pts; -} - -static int -read_packet_with_meta(void *opaque, uint8_t *buf, int buf_size) { - struct stream *stream = opaque; - struct receiver_state *state = &stream->receiver_state; - +stream_recv_packet(struct stream *stream, AVPacket *packet) { // The video stream contains raw packets, without time information. When we // record, we retrieve the timestamps separately, from a "meta" header // added by the server before each raw packet. @@ -82,60 +36,30 @@ read_packet_with_meta(void *opaque, uint8_t *buf, int buf_size) { // // It is followed by bytes containing the packet/frame. - if (!state->remaining) { -#define HEADER_SIZE 12 - uint8_t header[HEADER_SIZE]; - ssize_t r = net_recv_all(stream->socket, header, HEADER_SIZE); - if (r == -1) { - return AVERROR(errno); - } - if (r == 0) { - return AVERROR_EOF; - } - // no partial read (net_recv_all()) - SDL_assert_release(r == HEADER_SIZE); - - uint64_t pts = buffer_read64be(header); - state->remaining = buffer_read32be(&header[8]); - - if (pts != NO_PTS && !receiver_state_push_meta(state, pts)) { - LOGE("Could not store PTS for recording"); - // we could not save the PTS, the recording would be broken - return AVERROR(ENOMEM); - } + uint8_t header[HEADER_SIZE]; + ssize_t r = net_recv_all(stream->socket, header, HEADER_SIZE); + if (r < HEADER_SIZE) { + return false; } - SDL_assert(state->remaining); + uint64_t pts = buffer_read64be(header); + uint32_t len = buffer_read32be(&header[8]); + SDL_assert(len); - if (buf_size > state->remaining) { - buf_size = state->remaining; + if (av_new_packet(packet, len)) { + LOGE("Could not allocate packet"); + return false; } - ssize_t r = net_recv(stream->socket, buf, buf_size); - if (r == -1) { - return errno ? AVERROR(errno) : AVERROR_EOF; - } - if (r == 0) { - return AVERROR_EOF; + r = net_recv_all(stream->socket, packet->data, len); + if (r < len) { + av_packet_unref(packet); + return false; } - SDL_assert(state->remaining >= r); - state->remaining -= r; + packet->pts = pts != NO_PTS ? pts : AV_NOPTS_VALUE; - return r; -} - -static int -read_raw_packet(void *opaque, uint8_t *buf, int buf_size) { - struct stream *stream = opaque; - ssize_t r = net_recv(stream->socket, buf, buf_size); - if (r == -1) { - return errno ? AVERROR(errno) : AVERROR_EOF; - } - if (r == 0) { - return AVERROR_EOF; - } - return r; + return true; } static void @@ -145,55 +69,136 @@ notify_stopped(void) { SDL_PushEvent(&stop_event); } +static bool +process_config_packet(struct stream *stream, AVPacket *packet) { + if (stream->recorder && !recorder_write(stream->recorder, packet)) { + LOGE("Could not send config packet to recorder"); + return false; + } + return true; +} + +static bool +process_frame(struct stream *stream, AVPacket *packet) { + if (stream->decoder && !decoder_push(stream->decoder, packet)) { + return false; + } + + if (stream->recorder) { + packet->dts = packet->pts; + + if (!recorder_write(stream->recorder, packet)) { + LOGE("Could not write frame to output file"); + return false; + } + } + + return true; +} + +static bool +stream_parse(struct stream *stream, AVPacket *packet) { + uint8_t *in_data = packet->data; + int in_len = packet->size; + uint8_t *out_data = NULL; + int out_len = 0; + int r = av_parser_parse2(stream->parser, stream->codec_ctx, + &out_data, &out_len, in_data, in_len, + AV_NOPTS_VALUE, AV_NOPTS_VALUE, -1); + + // PARSER_FLAG_COMPLETE_FRAMES is set + SDL_assert(r == in_len); + SDL_assert(out_len == in_len); + + if (stream->parser->key_frame == 1) { + packet->flags |= AV_PKT_FLAG_KEY; + } + + bool ok = process_frame(stream, packet); + if (!ok) { + LOGE("Could not process frame"); + return false; + } + + return true; +} + +static bool +stream_push_packet(struct stream *stream, AVPacket *packet) { + bool is_config = packet->pts == AV_NOPTS_VALUE; + + // A config packet must not be decoded immetiately (it contains no + // frame); instead, it must be concatenated with the future data packet. + if (stream->has_pending || is_config) { + size_t offset; + if (stream->has_pending) { + offset = stream->pending.size; + if (av_grow_packet(&stream->pending, packet->size)) { + LOGE("Could not grow packet"); + return false; + } + } else { + offset = 0; + if (av_new_packet(&stream->pending, packet->size)) { + LOGE("Could not create packet"); + return false; + } + stream->has_pending = true; + } + + memcpy(stream->pending.data + offset, packet->data, packet->size); + + if (!is_config) { + // prepare the concat packet to send to the decoder + stream->pending.pts = packet->pts; + stream->pending.dts = packet->dts; + stream->pending.flags = packet->flags; + packet = &stream->pending; + } + } + + if (is_config) { + // config packet + bool ok = process_config_packet(stream, packet); + if (!ok) { + return false; + } + } else { + // data packet + bool ok = stream_parse(stream, packet); + + if (stream->has_pending) { + // the pending packet must be discarded (consumed or error) + stream->has_pending = false; + av_packet_unref(&stream->pending); + } + + if (!ok) { + return false; + } + } + return true; +} + static int run_stream(void *data) { struct stream *stream = data; - AVFormatContext *format_ctx = avformat_alloc_context(); - if (!format_ctx) { - LOGC("Could not allocate format context"); - goto end; - } - - unsigned char *buffer = av_malloc(BUFSIZE); - if (!buffer) { - LOGC("Could not allocate buffer"); - goto finally_free_format_ctx; - } - - // initialize the receiver state - stream->receiver_state.frame_meta_queue = NULL; - stream->receiver_state.remaining = 0; - - // if recording is enabled, a "header" is sent between raw packets - int (*read_packet)(void *, uint8_t *, int) = - stream->recorder ? read_packet_with_meta : read_raw_packet; - AVIOContext *avio_ctx = avio_alloc_context(buffer, BUFSIZE, 0, stream, - read_packet, NULL, NULL); - if (!avio_ctx) { - LOGC("Could not allocate avio context"); - // avformat_open_input takes ownership of 'buffer' - // so only free the buffer before avformat_open_input() - av_free(buffer); - goto finally_free_format_ctx; - } - - format_ctx->pb = avio_ctx; - - if (avformat_open_input(&format_ctx, NULL, NULL, NULL) < 0) { - LOGE("Could not open video stream"); - goto finally_free_avio_ctx; - } - AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_H264); if (!codec) { LOGE("H.264 decoder not found"); goto end; } + stream->codec_ctx = avcodec_alloc_context3(codec); + if (!stream->codec_ctx) { + LOGC("Could not allocate codec context"); + goto end; + } + if (stream->decoder && !decoder_open(stream->decoder, codec)) { LOGE("Could not open decoder"); - goto finally_close_input; + goto finally_free_codec_ctx; } if (stream->recorder && !recorder_open(stream->recorder, codec)) { @@ -201,50 +206,40 @@ run_stream(void *data) { goto finally_close_decoder; } - AVPacket packet; - av_init_packet(&packet); - packet.data = NULL; - packet.size = 0; + stream->parser = av_parser_init(AV_CODEC_ID_H264); + if (!stream->parser) { + LOGE("Could not initialize parser"); + goto finally_close_recorder; + } - while (!av_read_frame(format_ctx, &packet)) { - if (SDL_AtomicGet(&stream->stopped)) { - // if the stream is stopped, the socket had been shutdown, so the - // last packet is probably corrupted (but not detected as such by - // FFmpeg) and will not be decoded correctly - av_packet_unref(&packet); - goto quit; - } - if (stream->decoder && !decoder_push(stream->decoder, &packet)) { - av_packet_unref(&packet); - goto quit; - } - - if (stream->recorder) { - // we retrieve the PTS in order they were received, so they will - // be assigned to the correct frame - uint64_t pts = receiver_state_take_meta(&stream->receiver_state); - packet.pts = pts; - packet.dts = pts; - - // no need to rescale with av_packet_rescale_ts(), the timestamps - // are in microseconds both in input and output - if (!recorder_write(stream->recorder, &packet)) { - LOGE("Could not write frame to output file"); - av_packet_unref(&packet); - goto quit; - } + // We must only pass complete frames to av_parser_parse2()! + // It's more complicated, but this allows to reduce the latency by 1 frame! + stream->parser->flags |= PARSER_FLAG_COMPLETE_FRAMES; + + for (;;) { + AVPacket packet; + bool ok = stream_recv_packet(stream, &packet); + if (!ok) { + // end of stream + break; } + ok = stream_push_packet(stream, &packet); av_packet_unref(&packet); - - if (avio_ctx->eof_reached) { + if (!ok) { + // cannot process packet (error already logged) break; } } LOGD("End of frames"); -quit: + if (stream->has_pending) { + av_packet_unref(&stream->pending); + } + + av_parser_close(stream->parser); +finally_close_recorder: if (stream->recorder) { recorder_close(stream->recorder); } @@ -252,13 +247,8 @@ finally_close_decoder: if (stream->decoder) { decoder_close(stream->decoder); } -finally_close_input: - avformat_close_input(&format_ctx); -finally_free_avio_ctx: - av_free(avio_ctx->buffer); - av_free(avio_ctx); -finally_free_format_ctx: - avformat_free_context(format_ctx); +finally_free_codec_ctx: + avcodec_free_context(&stream->codec_ctx); end: notify_stopped(); return 0; @@ -270,7 +260,7 @@ stream_init(struct stream *stream, socket_t socket, stream->socket = socket; stream->decoder = decoder, stream->recorder = recorder; - SDL_AtomicSet(&stream->stopped, 0); + stream->has_pending = false; } bool @@ -287,7 +277,6 @@ stream_start(struct stream *stream) { void stream_stop(struct stream *stream) { - SDL_AtomicSet(&stream->stopped, 1); if (stream->decoder) { decoder_interrupt(stream->decoder); } diff --git a/app/src/stream.h b/app/src/stream.h index 1ebff1a0..160ed7f5 100644 --- a/app/src/stream.h +++ b/app/src/stream.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -10,23 +11,18 @@ struct video_buffer; -struct frame_meta { - uint64_t pts; - struct frame_meta *next; -}; - struct stream { socket_t socket; struct video_buffer *video_buffer; SDL_Thread *thread; - SDL_atomic_t stopped; struct decoder *decoder; struct recorder *recorder; - struct receiver_state { - // meta (in order) for frames not consumed yet - struct frame_meta *frame_meta_queue; - size_t remaining; // remaining bytes to receive for the current frame - } receiver_state; + AVCodecContext *codec_ctx; + AVCodecParserContext *parser; + // successive packets may need to be concatenated, until a non-config + // packet is available + bool has_pending; + AVPacket pending; }; void