Skip to content

Commit

Permalink
Introduce fast path for buffered ReadableStream (#13704)
Browse files Browse the repository at this point in the history
Co-authored-by: Jarred-Sumner <[email protected]>
  • Loading branch information
Jarred-Sumner and Jarred-Sumner authored Sep 4, 2024
1 parent f3da37e commit a9cf463
Show file tree
Hide file tree
Showing 21 changed files with 1,350 additions and 534 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ jobs:
name: Run format
uses: ./.github/workflows/run-format.yml
secrets: inherit
permissions:
contents: write
with:
zig-version: 0.13.0
80 changes: 47 additions & 33 deletions src/bun.js/api/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1869,6 +1869,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
blob: JSC.WebCore.AnyBlob = JSC.WebCore.AnyBlob{ .Blob = .{} },

sendfile: SendfileContext = undefined,

request_body_readable_stream_ref: JSC.WebCore.ReadableStream.Strong = .{},
request_body: ?*JSC.BodyValueRef = null,
request_body_buf: std.ArrayListUnmanaged(u8) = .{},
request_body_content_len: usize = 0,
Expand Down Expand Up @@ -2411,6 +2413,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.response_jsvalue = JSC.JSValue.zero;
}

this.request_body_readable_stream_ref.deinit();

if (this.request_weakref.get()) |request| {
request.request_context = AnyRequestContext.Null;
this.request_weakref.deinit();
Expand Down Expand Up @@ -3852,43 +3856,46 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
const vm = this.server.?.vm;
const globalThis = this.server.?.globalThis;

// After the user does request.body,
// if they then do .text(), .arrayBuffer(), etc
// we can no longer hold the strong reference from the body value ref.
if (this.request_body_readable_stream_ref.get()) |readable| {
assert(this.request_body_buf.items.len == 0);
vm.eventLoop().enter();
defer vm.eventLoop().exit();

if (!last) {
readable.ptr.Bytes.onData(
.{
.temporary = bun.ByteList.initConst(chunk),
},
bun.default_allocator,
);
} else {
var strong = this.request_body_readable_stream_ref;
this.request_body_readable_stream_ref = .{};
defer strong.deinit();
if (this.request_body) |request_body| {
_ = request_body.unref();
this.request_body = null;
}

readable.value.ensureStillAlive();
readable.ptr.Bytes.onData(
.{
.temporary_and_done = bun.ByteList.initConst(chunk),
},
bun.default_allocator,
);
}

return;
}

// This is the start of a task, so it's a good time to drain
if (this.request_body != null) {
var body = this.request_body.?;

if (body.value == .Locked) {
if (body.value.Locked.readable.get()) |readable| {
if (readable.ptr == .Bytes) {
assert(this.request_body_buf.items.len == 0);
vm.eventLoop().enter();
defer vm.eventLoop().exit();

if (!last) {
readable.ptr.Bytes.onData(
.{
.temporary = bun.ByteList.initConst(chunk),
},
bun.default_allocator,
);
} else {
var prev = body.value.Locked.readable;
body.value.Locked.readable = .{};
readable.value.ensureStillAlive();
defer prev.deinit();
readable.value.ensureStillAlive();
readable.ptr.Bytes.onData(
.{
.temporary_and_done = bun.ByteList.initConst(chunk),
},
bun.default_allocator,
);
}

return;
}
}
}

if (last) {
var bytes = &this.request_body_buf;

Expand Down Expand Up @@ -3989,6 +3996,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}

pub fn onRequestBodyReadableStreamAvailable(ptr: *anyopaque, globalThis: *JSC.JSGlobalObject, readable: JSC.WebCore.ReadableStream) void {
var this = bun.cast(*RequestContext, ptr);
bun.debugAssert(this.request_body_readable_stream_ref.held.ref == null);
this.request_body_readable_stream_ref = JSC.WebCore.ReadableStream.Strong.init(readable, globalThis);
}

pub fn onStartBufferingCallback(this: *anyopaque) void {
onStartBuffering(bun.cast(*RequestContext, this));
}
Expand Down Expand Up @@ -6792,6 +6805,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
.global = this.globalThis,
.onStartBuffering = RequestContext.onStartBufferingCallback,
.onStartStreaming = RequestContext.onStartStreamingRequestBodyCallback,
.onReadableStreamAvailable = RequestContext.onRequestBodyReadableStreamAvailable,
},
};
ctx.flags.is_waiting_for_request_body = true;
Expand Down
26 changes: 26 additions & 0 deletions src/bun.js/api/streams.classes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,32 @@ function source(name) {
isClosed: {
getter: "getIsClosedFromJS",
},
...(name !== "File"
? // Buffered versions
// not implemented in File, yet.
{
text: {
fn: "textFromJS",
length: 0,
},
json: {
fn: "jsonFromJS",
length: 0,
},
arrayBuffer: {
fn: "arrayBufferFromJS",
length: 0,
},
blob: {
fn: "blobFromJS",
length: 0,
},
bytes: {
fn: "bytesFromJS",
length: 0,
},
}
: {}),
...(name === "File"
? {
setRawMode: {
Expand Down
13 changes: 13 additions & 0 deletions src/bun.js/base.zig
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,19 @@ pub const ArrayBuffer = extern struct {
return out;
}

extern "C" fn JSArrayBuffer__fromDefaultAllocator(*JSC.JSGlobalObject, ptr: [*]u8, len: usize) JSC.JSValue;
pub fn toJSFromDefaultAllocator(globalThis: *JSC.JSGlobalObject, bytes: []u8) JSC.JSValue {
return JSArrayBuffer__fromDefaultAllocator(globalThis, bytes.ptr, bytes.len);
}

pub fn fromDefaultAllocator(globalThis: *JSC.JSGlobalObject, bytes: []u8, comptime typed_array_type: JSC.JSValue.JSType) JSC.JSValue {
return switch (typed_array_type) {
.ArrayBuffer => JSArrayBuffer__fromDefaultAllocator(globalThis, bytes.ptr, bytes.len),
.Uint8Array => JSC.JSUint8Array.fromBytes(globalThis, bytes),
else => @compileError("Not implemented yet"),
};
}

pub fn fromBytes(bytes: []u8, typed_array_type: JSC.JSValue.JSType) ArrayBuffer {
return ArrayBuffer{ .offset = 0, .len = @as(u32, @intCast(bytes.len)), .byte_len = @as(u32, @intCast(bytes.len)), .typed_array_type = typed_array_type, .ptr = bytes.ptr };
}
Expand Down
8 changes: 8 additions & 0 deletions src/bun.js/bindings/BunObject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ static inline JSC::EncodedJSValue flattenArrayOfBuffersIntoArrayBufferOrUint8Arr

size_t arrayLength = array->length();
if (arrayLength < 1) {
if (asUint8Array) {
return JSValue::encode(
JSC::JSUint8Array::create(
lexicalGlobalObject,
lexicalGlobalObject->m_typedArrayUint8.get(lexicalGlobalObject),
0));
}

RELEASE_AND_RETURN(throwScope, JSValue::encode(JSC::JSArrayBuffer::create(vm, lexicalGlobalObject->arrayBufferStructure(), JSC::ArrayBuffer::create(static_cast<size_t>(0), 1))));
}

Expand Down
26 changes: 21 additions & 5 deletions src/bun.js/bindings/Uint8Array.cpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
#include "root.h"

#include "JavaScriptCore/JSArrayBuffer.h"
#include "JavaScriptCore/TypedArrayType.h"
#include "JavaScriptCore/JSArrayBufferViewInlines.h"
#include "JavaScriptCore/JSArrayBufferView.h"
#include "JavaScriptCore/JSTypedArrayViewPrototype.h"
#include "mimalloc.h"

namespace Bun {

extern "C" JSC::EncodedJSValue JSUint8Array__fromDefaultAllocator(JSC::JSGlobalObject* lexicalGlobalObject, uint8_t* ptr, size_t length)
{

JSC::JSUint8Array* uint8Array = nullptr;
JSC::JSUint8Array* uint8Array;

if (LIKELY(length > 0)) {
auto buffer = ArrayBuffer::createFromBytes({ ptr, length }, createSharedTask<void(void*)>([](void* p) {
Expand All @@ -25,4 +22,23 @@ extern "C" JSC::EncodedJSValue JSUint8Array__fromDefaultAllocator(JSC::JSGlobalO

return JSC::JSValue::encode(uint8Array);
}

extern "C" JSC::EncodedJSValue JSArrayBuffer__fromDefaultAllocator(JSC::JSGlobalObject* lexicalGlobalObject, uint8_t* ptr, size_t length)
{

JSC::JSArrayBuffer* arrayBuffer;

if (LIKELY(length > 0)) {
RefPtr<ArrayBuffer> buffer = ArrayBuffer::createFromBytes({ ptr, length }, createSharedTask<void(void*)>([](void* p) {
mi_free(p);
}));

arrayBuffer = JSC::JSArrayBuffer::create(lexicalGlobalObject->vm(), lexicalGlobalObject->arrayBufferStructure(), WTFMove(buffer));
} else {
arrayBuffer = JSC::JSArrayBuffer::create(lexicalGlobalObject->vm(), lexicalGlobalObject->arrayBufferStructure(), nullptr);
}

return JSC::JSValue::encode(arrayBuffer);
}

}
Loading

0 comments on commit a9cf463

Please sign in to comment.