-
Notifications
You must be signed in to change notification settings - Fork 863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update unmarshallers to use system text json. #3553
base: petesong/stj
Are you sure you want to change the base?
Conversation
c01a7d9
to
ebc898a
Compare
this looks fantastic, very excited for this change |
{ | ||
bytesRead = stream.Read(buffer, 0, buffer.Length); | ||
} | ||
if (bytesRead == 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the method return here, I worry that the follow line sets the reader using the buffer
that is being returned to the pool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, yeah good suggestion the reader would be using a buffer returned to the pool in that scenario. If I return there though, it will return to the Read()
method and the _reader
won't have a buffer to work with, and I don't want to set buffer to null. I think instead I'll just refactor a bit and add something like the following in the Read()
method
if (!hasMoreData)
{
GetMoreBytesFromStream(_stream, ref _buffer, ref _reader);
hasMoreData = _reader.Read();
if (_reader.IsFinalBlock)
ArrayPool<byte>.Shared.Return(_buffer);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while fixing a bug in the streamingutf8 reader I decided to remove the pooling due to multiple resizings of the array. I put a justification in a comment, but basically the byte array returned by Array.Resize
isn't owned by the pool so keeping track of that didn't seem worth the Renting + copying I would have to do in order to do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flipped back to pooling
@@ -63,54 +63,58 @@ public IRequest Marshall(AddPermissionRequest publicRequest) | |||
request.HttpMethod = "POST"; | |||
|
|||
request.ResourcePath = "/"; | |||
using (MemoryStream memoryStream = new MemoryStream()) | |||
#if NETCOREAPP3_1_OR_GREATER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be stretching the scope of this change too much, but would it be possible to make the source of MemoryStream
/IBufferWriter<T>
pluggable.
I'd really love to add my own pipeline middleware that allows for injecting a RecyclableMemoryStream (which implements both MemoryStream
and IBufferWriter<T>
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My interpretation of this is we would have a property on AWSConfigs
called StreamFactory
of type IStreamFactory
defined as:
public interface IStreamFactory
{
Stream GetStream();
}
For .NET Core 3.1 and above we would default this to an instance of ArrayBufferWriterFactory:
internal class ArrayBufferWriterFactory : IStreamFactory
{
public Stream GetStream()
{
// TODO: Do something like this class to wrap the ArrayBufferWriter as a Stream. https://github.com/CommunityToolkit/dotnet/blob/c23e1cc6e918f86c589facf98f70ca122495c385/src/CommunityToolkit.HighPerformance/Streams/IBufferWriterStream%7BTWriter%7D.cs#L18
return new ArrayBufferWriter<byte>();
}
}
For older targets we would use a MemoryStream based factory:
internal class MemoryStreamFactory : IStreamFactory
{
public Stream GetStream()
{
return new MemoryStream();
}
}
In our unmarshallers where we need a stream we could get rid of the conditional check and just call AWSConfigs.StreamFactory.GetManager()
.
Then @slang25 can take a dependency on Microsoft.IO.RecyclableMemoryStream
and right his own factory and assign it to AWSConfigs.StreamFactory
.
internal class RecyclableMemoryStreamFactory : IStreamFactory
{
private static readonly RecyclableMemoryStreamManager manager = new RecyclableMemoryStreamManager();
public Stream GetStream()
{
return manager.GetStream();
}
}
Does that capture your thoughts @slang25 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, I just took a look at the class linked that does something similar, which allows the wrapper over IBufferWriter and at first glance it doesn't seem too bad. I'm a bit worried about overriding all those methods defined in the base Stream
class though... i wonder how the bufferwriter interacts with those methods and how that then interacts with the JsonWriter
if at all, maybe I'm overthinking it. Is it as performant / behave the same? Will hold off until I hear back though on if this is what you were thinking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I'm looking for is a way to get the best perf out of the AWS SDK, so having the ability to pool these buffers that are going to copy all the bytes into memory is going to reduce memory allocations significantly.
My understanding is that IBufferWriter<T>
is the fast modern to do synchronous buffered writing, vs Stream
which is more ubiquitous in the .NET world today.
That said, I think a Stream GetStream()
signature would work, but you could additional do a type check to see if it implements a IBufferWriter<T>
, and then use the appropriate overloads. That makes it convenient for use with Streams, and has a fast path for things like RecyclableMemoryStreamManager
. To be honest, this isn't a deal breaker, I haven't ran benchmarks but I'm guessing the gains are not tremendous.
The problem with a static IStreamFactory
is that I want to be able to pool the buffers and therefore want a way to return them. So the AWSConfigs
static property would need an interface to support that for it to achieve what I'm looking for.
I was thinking about having a PipelineHandler
that could be added into the pipeline, and setting the stream on some sort of context object that passes through. That way it could own the creation/pooling of buffers, and returning them. However a static property would be more ergonomic.
231beda
to
fe0da63
Compare
…mo DB Utils refactor to enable building test project
ae62725
to
12fb049
Compare
|
||
/// <summary> | ||
/// Key for the UseSdkCache property. | ||
/// <seealso cref="Amazon.AWSConfigs.UseSdkCache"/> | ||
/// Configures the default buffer size for the <see cref="Amazon.Runtime.Internal.Util.StreamingUtf8JsonReader"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this cref is pointing to an internal undocumented class I would remove "for the <see cref="Amazon.Runtime.Internal.Util.StreamingUtf8JsonReader"/>".
|
||
namespace Amazon.Runtime.Documents.Internal.Transform | ||
{ | ||
/// <summary> | ||
/// Dedicated <see cref="IUnmarshaller{T,R}"/> for <see cref="Document"/>. | ||
/// Dedicated <see cref="IXmlUnmarshaller{T,R}"/> for <see cref="Document"/>. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be IJsonUnmarshaller
instead of IXmlUnmarshaller
?
@@ -46,7 +47,7 @@ public static XmlUnmarshallerContext ConvertMessageToXmlContext(IEventStreamMess | |||
/// Converts an <seealso cref="EventStreamMessage"/> to a <seealso cref="JsonUnmarshallerContext"/> | |||
/// </summary> | |||
/// <param name="message">The event stream message</param> | |||
/// <returns>The JsonUnmarshallerContext</returns> | |||
/// <returns>The JsonUnmarshallerContext and Utf8JsonReader</returns> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is only one type returned, why was the "and Utf8JsonReader" added?
@@ -21,6 +21,7 @@ | |||
using System.IO; | |||
using System.Xml; | |||
using ThirdParty.Ionic.Zlib; | |||
using System.Text.Json; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is weird that JsonUnmarshallerContext
is in a separate file and all of the other UnmarshallerContext subclasses are in this file. Can we make it consistent and put all of the other types besides the base class UnmarshallerContext
in a separate file?
/// <param name="targetDepth">Tokens are read at depth greater than or equal to target depth.</param> | ||
/// <param name="reader">The Utf8JsonReader used to read the document</param> | ||
/// <returns>True if a token was read and current depth is greater than or equal to target depth.</returns> | ||
public bool ReadAtDepth(int targetDepth, ref StreamingUtf8JsonReader reader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be on the base class or can it just be on JsonUnmarshallerContext
?
|
||
if (bytesRead == 0) | ||
{ | ||
ArrayPool<byte>.Shared.Return(buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side question but if we have an Exception while unmarshalling do we need a mechanism to make sure the buffer is returned?
I can't remember what happens with ArrayPool
if the shared array objects are never returned. My guess is the non returned buffer is garbage collector and ArrayPool
would eventually create a new buffer to match the demand. Does that sound right to you?
If that assumption is true then maybe it is okay to let the buffers get lost in exceptions unless it is fairly easy to make sure the buffer is returned. You could I guess have a finalizer for the class and return the buffer if it hasn't already been returned.
[TestMethod] | ||
public void HandlesUtf8BOM() | ||
{ | ||
// we can't use reflection to access the private fields of StreamingUtf8JsonReader since it is a ref struct so we have to test it this way. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could make the methods internal
in StreamingUtf7JsonReaderand then since the unit test project has access via the
InternalsVisibleToAttributetest the handle method directly. Following this pattern you could also have tests directly on
FillBufferand
GetMoreBytesFromStream`,
// here we're creating a json string that is greater than the default buffer size to test the GetMoreBytesFromStream logic | ||
var sb = new StringBuilder(); | ||
sb.Append("{ \"key\": \""); | ||
sb.Append(new string('x', 7500)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case we increase the default size but don't change the value here you might want to reference the AWSConfigs
value plus some size bigger.
/// This class just tests the wrapper class StreamingUtf8JsonReader. | ||
/// </summary> | ||
[TestClass] | ||
public class StreamingUtf8JsonReaderTests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a test for the scenario where the PassReaderByRef
is used for converting the JsonDocument and the the document doesn't fit in the buffer.
string largeJson = sb.ToString(); | ||
|
||
byte[] payload = Encoding.UTF8.GetBytes(largeJson); | ||
using (var stream = new MemoryStream(payload)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To help verification you might want to add an internal property on the StreamingUtf8JsonReader
for the current length of the buffer. Then you can check before and after that the buffer size increased.
Description
Updates the unmarshallers to use System Text Json for deserialization.
918fa40 - Core and generator changes along with refactoring of DynamoDB Utils class to get test project to compile
3f84a2c - manual S3 changes for unmarshallers
12fb049 - generated changes for SQS as an example
53984c3 - fixing unnecessary padding
Some things I want to point out.
Utf8JsonReader
cannot accept a stream in its constructor, I created a wrapper aroundUtf8JsonReader
that accepts a stream and fills the buffer to pass into the reader. Since the wrapper is also a ref struct it can holdUtf8JsonReader
as a field.IUnmarshaller
=>IXmlUnmarshaller
andIJsonUnmarshaller
. This gives the added benefit that each structure unmarshaller no longer throwsNotImplementedException
for the other protocol.This is necessary for when we add new protocols down the line. We don't want each structure unmarshaller to implement all protocol interfaces just to throw
NotImplemented
for the others.Motivation and Context
Testing
I tested a couple real API calls. I added unit tests for the streamingUtf8JsonReader, and all the protocol tests pass.
Full dry run will be done in the feature branch.
Screenshots (if appropriate)
Overall, I saw around a 77% performance gain in unmarshalling (note this is on my machine not an EC2 instance). I will attach specific numbers from an ec2 instance as a follow up.
Types of changes
Checklist
License