From 506d316f5560803e4b24541b18d17ec39e59061b Mon Sep 17 00:00:00 2001
From: Chris Robinson <chris.kcat@gmail.com>
Date: Sun, 31 Dec 2017 16:11:00 -0800
Subject: Wake up the packet send loop when more frames are needed

---
 examples/alffplay.cpp | 88 +++++++++++++++++++++++++++++----------------------
 1 file changed, 50 insertions(+), 38 deletions(-)

(limited to 'examples/alffplay.cpp')

diff --git a/examples/alffplay.cpp b/examples/alffplay.cpp
index 285ec7f1..60b60a65 100644
--- a/examples/alffplay.cpp
+++ b/examples/alffplay.cpp
@@ -45,8 +45,8 @@ LPALGETSOURCEI64VSOFT alGetSourcei64vSOFT;
 
 const std::chrono::duration<double> AVSyncThreshold(0.01);
 const std::chrono::seconds AVNoSyncThreshold(10);
-const std::chrono::milliseconds AudioBufferTime(100); /* Per-buffer */
-#define AUDIO_BUFFER_QUEUE_SIZE 8 /* Number of buffers to queue */
+const std::chrono::milliseconds AudioBufferTime(20); /* Per-buffer */
+#define AUDIO_BUFFER_QUEUE_SIZE 25 /* Number of buffers to queue (~500ms) */
 #define MAX_QUEUE_SIZE (15 * 1024 * 1024) /* Bytes of compressed data to keep queued */
 #define SAMPLE_CORRECTION_MAX_DIFF 0.05
 #define AUDIO_DIFF_AVG_NB 20
@@ -127,8 +127,8 @@ struct AudioState {
         int AvgCount;
     } mDiff;
 
-    /* Time (in seconds) of the next sample to be buffered */
-    std::chrono::duration<double> mCurrentPts;
+    /* Time (in nanoseconds) of the next sample to be buffered */
+    std::chrono::nanoseconds mCurrentPts;
 
     /* Decompressed sample frame, and swresample context for conversion */
     AVFrame           *mDecodedFrame;
@@ -155,7 +155,7 @@ struct AudioState {
 
     AudioState(MovieState *movie)
       : mMovie(movie), mStream(nullptr), mCodecCtx(nullptr)
-      , mDiff{0.0, 0.0, 0.0, 0}, mCurrentPts(0.0), mDecodedFrame(nullptr)
+      , mDiff{0.0, 0.0, 0.0, 0}, mCurrentPts(0), mDecodedFrame(nullptr)
       , mSwresCtx(nullptr), mDstChanLayout(0), mDstSampleFmt(AV_SAMPLE_FMT_NONE)
       , mSamples(nullptr), mSamplesLen(0), mSamplesPos(0), mSamplesMax(0)
       , mFormat(AL_NONE), mFrameSize(0), mSource(0), mBufferIdx(0)
@@ -265,6 +265,11 @@ struct MovieState {
 
     std::chrono::microseconds mClockBase;
 
+    std::mutex mSendMtx;
+    std::condition_variable mSendCond;
+    /* NOTE: false/clear = need data, true/set = no data needed */
+    std::atomic_flag mSendDataGood;
+
     std::atomic<bool> mQuit;
 
     AudioState mAudio;
@@ -324,7 +329,7 @@ std::chrono::nanoseconds AudioState::getClock()
      * OpenAL's current position, and subtracting the source latency from that
      * gives the timestamp of the sample currently at the DAC.
      */
-    nanoseconds pts = std::chrono::duration_cast<nanoseconds>(mCurrentPts);
+    nanoseconds pts = mCurrentPts;
     if(mSource)
     {
         ALint64SOFT offset[2];
@@ -405,6 +410,9 @@ int AudioState::decodeFrame()
         int ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
         if(ret == AVERROR(EAGAIN))
         {
+            mMovie->mSendDataGood.clear(std::memory_order_relaxed);
+            std::unique_lock<std::mutex>(mMovie->mSendMtx).unlock();
+            mMovie->mSendCond.notify_one();
             do {
                 mQueueCond.wait(lock);
                 ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
@@ -412,6 +420,8 @@ int AudioState::decodeFrame()
         }
         lock.unlock();
         if(ret == AVERROR_EOF) break;
+        mMovie->mSendDataGood.clear(std::memory_order_relaxed);
+        mMovie->mSendCond.notify_one();
         if(ret < 0)
         {
             std::cerr<< "Failed to decode frame: "<<ret <<std::endl;
@@ -1072,18 +1082,22 @@ int VideoState::handler()
         int ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
         if(ret == AVERROR(EAGAIN))
         {
+            mMovie->mSendDataGood.clear(std::memory_order_relaxed);
+            std::unique_lock<std::mutex>(mMovie->mSendMtx).unlock();
+            mMovie->mSendCond.notify_one();
             do {
                 mQueueCond.wait(lock);
                 ret = avcodec_receive_frame(mCodecCtx, mDecodedFrame);
             } while(ret == AVERROR(EAGAIN));
         }
         lock.unlock();
-        if(ret == AVERROR_EOF)
-            break;
+        if(ret == AVERROR_EOF) break;
+        mMovie->mSendDataGood.clear(std::memory_order_relaxed);
+        mMovie->mSendCond.notify_one();
         if(ret < 0)
         {
             std::cerr<< "Failed to decode frame: "<<ret <<std::endl;
-            break;
+            continue;
         }
 
         std::chrono::nanoseconds pts = synchronize(
@@ -1286,42 +1300,40 @@ int MovieState::parse_handler()
 
         do {
             /* Send whatever queued packets we have. */
-            bool sent;
-            do {
-                sent = false;
-                if(!audio_queue.empty())
-                {
-                    std::unique_lock<std::mutex> lock(mAudio.mQueueMtx);
-                    int ret = avcodec_send_packet(mAudio.mCodecCtx, audio_queue.front());
-                    if(ret != AVERROR(EAGAIN))
-                    {
-                        lock.unlock();
-                        mAudio.mQueueCond.notify_one();
-                        audio_queue.pop();
-                        sent = true;
-                    }
-                }
-                if(!video_queue.empty())
-                {
-                    std::unique_lock<std::mutex> lock(mVideo.mQueueMtx);
-                    int ret = avcodec_send_packet(mVideo.mCodecCtx, video_queue.front());
-                    if(ret != AVERROR(EAGAIN))
-                    {
-                        lock.unlock();
-                        mVideo.mQueueCond.notify_one();
-                        video_queue.pop();
-                        sent = true;
-                    }
-                }
-            } while(sent);
+            if(!audio_queue.empty())
+            {
+                std::unique_lock<std::mutex> lock(mAudio.mQueueMtx);
+                int ret;
+                do {
+                    ret = avcodec_send_packet(mAudio.mCodecCtx, audio_queue.front());
+                    if(ret != AVERROR(EAGAIN)) audio_queue.pop();
+                } while(ret != AVERROR(EAGAIN) && !audio_queue.empty());
+                lock.unlock();
+                mAudio.mQueueCond.notify_one();
+            }
+            if(!video_queue.empty())
+            {
+                std::unique_lock<std::mutex> lock(mVideo.mQueueMtx);
+                int ret;
+                do {
+                    ret = avcodec_send_packet(mVideo.mCodecCtx, video_queue.front());
+                    if(ret != AVERROR(EAGAIN)) video_queue.pop();
+                } while(ret != AVERROR(EAGAIN) && !video_queue.empty());
+                lock.unlock();
+                mVideo.mQueueCond.notify_one();
+            }
             /* If the queues are completely empty, or it's not full and there's
              * more input to read, go get more.
              */
             size_t queue_size = audio_queue.totalSize() + video_queue.totalSize();
             if(queue_size == 0 || (queue_size < MAX_QUEUE_SIZE && !input_finished))
                 break;
+
             /* Nothing to send or get for now, wait a bit and try again. */
-            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+            { std::unique_lock<std::mutex> lock(mSendMtx);
+                if(mSendDataGood.test_and_set(std::memory_order_relaxed))
+                    mSendCond.wait_for(lock, std::chrono::milliseconds(10));
+            }
         } while(!mQuit.load(std::memory_order_relaxed));
     }
     /* Pass a null packet to finish the send buffers (the receive functions
-- 
cgit v1.2.3