From 6166fe93b1bcfccb1cdf6f17ecbf9901a9be7bc9 Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Thu, 12 Dec 2024 18:35:16 -0800 Subject: [PATCH] Bun.file read side --- src/bun.js/event_loop.zig | 6 + src/bun.js/webcore/blob.zig | 149 ++++++++++++- src/bun.js/webcore/response.classes.ts | 2 +- src/bun.js/webcore/response.zig | 31 ++- src/s3.zig | 294 ++++++++++++++++++++++++- 5 files changed, 475 insertions(+), 7 deletions(-) diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 514e8489d09d2d..998da9d243f44d 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -18,6 +18,7 @@ const ReadFileTask = WebCore.Blob.ReadFile.ReadFileTask; const WriteFileTask = WebCore.Blob.WriteFile.WriteFileTask; const napi_async_work = JSC.napi.napi_async_work; const FetchTasklet = Fetch.FetchTasklet; +const S3HttpSimpleTask = @import("../s3.zig").AWSCredentials.S3HttpSimpleTask; const JSValue = JSC.JSValue; const js = JSC.C; const Waker = bun.Async.Waker; @@ -407,6 +408,7 @@ const ServerAllConnectionsClosedTask = @import("./api/server.zig").ServerAllConn // Task.get(ReadFileTask) -> ?ReadFileTask pub const Task = TaggedPointerUnion(.{ FetchTasklet, + S3HttpSimpleTask, AsyncGlobWalkTask, AsyncTransformTask, ReadFileTask, @@ -991,6 +993,10 @@ pub const EventLoop = struct { var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?; fetch_task.onProgressUpdate(); }, + .S3HttpSimpleTask => { + var s3_task: *S3HttpSimpleTask = task.get(S3HttpSimpleTask).?; + s3_task.onResponse(); + }, @field(Task.Tag, @typeName(AsyncGlobWalkTask)) => { var globWalkTask: *AsyncGlobWalkTask = task.get(AsyncGlobWalkTask).?; globWalkTask.*.runFromJS(); diff --git a/src/bun.js/webcore/blob.zig b/src/bun.js/webcore/blob.zig index 9c1467bf5d8c0d..59781eba20a23b 100644 --- a/src/bun.js/webcore/blob.zig +++ b/src/bun.js/webcore/blob.zig @@ -19,7 +19,6 @@ const default_allocator = bun.default_allocator; const FeatureFlags = bun.FeatureFlags; const ArrayBuffer = @import("../base.zig").ArrayBuffer; const Properties = @import("../base.zig").Properties; - const getAllocator = @import("../base.zig").getAllocator; const Environment = @import("../../env.zig"); @@ -44,6 +43,7 @@ const Request = JSC.WebCore.Request; const libuv = bun.windows.libuv; +const AWS = @import("../../s3.zig").AWSCredentials; const PathOrBlob = union(enum) { path: JSC.Node.PathOrFileDescriptor, blob: Blob, @@ -147,6 +147,14 @@ pub const Blob = struct { pub fn doReadFile(this: *Blob, comptime Function: anytype, global: *JSGlobalObject) JSValue { bloblog("doReadFile", .{}); + if (this.isS3()) { + const WrappedFn = struct { + pub fn wrapped(b: *Blob, g: *JSGlobalObject, by: []u8) JSC.JSValue { + return JSC.toJSHostValue(g, Function(b, g, by, .clone)); + } + }; + return S3BlobDownloadTask.init(global, this, WrappedFn.wrapped); + } const Handler = NewReadFileHandler(Function); @@ -3423,12 +3431,149 @@ pub const Blob = struct { return JSValue.jsBoolean(bun.isRegularFile(store.data.file.mode) or bun.C.S.ISFIFO(store.data.file.mode)); } + fn isS3(this: *Blob) bool { + if (this.store) |store| { + if (store.data == .file) { + if (store.data.file.pathlike == .path) { + const slice = store.data.file.pathlike.path.slice(); + return strings.startsWith(slice, "s3://"); + } + } + } + return false; + } + + const S3BlobDownloadTask = struct { + blob: Blob, + globalThis: *JSC.JSGlobalObject, + promise: JSC.JSPromise.Strong, + poll_ref: bun.Async.KeepAlive = .{}, + + handler: S3ReadHandler, + usingnamespace bun.New(S3BlobDownloadTask); + pub const S3ReadHandler = *const fn (this: *Blob, globalthis: *JSGlobalObject, raw_bytes: []u8) JSValue; + + pub fn callHandler(this: *S3BlobDownloadTask, raw_bytes: []u8) JSValue { + return this.handler(&this.blob, this.globalThis, raw_bytes); + } + pub fn onS3DownloadResolved(result: AWS.S3DownloadResult, this: *S3BlobDownloadTask) void { + defer this.deinit(); + switch (result) { + .not_found => { + const js_err = this.globalThis.createErrorInstance("File not found", .{}); + js_err.put(this.globalThis, ZigString.static("code"), ZigString.init("FileNotFound").toJS(this.globalThis)); + this.promise.reject(this.globalThis, js_err); + }, + .success => |response| { + const bytes = response.body.list.items; + if (this.blob.size == Blob.max_size) { + this.blob.size = @truncate(bytes.len); + } + JSC.AnyPromise.wrap(.{ .normal = this.promise.get() }, this.globalThis, S3BlobDownloadTask.callHandler, .{ this, bytes }); + }, + .failure => |err| { + const js_err = this.globalThis.createErrorInstance("{s}", .{err.message}); + js_err.put(this.globalThis, ZigString.static("code"), ZigString.init(err.code).toJS(this.globalThis)); + this.promise.rejectOnNextTick(this.globalThis, js_err); + }, + } + } + + pub fn init(globalThis: *JSC.JSGlobalObject, blob: *Blob, handler: S3BlobDownloadTask.S3ReadHandler) JSValue { + blob.store.?.ref(); + + const this = S3BlobDownloadTask.new(.{ + .globalThis = globalThis, + .blob = blob.*, + .promise = JSC.JSPromise.Strong.init(globalThis), + .handler = handler, + }); + const promise = this.promise.value(); + const env = this.globalThis.bunVM().bundler.env; + const credentials = env.getAWSCredentials(); + const url = bun.URL.parse(this.blob.store.?.data.file.pathlike.path.slice()); + this.poll_ref.ref(globalThis.bunVM()); + + if (blob.offset > 0) { + const len: ?usize = if (blob.size != Blob.max_size) @intCast(blob.size) else null; + const offset: usize = @intCast(blob.offset); + credentials.s3DownloadSlice(url.hostname, url.path, offset, len, @ptrCast(&S3BlobDownloadTask.onS3DownloadResolved), this, if (env.getHttpProxy(url)) |proxy| proxy.href else null); + } else { + credentials.s3Download(url.hostname, url.path, @ptrCast(&S3BlobDownloadTask.onS3DownloadResolved), this, if (env.getHttpProxy(url)) |proxy| proxy.href else null); + } + return promise; + } + + pub fn deinit(this: *S3BlobDownloadTask) void { + this.blob.store.?.deref(); + this.poll_ref.unrefOnNextTick(this.globalThis.bunVM()); + this.promise.deinit(); + this.destroy(); + } + }; + + const S3BlobStatTask = struct { + blob: *Blob, + globalThis: *JSC.JSGlobalObject, + promise: JSC.JSPromise.Strong, + strong_ref: JSC.Strong, + poll_ref: bun.Async.KeepAlive = .{}, + usingnamespace bun.New(S3BlobStatTask); + + pub fn onS3StatResolved(result: AWS.S3StatResult, this: *S3BlobStatTask) void { + defer this.deinit(); + switch (result) { + .not_found => { + this.promise.resolve(this.globalThis, .false); + }, + .success => |stat| { + if (this.blob.size == Blob.max_size) { + this.blob.size = @truncate(stat.size); + } + this.promise.resolve(this.globalThis, .true); + }, + .failure => |err| { + const js_err = this.globalThis.createErrorInstance("{s}", .{err.message}); + js_err.put(this.globalThis, ZigString.static("code"), ZigString.init(err.code).toJS(this.globalThis)); + this.promise.rejectOnNextTick(this.globalThis, js_err); + }, + } + } + + pub fn init(globalThis: *JSC.JSGlobalObject, blob: *Blob, js_blob: JSValue) JSValue { + const this = S3BlobStatTask.new(.{ + .globalThis = globalThis, + .blob = blob, + .promise = JSC.JSPromise.Strong.init(globalThis), + .strong_ref = JSC.Strong.create(js_blob, globalThis), + }); + const promise = this.promise.value(); + const env = this.globalThis.bunVM().bundler.env; + const credentials = env.getAWSCredentials(); + const url = bun.URL.parse(this.blob.store.?.data.file.pathlike.path.slice()); + this.poll_ref.ref(globalThis.bunVM()); + credentials.s3Stat(url.hostname, url.path, @ptrCast(&S3BlobStatTask.onS3StatResolved), this, if (env.getHttpProxy(url)) |proxy| proxy.href else null); + return promise; + } + + pub fn deinit(this: *S3BlobStatTask) void { + this.poll_ref.unrefOnNextTick(this.globalThis.bunVM()); + this.strong_ref.deinit(); + this.promise.deinit(); + this.destroy(); + } + }; + // This mostly means 'can it be read?' pub fn getExists( this: *Blob, globalThis: *JSC.JSGlobalObject, _: *JSC.CallFrame, + this_value: JSC.JSValue, ) bun.JSError!JSValue { + if (this.isS3()) { + return S3BlobStatTask.init(globalThis, this, this_value); + } return JSC.JSPromise.resolvedPromiseValue(globalThis, this.getExistsSync()); } @@ -3783,7 +3928,7 @@ pub const Blob = struct { if (this.store) |store| { if (store.data == .file) { // last_modified can be already set during read. - if (store.data.file.last_modified == JSC.init_timestamp) { + if (store.data.file.last_modified == JSC.init_timestamp and !this.isS3()) { resolveFileStat(store); } return JSValue.jsNumber(store.data.file.last_modified); diff --git a/src/bun.js/webcore/response.classes.ts b/src/bun.js/webcore/response.classes.ts index 157f0abc387b8c..26c89991d7c243 100644 --- a/src/bun.js/webcore/response.classes.ts +++ b/src/bun.js/webcore/response.classes.ts @@ -140,7 +140,7 @@ export default [ slice: { fn: "getSlice", length: 2 }, stream: { fn: "getStream", length: 1 }, formData: { fn: "getFormData" }, - exists: { fn: "getExists", length: 0 }, + exists: { fn: "getExists", length: 0, passThis: true }, // Non-standard, but consistent! bytes: { fn: "getBytes" }, diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index fcb70c3cf0027a..b809bb4fa9a237 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -2274,6 +2274,7 @@ pub const Fetch = struct { var signal: ?*JSC.WebCore.AbortSignal = null; // Custom Hostname var hostname: ?[]u8 = null; + var range: ?[]u8 = null; var unix_socket_path: ZigString.Slice = ZigString.Slice.empty; var url_proxy_buffer: []const u8 = ""; @@ -2312,6 +2313,10 @@ pub const Fetch = struct { bun.default_allocator.free(hn); hostname = null; } + if (range) |range_| { + bun.default_allocator.free(range_); + range = null; + } if (ssl_config) |conf| { ssl_config = null; @@ -2929,6 +2934,15 @@ pub const Fetch = struct { } hostname = _hostname.toOwnedSliceZ(allocator) catch bun.outOfMemory(); } + if (url.isS3()) { + if (headers_.fastGet(JSC.FetchHeaders.HTTPHeaderName.Range)) |_range| { + if (range) |range_| { + range = null; + allocator.free(range_); + } + range = _range.toOwnedSliceZ(allocator) catch bun.outOfMemory(); + } + } break :extract_headers Headers.from(headers_, allocator, .{ .body = body.getAnyBlob() }) catch bun.outOfMemory(); } @@ -3228,7 +3242,7 @@ pub const Fetch = struct { } } // TODO: should we generate the content hash? presigned never uses content-hash, maybe only if a extra option is passed to avoid the cost - var result = credentials.s3Request(url.hostname, url.path, method, null) catch |sign_err| { + var result = credentials.signRequest(url.hostname, url.path, method, null) catch |sign_err| { switch (sign_err) { error.MissingCredentials => { const err = JSC.toTypeError(.ERR_INVALID_ARG_VALUE, "missing s3 credentials", .{}, ctx); @@ -3236,7 +3250,7 @@ pub const Fetch = struct { return JSPromise.rejectedPromiseValue(globalThis, err); }, error.InvalidMethod => { - const err = JSC.toTypeError(.ERR_INVALID_ARG_VALUE, "method must be GET, PUT, DELETE when using s3 protocol", .{}, ctx); + const err = JSC.toTypeError(.ERR_INVALID_ARG_VALUE, "method must be GET, PUT, DELETE or HEAD when using s3 protocol", .{}, ctx); is_error = true; return JSPromise.rejectedPromiseValue(globalThis, err); }, @@ -3274,7 +3288,18 @@ pub const Fetch = struct { if (headers) |*headers_| { headers_.deinit(); } - headers = Headers.fromPicoHttpHeaders(&result.headers, allocator) catch bun.outOfMemory(); + if (range) |range_| { + var headersWithRange: [5]picohttp.Header = .{ + result.headers[0], + result.headers[1], + result.headers[2], + result.headers[3], + .{ .name = "range", .value = range_ }, + }; + headers = Headers.fromPicoHttpHeaders(&headersWithRange, allocator) catch bun.outOfMemory(); + } else { + headers = Headers.fromPicoHttpHeaders(&result.headers, allocator) catch bun.outOfMemory(); + } } // Only create this after we have validated all the input. diff --git a/src/s3.zig b/src/s3.zig index 72f575373fa683..5de1cac34f4cc3 100644 --- a/src/s3.zig +++ b/src/s3.zig @@ -86,7 +86,7 @@ pub const AWSCredentials = struct { } }; - pub fn s3Request(this: *const @This(), bucket: []const u8, path: []const u8, method: bun.http.Method, content_hash: ?[]const u8) !SignResult { + pub fn signRequest(this: *const @This(), bucket: []const u8, path: []const u8, method: bun.http.Method, content_hash: ?[]const u8) !SignResult { if (this.accessKeyId.len == 0 or this.secretAccessKey.len == 0) return error.MissingCredentials; const method_name = switch (method) { @@ -189,4 +189,296 @@ pub const AWSCredentials = struct { }, }; } + + pub const S3StatResult = union(enum) { + success: struct { + size: usize = 0, + /// etag is not owned and need to be copied if used after this callback + etag: []const u8 = "", + }, + not_found: void, + + /// failure error is not owned and need to be copied if used after this callback + failure: struct { + code: []const u8, + message: []const u8, + }, + }; + pub const S3DownloadResult = union(enum) { + success: struct { + /// etag is not owned and need to be copied if used after this callback + etag: []const u8 = "", + /// body is owned and dont need to be copied, but dont forget to free it + body: bun.MutableString, + }, + not_found: void, + /// failure error is not owned and need to be copied if used after this callback + failure: struct { + code: []const u8, + message: []const u8, + }, + }; + pub const S3UploadResult = union(enum) { + success: void, + /// failure error is not owned and need to be copied if used after this callback + failure: struct { + code: []const u8, + message: []const u8, + }, + }; + pub const S3DeleteResult = union(enum) { + success: void, + not_found: void, + + /// failure error is not owned and need to be copied if used after this callback + failure: struct { + code: []const u8, + message: []const u8, + }, + }; + pub const S3HttpSimpleTask = struct { + http: bun.http.AsyncHTTP, + vm: *JSC.VirtualMachine, + sign_result: SignResult, + headers: JSC.WebCore.Headers, + callback_context: *anyopaque, + callback: Callback, + response_buffer: bun.MutableString = .{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }, + result: bun.http.HTTPClientResult = .{}, + concurrent_task: JSC.ConcurrentTask = .{}, + range: ?[]const u8, + + usingnamespace bun.New(@This()); + pub const Callback = union(enum) { + stat: *const fn (S3StatResult, *anyopaque) void, + download: *const fn (S3DownloadResult, *anyopaque) void, + upload: *const fn (S3UploadResult, *anyopaque) void, + delete: *const fn (S3DeleteResult, *anyopaque) void, + + pub fn fail(this: @This(), code: []const u8, message: []const u8, context: *anyopaque) void { + switch (this) { + inline .upload, .download, .stat, .delete => |callback| callback(.{ + .failure = .{ + .code = code, + .message = message, + }, + }, context), + } + } + }; + pub fn deinit(this: *@This()) void { + if (this.result.certificate_info) |*certificate| { + certificate.deinit(bun.default_allocator); + } + + this.response_buffer.deinit(); + this.headers.deinit(); + this.sign_result.deinit(); + this.http.clearData(); + if (this.range) |range| { + bun.default_allocator.free(range); + } + if (this.result.metadata) |*metadata| { + metadata.deinit(bun.default_allocator); + } + this.destroy(); + } + + fn fail(this: @This()) void { + var code: []const u8 = "UnknownError"; + var message: []const u8 = "an unexpected error has occurred"; + if (this.result.body) |body| { + const bytes = body.list.items; + if (bytes.len > 0) { + message = bytes[0..]; + if (strings.indexOf(bytes, "")) |start| { + if (strings.indexOf(bytes, "")) |end| { + code = bytes[start + "".len .. end]; + } + } + if (strings.indexOf(bytes, "")) |start| { + if (strings.indexOf(bytes, "")) |end| { + message = bytes[start + "".len .. end]; + } + } + } + } + this.callback.fail(code, message, this.callback_context); + } + + pub fn onResponse(this: *@This()) void { + defer this.deinit(); + bun.assert(this.result.metadata != null); + const response = this.result.metadata.?.response; + switch (this.callback) { + .stat => |callback| { + switch (response.status_code) { + 404 => { + callback(.{ .not_found = {} }, this.callback_context); + }, + 200 => { + callback(.{ + .success = .{ + .etag = response.headers.get("etag") orelse "", + .size = if (response.headers.get("content-length")) |content_len| (std.fmt.parseInt(usize, content_len, 10) catch 0) else 0, + }, + }, this.callback_context); + }, + else => { + this.fail(); + }, + } + }, + .delete => |callback| { + switch (response.status_code) { + 404 => { + callback(.{ .not_found = {} }, this.callback_context); + }, + 200 => { + callback(.{ .success = {} }, this.callback_context); + }, + else => { + this.fail(); + }, + } + }, + .upload => |callback| { + switch (response.status_code) { + 200 => { + callback(.{ .success = {} }, this.callback_context); + }, + else => { + this.fail(); + }, + } + }, + .download => |callback| { + switch (response.status_code) { + 404 => { + callback(.{ .not_found = {} }, this.callback_context); + }, + 200, 206 => { + const body = this.response_buffer; + this.response_buffer = .{ + .allocator = bun.default_allocator, + .list = .{ + .items = &.{}, + .capacity = 0, + }, + }; + callback(.{ + .success = .{ + .etag = response.headers.get("etag") orelse "", + .body = body, + }, + }, this.callback_context); + }, + else => { + //error + this.fail(); + }, + } + }, + } + } + + pub fn http_callback(this: *@This(), async_http: *bun.http.AsyncHTTP, result: bun.http.HTTPClientResult) void { + const is_done = !result.has_more; + this.result = result; + this.http = async_http.*; + this.http.response_buffer = async_http.response_buffer; + if (is_done) { + this.vm.eventLoop().enqueueTaskConcurrent(this.concurrent_task.from(this, .manual_deinit)); + } + } + }; + + pub fn executeSimpleS3Request(this: *const @This(), bucket: []const u8, path: []const u8, method: bun.http.Method, callback: S3HttpSimpleTask.Callback, callback_context: *anyopaque, proxy_url: ?[]const u8, body: []const u8, range: ?[]const u8) void { + var result = this.signRequest(bucket, path, method, null) catch |sign_err| { + if (range) |range_| bun.default_allocator.free(range_); + + return switch (sign_err) { + error.MissingCredentials => callback.fail("MissingCredentials", "missing s3 credentials", callback_context), + error.InvalidMethod => callback.fail("MissingCredentials", "method must be GET, PUT, DELETE or HEAD when using s3 protocol", callback_context), + error.InvalidPath => callback.fail("InvalidPath", "invalid s3 bucket, key combination", callback_context), + else => callback.fail("SignError", "failed to retrieve s3 content check your credentials", callback_context), + }; + }; + + const headers = brk: { + if (range) |range_| { + var headersWithRange: [5]picohttp.Header = .{ + result.headers[0], + result.headers[1], + result.headers[2], + result.headers[3], + .{ .name = "range", .value = range_ }, + }; + break :brk JSC.WebCore.Headers.fromPicoHttpHeaders(&headersWithRange, bun.default_allocator) catch bun.outOfMemory(); + } else { + break :brk JSC.WebCore.Headers.fromPicoHttpHeaders(&result.headers, bun.default_allocator) catch bun.outOfMemory(); + } + }; + const task = S3HttpSimpleTask.new(.{ + .http = undefined, + .sign_result = result, + .callback_context = callback_context, + .callback = callback, + .range = range, + .headers = headers, + .vm = JSC.VirtualMachine.get(), + }); + + const url = bun.URL.parse(result.url); + + task.http = bun.http.AsyncHTTP.init( + bun.default_allocator, + method, + url, + task.headers.entries, + task.headers.buf.items, + &task.response_buffer, + body, + bun.http.HTTPClientResult.Callback.New( + *S3HttpSimpleTask, + S3HttpSimpleTask.http_callback, + ).init(task), + .follow, + .{ + .http_proxy = if (proxy_url) |proxy| bun.URL.parse(proxy) else null, + }, + ); + // queue http request + bun.http.HTTPThread.init(&.{}); + var batch = bun.ThreadPool.Batch{}; + task.http.schedule(bun.default_allocator, &batch); + bun.http.http_thread.schedule(batch); + } + + pub fn s3Stat(this: *const @This(), bucket: []const u8, path: []const u8, callback: *const fn (S3StatResult, *anyopaque) void, callback_context: *anyopaque, proxy_url: ?[]const u8) void { + this.executeSimpleS3Request(bucket, path, .HEAD, .{ .stat = callback }, callback_context, proxy_url, "", null); + } + + pub fn s3Download(this: *const @This(), bucket: []const u8, path: []const u8, callback: *const fn (S3DownloadResult, *anyopaque) void, callback_context: *anyopaque, proxy_url: ?[]const u8) void { + this.executeSimpleS3Request(bucket, path, .GET, .{ .download = callback }, callback_context, proxy_url, "", null); + } + + pub fn s3DownloadSlice(this: *const @This(), bucket: []const u8, path: []const u8, offset: usize, len: ?usize, callback: *const fn (S3DownloadResult, *anyopaque) void, callback_context: *anyopaque, proxy_url: ?[]const u8) void { + const range = if (len != null) std.fmt.allocPrint(bun.default_allocator, "bytes={}-{}", .{ offset, offset + len.? }) catch bun.outOfMemory() else std.fmt.allocPrint(bun.default_allocator, "bytes={}-", .{offset}) catch bun.outOfMemory(); + this.executeSimpleS3Request(bucket, path, .GET, .{ .download = callback }, callback_context, proxy_url, "", range); + } + + pub fn s3Delete(this: *const @This(), bucket: []const u8, path: []const u8, callback: *const fn (S3DeleteResult, *anyopaque) void, callback_context: *anyopaque, proxy_url: ?[]const u8) void { + this.executeSimpleS3Request(bucket, path, .DELETE, .{ .delete = callback }, callback_context, proxy_url, "", null); + } + + pub fn s3Upload(this: *const @This(), bucket: []const u8, path: []const u8, content: []const u8, callback: *const fn (S3UploadResult, *anyopaque) void, callback_context: *anyopaque, proxy_url: ?[]const u8) void { + this.executeSimpleS3Request(bucket, path, .POST, .{ .upload = callback }, callback_context, proxy_url, content, null); + } };