From 0136df1e6261d7da7b5a46366431250ba2ad771a Mon Sep 17 00:00:00 2001
From: Chris Robinson <chris.kcat@gmail.com>
Date: Sat, 30 Dec 2017 20:34:33 -0800
Subject: Improve the alffplay queue for FFmpeg's send/receive API

The packet handling thread now calls avcodec_send_packet to give compressed
data to libavcodec, while the audio/video threads call avcodec_receive_frame to
handle decoded frames. The packet thread still maintains local queues for each
stream to avoid starving an A/V thread when the other doesn't want another
frame yet.
---
 examples/alffplay.cpp | 291 ++++++++++++++++++++++++--------------------------
 1 file changed, 139 insertions(+), 152 deletions(-)

diff --git a/examples/alffplay.cpp b/examples/alffplay.cpp
index 10c00553..7c6b9f6e 100644
--- a/examples/alffplay.cpp
+++ b/examples/alffplay.cpp
@@ -69,24 +69,44 @@ enum {
 };
 
 
-struct PacketQueue {
+class PacketQueue {
     std::deque<AVPacket> mPackets;
-    std::atomic<int> mTotalSize;
-    std::atomic<bool> mFinished;
-    std::mutex mMutex;
-    std::condition_variable mCond;
+    size_t mTotalSize;
 
-    PacketQueue() : mTotalSize(0), mFinished(false)
-    { }
-    ~PacketQueue()
-    { clear(); }
+public:
+    PacketQueue() : mTotalSize(0) { }
+    ~PacketQueue() { clear(); }
 
-    int put(const AVPacket *pkt);
-    int peek(AVPacket *pkt, std::atomic<bool> &quit_var);
-    void pop();
+    bool empty() const noexcept { return mPackets.empty(); }
+    size_t totalSize() const noexcept { return mTotalSize; }
 
-    void clear();
-    void finish();
+    void put(const AVPacket *pkt)
+    {
+        mPackets.push_back(AVPacket{});
+        if(av_packet_ref(&mPackets.back(), pkt) != 0)
+            mPackets.pop_back();
+        else
+            mTotalSize += mPackets.back().size;
+    }
+
+    AVPacket *front() noexcept
+    { return &mPackets.front(); }
+
+    void pop()
+    {
+        AVPacket *pkt = &mPackets.front();
+        mTotalSize -= pkt->size;
+        av_packet_unref(pkt);
+        mPackets.pop_front();
+    }
+
+    void clear()
+    {
+        for(AVPacket &pkt : mPackets)
+            av_packet_unref(&pkt);
+        mPackets.clear();
+        mTotalSize = 0;
+    }
 };
 
 
@@ -98,7 +118,8 @@ struct AudioState {
     AVStream *mStream;
     AVCodecContext *mCodecCtx;
 
-    PacketQueue mQueue;
+    std::mutex mQueueMtx;
+    std::condition_variable mQueueCond;
 
     /* Used for clock difference average computation */
     struct {
@@ -174,7 +195,8 @@ struct VideoState {
     AVStream *mStream;
     AVCodecContext *mCodecCtx;
 
-    PacketQueue mQueue;
+    std::mutex mQueueMtx;
+    std::condition_variable mQueueCond;
 
     double  mClock;
     double  mFrameTimer;
@@ -241,7 +263,6 @@ struct VideoState {
 
 struct MovieState {
     AVFormatContext *mFormatCtx;
-    int mVideoStream, mAudioStream;
 
     int mAVSyncType;
 
@@ -259,9 +280,9 @@ struct MovieState {
     std::string mFilename;
 
     MovieState(std::string fname)
-      : mFormatCtx(nullptr), mVideoStream(0), mAudioStream(0)
-      , mAVSyncType(DEFAULT_AV_SYNC_TYPE), mExternalClockBase(0), mQuit(false)
-      , mAudio(this), mVideo(this), mFilename(std::move(fname))
+      : mFormatCtx(nullptr), mAVSyncType(DEFAULT_AV_SYNC_TYPE)
+      , mExternalClockBase(0), mQuit(false), mAudio(this), mVideo(this)
+      , mFilename(std::move(fname))
     { }
     ~MovieState()
     {
@@ -284,68 +305,6 @@ struct MovieState {
 };
 
 
-int PacketQueue::put(const AVPacket *pkt)
-{
-    std::unique_lock<std::mutex> lock(mMutex);
-    mPackets.push_back(AVPacket{});
-    if(av_packet_ref(&mPackets.back(), pkt) != 0)
-    {
-        mPackets.pop_back();
-        return -1;
-    }
-    mTotalSize += mPackets.back().size;
-    lock.unlock();
-
-    mCond.notify_one();
-    return 0;
-}
-
-int PacketQueue::peek(AVPacket *pkt, std::atomic<bool> &quit_var)
-{
-    std::unique_lock<std::mutex> lock(mMutex);
-    while(!quit_var.load())
-    {
-        if(!mPackets.empty())
-        {
-            if(av_packet_ref(pkt, &mPackets.front()) != 0)
-                return -1;
-            return 1;
-        }
-
-        if(mFinished.load())
-            return 0;
-        mCond.wait(lock);
-    }
-    return -1;
-}
-
-void PacketQueue::pop()
-{
-    std::unique_lock<std::mutex> lock(mMutex);
-    AVPacket *pkt = &mPackets.front();
-    mTotalSize -= pkt->size;
-    av_packet_unref(pkt);
-    mPackets.pop_front();
-}
-
-void PacketQueue::clear()
-{
-    std::unique_lock<std::mutex> lock(mMutex);
-    std::for_each(mPackets.begin(), mPackets.end(),
-        [](AVPacket &pkt) { av_packet_unref(&pkt); }
-    );
-    mPackets.clear();
-    mTotalSize = 0;
-}
-void PacketQueue::finish()
-{
-    std::unique_lock<std::mutex> lock(mMutex);
-    mFinished = true;
-    lock.unlock();
-    mCond.notify_all();
-}
-
-
 double AudioState::getClock()
 {
     double pts;
@@ -440,29 +399,18 @@ int AudioState::decodeFrame()
 {
     while(!mMovie->mQuit.load())
     {
-        while(!mMovie->mQuit.load())
-        {
-            /* Get the next packet */
-            AVPacket pkt{};
-            if(mQueue.peek(&pkt, mMovie->mQuit) <= 0)
-                return -1;
-
-            int ret = avcodec_send_packet(mCodecCtx, &pkt);
-            if(ret != AVERROR(EAGAIN))
-            {
-                if(ret < 0)
-                    std::cerr<< "Failed to send encoded packet: 0x"<<std::hex<<ret<<std::dec <<std::endl;
-                mQueue.pop();
-            }
-            av_packet_unref(&pkt);
-            if(ret == 0 || ret == AVERROR(EAGAIN))
-                break;
-        }
-
+        std::unique_lock<std::mutex> lock(mQueueMtx);
         int ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
         if(ret == AVERROR(EAGAIN))
-            continue;
-        if(ret == AVERROR_EOF || ret < 0)
+        {
+            do {
+                mQueueCond.wait(lock);
+                ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
+            } while(ret == AVERROR(EAGAIN));
+        }
+        lock.unlock();
+        if(ret == AVERROR_EOF) break;
+        if(ret < 0)
         {
             std::cerr<< "Failed to decode frame: "<<ret <<std::endl;
             return 0;
@@ -1110,28 +1058,19 @@ int VideoState::handler()
     mDecodedFrame = av_frame_alloc();
     while(!mMovie->mQuit)
     {
-        while(!mMovie->mQuit)
-        {
-            AVPacket packet{};
-            if(mQueue.peek(&packet, mMovie->mQuit) <= 0)
-                goto finish;
-
-            int ret = avcodec_send_packet(mCodecCtx, &packet);
-            if(ret != AVERROR(EAGAIN))
-            {
-                if(ret < 0)
-                    std::cerr<< "Failed to send encoded packet: 0x"<<std::hex<<ret<<std::dec <<std::endl;
-                mQueue.pop();
-            }
-            av_packet_unref(&packet);
-            if(ret == 0 || ret == AVERROR(EAGAIN))
-                break;
-        }
-
+        std::unique_lock<std::mutex> lock(mQueueMtx);
         /* Decode video frame */
         int ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
         if(ret == AVERROR(EAGAIN))
-            continue;
+        {
+            do {
+                mQueueCond.wait(lock);
+                ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
+            } while(ret == AVERROR(EAGAIN));
+        }
+        lock.unlock();
+        if(ret == AVERROR_EOF)
+            break;
         if(ret < 0)
         {
             std::cerr<< "Failed to decode frame: "<<ret <<std::endl;
@@ -1145,7 +1084,6 @@ int VideoState::handler()
             break;
         av_frame_unref(mDecodedFrame);
     }
-finish:
     mEOS = true;
     av_frame_free(&mDecodedFrame);
 
@@ -1254,7 +1192,6 @@ int MovieState::streamComponentOpen(int stream_index)
     switch(avctx->codec_type)
     {
         case AVMEDIA_TYPE_AUDIO:
-            mAudioStream = stream_index;
             mAudio.mStream = mFormatCtx->streams[stream_index];
             mAudio.mCodecCtx = avctx;
 
@@ -1267,7 +1204,6 @@ int MovieState::streamComponentOpen(int stream_index)
             break;
 
         case AVMEDIA_TYPE_VIDEO:
-            mVideoStream = stream_index;
             mVideo.mStream = mFormatCtx->streams[stream_index];
             mVideo.mCodecCtx = avctx;
 
@@ -1280,10 +1216,10 @@ int MovieState::streamComponentOpen(int stream_index)
 
         default:
             avcodec_free_context(&avctx);
-            break;
+            return -1;
     }
 
-    return 0;
+    return stream_index;
 }
 
 int MovieState::parse_handler()
@@ -1291,9 +1227,6 @@ int MovieState::parse_handler()
     int video_index = -1;
     int audio_index = -1;
 
-    mVideoStream = -1;
-    mAudioStream = -1;
-
     /* Dump information about file onto standard error */
     av_dump_format(mFormatCtx, 0, mFilename.c_str(), 0);
 
@@ -1309,39 +1242,93 @@ int MovieState::parse_handler()
      * components time to start without needing to skip ahead.
      */
     mExternalClockBase = av_gettime() + 50000;
-    if(audio_index >= 0)
-        streamComponentOpen(audio_index);
-    if(video_index >= 0)
-        streamComponentOpen(video_index);
+    if(audio_index >= 0) audio_index = streamComponentOpen(audio_index);
+    if(video_index >= 0) video_index = streamComponentOpen(video_index);
 
-    if(mVideoStream < 0 && mAudioStream < 0)
+    if(video_index < 0 && audio_index < 0)
     {
         std::cerr<< mFilename<<": could not open codecs" <<std::endl;
         mQuit = true;
     }
 
-    /* Main packet handling loop */
-    while(!mQuit.load())
-    {
-        if(mAudio.mQueue.mTotalSize + mVideo.mQueue.mTotalSize >= MAX_QUEUE_SIZE)
-        {
-            std::this_thread::sleep_for(std::chrono::milliseconds(10));
-            continue;
-        }
+    PacketQueue audio_queue, video_queue;
+    bool input_finished = false;
 
+    /* Main packet reading/dispatching loop */
+    while(!mQuit.load(std::memory_order_relaxed) && !input_finished)
+    {
         AVPacket packet;
         if(av_read_frame(mFormatCtx, &packet) < 0)
-            break;
+            input_finished = true;
+        else
+        {
+            /* Copy the packet into the queue it's meant for. */
+            if(packet.stream_index == video_index)
+                video_queue.put(&packet);
+            else if(packet.stream_index == audio_index)
+                audio_queue.put(&packet);
+            av_packet_unref(&packet);
+        }
 
-        /* Copy the packet in the queue it's meant for. */
-        if(packet.stream_index == mVideoStream)
-            mVideo.mQueue.put(&packet);
-        else if(packet.stream_index == mAudioStream)
-            mAudio.mQueue.put(&packet);
-        av_packet_unref(&packet);
+        do {
+            /* Send whatever queued packets we have. */
+            bool sent;
+            do {
+                sent = false;
+                if(!audio_queue.empty())
+                {
+                    std::unique_lock<std::mutex> lock(mAudio.mQueueMtx);
+                    int ret = avcodec_send_packet(mAudio.mCodecCtx, audio_queue.front());
+                    if(ret != AVERROR(EAGAIN))
+                    {
+                        lock.unlock();
+                        mAudio.mQueueCond.notify_one();
+                        audio_queue.pop();
+                        sent = true;
+                    }
+                }
+                if(!video_queue.empty())
+                {
+                    std::unique_lock<std::mutex> lock(mVideo.mQueueMtx);
+                    int ret = avcodec_send_packet(mVideo.mCodecCtx, video_queue.front());
+                    if(ret != AVERROR(EAGAIN))
+                    {
+                        lock.unlock();
+                        mVideo.mQueueCond.notify_one();
+                        video_queue.pop();
+                        sent = true;
+                    }
+                }
+            } while(sent);
+            /* If the queues are completely empty, or it's not full and there's
+             * more input to read, go get more.
+             */
+            size_t queue_size = audio_queue.totalSize() + video_queue.totalSize();
+            if(queue_size == 0 || (queue_size < MAX_QUEUE_SIZE && !input_finished))
+                break;
+            /* Nothing to send or get for now, wait a bit and try again. */
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        } while(!mQuit.load(std::memory_order_relaxed));
+    }
+    /* Pass a null packet to finish the send buffers (the receive functions
+     * will get AVERROR_EOF when emptied).
+     */
+    if(mVideo.mCodecCtx != nullptr)
+    {
+        { std::lock_guard<std::mutex> lock(mVideo.mQueueMtx);
+            avcodec_send_packet(mVideo.mCodecCtx, nullptr);
+        }
+        mVideo.mQueueCond.notify_one();
+    }
+    if(mAudio.mCodecCtx != nullptr)
+    {
+        { std::lock_guard<std::mutex> lock(mAudio.mQueueMtx);
+            avcodec_send_packet(mAudio.mCodecCtx, nullptr);
+        }
+        mAudio.mQueueCond.notify_one();
     }
-    mVideo.mQueue.finish();
-    mAudio.mQueue.finish();
+    video_queue.clear();
+    audio_queue.clear();
 
     /* all done - wait for it */
     if(mVideoThread.joinable())
-- 
cgit v1.2.3