diff --git a/app/src/recorder.c b/app/src/recorder.c index 3de8257a..c14394a3 100644 --- a/app/src/recorder.c +++ b/app/src/recorder.c @@ -166,11 +166,21 @@ recorder_rescale_packet(struct recorder *recorder, AVPacket *packet) { bool recorder_write(struct recorder *recorder, AVPacket *packet) { if (!recorder->header_written) { + if (packet->pts != AV_NOPTS_VALUE) { + LOGE("The first packet is not a config packet"); + return false; + } bool ok = recorder_write_header(recorder, packet); if (!ok) { return false; } recorder->header_written = true; + return true; + } + + if (packet->pts == AV_NOPTS_VALUE) { + // ignore config packets + return true; } recorder_rescale_packet(recorder, packet); diff --git a/app/src/scrcpy.c b/app/src/scrcpy.c index 01dc52e4..ed988778 100644 --- a/app/src/scrcpy.c +++ b/app/src/scrcpy.c @@ -277,7 +277,6 @@ scrcpy(const struct scrcpy_options *options) { .local_port = options->port, .max_size = options->max_size, .bit_rate = options->bit_rate, - .send_frame_meta = record, .control = options->control, }; if (!server_start(&server, options->serial, ¶ms)) { diff --git a/app/src/server.c b/app/src/server.c index 5b593c47..85b1b6b8 100644 --- a/app/src/server.c +++ b/app/src/server.c @@ -130,7 +130,7 @@ execute_server(struct server *server, const struct server_params *params) { bit_rate_string, server->tunnel_forward ? "true" : "false", params->crop ? params->crop : "-", - params->send_frame_meta ? "true" : "false", + "true", // always send frame meta (packet boundaries + timestamp) params->control ? "true" : "false", }; return adb_execute(server->serial, cmd, sizeof(cmd) / sizeof(cmd[0])); diff --git a/app/src/server.h b/app/src/server.h index 74a6cac8..4970d64e 100644 --- a/app/src/server.h +++ b/app/src/server.h @@ -34,7 +34,6 @@ struct server_params { uint16_t local_port; uint16_t max_size; uint32_t bit_rate; - bool send_frame_meta; bool control; }; diff --git a/app/src/stream.c b/app/src/stream.c index e85834c1..0396bf60 100644 --- a/app/src/stream.c +++ b/app/src/stream.c @@ -22,54 +22,8 @@ #define HEADER_SIZE 12 #define NO_PTS UINT64_C(-1) -static struct frame_meta * -frame_meta_new(uint64_t pts) { - struct frame_meta *meta = SDL_malloc(sizeof(*meta)); - if (!meta) { - return meta; - } - meta->pts = pts; - meta->next = NULL; - return meta; -} - -static void -frame_meta_delete(struct frame_meta *frame_meta) { - SDL_free(frame_meta); -} - static bool -receiver_state_push_meta(struct receiver_state *state, uint64_t pts) { - struct frame_meta *frame_meta = frame_meta_new(pts); - if (!frame_meta) { - return false; - } - - // append to the list - // (iterate to find the last item, in practice the list should be tiny) - struct frame_meta **p = &state->frame_meta_queue; - while (*p) { - p = &(*p)->next; - } - *p = frame_meta; - return true; -} - -static uint64_t -receiver_state_take_meta(struct receiver_state *state) { - struct frame_meta *frame_meta = state->frame_meta_queue; // first item - SDL_assert(frame_meta); // must not be empty - uint64_t pts = frame_meta->pts; - state->frame_meta_queue = frame_meta->next; // remove the item - frame_meta_delete(frame_meta); - return pts; -} - -static int -read_packet_with_meta(void *opaque, uint8_t *buf, int buf_size) { - struct stream *stream = opaque; - struct receiver_state *state = &stream->receiver_state; - +stream_recv_packet(struct stream *stream, AVPacket *packet) { // The video stream contains raw packets, without time information. When we // record, we retrieve the timestamps separately, from a "meta" header // added by the server before each raw packet. @@ -82,60 +36,30 @@ read_packet_with_meta(void *opaque, uint8_t *buf, int buf_size) { // // It is followed by bytes containing the packet/frame. - if (!state->remaining) { -#define HEADER_SIZE 12 - uint8_t header[HEADER_SIZE]; - ssize_t r = net_recv_all(stream->socket, header, HEADER_SIZE); - if (r == -1) { - return AVERROR(errno); - } - if (r == 0) { - return AVERROR_EOF; - } - // no partial read (net_recv_all()) - SDL_assert_release(r == HEADER_SIZE); - - uint64_t pts = buffer_read64be(header); - state->remaining = buffer_read32be(&header[8]); - - if (pts != NO_PTS && !receiver_state_push_meta(state, pts)) { - LOGE("Could not store PTS for recording"); - // we could not save the PTS, the recording would be broken - return AVERROR(ENOMEM); - } + uint8_t header[HEADER_SIZE]; + ssize_t r = net_recv_all(stream->socket, header, HEADER_SIZE); + if (r < HEADER_SIZE) { + return false; } - SDL_assert(state->remaining); + uint64_t pts = buffer_read64be(header); + uint32_t len = buffer_read32be(&header[8]); + SDL_assert(len); - if (buf_size > state->remaining) { - buf_size = state->remaining; + if (av_new_packet(packet, len)) { + LOGE("Could not allocate packet"); + return false; } - ssize_t r = net_recv(stream->socket, buf, buf_size); - if (r == -1) { - return errno ? AVERROR(errno) : AVERROR_EOF; - } - if (r == 0) { - return AVERROR_EOF; + r = net_recv_all(stream->socket, packet->data, len); + if (r < len) { + av_packet_unref(packet); + return false; } - SDL_assert(state->remaining >= r); - state->remaining -= r; + packet->pts = pts != NO_PTS ? pts : AV_NOPTS_VALUE; - return r; -} - -static int -read_raw_packet(void *opaque, uint8_t *buf, int buf_size) { - struct stream *stream = opaque; - ssize_t r = net_recv(stream->socket, buf, buf_size); - if (r == -1) { - return errno ? AVERROR(errno) : AVERROR_EOF; - } - if (r == 0) { - return AVERROR_EOF; - } - return r; + return true; } static void @@ -145,55 +69,136 @@ notify_stopped(void) { SDL_PushEvent(&stop_event); } +static bool +process_config_packet(struct stream *stream, AVPacket *packet) { + if (stream->recorder && !recorder_write(stream->recorder, packet)) { + LOGE("Could not send config packet to recorder"); + return false; + } + return true; +} + +static bool +process_frame(struct stream *stream, AVPacket *packet) { + if (stream->decoder && !decoder_push(stream->decoder, packet)) { + return false; + } + + if (stream->recorder) { + packet->dts = packet->pts; + + if (!recorder_write(stream->recorder, packet)) { + LOGE("Could not write frame to output file"); + return false; + } + } + + return true; +} + +static bool +stream_parse(struct stream *stream, AVPacket *packet) { + uint8_t *in_data = packet->data; + int in_len = packet->size; + uint8_t *out_data = NULL; + int out_len = 0; + int r = av_parser_parse2(stream->parser, stream->codec_ctx, + &out_data, &out_len, in_data, in_len, + AV_NOPTS_VALUE, AV_NOPTS_VALUE, -1); + + // PARSER_FLAG_COMPLETE_FRAMES is set + SDL_assert(r == in_len); + SDL_assert(out_len == in_len); + + if (stream->parser->key_frame == 1) { + packet->flags |= AV_PKT_FLAG_KEY; + } + + bool ok = process_frame(stream, packet); + if (!ok) { + LOGE("Could not process frame"); + return false; + } + + return true; +} + +static bool +stream_push_packet(struct stream *stream, AVPacket *packet) { + bool is_config = packet->pts == AV_NOPTS_VALUE; + + // A config packet must not be decoded immetiately (it contains no + // frame); instead, it must be concatenated with the future data packet. + if (stream->has_pending || is_config) { + size_t offset; + if (stream->has_pending) { + offset = stream->pending.size; + if (av_grow_packet(&stream->pending, packet->size)) { + LOGE("Could not grow packet"); + return false; + } + } else { + offset = 0; + if (av_new_packet(&stream->pending, packet->size)) { + LOGE("Could not create packet"); + return false; + } + stream->has_pending = true; + } + + memcpy(stream->pending.data + offset, packet->data, packet->size); + + if (!is_config) { + // prepare the concat packet to send to the decoder + stream->pending.pts = packet->pts; + stream->pending.dts = packet->dts; + stream->pending.flags = packet->flags; + packet = &stream->pending; + } + } + + if (is_config) { + // config packet + bool ok = process_config_packet(stream, packet); + if (!ok) { + return false; + } + } else { + // data packet + bool ok = stream_parse(stream, packet); + + if (stream->has_pending) { + // the pending packet must be discarded (consumed or error) + stream->has_pending = false; + av_packet_unref(&stream->pending); + } + + if (!ok) { + return false; + } + } + return true; +} + static int run_stream(void *data) { struct stream *stream = data; - AVFormatContext *format_ctx = avformat_alloc_context(); - if (!format_ctx) { - LOGC("Could not allocate format context"); - goto end; - } - - unsigned char *buffer = av_malloc(BUFSIZE); - if (!buffer) { - LOGC("Could not allocate buffer"); - goto finally_free_format_ctx; - } - - // initialize the receiver state - stream->receiver_state.frame_meta_queue = NULL; - stream->receiver_state.remaining = 0; - - // if recording is enabled, a "header" is sent between raw packets - int (*read_packet)(void *, uint8_t *, int) = - stream->recorder ? read_packet_with_meta : read_raw_packet; - AVIOContext *avio_ctx = avio_alloc_context(buffer, BUFSIZE, 0, stream, - read_packet, NULL, NULL); - if (!avio_ctx) { - LOGC("Could not allocate avio context"); - // avformat_open_input takes ownership of 'buffer' - // so only free the buffer before avformat_open_input() - av_free(buffer); - goto finally_free_format_ctx; - } - - format_ctx->pb = avio_ctx; - - if (avformat_open_input(&format_ctx, NULL, NULL, NULL) < 0) { - LOGE("Could not open video stream"); - goto finally_free_avio_ctx; - } - AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_H264); if (!codec) { LOGE("H.264 decoder not found"); goto end; } + stream->codec_ctx = avcodec_alloc_context3(codec); + if (!stream->codec_ctx) { + LOGC("Could not allocate codec context"); + goto end; + } + if (stream->decoder && !decoder_open(stream->decoder, codec)) { LOGE("Could not open decoder"); - goto finally_close_input; + goto finally_free_codec_ctx; } if (stream->recorder && !recorder_open(stream->recorder, codec)) { @@ -201,50 +206,40 @@ run_stream(void *data) { goto finally_close_decoder; } - AVPacket packet; - av_init_packet(&packet); - packet.data = NULL; - packet.size = 0; + stream->parser = av_parser_init(AV_CODEC_ID_H264); + if (!stream->parser) { + LOGE("Could not initialize parser"); + goto finally_close_recorder; + } - while (!av_read_frame(format_ctx, &packet)) { - if (SDL_AtomicGet(&stream->stopped)) { - // if the stream is stopped, the socket had been shutdown, so the - // last packet is probably corrupted (but not detected as such by - // FFmpeg) and will not be decoded correctly - av_packet_unref(&packet); - goto quit; - } - if (stream->decoder && !decoder_push(stream->decoder, &packet)) { - av_packet_unref(&packet); - goto quit; - } - - if (stream->recorder) { - // we retrieve the PTS in order they were received, so they will - // be assigned to the correct frame - uint64_t pts = receiver_state_take_meta(&stream->receiver_state); - packet.pts = pts; - packet.dts = pts; - - // no need to rescale with av_packet_rescale_ts(), the timestamps - // are in microseconds both in input and output - if (!recorder_write(stream->recorder, &packet)) { - LOGE("Could not write frame to output file"); - av_packet_unref(&packet); - goto quit; - } + // We must only pass complete frames to av_parser_parse2()! + // It's more complicated, but this allows to reduce the latency by 1 frame! + stream->parser->flags |= PARSER_FLAG_COMPLETE_FRAMES; + + for (;;) { + AVPacket packet; + bool ok = stream_recv_packet(stream, &packet); + if (!ok) { + // end of stream + break; } + ok = stream_push_packet(stream, &packet); av_packet_unref(&packet); - - if (avio_ctx->eof_reached) { + if (!ok) { + // cannot process packet (error already logged) break; } } LOGD("End of frames"); -quit: + if (stream->has_pending) { + av_packet_unref(&stream->pending); + } + + av_parser_close(stream->parser); +finally_close_recorder: if (stream->recorder) { recorder_close(stream->recorder); } @@ -252,13 +247,8 @@ finally_close_decoder: if (stream->decoder) { decoder_close(stream->decoder); } -finally_close_input: - avformat_close_input(&format_ctx); -finally_free_avio_ctx: - av_free(avio_ctx->buffer); - av_free(avio_ctx); -finally_free_format_ctx: - avformat_free_context(format_ctx); +finally_free_codec_ctx: + avcodec_free_context(&stream->codec_ctx); end: notify_stopped(); return 0; @@ -270,7 +260,7 @@ stream_init(struct stream *stream, socket_t socket, stream->socket = socket; stream->decoder = decoder, stream->recorder = recorder; - SDL_AtomicSet(&stream->stopped, 0); + stream->has_pending = false; } bool @@ -287,7 +277,6 @@ stream_start(struct stream *stream) { void stream_stop(struct stream *stream) { - SDL_AtomicSet(&stream->stopped, 1); if (stream->decoder) { decoder_interrupt(stream->decoder); } diff --git a/app/src/stream.h b/app/src/stream.h index 1ebff1a0..160ed7f5 100644 --- a/app/src/stream.h +++ b/app/src/stream.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -10,23 +11,18 @@ struct video_buffer; -struct frame_meta { - uint64_t pts; - struct frame_meta *next; -}; - struct stream { socket_t socket; struct video_buffer *video_buffer; SDL_Thread *thread; - SDL_atomic_t stopped; struct decoder *decoder; struct recorder *recorder; - struct receiver_state { - // meta (in order) for frames not consumed yet - struct frame_meta *frame_meta_queue; - size_t remaining; // remaining bytes to receive for the current frame - } receiver_state; + AVCodecContext *codec_ctx; + AVCodecParserContext *parser; + // successive packets may need to be concatenated, until a non-config + // packet is available + bool has_pending; + AVPacket pending; }; void