Skip to content

Commit

Permalink
Bun.file read side
Browse files Browse the repository at this point in the history
  • Loading branch information
cirospaciari committed Dec 13, 2024
1 parent adf3238 commit 6166fe9
Show file tree
Hide file tree
Showing 5 changed files with 475 additions and 7 deletions.
6 changes: 6 additions & 0 deletions src/bun.js/event_loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const ReadFileTask = WebCore.Blob.ReadFile.ReadFileTask;
const WriteFileTask = WebCore.Blob.WriteFile.WriteFileTask;
const napi_async_work = JSC.napi.napi_async_work;
const FetchTasklet = Fetch.FetchTasklet;
const S3HttpSimpleTask = @import("../s3.zig").AWSCredentials.S3HttpSimpleTask;
const JSValue = JSC.JSValue;
const js = JSC.C;
const Waker = bun.Async.Waker;
Expand Down Expand Up @@ -407,6 +408,7 @@ const ServerAllConnectionsClosedTask = @import("./api/server.zig").ServerAllConn
// Task.get(ReadFileTask) -> ?ReadFileTask
pub const Task = TaggedPointerUnion(.{
FetchTasklet,
S3HttpSimpleTask,
AsyncGlobWalkTask,
AsyncTransformTask,
ReadFileTask,
Expand Down Expand Up @@ -991,6 +993,10 @@ pub const EventLoop = struct {
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
fetch_task.onProgressUpdate();
},
.S3HttpSimpleTask => {
var s3_task: *S3HttpSimpleTask = task.get(S3HttpSimpleTask).?;
s3_task.onResponse();
},
@field(Task.Tag, @typeName(AsyncGlobWalkTask)) => {
var globWalkTask: *AsyncGlobWalkTask = task.get(AsyncGlobWalkTask).?;
globWalkTask.*.runFromJS();
Expand Down
149 changes: 147 additions & 2 deletions src/bun.js/webcore/blob.zig
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ const default_allocator = bun.default_allocator;
const FeatureFlags = bun.FeatureFlags;
const ArrayBuffer = @import("../base.zig").ArrayBuffer;
const Properties = @import("../base.zig").Properties;

const getAllocator = @import("../base.zig").getAllocator;

const Environment = @import("../../env.zig");
Expand All @@ -44,6 +43,7 @@ const Request = JSC.WebCore.Request;

const libuv = bun.windows.libuv;

const AWS = @import("../../s3.zig").AWSCredentials;
const PathOrBlob = union(enum) {
path: JSC.Node.PathOrFileDescriptor,
blob: Blob,
Expand Down Expand Up @@ -147,6 +147,14 @@ pub const Blob = struct {

pub fn doReadFile(this: *Blob, comptime Function: anytype, global: *JSGlobalObject) JSValue {
bloblog("doReadFile", .{});
if (this.isS3()) {
const WrappedFn = struct {
pub fn wrapped(b: *Blob, g: *JSGlobalObject, by: []u8) JSC.JSValue {
return JSC.toJSHostValue(g, Function(b, g, by, .clone));
}
};
return S3BlobDownloadTask.init(global, this, WrappedFn.wrapped);
}

const Handler = NewReadFileHandler(Function);

Expand Down Expand Up @@ -3423,12 +3431,149 @@ pub const Blob = struct {
return JSValue.jsBoolean(bun.isRegularFile(store.data.file.mode) or bun.C.S.ISFIFO(store.data.file.mode));
}

fn isS3(this: *Blob) bool {
if (this.store) |store| {
if (store.data == .file) {
if (store.data.file.pathlike == .path) {
const slice = store.data.file.pathlike.path.slice();
return strings.startsWith(slice, "s3://");
}
}
}
return false;
}

const S3BlobDownloadTask = struct {
blob: Blob,
globalThis: *JSC.JSGlobalObject,
promise: JSC.JSPromise.Strong,
poll_ref: bun.Async.KeepAlive = .{},

handler: S3ReadHandler,
usingnamespace bun.New(S3BlobDownloadTask);
pub const S3ReadHandler = *const fn (this: *Blob, globalthis: *JSGlobalObject, raw_bytes: []u8) JSValue;

pub fn callHandler(this: *S3BlobDownloadTask, raw_bytes: []u8) JSValue {
return this.handler(&this.blob, this.globalThis, raw_bytes);
}
pub fn onS3DownloadResolved(result: AWS.S3DownloadResult, this: *S3BlobDownloadTask) void {
defer this.deinit();
switch (result) {
.not_found => {
const js_err = this.globalThis.createErrorInstance("File not found", .{});
js_err.put(this.globalThis, ZigString.static("code"), ZigString.init("FileNotFound").toJS(this.globalThis));
this.promise.reject(this.globalThis, js_err);
},
.success => |response| {
const bytes = response.body.list.items;
if (this.blob.size == Blob.max_size) {
this.blob.size = @truncate(bytes.len);
}
JSC.AnyPromise.wrap(.{ .normal = this.promise.get() }, this.globalThis, S3BlobDownloadTask.callHandler, .{ this, bytes });
},
.failure => |err| {
const js_err = this.globalThis.createErrorInstance("{s}", .{err.message});
js_err.put(this.globalThis, ZigString.static("code"), ZigString.init(err.code).toJS(this.globalThis));
this.promise.rejectOnNextTick(this.globalThis, js_err);
},
}
}

pub fn init(globalThis: *JSC.JSGlobalObject, blob: *Blob, handler: S3BlobDownloadTask.S3ReadHandler) JSValue {
blob.store.?.ref();

const this = S3BlobDownloadTask.new(.{
.globalThis = globalThis,
.blob = blob.*,
.promise = JSC.JSPromise.Strong.init(globalThis),
.handler = handler,
});
const promise = this.promise.value();
const env = this.globalThis.bunVM().bundler.env;
const credentials = env.getAWSCredentials();
const url = bun.URL.parse(this.blob.store.?.data.file.pathlike.path.slice());
this.poll_ref.ref(globalThis.bunVM());

if (blob.offset > 0) {
const len: ?usize = if (blob.size != Blob.max_size) @intCast(blob.size) else null;
const offset: usize = @intCast(blob.offset);
credentials.s3DownloadSlice(url.hostname, url.path, offset, len, @ptrCast(&S3BlobDownloadTask.onS3DownloadResolved), this, if (env.getHttpProxy(url)) |proxy| proxy.href else null);
} else {
credentials.s3Download(url.hostname, url.path, @ptrCast(&S3BlobDownloadTask.onS3DownloadResolved), this, if (env.getHttpProxy(url)) |proxy| proxy.href else null);
}
return promise;
}

pub fn deinit(this: *S3BlobDownloadTask) void {
this.blob.store.?.deref();
this.poll_ref.unrefOnNextTick(this.globalThis.bunVM());
this.promise.deinit();
this.destroy();
}
};

const S3BlobStatTask = struct {
blob: *Blob,
globalThis: *JSC.JSGlobalObject,
promise: JSC.JSPromise.Strong,
strong_ref: JSC.Strong,
poll_ref: bun.Async.KeepAlive = .{},
usingnamespace bun.New(S3BlobStatTask);

pub fn onS3StatResolved(result: AWS.S3StatResult, this: *S3BlobStatTask) void {
defer this.deinit();
switch (result) {
.not_found => {
this.promise.resolve(this.globalThis, .false);
},
.success => |stat| {
if (this.blob.size == Blob.max_size) {
this.blob.size = @truncate(stat.size);
}
this.promise.resolve(this.globalThis, .true);
},
.failure => |err| {
const js_err = this.globalThis.createErrorInstance("{s}", .{err.message});
js_err.put(this.globalThis, ZigString.static("code"), ZigString.init(err.code).toJS(this.globalThis));
this.promise.rejectOnNextTick(this.globalThis, js_err);
},
}
}

pub fn init(globalThis: *JSC.JSGlobalObject, blob: *Blob, js_blob: JSValue) JSValue {
const this = S3BlobStatTask.new(.{
.globalThis = globalThis,
.blob = blob,
.promise = JSC.JSPromise.Strong.init(globalThis),
.strong_ref = JSC.Strong.create(js_blob, globalThis),
});
const promise = this.promise.value();
const env = this.globalThis.bunVM().bundler.env;
const credentials = env.getAWSCredentials();
const url = bun.URL.parse(this.blob.store.?.data.file.pathlike.path.slice());
this.poll_ref.ref(globalThis.bunVM());
credentials.s3Stat(url.hostname, url.path, @ptrCast(&S3BlobStatTask.onS3StatResolved), this, if (env.getHttpProxy(url)) |proxy| proxy.href else null);
return promise;
}

pub fn deinit(this: *S3BlobStatTask) void {
this.poll_ref.unrefOnNextTick(this.globalThis.bunVM());
this.strong_ref.deinit();
this.promise.deinit();
this.destroy();
}
};

// This mostly means 'can it be read?'
pub fn getExists(
this: *Blob,
globalThis: *JSC.JSGlobalObject,
_: *JSC.CallFrame,
this_value: JSC.JSValue,
) bun.JSError!JSValue {
if (this.isS3()) {
return S3BlobStatTask.init(globalThis, this, this_value);
}
return JSC.JSPromise.resolvedPromiseValue(globalThis, this.getExistsSync());
}

Expand Down Expand Up @@ -3783,7 +3928,7 @@ pub const Blob = struct {
if (this.store) |store| {
if (store.data == .file) {
// last_modified can be already set during read.
if (store.data.file.last_modified == JSC.init_timestamp) {
if (store.data.file.last_modified == JSC.init_timestamp and !this.isS3()) {
resolveFileStat(store);
}
return JSValue.jsNumber(store.data.file.last_modified);
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/webcore/response.classes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ export default [
slice: { fn: "getSlice", length: 2 },
stream: { fn: "getStream", length: 1 },
formData: { fn: "getFormData" },
exists: { fn: "getExists", length: 0 },
exists: { fn: "getExists", length: 0, passThis: true },

// Non-standard, but consistent!
bytes: { fn: "getBytes" },
Expand Down
31 changes: 28 additions & 3 deletions src/bun.js/webcore/response.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2274,6 +2274,7 @@ pub const Fetch = struct {
var signal: ?*JSC.WebCore.AbortSignal = null;
// Custom Hostname
var hostname: ?[]u8 = null;
var range: ?[]u8 = null;
var unix_socket_path: ZigString.Slice = ZigString.Slice.empty;

var url_proxy_buffer: []const u8 = "";
Expand Down Expand Up @@ -2312,6 +2313,10 @@ pub const Fetch = struct {
bun.default_allocator.free(hn);
hostname = null;
}
if (range) |range_| {
bun.default_allocator.free(range_);
range = null;
}

if (ssl_config) |conf| {
ssl_config = null;
Expand Down Expand Up @@ -2929,6 +2934,15 @@ pub const Fetch = struct {
}
hostname = _hostname.toOwnedSliceZ(allocator) catch bun.outOfMemory();
}
if (url.isS3()) {
if (headers_.fastGet(JSC.FetchHeaders.HTTPHeaderName.Range)) |_range| {
if (range) |range_| {
range = null;
allocator.free(range_);
}
range = _range.toOwnedSliceZ(allocator) catch bun.outOfMemory();
}
}

break :extract_headers Headers.from(headers_, allocator, .{ .body = body.getAnyBlob() }) catch bun.outOfMemory();
}
Expand Down Expand Up @@ -3228,15 +3242,15 @@ pub const Fetch = struct {
}
}
// TODO: should we generate the content hash? presigned never uses content-hash, maybe only if a extra option is passed to avoid the cost
var result = credentials.s3Request(url.hostname, url.path, method, null) catch |sign_err| {
var result = credentials.signRequest(url.hostname, url.path, method, null) catch |sign_err| {
switch (sign_err) {
error.MissingCredentials => {
const err = JSC.toTypeError(.ERR_INVALID_ARG_VALUE, "missing s3 credentials", .{}, ctx);
is_error = true;
return JSPromise.rejectedPromiseValue(globalThis, err);
},
error.InvalidMethod => {
const err = JSC.toTypeError(.ERR_INVALID_ARG_VALUE, "method must be GET, PUT, DELETE when using s3 protocol", .{}, ctx);
const err = JSC.toTypeError(.ERR_INVALID_ARG_VALUE, "method must be GET, PUT, DELETE or HEAD when using s3 protocol", .{}, ctx);
is_error = true;
return JSPromise.rejectedPromiseValue(globalThis, err);
},
Expand Down Expand Up @@ -3274,7 +3288,18 @@ pub const Fetch = struct {
if (headers) |*headers_| {
headers_.deinit();
}
headers = Headers.fromPicoHttpHeaders(&result.headers, allocator) catch bun.outOfMemory();
if (range) |range_| {
var headersWithRange: [5]picohttp.Header = .{
result.headers[0],
result.headers[1],
result.headers[2],
result.headers[3],
.{ .name = "range", .value = range_ },
};
headers = Headers.fromPicoHttpHeaders(&headersWithRange, allocator) catch bun.outOfMemory();
} else {
headers = Headers.fromPicoHttpHeaders(&result.headers, allocator) catch bun.outOfMemory();
}
}

// Only create this after we have validated all the input.
Expand Down
Loading

0 comments on commit 6166fe9

Please sign in to comment.