From a7a3d5ab98ee0ad33fdef50bf081afeb8295ebe4 Mon Sep 17 00:00:00 2001
From: Sven Gothel <sgothel@jausoft.com>
Date: Fri, 3 Oct 2014 03:12:42 +0200
Subject: MappedByteBuffer*Stream:

- Validate active and GC'ed mapped-buffer count
  in cleanAllSlices() via close() ..

- Fix missing unmapping last buffer in notifyLengthChangeImpl(),
  branch criteria was off by one.

- cleanSlice(..) now also issues cleanBuffer(..) on the GC'ed entry,
  hence if WeakReference is still alive, enforce it's release.

- cleanBuffer(..) reverts FLUSH_PRE_HARD -> FLUSH_PRE_SOFT
  in case of an error.

- flush() -> flush(boolean metaData) to expose FileChannel.force(metaData).

- Add synchronous mode, flushing/syncing the mapped buffers when
  in READ_WRITE mapping mode and issue FileChannel.force() if not READ_ONLY.

  Above is implemented via flush()/flushImpl(..) for buffers and FileChannel,
  as well as in syncSlice(..) for buffers only.

  flush*()/syncSlice() is covered by:
    - setLength()
    - notifyLengthChange*(..)
    - nextSlice()

  Always issue flushImpl() in close().

- Windows: Clean all buffers in setLength(),
  otherwise Windows will report:

- Windows: Catch MappedByteBuffer.force() IOException

- Optimization of position(..)
  position(..) is now standalone to allow issuing flushSlice(..)
  before gathering the new mapped buffer.
  This shall avoid one extra cache miss.

  Hence rename positionImpl(..) -> position2(..).

- All MappedByteBufferOutputStream.write(..) methods
  issue syncSlice(..) on the last written current slice
  to ensure new 'synchronous' mode is honored.

+++

Unit tests:

- Ensure test files are being deleted

- TestByteBufferCopyStream: Reduced test file size to more sensible values.

-
---
 .../common/nio/MappedByteBufferInputStream.java    | 281 ++++++++++++++++-----
 .../common/nio/MappedByteBufferOutputStream.java   |  49 +++-
 2 files changed, 266 insertions(+), 64 deletions(-)

(limited to 'src/java/com')

diff --git a/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java b/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java
index 5f91f64..1d4d78a 100644
--- a/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java
+++ b/src/java/com/jogamp/common/nio/MappedByteBufferInputStream.java
@@ -29,11 +29,13 @@ package com.jogamp.common.nio;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.RandomAccessFile;
 import java.lang.ref.WeakReference;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel.MapMode;
 import java.security.AccessController;
@@ -156,6 +158,8 @@ public class MappedByteBufferInputStream extends InputStream {
     private ByteBuffer[] slices;
     private WeakReference<ByteBuffer>[] slices2GC;
     private long totalSize;
+    private int slicesEntries, slices2GCEntries;
+    private boolean synchronous;
 
     private int refCount;
 
@@ -169,6 +173,23 @@ public class MappedByteBufferInputStream extends InputStream {
     private long mark;
 
     final void dbgDump(final String prefix, final PrintStream out) {
+        int _slicesEntries = 0;
+        for(int i=0; i<sliceCount; i++) {
+            if( null != slices[i] ) {
+                _slicesEntries++;
+            }
+        }
+        int _slices2GCEntries = 0;
+        int _slices2GCAliveEntries = 0;
+        for(int i=0; i<sliceCount; i++) {
+            final WeakReference<ByteBuffer> ref = slices2GC[i];
+            if( null != ref ) {
+                _slices2GCEntries++;
+                if( null != ref.get() ) {
+                    _slices2GCAliveEntries++;
+                }
+            }
+        }
         long fcSz = 0, pos = 0, rem = 0;
         try {
             fcSz = fc.size();
@@ -187,7 +208,9 @@ public class MappedByteBufferInputStream extends InputStream {
         out.println(prefix+" refCount "+refCount+", fcSize "+fcSz+", totalSize "+totalSize);
         out.println(prefix+" position "+pos+", remaining "+rem);
         out.println(prefix+" mmode "+mmode+", cmode "+cmode+", fileResizeOp "+fileResizeOp);
-        out.println(prefix+" slice "+sliceIdx+" / "+sliceCount+" ("+sliceCount2+")");
+        out.println(prefix+" slice "+sliceIdx+" / "+sliceCount+" ("+sliceCount2+"), synchronous "+synchronous);
+        out.println(prefix+"   mapped   "+slicesEntries+" / "+_slicesEntries);
+        out.println(prefix+"   GC-queue "+slices2GCEntries+" / "+_slices2GCEntries+" (alive "+_slices2GCAliveEntries+")");
         out.println(prefix+" sliceShift "+sliceShift+" -> "+(1L << sliceShift));
     }
 
@@ -203,7 +226,7 @@ public class MappedByteBufferInputStream extends InputStream {
         // trigger notifyLengthChange
         this.totalSize = -1;
         this.sliceCount = 0;
-        notifyLengthChange(totalSize);
+        notifyLengthChange( totalSize );
 
         this.refCount = 1;
         this.cleanerInit = false;
@@ -263,6 +286,28 @@ public class MappedByteBufferInputStream extends InputStream {
         this(fileChannel, FileChannel.MapMode.READ_ONLY, CacheMode.FLUSH_PRE_SOFT, DEFAULT_SLICE_SHIFT);
     }
 
+    /**
+     * Enable or disable synchronous mode.
+     * <p>
+     * If synchronous mode is enabled, mapped buffers will be {@link #flush(boolean) flushed}
+     * if {@link #notifyLengthChange(long) resized}, <i>written to</i> or {@link #close() closing}  in {@link FileChannel.MapMode#READ_WRITE read-write} mapping mode.
+     * </p>
+     * <p>
+     * If synchronous mode is enabled, {@link FileChannel#force(boolean)} is issued
+     * if {@link #setLength(long) resizing} or {@link #close() closing} and not in {@link FileChannel.MapMode#READ_ONLY read-only} mapping mode.
+     * </p>
+     * @param s {@code true} to enable synchronous mode
+     */
+    public final synchronized void setSynchronous(final boolean s) {
+        synchronous = s;
+    }
+    /**
+     * Return {@link #setSynchronous(boolean) synchronous mode}.
+     */
+    public final synchronized boolean getSynchronous() {
+        return synchronous ;
+    }
+
     final synchronized void checkOpen() throws IOException {
         if( 0 == refCount ) {
             throw new IOException("stream closed");
@@ -274,16 +319,15 @@ public class MappedByteBufferInputStream extends InputStream {
         if( 0 < refCount ) {
             refCount--;
             if( 0 == refCount ) {
-                for(int i=0; i<sliceCount; i++) {
-                    cleanSlice(i);
-                }
-                if( mmode != FileChannel.MapMode.READ_ONLY ) {
-                    fc.force(true);
+                try {
+                    cleanAllSlices( true /* syncBuffer */ );
+                } finally {
+                    flushImpl(true /* metaData */, false /* syncBuffer */);
+                    fc.close();
+                    mark = -1;
+                    sliceIdx = -1;
+                    super.close();
                 }
-                fc.close();
-                mark = -1;
-                sliceIdx = -1;
-                super.close();
             }
         }
     }
@@ -307,15 +351,35 @@ public class MappedByteBufferInputStream extends InputStream {
      * <p>
      * User must have a {@link FileResizeOp} {@link #setFileResizeOp(FileResizeOp) registered} before.
      * </p>
+     * <p>
+     * Implementation calls {@link #notifyLengthChange(long)} after {@link FileResizeOp#setLength(long)}.
+     * </p>
      * @param newTotalSize the new total size
      * @throws IOException if no {@link FileResizeOp} has been {@link #setFileResizeOp(FileResizeOp) registered}
      *                     or if a buffer slice operation failed
      */
     public final synchronized void setLength(final long newTotalSize) throws IOException {
+        final long currentPosition;
+        if( 0 != newTotalSize &&  totalSize != newTotalSize ) {
+            currentPosition = position();
+        } else {
+            currentPosition = -1L;
+        }
         if( fc.size() != newTotalSize ) {
+            if( Platform.OSType.WINDOWS == Platform.getOSType() ) {
+                // On Windows, we have to close all mapped slices.
+                // Otherwise we will receive:
+                // java.io.IOException: The requested operation cannot be performed on a file with a user-mapped section open
+                //      at java.io.RandomAccessFile.setLength(Native Method)
+                cleanAllSlices( synchronous );
+            }
             fileResizeOp.setLength(newTotalSize);
+            if( synchronous ) {
+                // buffers will be synchronized in notifyLengthChangeImpl(..)
+                flushImpl( true /* metaData */, false /* syncBuffer */);
+            }
         }
-        notifyLengthChange(newTotalSize);
+        notifyLengthChangeImpl(newTotalSize, currentPosition);
     }
 
     /**
@@ -328,6 +392,9 @@ public class MappedByteBufferInputStream extends InputStream {
      * @throws IOException if a buffer slice operation failed
      */
     public final synchronized void notifyLengthChange(final long newTotalSize) throws IOException {
+        notifyLengthChangeImpl(newTotalSize, -1L);
+    }
+    private final synchronized void notifyLengthChangeImpl(final long newTotalSize, final long currentPosition) throws IOException {
         /* if( DEBUG ) {
             System.err.println("notifyLengthChange.0: "+totalSize+" -> "+newTotalSize);
             dbgDump("notifyLengthChange.0:", System.err);
@@ -337,11 +404,7 @@ public class MappedByteBufferInputStream extends InputStream {
             return;
         } else if( 0 == newTotalSize ) {
             // ZERO - ensure one entry avoiding NULL checks
-            if( null != slices ) {
-                for(int i=0; i<sliceCount; i++) {
-                    cleanSlice(i);
-                }
-            }
+            cleanAllSlices( synchronous );
             @SuppressWarnings("unchecked")
             final WeakReference<ByteBuffer>[] newSlices2GC = new WeakReference[ 1 ];
             slices2GC = newSlices2GC;
@@ -352,7 +415,7 @@ public class MappedByteBufferInputStream extends InputStream {
             mark = -1;
             sliceIdx = 0;
         } else {
-            final long prePosition = position();
+            final long prePosition = 0 <= currentPosition ? currentPosition : position();
 
             final long sliceSize = 1L << sliceShift;
             final int newSliceCount = (int)( ( newTotalSize + ( sliceSize - 1 ) ) / sliceSize );
@@ -360,11 +423,13 @@ public class MappedByteBufferInputStream extends InputStream {
             final WeakReference<ByteBuffer>[] newSlices2GC = new WeakReference[ newSliceCount ];
             final ByteBuffer[] newSlices = new ByteBuffer[ newSliceCount ];
             final int copySliceCount = Math.min(newSliceCount, sliceCount-1); // drop last (resize)
-            if( 0 < copySliceCount ) {
-                System.arraycopy(slices2GC, 0, newSlices2GC, 0, copySliceCount);
-                System.arraycopy(slices,    0, newSlices,    0, copySliceCount);
+            if( 0 <= copySliceCount ) {
+                if( 0 < copySliceCount ) {
+                    System.arraycopy(slices2GC, 0, newSlices2GC, 0, copySliceCount);
+                    System.arraycopy(slices,    0, newSlices,    0, copySliceCount);
+                }
                 for(int i=copySliceCount; i<sliceCount; i++) { // clip shrunken slices + 1 (last), incl. slices2GC!
-                    cleanSlice(i);
+                    cleanSlice(i, synchronous);
                 }
             }
             slices2GC = newSlices2GC;
@@ -374,7 +439,7 @@ public class MappedByteBufferInputStream extends InputStream {
             if( newTotalSize < mark ) {
                 mark = -1;
             }
-            positionImpl( Math.min(prePosition, newTotalSize) ); // -> clipped position (set currSlice and re-map/-pos buffer)
+            position2( Math.min(prePosition, newTotalSize) ); // -> clipped position (set currSlice and re-map/-pos buffer)
         }
         /* if( DEBUG ) {
             System.err.println("notifyLengthChange.X: "+slices[currSlice]);
@@ -383,16 +448,34 @@ public class MappedByteBufferInputStream extends InputStream {
     }
 
     /**
-     *
+     * Similar to {@link OutputStream#flush()}, synchronizes all mapped buffers
+     * from local storage via {@link MappedByteBuffer#force()}
+     * as well as the {@link FileChannel#force(boolean)} w/o {@code metaData}.
+     * @param metaData TODO
      * @throws IOException if this stream has been {@link #close() closed}.
      */
-    public final synchronized void flush() throws IOException {
+    public final synchronized void flush(final boolean metaData) throws IOException {
         checkOpen();
-        if( mmode != FileChannel.MapMode.READ_ONLY ) {
-            fc.force(true);
+        flushImpl(metaData, true);
+    }
+    private final synchronized void flushImpl(final boolean metaData, final boolean syncBuffer) throws IOException {
+        if( FileChannel.MapMode.READ_ONLY != mmode ) {
+            if( syncBuffer && FileChannel.MapMode.READ_WRITE == mmode ) {
+                for(int i=0; i<sliceCount; i++) {
+                    syncSlice(slices[i], true);
+                }
+                for(int i=0; i<sliceCount; i++) {
+                    final WeakReference<ByteBuffer> ref = slices2GC[i];
+                    if( null != ref ) {
+                        syncSlice(ref.get(), true);
+                    }
+                }
+            }
+            fc.force(metaData);
         }
     }
 
+
     /**
      * Returns a new MappedByteBufferOutputStream instance sharing
      * all resources of this input stream, including all buffer slices.
@@ -422,23 +505,28 @@ public class MappedByteBufferInputStream extends InputStream {
      * @throws IOException if a buffer slice operation failed.
      */
     public final synchronized ByteBuffer currentSlice() throws IOException {
-        if ( null != slices[sliceIdx] ) {
-            return slices[sliceIdx];
+        final ByteBuffer s0 = slices[sliceIdx];
+        if ( null != s0 ) {
+            return s0;
         } else {
             if( CacheMode.FLUSH_PRE_SOFT == cmode ) {
                 final WeakReference<ByteBuffer> ref = slices2GC[sliceIdx];
                 if( null != ref ) {
                     final ByteBuffer mbb = ref.get();
                     slices2GC[sliceIdx] = null;
+                    slices2GCEntries--;
                     if( null != mbb ) {
                         slices[sliceIdx] = mbb;
+                        slicesEntries++;
                         return mbb;
                     }
                 }
             }
             final long pos = (long)sliceIdx << sliceShift;
-            slices[sliceIdx] = fc.map(mmode, pos, Math.min(1L << sliceShift, totalSize - pos));
-            return slices[sliceIdx];
+            final MappedByteBuffer s1 = fc.map(mmode, pos, Math.min(1L << sliceShift, totalSize - pos));
+            slices[sliceIdx] = s1;
+            slicesEntries++;
+            return s1;
         }
     }
 
@@ -453,9 +541,7 @@ public class MappedByteBufferInputStream extends InputStream {
      */
     public final synchronized ByteBuffer nextSlice() throws IOException {
         if ( sliceIdx < sliceCount - 1 ) {
-            if( CacheMode.FLUSH_NONE != cmode ) {
-                flushSlice(sliceIdx);
-            }
+            flushSlice(sliceIdx, synchronous);
             sliceIdx++;
             final ByteBuffer slice = currentSlice();
             slice.position( 0 );
@@ -465,46 +551,109 @@ public class MappedByteBufferInputStream extends InputStream {
         }
     }
 
-    private synchronized void flushSlice(final int i) throws IOException {
+    synchronized void syncSlice(final ByteBuffer s) throws IOException {
+        syncSlice(s, synchronous);
+    }
+    synchronized void syncSlice(final ByteBuffer s, final boolean syncBuffer) throws IOException {
+        if( syncBuffer && null != s && FileChannel.MapMode.READ_WRITE == mmode ) {
+            try {
+                ((MappedByteBuffer)s).force();
+            } catch( final Throwable t ) {
+                // On Windows .. this may happen, like:
+                // java.io.IOException: The process cannot access the file because another process has locked a portion of the file
+                //   at java.nio.MappedByteBuffer.force0(Native Method)
+                //   at java.nio.MappedByteBuffer.force(MappedByteBuffer.java:203)
+                if( DEBUG ) {
+                    System.err.println("Caught "+t.getMessage());
+                    t.printStackTrace();
+                }
+            }
+        }
+    }
+    private synchronized void flushSlice(final int i, final boolean syncBuffer) throws IOException {
         final ByteBuffer s = slices[i];
         if ( null != s ) {
-            slices[i] = null; // GC a slice is enough
-            if( CacheMode.FLUSH_PRE_HARD == cmode ) {
-                if( !cleanBuffer(s) ) {
-                    cmode = CacheMode.FLUSH_PRE_SOFT;
+            if( CacheMode.FLUSH_NONE != cmode ) {
+                slices[i] = null; // trigger slice GC
+                slicesEntries--;
+                if( CacheMode.FLUSH_PRE_HARD == cmode ) {
+                    if( !cleanBuffer(s, syncBuffer) ) {
+                        // buffer already synced in cleanBuffer(..) if requested
+                        slices2GC[i] = new WeakReference<ByteBuffer>(s);
+                        slices2GCEntries++;
+                    }
+                } else {
+                    syncSlice(s, syncBuffer);
                     slices2GC[i] = new WeakReference<ByteBuffer>(s);
+                    slices2GCEntries++;
                 }
             } else {
-                slices2GC[i] = new WeakReference<ByteBuffer>(s);
+                syncSlice(s, syncBuffer);
             }
         }
     }
-    private synchronized void cleanSlice(final int i) {
-        final ByteBuffer s = slices[i];
-        if( null != s ) {
+    private synchronized void cleanAllSlices(final boolean syncBuffers) throws IOException {
+        if( null != slices ) {
+            for(int i=0; i<sliceCount; i++) {
+                cleanSlice(i, syncBuffers);
+            }
+            if( 0 != slicesEntries || 0 != slices2GCEntries ) { // FIXME
+                final String err = "mappedSliceCount "+slicesEntries+", slices2GCEntries "+slices2GCEntries;
+                dbgDump(err+": ", System.err);
+                throw new InternalError(err);
+            }
+        }
+    }
+
+    private synchronized void cleanSlice(final int i, final boolean syncBuffer) throws IOException {
+        final ByteBuffer s1 = slices[i];
+        final ByteBuffer s2;
+        {
+            final WeakReference<ByteBuffer> ref = slices2GC[i];
+            slices2GC[i] = null;
+            if( null != ref ) {
+                slices2GCEntries--;
+                s2 = ref.get();
+            } else {
+                s2 = null;
+            }
+        }
+        if( null != s1 ) {
             slices[i] = null;
-            cleanBuffer(s);
+            slicesEntries--;
+            cleanBuffer(s1, syncBuffer);
+            if( null != s2 ) {
+                throw new InternalError("XXX");
+            }
+        } else if( null != s2 ) {
+            cleanBuffer(s2, syncBuffer);
         }
-        slices2GC[i] = null;
     }
-    private synchronized boolean cleanBuffer(final ByteBuffer mbb) {
+    private synchronized boolean cleanBuffer(final ByteBuffer mbb, final boolean syncBuffer) throws IOException {
         if( !cleanerInit ) {
             initCleaner(mbb);
         }
-        if ( !hasCleaner || !mbb.isDirect() ) {
+        syncSlice(mbb, syncBuffer);
+        if( !mbb.isDirect() ) {
             return false;
         }
-        try {
-            cClean.invoke(mbbCleaner.invoke(mbb));
-            return true;
-        } catch(final Throwable t) {
-            hasCleaner = false;
-            if( DEBUG ) {
-                System.err.println("Caught "+t.getMessage());
-                t.printStackTrace();
+        boolean res = false;
+        if ( hasCleaner ) {
+            try {
+                cClean.invoke(mbbCleaner.invoke(mbb));
+                res = true;
+            } catch(final Throwable t) {
+                hasCleaner = false;
+                if( DEBUG ) {
+                    System.err.println("Caught "+t.getMessage());
+                    t.printStackTrace();
+                }
             }
-            return false;
         }
+        if( !res && CacheMode.FLUSH_PRE_HARD == cmode ) {
+            cmode = CacheMode.FLUSH_PRE_SOFT;
+        }
+        return res;
     }
     private synchronized void initCleaner(final ByteBuffer bb) {
         final Method[] _mbbCleaner = { null };
@@ -618,13 +767,25 @@ public class MappedByteBufferInputStream extends InputStream {
             throw new IllegalArgumentException("new position "+newPosition+" not within [0.."+totalSize+"]");
         }
         final int preSlice = sliceIdx;
-        positionImpl( newPosition );
-        if( CacheMode.FLUSH_NONE != cmode && preSlice != sliceIdx) {
-            flushSlice(preSlice);
+
+        if ( totalSize == newPosition ) {
+            // EOF, pos == maxPos + 1
+            sliceIdx = Math.max(0, sliceCount - 1); // handle zero size
+            if( preSlice != sliceIdx ) {
+                flushSlice(preSlice, synchronous);
+            }
+            final ByteBuffer s = currentSlice();
+            s.position( s.capacity() );
+        } else {
+            sliceIdx = (int)( newPosition >>> sliceShift );
+            if( preSlice != sliceIdx ) {
+                flushSlice(preSlice, synchronous);
+            }
+            currentSlice().position( (int)( newPosition - ( (long)sliceIdx << sliceShift ) ) );
         }
         return this;
     }
-    private final synchronized void positionImpl( final long newPosition ) throws IOException {
+    private final synchronized void position2( final long newPosition ) throws IOException {
         if ( totalSize == newPosition ) {
             // EOF, pos == maxPos + 1
             sliceIdx = Math.max(0, sliceCount - 1); // handle zero size
diff --git a/src/java/com/jogamp/common/nio/MappedByteBufferOutputStream.java b/src/java/com/jogamp/common/nio/MappedByteBufferOutputStream.java
index f84e6c2..8adf0e4 100644
--- a/src/java/com/jogamp/common/nio/MappedByteBufferOutputStream.java
+++ b/src/java/com/jogamp/common/nio/MappedByteBufferOutputStream.java
@@ -78,6 +78,19 @@ public class MappedByteBufferOutputStream extends OutputStream {
         this(new MappedByteBufferInputStream(fileChannel, mmode, cmode, sliceShift, fileChannel.size(), 0), fileResizeOp);
     }
 
+    /**
+     * See {@link MappedByteBufferInputStream#setSynchronous(boolean)}.
+     */
+    public final synchronized void setSynchronous(final boolean s) {
+        parent.setSynchronous(s);
+    }
+    /**
+     * See {@link MappedByteBufferInputStream#getSynchronous()}.
+     */
+    public final synchronized boolean getSynchronous() {
+        return parent.getSynchronous();
+    }
+
     /**
      * See {@link MappedByteBufferInputStream#setLength(long)}.
      */
@@ -129,7 +142,15 @@ public class MappedByteBufferOutputStream extends OutputStream {
 
     @Override
     public final synchronized void flush() throws IOException {
-        parent.flush();
+        parent.flush( true /* metaData */);
+    }
+
+    /**
+     * See {@link MappedByteBufferInputStream#flush(boolean)}.
+     */
+    // @Override
+    public final synchronized void flush(final boolean metaData) throws IOException {
+        parent.flush(metaData);
     }
 
     @Override
@@ -156,6 +177,11 @@ public class MappedByteBufferOutputStream extends OutputStream {
             }
         }
         slice.put( (byte)(b & 0xFF) );
+
+        // sync last buffer (happens only in synchronous mode)
+        if( null != slice ) {
+            parent.syncSlice(slice);
+        }
     }
 
     @Override
@@ -178,8 +204,9 @@ public class MappedByteBufferOutputStream extends OutputStream {
             parent.setLength( parent.length() + len - totalRem );
         }
         int written = 0;
+        ByteBuffer slice = null;
         while( written < len ) {
-            ByteBuffer slice = parent.currentSlice();
+            slice = parent.currentSlice();
             int currRem = slice.remaining();
             if ( 0 == currRem ) {
                 if ( null == ( slice = parent.nextSlice() ) ) {
@@ -197,6 +224,10 @@ public class MappedByteBufferOutputStream extends OutputStream {
             slice.put( b, off + written, currLen );
             written += currLen;
         }
+        // sync last buffer (happens only in synchronous mode)
+        if( null != slice ) {
+            parent.syncSlice(slice);
+        }
     }
 
     /**
@@ -221,8 +252,9 @@ public class MappedByteBufferOutputStream extends OutputStream {
             parent.setLength( parent.length() + len - totalRem );
         }
         int written = 0;
+        ByteBuffer slice = null;
         while( written < len ) {
-            ByteBuffer slice = parent.currentSlice();
+            slice = parent.currentSlice();
             int currRem = slice.remaining();
             if ( 0 == currRem ) {
                 if ( null == ( slice = parent.nextSlice() ) ) {
@@ -257,6 +289,10 @@ public class MappedByteBufferOutputStream extends OutputStream {
             }
             written += currLen;
         }
+        // sync last buffer (happens only in synchronous mode)
+        if( null != slice ) {
+            parent.syncSlice(slice);
+        }
     }
 
     /**
@@ -285,8 +321,9 @@ public class MappedByteBufferOutputStream extends OutputStream {
             parent.setLength( parent.length() + len - totalRem );
         }
         long written = 0;
+        ByteBuffer slice = null;
         while( written < len ) {
-            ByteBuffer slice = parent.currentSlice();
+            slice = parent.currentSlice();
             int currRem = slice.remaining();
             if ( 0 == currRem ) {
                 if ( null == ( slice = parent.nextSlice() ) ) {
@@ -306,5 +343,9 @@ public class MappedByteBufferOutputStream extends OutputStream {
             }
             written += currLen;
         }
+        // sync last buffer (happens only in synchronous mode)
+        if( null != slice ) {
+            parent.syncSlice(slice);
+        }
     }
 }
-- 
cgit v1.2.3