Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] experiment - Use SemaphoreSlim to prevent concurrent basestream access #619

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 64 additions & 28 deletions src/ICSharpCode.SharpZipLib/Zip/ZipFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
using System.IO;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ICSharpCode.SharpZipLib.Zip
{
Expand Down Expand Up @@ -1079,8 +1081,10 @@ private enum HeaderTest
/// <returns>The offset of the entries data in the file</returns>
private long TestLocalHeader(ZipEntry entry, HeaderTest tests)
{
lock (baseStream_)
try
{
this.baseStreamSemaphore.Wait();

bool testHeader = (tests & HeaderTest.Header) != 0;
bool testData = (tests & HeaderTest.Extract) != 0;

Expand Down Expand Up @@ -1350,6 +1354,10 @@ private long TestLocalHeader(ZipEntry entry, HeaderTest tests)
int extraLength = storedNameLength + extraDataLength;
return offsetOfFirstEntry + entry.Offset + ZipConstants.LocalHeaderBaseSize + extraLength;
}
finally
{
this.baseStreamSemaphore.Release();
}
}

#endregion Archive Testing
Expand Down Expand Up @@ -3349,11 +3357,21 @@ private void DisposeInternal(bool disposing)
isDisposed_ = true;
entries_ = Empty.Array<ZipEntry>();

if (IsStreamOwner && (baseStream_ != null))
if (disposing)
{
lock (baseStream_)
try
{
baseStream_.Dispose();
this.baseStreamSemaphore.Wait();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this the other day and wasn't sure about it, and it still feels off - if the idea of the existing lock is to prevent the stream being disposed while being read (rather than 'just' being disposed in between reads) then that needs to be maintained, but then you might also need to avoid disposing the semaphore while another thread is using it, which just makes things more complicated.

Still not sure if the lock is doing anything other than trying to save users from their own mistakes though (preventing an explicit attempt to dispose the archive while still using it) :-(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not even sure that that is something we should try to manage. Tried looking up when the locking was added, but it lead me to this wonderful commit with 124,411 additions and 10,927 deletions... Great.


if (IsStreamOwner)
{
baseStream_?.Dispose();
}
}
finally
{
this.baseStreamSemaphore.Release();
this.baseStreamSemaphore.Dispose();
}
}

Expand Down Expand Up @@ -3787,6 +3805,7 @@ private static void WriteEncryptionHeader(Stream stream, long crcValue)
private ZipEntry[] entries_;
private byte[] key;
private bool isNewArchive_;
private readonly SemaphoreSlim baseStreamSemaphore = new SemaphoreSlim(1, 1);

// Default is dynamic which is not backwards compatible and can cause problems
// with XP's built in compression which cant read Zip64 archives.
Expand Down Expand Up @@ -4156,16 +4175,14 @@ public PartialInputStream(ZipFile zipFile, long start, long length)
// uses reader here....
zipFile_ = zipFile;
baseStream_ = zipFile_.baseStream_;
this.semaphore = zipFile.baseStreamSemaphore;
readPos_ = start;
end_ = start + length;
}

#endregion Constructors

/// <summary>
/// Read a byte from this stream.
/// </summary>
/// <returns>Returns the byte read or -1 on end of stream.</returns>
/// <inheritdoc/>
public override int ReadByte()
{
if (readPos_ >= end_)
Expand All @@ -4174,32 +4191,40 @@ public override int ReadByte()
return -1;
}

lock (baseStream_)
try
{
this.semaphore.Wait();

baseStream_.Seek(readPos_++, SeekOrigin.Begin);
return baseStream_.ReadByte();
}
finally
{
this.semaphore.Release();
}
}

/// <summary>
/// Reads a sequence of bytes from the current stream and advances the position within the stream by the number of bytes read.
/// </summary>
/// <param name="buffer">An array of bytes. When this method returns, the buffer contains the specified byte array with the values between offset and (offset + count - 1) replaced by the bytes read from the current source.</param>
/// <param name="offset">The zero-based byte offset in buffer at which to begin storing the data read from the current stream.</param>
/// <param name="count">The maximum number of bytes to be read from the current stream.</param>
/// <returns>
/// The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero (0) if the end of the stream has been reached.
/// </returns>
/// <exception cref="System.ArgumentException">The sum of offset and count is larger than the buffer length. </exception>
/// <exception cref="System.ObjectDisposedException">Methods were called after the stream was closed. </exception>
/// <exception cref="System.NotSupportedException">The stream does not support reading. </exception>
/// <exception cref="System.ArgumentNullException">buffer is null. </exception>
/// <exception cref="System.IO.IOException">An I/O error occurs. </exception>
/// <exception cref="System.ArgumentOutOfRangeException">offset or count is negative. </exception>
/// <inheritdoc/>
public override int Read(byte[] buffer, int offset, int count)
{
lock (baseStream_)
return ReadAsyncCore(buffer, offset, count, false, default).GetAwaiter().GetResult();
}

/// <inheritdoc/>
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return ReadAsyncCore(buffer, offset, count, true, cancellationToken);
}

private async Task<int> ReadAsyncCore(byte[] buffer, int offset, int count, bool useAsync, CancellationToken cancellationToken)
{
try
{
if (useAsync)
await this.semaphore.WaitAsync(cancellationToken);
else
this.semaphore.Wait();

if (count > end_ - readPos_)
{
count = (int)(end_ - readPos_);
Expand All @@ -4208,19 +4233,29 @@ public override int Read(byte[] buffer, int offset, int count)
return 0;
}
}

// Protect against Stream implementations that throw away their buffer on every Seek
// (for example, Mono FileStream)
if (baseStream_.Position != readPos_)
{
baseStream_.Seek(readPos_, SeekOrigin.Begin);
}
int readCount = baseStream_.Read(buffer, offset, count);

var readCount = useAsync ?
await baseStream_.ReadAsync(buffer, offset, count, cancellationToken) :
baseStream_.Read(buffer, offset, count);

if (readCount > 0)
{
readPos_ += readCount;
}

return readCount;
}
finally
{
this.semaphore.Release();
}
}

/// <summary>
Expand Down Expand Up @@ -4386,12 +4421,13 @@ public override bool CanTimeout

#region Instance Fields

private ZipFile zipFile_;
private Stream baseStream_;
private readonly ZipFile zipFile_;
private readonly Stream baseStream_;
private readonly long start_;
private readonly long length_;
private long readPos_;
private readonly long end_;
private readonly SemaphoreSlim semaphore;

#endregion Instance Fields
}
Expand Down