diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b18b20db1..b27dcfff2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,6 +13,7 @@ env: CARGO_TERM_COLOR: always RUSTUP_MAX_RETRIES: 10 ORT_DYLIB_PATH: /tmp/onnxruntime/lib/libonnxruntime.so + RUST_LOG: event_worker=trace jobs: cargo-fmt: @@ -50,9 +51,31 @@ jobs: - uses: actions/checkout@v4 - run: rustup show - uses: Swatinem/rust-cache@v2 + - uses: cardinalby/export-env-action@v2 with: envFile: ".env" + - name: Install ONNX Runtime Library run: ./scripts/install_onnx.sh ${{ env.ONNXRUNTIME_VERSION }} x64 /tmp/onnxruntime - - run: ./scripts/test.sh + + - name: Install S3 credentials for testing + run: | + cd crates/sb_fs/tests + echo "S3FS_TEST_SUPABASE_STORAGE=true" >> .env + echo "S3FS_TEST_APP_NAME=edge_runtime_github_ci" >> .env + echo "S3FS_TEST_BUCKET_NAME=$BUCKET_NAME" >> .env + echo "S3FS_TEST_ENDPOINT_URL=$ENDPOINT_URL" >> .env + echo "S3FS_TEST_REGION=$REGION" >> .env + echo "S3FS_TEST_ACCESS_KEY_ID=$ACCESS_KEY_ID" >> .env + echo "S3FS_TEST_SECRET_ACCESS_KEY=$SECRET_ACCESS_KEY" >> .env + shell: bash + env: + BUCKET_NAME: ${{ secrets.SUPABASE_S3_BUCKET_NAME }} + ENDPOINT_URL: ${{ secrets.SUPABASE_S3_ENTRYPOINT }} + REGION: ${{ secrets.SUPABASE_S3_REGION }} + ACCESS_KEY_ID: ${{ secrets.SUPABASE_S3_ACCESS_KEY }} + SECRET_ACCESS_KEY: ${{ secrets.SUPABASE_S3_SECRET }} + + - run: ./scripts/test.sh --features base/tracing + diff --git a/Cargo.lock b/Cargo.lock index f72ea89a2..75ce88860 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -268,6 +268,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "assert-json-diff" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4259cbe96513d2f1073027a259fc2ca917feb3026a5a8d984e3628e490255cc0" +dependencies = [ + "extend", + "serde", + "serde_json", +] + [[package]] name = "ast_node" version = "0.9.8" @@ -357,7 +368,7 @@ dependencies = [ "fastrand", "hex", "http 0.2.11", - "ring", + "ring 0.17.7", "time", "tokio", "tracing", @@ -520,7 +531,7 @@ dependencies = [ "once_cell", "p256 0.11.1", "percent-encoding", - "ring", + "ring 0.17.7", "sha2", "subtle", "time", @@ -601,6 +612,25 @@ dependencies = [ "aws-smithy-types", ] +[[package]] +name = "aws-smithy-protocol-test" +version = "0.63.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b92b62199921f10685c6b588fdbeb81168ae4e7950ae3e5f50145a01bb5f1ad" +dependencies = [ + "assert-json-diff", + "aws-smithy-runtime-api", + "base64-simd 0.8.0", + "cbor-diag", + "ciborium", + "http 0.2.11", + "pretty_assertions", + "regex-lite", + "roxmltree", + "serde_json", + "thiserror", +] + [[package]] name = "aws-smithy-query" version = "0.60.7" @@ -619,6 +649,7 @@ checksum = "a065c0fe6fdbdf9f11817eb68582b2ab4aff9e9c39e986ae48f7ec576c6322db" dependencies = [ "aws-smithy-async", "aws-smithy-http", + "aws-smithy-protocol-test", "aws-smithy-runtime-api", "aws-smithy-types", "bytes", @@ -630,12 +661,16 @@ dependencies = [ "httparse", "hyper 0.14.28", "hyper-rustls 0.24.2", + "indexmap", "once_cell", "pin-project-lite", "pin-utils", "rustls 0.21.12", + "serde", + "serde_json", "tokio", "tracing", + "tracing-subscriber", ] [[package]] @@ -1039,6 +1074,15 @@ dependencies = [ "alloc-stdlib", ] +[[package]] +name = "bs58" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf88ba1141d185c399bee5288d850d63b8369520c1eafc32a0430b5b6c287bf4" +dependencies = [ + "tinyvec", +] + [[package]] name = "bstr" version = "1.9.1" @@ -1144,6 +1188,25 @@ dependencies = [ "libc", ] +[[package]] +name = "cbor-diag" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc245b6ecd09b23901a4fbad1ad975701fd5061ceaef6afa93a2d70605a64429" +dependencies = [ + "bs58", + "chrono", + "data-encoding", + "half", + "nom", + "num-bigint", + "num-rational", + "num-traits", + "separator", + "url", + "uuid", +] + [[package]] name = "cc" version = "1.0.83" @@ -1186,6 +1249,33 @@ dependencies = [ "winapi", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "cipher" version = "0.4.4" @@ -1474,6 +1564,15 @@ dependencies = [ "typenum", ] +[[package]] +name = "ct-logs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1a816186fa68d9e426e3cb4ae4dff1fcd8e4a2c34b781bf7a822574a0d0aac8" +dependencies = [ + "sct 0.6.1", +] + [[package]] name = "ctor" version = "0.2.6" @@ -1775,7 +1874,7 @@ dependencies = [ "p384", "p521", "rand", - "ring", + "ring 0.17.7", "rsa", "serde", "serde_bytes", @@ -1887,7 +1986,7 @@ dependencies = [ "percent-encoding", "phf", "pin-project", - "ring", + "ring 0.17.7", "scopeguard", "serde", "smallvec", @@ -2256,6 +2355,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "diff" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" + [[package]] name = "digest" version = "0.10.7" @@ -2311,6 +2416,12 @@ dependencies = [ "litrs", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "dprint-swc-ext" version = "0.17.0" @@ -2544,6 +2655,18 @@ dependencies = [ "uuid", ] +[[package]] +name = "extend" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f47da3a72ec598d9c8937a7ebca8962a5c7a1f28444e38c2b33c771ba3f55f05" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -3138,6 +3261,30 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "headers" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http 0.2.11", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http 0.2.11", +] + [[package]] name = "heck" version = "0.4.1" @@ -3345,6 +3492,42 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-proxy" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca815a891b24fdfb243fa3239c86154392b0953ee584aa1a2a1f66d20cbe75cc" +dependencies = [ + "bytes", + "futures", + "headers", + "http 0.2.11", + "hyper 0.14.28", + "hyper-rustls 0.22.1", + "rustls-native-certs 0.5.0", + "tokio", + "tokio-rustls 0.22.0", + "tower-service", + "webpki", +] + +[[package]] +name = "hyper-rustls" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64" +dependencies = [ + "ct-logs", + "futures-util", + "hyper 0.14.28", + "log", + "rustls 0.19.1", + "rustls-native-certs 0.5.0", + "tokio", + "tokio-rustls 0.22.0", + "webpki", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -4388,6 +4571,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.18" @@ -4859,6 +5053,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "pretty_assertions" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ae130e2f271fbc2ac3a40fb1d07180839cdbbe443c7a27e1e3c13c5cac0116d" +dependencies = [ + "diff", + "yansi", +] + [[package]] name = "prettyplease" version = "0.2.17" @@ -4878,6 +5082,30 @@ dependencies = [ "elliptic-curve 0.13.8", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro-rules" version = "0.4.0" @@ -5265,6 +5493,21 @@ dependencies = [ "subtle", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin 0.5.2", + "untrusted 0.7.1", + "web-sys", + "winapi", +] + [[package]] name = "ring" version = "0.17.7" @@ -5275,7 +5518,7 @@ dependencies = [ "getrandom", "libc", "spin 0.9.8", - "untrusted", + "untrusted 0.9.0", "windows-sys 0.48.0", ] @@ -5329,6 +5572,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "roxmltree" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "921904a62e410e37e215c40381b7117f830d9d89ba60ab5236170541dd25646b" +dependencies = [ + "xmlparser", +] + [[package]] name = "rsa" version = "0.9.6" @@ -5415,6 +5667,19 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" +dependencies = [ + "base64 0.13.1", + "log", + "ring 0.16.20", + "sct 0.6.1", + "webpki", +] + [[package]] name = "rustls" version = "0.21.12" @@ -5422,9 +5687,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", - "ring", + "ring 0.17.7", "rustls-webpki 0.101.7", - "sct", + "sct 0.7.1", ] [[package]] @@ -5434,13 +5699,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" dependencies = [ "log", - "ring", + "ring 0.17.7", "rustls-pki-types", "rustls-webpki 0.102.2", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092" +dependencies = [ + "openssl-probe", + "rustls 0.19.1", + "schannel", + "security-framework", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -5509,8 +5786,8 @@ version = "0.101.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ - "ring", - "untrusted", + "ring 0.17.7", + "untrusted 0.9.0", ] [[package]] @@ -5519,9 +5796,9 @@ version = "0.102.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" dependencies = [ - "ring", + "ring 0.17.7", "rustls-pki-types", - "untrusted", + "untrusted 0.9.0", ] [[package]] @@ -5639,7 +5916,7 @@ dependencies = [ "once_cell", "percent-encoding", "rand", - "ring", + "ring 0.17.7", "sb_node", "scopeguard", "serde", @@ -5677,16 +5954,23 @@ dependencies = [ "aws-sdk-s3", "aws-smithy-runtime", "aws-smithy-runtime-api", + "base", + "ctor", "deno_ast", "deno_core", "deno_fs", "deno_io", "deno_npm", "deno_semver", + "dotenvy", "enum-as-inner", "eszip", + "event_worker", "futures", + "headers", "hyper 0.14.28", + "hyper-proxy", + "hyper-rustls 0.24.2", "import_map", "indexmap", "log", @@ -5695,11 +5979,13 @@ dependencies = [ "once_cell", "rand", "rkyv", + "rustls 0.21.12", "sb_core", "sb_eszip_shared", "sb_node", "sb_npm", "serde", + "serial_test", "tempfile", "thiserror", "tokio", @@ -5833,7 +6119,7 @@ dependencies = [ "rand", "regex", "reqwest 0.12.4", - "ring", + "ring 0.17.7", "ripemd", "rsa", "scrypt", @@ -5879,7 +6165,7 @@ dependencies = [ "once_cell", "percent-encoding", "reqwest 0.12.4", - "ring", + "ring 0.17.7", "sb_core", "sb_node", "serde", @@ -5957,14 +6243,24 @@ dependencies = [ "sha2", ] +[[package]] +name = "sct" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" +dependencies = [ + "ring 0.16.20", + "untrusted 0.7.1", +] + [[package]] name = "sct" version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" dependencies = [ - "ring", - "untrusted", + "ring 0.17.7", + "untrusted 0.9.0", ] [[package]] @@ -6045,6 +6341,12 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" +[[package]] +name = "separator" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f97841a747eef040fcd2e7b3b9a220a7205926e60488e673d9e4926d27772ce5" + [[package]] name = "serde" version = "1.0.201" @@ -7180,6 +7482,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" +dependencies = [ + "rustls 0.19.1", + "tokio", + "webpki", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -7300,6 +7613,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -7310,12 +7633,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] @@ -7550,6 +7876,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "untrusted" version = "0.9.0" @@ -7782,6 +8114,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" +dependencies = [ + "ring 0.16.20", + "untrusted 0.7.1", +] + [[package]] name = "webpki-roots" version = "0.26.3" @@ -8180,6 +8522,12 @@ version = "0.8.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984" +[[package]] +name = "yansi" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" + [[package]] name = "zerocopy" version = "0.7.32" diff --git a/Cargo.toml b/Cargo.toml index 4a71c2605..e3ff61832 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,7 @@ fs3 = "0.5.0" uuid = { version = "1.3.0", features = ["v4"] } monch = "=0.5.0" reqwest = { version = "0.12.4", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli", "socks", "json", "http2"] } # pinned because of https://github.com/seanmonstar/reqwest/pull/1955 +reqwest_v011 = { package = "reqwest", version = "0.11", features = ["stream", "json", "multipart"] } ring = "^0.17.0" import_map = { version = "=0.20.0", features = ["ext"] } base32 = "=0.4.0" @@ -137,6 +138,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "tracing-log"] rkyv = "0.7" tempfile = "3" xxhash-rust = "0.8" +serial_test = "3.0.0" [patch.crates-io] # If the PR is merged upstream, remove the line below. diff --git a/crates/base/Cargo.toml b/crates/base/Cargo.toml index 33641f3b3..cb094ed4e 100644 --- a/crates/base/Cargo.toml +++ b/crates/base/Cargo.toml @@ -74,9 +74,9 @@ notify.workspace = true pin-project.workspace = true rustls-pemfile.workspace = true tracing.workspace = true +reqwest_v011.workspace = true tracing-subscriber = { workspace = true, optional = true, features = ["env-filter", "tracing-log"] } -reqwest_v011 = { package = "reqwest", version = "0.11", features = ["stream", "json", "multipart"] } tls-listener = { version = "0.10", features = ["rustls"] } flume = "0.11.0" cooked-waker = "5" @@ -85,8 +85,8 @@ tokio-rustls = "0.25.0" [dev-dependencies] tokio-util = { workspace = true, features = ["rt", "compat"] } tracing-subscriber = { workspace = true, features = ["env-filter", "tracing-log"] } +serial_test.workspace = true -serial_test = "3.0.0" async-tungstenite = { version = "0.25.0", default-features = false } tungstenite = { version = "0.21.0", default-features = false, features = ["handshake"] } diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 29887e2a6..5f9282919 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -465,7 +465,8 @@ where let build_file_system_fn = |base_fs: Arc| -> Result, AnyError> { let tmp_fs = TmpFs::try_from(maybe_tmp_fs_config.unwrap_or_default())?; - let fs = PrefixFs::new("/tmp", tmp_fs, Some(base_fs)); + let fs = PrefixFs::new("/tmp", tmp_fs, Some(base_fs)) + .tmp_dir("/tmp"); Ok(if let Some(s3_fs) = maybe_s3_fs_config.map(S3Fs::new).transpose()? { maybe_s3_fs = Some(s3_fs.clone()); diff --git a/crates/base/src/macros/test_macros.rs b/crates/base/src/macros/test_macros.rs index df56a1d9d..7919b4c47 100644 --- a/crates/base/src/macros/test_macros.rs +++ b/crates/base/src/macros/test_macros.rs @@ -1,11 +1,14 @@ #[macro_export] macro_rules! integration_test_listen_fut { ($port:expr, $tls:expr, $main_file:expr, $policy:expr, $import_map:expr, $flag:expr, $tx:expr, $token:expr) => {{ - use futures_util::FutureExt; + use $crate::macros::test_macros::__private; + + use __private::futures_util::FutureExt; + use __private::Tls; - let tls: Option = $tls.clone(); + let tls: Option = $tls.clone(); - base::commands::start_server( + __private::start_server( "0.0.0.0", $port, tls, @@ -33,16 +36,20 @@ macro_rules! integration_test_listen_fut { #[macro_export] macro_rules! integration_test_with_server_flag { ($flag:expr, $main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => { - use futures_util::FutureExt; use $crate::macros::test_macros::__private; - let (tx, mut rx) = tokio::sync::mpsc::channel::(1); + use __private::futures_util::FutureExt; + use __private::ServerHealth; + use __private::Tls; + use __private::reqwest_v011; + + let (tx, mut rx) = tokio::sync::mpsc::channel::(1); let req_builder: Option = $req_builder; - let tls: Option = $tls; + let tls: Option = $tls; let schema = if tls.is_some() { "https" } else { "http" }; let signal = tokio::spawn(async move { - while let Some(base::server::ServerHealth::Listening(event_rx, metric_src)) = rx.recv().await { + while let Some(ServerHealth::Listening(event_rx, metric_src)) = rx.recv().await { $crate::integration_test_with_server_flag!(@req event_rx, metric_src, schema, $port, $url, req_builder, ($($function)+)); } None @@ -166,7 +173,7 @@ macro_rules! integration_test_with_server_flag { macro_rules! integration_test { ($main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => { $crate::integration_test_with_server_flag!( - $crate::server::ServerFlags::default(), + ServerFlags::default(), $main_file, $port, $url, @@ -190,6 +197,14 @@ pub mod __private { use crate::server::ServerEvent; + pub use crate::commands::start_server; + pub use crate::rt_worker::worker_ctx::TerminationToken; + pub use crate::server::ServerFlags; + pub use crate::server::ServerHealth; + pub use crate::server::Tls; + pub use futures_util; + pub use reqwest_v011; + /// NOTE(Nyannyacha): This was defined to enable pattern matching in closure /// argument positions. type ReqTuple = ( diff --git a/crates/base/src/rt_worker/worker.rs b/crates/base/src/rt_worker/worker.rs index ffd8901de..056fa84bd 100644 --- a/crates/base/src/rt_worker/worker.rs +++ b/crates/base/src/rt_worker/worker.rs @@ -328,15 +328,8 @@ impl Worker { token.outbound.cancel(); } } - - let s3_fs = runtime.s3_fs.take(); - - drop(runtime); - - if let Some(fs) = s3_fs { - if !fs.try_flush_background_tasks().await { - error!("failed to flush background s3 api tasks"); - } + if let Some(fs) = runtime.s3_fs.as_ref() { + fs.flush_background_tasks().await; } result diff --git a/crates/base/src/rt_worker/worker_ctx.rs b/crates/base/src/rt_worker/worker_ctx.rs index 5ad6b66f6..bc06b66fd 100644 --- a/crates/base/src/rt_worker/worker_ctx.rs +++ b/crates/base/src/rt_worker/worker_ctx.rs @@ -10,6 +10,7 @@ use anyhow::{anyhow, bail, Error}; use base_mem_check::MemCheckState; use cpu_timer::CPUTimer; use deno_config::JsxImportSourceConfig; +use deno_core::unsync::AtomicFlag; use deno_core::{InspectorSessionProxy, LocalInspectorSession}; use event_worker::events::{ BootEvent, ShutdownEvent, WorkerEventWithMetadata, WorkerEvents, WorkerMemoryUsed, @@ -22,6 +23,7 @@ use hyper_v014::client::conn::http1; use hyper_v014::upgrade::OnUpgrade; use hyper_v014::{Body, Request, Response}; use log::{debug, error}; +use once_cell::sync::Lazy; use sb_core::{MetricSource, SharedMetricSource}; use sb_graph::{DecoratorType, EszipPayloadKind}; use sb_workers::context::{ @@ -41,6 +43,7 @@ use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::time::sleep; use tokio_rustls::server::TlsStream; use tokio_util::sync::CancellationToken; +use tracing::warn; use uuid::Uuid; use super::supervisor::{self, CPUTimerParam, CPUUsageMetrics}; @@ -95,12 +98,25 @@ impl TerminationToken { } async fn handle_request( + flags: Arc, worker_kind: WorkerKind, duplex_stream_tx: mpsc::UnboundedSender, msg: WorkerRequestMsg, - maybe_request_idle_timeout: Option, ) -> Result<(), Error> { - let (ours, theirs) = io::duplex(1024); + let request_idle_timeout_ms = flags.request_idle_timeout_ms; + let request_buf_size = flags.request_buffer_size.unwrap_or_else(|| { + const KIB: usize = 1024; + static CHECK: Lazy = Lazy::new(AtomicFlag::default); + + if !CHECK.is_raised() { + CHECK.raise(); + warn!("request buffer size is not specified, so it will be set to 1 KiB"); + } + + KIB as u64 + }); + + let (ours, theirs) = io::duplex(request_buf_size as usize); let WorkerRequestMsg { mut req, res_tx, @@ -138,7 +154,7 @@ async fn handle_request( tokio::spawn(relay_upgraded_request_and_response( req_upgrade, parts, - maybe_request_idle_timeout, + request_idle_timeout_ms, )); return; @@ -157,7 +173,7 @@ async fn handle_request( tokio::task::yield_now().await; let maybe_cancel_fut = async move { - if let Some(timeout_ms) = maybe_request_idle_timeout { + if let Some(timeout_ms) = request_idle_timeout_ms { sleep(Duration::from_millis(timeout_ms)).await; } else { pending::<()>().await; @@ -190,7 +206,7 @@ async fn handle_request( } } - if let Some(timeout_ms) = maybe_request_idle_timeout { + if let Some(timeout_ms) = flags.request_idle_timeout_ms { let headers = res.headers(); let is_streamed_response = !headers.contains_key(http_v02::header::CONTENT_LENGTH); @@ -576,9 +592,9 @@ pub struct WorkerCtx { } pub async fn create_worker>( + flags: Arc, init_opts: Opt, inspector: Option, - maybe_request_idle_timeout: Option, ) -> Result { let (duplex_stream_tx, duplex_stream_rx) = mpsc::unbounded_channel::(); let (worker_boot_result_tx, worker_boot_result_rx) = @@ -620,15 +636,11 @@ pub async fn create_worker>( async move { while let Some(msg) = worker_req_rx.recv().await { tokio::task::spawn({ + let flags = flags.clone(); let stream_tx_inner = stream_tx.clone(); async move { - if let Err(err) = handle_request( - worker_kind, - stream_tx_inner, - msg, - maybe_request_idle_timeout, - ) - .await + if let Err(err) = + handle_request(flags, worker_kind, stream_tx_inner, msg).await { error!("worker failed to handle request: {:?}", err); } @@ -725,6 +737,7 @@ pub async fn send_user_worker_request( // Todo: Fix #[allow(clippy::too_many_arguments)] pub async fn create_main_worker( + flags: Arc, main_worker_path: PathBuf, import_map_path: Option, no_module_cache: bool, @@ -745,6 +758,7 @@ pub async fn create_main_worker( } let ctx = create_worker( + flags, ( WorkerContextInitOpts { service_path, @@ -766,7 +780,6 @@ pub async fn create_main_worker( termination_token, ), inspector, - None, ) .await .map_err(|err| anyhow!("main worker boot error: {}", err))?; @@ -775,7 +788,7 @@ pub async fn create_main_worker( } pub async fn create_events_worker( - flags: &ServerFlags, + flags: Arc, events_worker_path: PathBuf, import_map_path: Option, maybe_entrypoint: Option, @@ -784,8 +797,12 @@ pub async fn create_events_worker( ) -> Result<(WorkerCtx, mpsc::UnboundedSender), Error> { let (events_tx, events_rx) = mpsc::unbounded_channel::(); + let no_module_cache = flags.no_module_cache; + let event_worker_exit_deadline_sec = flags.event_worker_exit_deadline_sec; + let mut service_path = events_worker_path.clone(); let mut maybe_eszip = None; + if let Some(ext) = events_worker_path.extension() { if ext == "eszip" { service_path = events_worker_path.parent().unwrap().to_path_buf(); @@ -796,10 +813,11 @@ pub async fn create_events_worker( } let ctx = create_worker( + flags, ( WorkerContextInitOpts { service_path, - no_module_cache: flags.no_module_cache, + no_module_cache, import_map_path, env_vars: std::env::vars().collect(), timing: None, @@ -809,7 +827,7 @@ pub async fn create_events_worker( maybe_module_code: None, conf: WorkerRuntimeOpts::EventsWorker(EventWorkerRuntimeOpts { events_msg_rx: Some(events_rx), - event_worker_exit_deadline_sec: Some(flags.event_worker_exit_deadline_sec), + event_worker_exit_deadline_sec: Some(event_worker_exit_deadline_sec), }), static_patterns: vec![], @@ -820,7 +838,6 @@ pub async fn create_events_worker( termination_token, ), None, - None, ) .await .map_err(|err| anyhow!("events worker boot error: {}", err))?; @@ -829,13 +846,13 @@ pub async fn create_events_worker( } pub async fn create_user_worker_pool( + flags: Arc, policy: WorkerPoolPolicy, worker_event_sender: Option>, termination_token: Option, static_patterns: Vec, inspector: Option, jsx: Option, - request_idle_timeout: Option, ) -> Result<(SharedMetricSource, mpsc::UnboundedSender), Error> { let metric_src = SharedMetricSource::default(); let (user_worker_msgs_tx, mut user_worker_msgs_rx) = @@ -849,12 +866,12 @@ pub async fn create_user_worker_pool( let token = termination_token.as_ref(); let mut termination_requested = false; let mut worker_pool = WorkerPool::new( + flags, policy, metric_src_inner, worker_event_sender, user_worker_msgs_tx_clone, inspector, - request_idle_timeout, ); // Note: Keep this loop non-blocking. Spawn a task to run blocking calls. diff --git a/crates/base/src/rt_worker/worker_pool.rs b/crates/base/src/rt_worker/worker_pool.rs index 1160541f9..39c877e5d 100644 --- a/crates/base/src/rt_worker/worker_pool.rs +++ b/crates/base/src/rt_worker/worker_pool.rs @@ -206,13 +206,13 @@ impl ActiveWorkerRegistry { // retires current one adds new one) // send_request is called with UUID pub struct WorkerPool { + pub flags: Arc, pub policy: WorkerPoolPolicy, pub metric_src: SharedMetricSource, pub user_workers: HashMap, pub active_workers: HashMap, pub worker_pool_msgs_tx: mpsc::UnboundedSender, pub maybe_inspector: Option, - pub maybe_request_idle_timeout: Option, // TODO: refactor this out of worker pool pub worker_event_sender: Option>, @@ -220,21 +220,21 @@ pub struct WorkerPool { impl WorkerPool { pub(crate) fn new( + flags: Arc, policy: WorkerPoolPolicy, metric_src: SharedMetricSource, worker_event_sender: Option>, worker_pool_msgs_tx: mpsc::UnboundedSender, inspector: Option, - request_idle_timeout: Option, ) -> Self { Self { + flags, policy, metric_src, worker_event_sender, user_workers: HashMap::new(), active_workers: HashMap::new(), maybe_inspector: inspector, - maybe_request_idle_timeout: request_idle_timeout, worker_pool_msgs_tx, } } @@ -253,8 +253,6 @@ impl WorkerPool { let is_oneshot_policy = self.policy.supervisor_policy.is_oneshot(); let inspector = self.maybe_inspector.clone(); - let request_idle_timeout = self.maybe_request_idle_timeout; - let force_create = worker_options .conf .as_user_worker() @@ -340,6 +338,7 @@ impl WorkerPool { } }; + let flags = self.flags.clone(); let worker_pool_msgs_tx = self.worker_pool_msgs_tx.clone(); let events_msg_tx = self.worker_event_sender.clone(); let supervisor_policy = self.policy.supervisor_policy; @@ -427,9 +426,9 @@ impl WorkerPool { worker_options.conf = WorkerRuntimeOpts::UserWorker(user_worker_rt_opts); match create_worker( + flags, (worker_options, supervisor_policy, termination_token.clone()), inspector, - request_idle_timeout, ) .await { diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index d420575a8..09c2fd893 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -252,6 +252,7 @@ pub struct ServerFlags { pub request_wait_timeout_ms: Option, pub request_idle_timeout_ms: Option, pub request_read_timeout_ms: Option, + pub request_buffer_size: Option, } #[derive(Debug)] @@ -330,7 +331,7 @@ pub struct Server { main_worker_req_tx: mpsc::UnboundedSender, callback_tx: Option>, termination_tokens: TerminationTokens, - flags: ServerFlags, + flags: Arc, metric_src: SharedMetricSource, } @@ -356,6 +357,7 @@ impl Server { ) -> Result { let mut worker_events_tx = None; + let flags = Arc::new(flags); let maybe_events_entrypoint = entrypoints.events; let maybe_main_entrypoint = entrypoints.main; let termination_tokens = @@ -367,7 +369,7 @@ impl Server { let events_path_buf = events_path.to_path_buf(); let (ctx, sender) = create_events_worker( - &flags, + flags.clone(), events_path_buf, import_map_path.clone(), maybe_events_entrypoint, @@ -391,19 +393,20 @@ impl Server { // Create a user worker pool let (shared_metric_src, worker_pool_tx) = create_user_worker_pool( + flags.clone(), maybe_user_worker_policy.unwrap_or_default(), worker_events_tx, Some(termination_tokens.pool.clone()), static_patterns, inspector.clone(), jsx_config.clone(), - flags.request_idle_timeout_ms, ) .await?; // create main worker let main_worker_path = Path::new(&main_service_path).to_path_buf(); let main_worker_req_tx = create_main_worker( + flags.clone(), main_worker_path, import_map_path.clone(), flags.no_module_cache, @@ -492,7 +495,7 @@ impl Server { mut graceful_exit_deadline_sec, mut graceful_exit_keepalive_deadline_ms, .. - } = self.flags; + } = *self.flags; let request_read_timeout_dur = request_read_timeout_ms.map(Duration::from_millis); let mut terminate_signal_fut = get_termination_signal(); diff --git a/crates/base/src/utils/mod.rs b/crates/base/src/utils/mod.rs index f8c55a6f6..1df45b31c 100644 --- a/crates/base/src/utils/mod.rs +++ b/crates/base/src/utils/mod.rs @@ -1,3 +1,4 @@ pub mod json; pub mod path; +pub mod test_utils; pub mod units; diff --git a/crates/base/src/utils/integration_test_helper.rs b/crates/base/src/utils/test_utils.rs similarity index 86% rename from crates/base/src/utils/integration_test_helper.rs rename to crates/base/src/utils/test_utils.rs index bf44a7dac..b3e5bae00 100644 --- a/crates/base/src/utils/integration_test_helper.rs +++ b/crates/base/src/utils/test_utils.rs @@ -1,7 +1,6 @@ #![allow(dead_code)] use std::{ - collections::HashMap, marker::PhantomPinned, path::PathBuf, sync::Arc, @@ -9,14 +8,16 @@ use std::{ time::Duration, }; -use anyhow::{bail, Context, Error}; -use base::{ +use crate::{ rt_worker::{ worker_ctx::{create_user_worker_pool, create_worker, CreateWorkerArgs, TerminationToken}, worker_pool::{SupervisorPolicy, WorkerPoolPolicy}, }, server::ServerFlags, }; + +use anyhow::{bail, Context, Error}; +use event_worker::events::WorkerEventWithMetadata; use futures_util::{future::BoxFuture, Future, FutureExt}; use http_v02::{Request, Response}; use hyper_v014::Body; @@ -131,8 +132,9 @@ impl WorkerContextInitOptsForTesting for WorkerContextInitOpts { pub struct TestBedBuilder { main_service_path: PathBuf, worker_pool_policy: Option, + worker_event_sender: Option>, main_worker_init_opts: Option, - request_idle_timeout: Option, + flags: ServerFlags, } impl TestBedBuilder { @@ -143,22 +145,31 @@ impl TestBedBuilder { Self { main_service_path: main_service_path.into(), worker_pool_policy: None, + worker_event_sender: None, main_worker_init_opts: None, - request_idle_timeout: None, + flags: ServerFlags::default(), } } - pub fn with_worker_pool_policy(mut self, worker_pool_policy: WorkerPoolPolicy) -> Self { - self.worker_pool_policy = Some(worker_pool_policy); + pub fn with_worker_pool_policy(mut self, value: WorkerPoolPolicy) -> Self { + self.worker_pool_policy = Some(value); self } - pub fn with_oneshot_policy(mut self, request_wait_timeout_ms: u64) -> Self { + pub fn with_worker_event_sender( + mut self, + value: Option>, + ) -> Self { + self.worker_event_sender = value; + self + } + + pub fn with_oneshot_policy(mut self, value: Option) -> Self { self.worker_pool_policy = Some(WorkerPoolPolicy::new( SupervisorPolicy::oneshot(), 1, ServerFlags { - request_wait_timeout_ms: Some(request_wait_timeout_ms), + request_wait_timeout_ms: value, ..Default::default() }, )); @@ -166,12 +177,12 @@ impl TestBedBuilder { self } - pub fn with_per_worker_policy(mut self, request_wait_timeout_ms: u64) -> Self { + pub fn with_per_worker_policy(mut self, value: Option) -> Self { self.worker_pool_policy = Some(WorkerPoolPolicy::new( SupervisorPolicy::PerWorker, 1, ServerFlags { - request_wait_timeout_ms: Some(request_wait_timeout_ms), + request_wait_timeout_ms: value, ..Default::default() }, )); @@ -179,12 +190,12 @@ impl TestBedBuilder { self } - pub fn with_per_request_policy(mut self, request_wait_timeout_ms: u64) -> Self { + pub fn with_per_request_policy(mut self, value: Option) -> Self { self.worker_pool_policy = Some(WorkerPoolPolicy::new( SupervisorPolicy::PerRequest { oneshot: false }, 1, ServerFlags { - request_wait_timeout_ms: Some(request_wait_timeout_ms), + request_wait_timeout_ms: value, ..Default::default() }, )); @@ -192,16 +203,13 @@ impl TestBedBuilder { self } - pub fn with_main_worker_init_opts( - mut self, - main_worker_init_opts: WorkerContextInitOpts, - ) -> Self { - self.main_worker_init_opts = Some(main_worker_init_opts); + pub fn with_main_worker_init_opts(mut self, value: WorkerContextInitOpts) -> Self { + self.main_worker_init_opts = Some(value); self } - pub fn with_request_idle_timeout(mut self, request_idle_timeout: u64) -> Self { - self.request_idle_timeout = Some(request_idle_timeout); + pub fn with_server_flags(mut self, value: ServerFlags) -> Self { + self.flags = value; self } @@ -210,14 +218,14 @@ impl TestBedBuilder { let token = TerminationToken::new(); ( create_user_worker_pool( + Arc::new(self.flags), self.worker_pool_policy .unwrap_or_else(test_user_worker_pool_policy), - None, + self.worker_event_sender, Some(token.clone()), vec![], None, None, - self.request_idle_timeout, ) .await .unwrap(), @@ -229,7 +237,7 @@ impl TestBedBuilder { service_path: self.main_service_path, no_module_cache: false, import_map_path: None, - env_vars: HashMap::new(), + env_vars: std::env::vars().collect(), timing: None, maybe_eszip: None, maybe_entrypoint: None, @@ -249,9 +257,9 @@ impl TestBedBuilder { let main_termination_token = TerminationToken::new(); let ctx = create_worker( + Arc::new(self.flags), (main_worker_init_opts, main_termination_token.clone()), None, - None, ) .await .unwrap(); @@ -276,12 +284,12 @@ impl TestBed { request_factory_fn: F, ) -> Result, impl FnOnce(Response)>, Error> where - F: FnOnce() -> Result, Error>, + F: FnOnce(http_v02::request::Builder) -> Result, Error>, { let conn_token = CancellationToken::new(); let (res_tx, res_rx) = oneshot::channel(); - let req: Request = request_factory_fn()?; + let req: Request = request_factory_fn(http_v02::request::Builder::new())?; let _ = self.main_worker_msg_tx.send(WorkerRequestMsg { req, @@ -330,10 +338,10 @@ pub async fn create_test_user_worker>( Ok({ let ctx = create_worker( + Arc::default(), opts.with_policy(policy) .with_termination_token(termination_token.clone()), None, - None, ) .await?; diff --git a/crates/base/test_cases/user-worker-san-check/.blocklisted b/crates/base/test_cases/user-worker-san-check/.blocklisted index 2f8c4d7c5..86d75ff57 100644 --- a/crates/base/test_cases/user-worker-san-check/.blocklisted +++ b/crates/base/test_cases/user-worker-san-check/.blocklisted @@ -23,14 +23,11 @@ link linkSync lstat lstatSync -makeTempDir makeTempDirSync makeTempFile makeTempFileSync -mkdir mkdirSync openSync -readDir readDirSync readLink readLinkSync diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index 5bbe56d43..58d209b68 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -1,9 +1,6 @@ #![allow(clippy::arc_with_non_send_sync)] #![allow(clippy::async_yields_async)] -#[path = "../src/utils/integration_test_helper.rs"] -mod integration_test_helper; - use deno_config::JsxImportSourceConfig; use http_v02 as http; use hyper_v014 as hyper; @@ -23,6 +20,10 @@ use std::{ use anyhow::Context; use async_tungstenite::WebSocketStream; +use base::utils::test_utils::{ + self, create_test_user_worker, test_user_runtime_opts, test_user_worker_pool_policy, + TestBedBuilder, +}; use base::{ integration_test, integration_test_listen_fut, integration_test_with_server_flag, rt_worker::worker_ctx::{create_user_worker_pool, create_worker, TerminationToken}, @@ -62,10 +63,6 @@ use tokio_util::{compat::TokioAsyncReadCompatExt, sync::CancellationToken}; use tungstenite::Message; use urlencoding::encode; -use crate::integration_test_helper::{ - create_test_user_worker, test_user_runtime_opts, test_user_worker_pool_policy, TestBedBuilder, -}; - const MB: usize = 1024 * 1024; const NON_SECURE_PORT: u16 = 8498; const SECURE_PORT: u16 = 4433; @@ -186,13 +183,13 @@ async fn test_not_trigger_pku_sigsegv_due_to_jit_compilation_non_cli() { // create a user worker pool let (_, worker_pool_tx) = create_user_worker_pool( - integration_test_helper::test_user_worker_pool_policy(), + Arc::default(), + test_utils::test_user_worker_pool_policy(), None, Some(pool_termination_token.clone()), vec![], None, None, - None, ) .await .unwrap(); @@ -219,7 +216,7 @@ async fn test_not_trigger_pku_sigsegv_due_to_jit_compilation_non_cli() { maybe_tmp_fs_config: None, }; - let ctx = create_worker((opts, main_termination_token.clone()), None, None) + let ctx = create_worker(Arc::default(), (opts, main_termination_token.clone()), None) .await .unwrap(); @@ -347,13 +344,13 @@ async fn test_main_worker_boot_error() { // create a user worker pool let (_, worker_pool_tx) = create_user_worker_pool( + Arc::default(), test_user_worker_pool_policy(), None, Some(pool_termination_token.clone()), vec![], None, None, - None, ) .await .unwrap(); @@ -380,7 +377,7 @@ async fn test_main_worker_boot_error() { maybe_tmp_fs_config: None, }; - let result = create_worker((opts, main_termination_token.clone()), None, None).await; + let result = create_worker(Arc::default(), (opts, main_termination_token.clone()), None).await; assert!(result.is_err()); assert!(result @@ -475,13 +472,13 @@ async fn test_main_worker_user_worker_mod_evaluate_exception() { // create a user worker pool let (_, worker_pool_tx) = create_user_worker_pool( + Arc::default(), test_user_worker_pool_policy(), None, Some(pool_termination_token.clone()), vec![], None, None, - None, ) .await .unwrap(); @@ -508,7 +505,7 @@ async fn test_main_worker_user_worker_mod_evaluate_exception() { maybe_tmp_fs_config: None, }; - let ctx = create_worker((opts, main_termination_token.clone()), None, None) + let ctx = create_worker(Arc::default(), (opts, main_termination_token.clone()), None) .await .unwrap(); @@ -961,13 +958,12 @@ async fn req_failure_case_timeout() { let tb = TestBedBuilder::new("./test_cases/main") // NOTE: It should be small enough that the worker pool rejects the // request. - .with_oneshot_policy(10) + .with_oneshot_policy(Some(10)) .build() .await; - let req_body_fn = || { - Request::builder() - .uri("/slow_resp") + let req_body_fn = |b: http::request::Builder| { + b.uri("/slow_resp") .method("GET") .body(Body::empty()) .context("can't make request") @@ -999,14 +995,13 @@ async fn req_failure_case_timeout() { #[serial] async fn req_failure_case_cpu_time_exhausted() { let tb = TestBedBuilder::new("./test_cases/main_small_cpu_time") - .with_oneshot_policy(100000) + .with_oneshot_policy(None) .build() .await; let mut res = tb - .request(|| { - Request::builder() - .uri("/slow_resp") + .request(|b| { + b.uri("/slow_resp") .method("GET") .body(Body::empty()) .context("can't make request") @@ -1028,14 +1023,13 @@ async fn req_failure_case_cpu_time_exhausted() { #[serial] async fn req_failure_case_cpu_time_exhausted_2() { let tb = TestBedBuilder::new("./test_cases/main_small_cpu_time") - .with_oneshot_policy(100000) + .with_oneshot_policy(None) .build() .await; let mut res = tb - .request(|| { - Request::builder() - .uri("/cpu-sync") + .request(|b| { + b.uri("/cpu-sync") .method("GET") .body(Body::empty()) .context("can't make request") @@ -1057,14 +1051,13 @@ async fn req_failure_case_cpu_time_exhausted_2() { #[serial] async fn req_failure_case_wall_clock_reached() { let tb = TestBedBuilder::new("./test_cases/main_small_wall_clock") - .with_oneshot_policy(100000) + .with_oneshot_policy(None) .build() .await; let mut res = tb - .request(|| { - Request::builder() - .uri("/slow_resp") + .request(|b| { + b.uri("/slow_resp") .method("GET") .body(Body::empty()) .context("can't make request") @@ -1087,14 +1080,13 @@ async fn req_failure_case_wall_clock_reached() { #[serial] async fn req_failture_case_memory_limit_1() { let tb = TestBedBuilder::new("./test_cases/main") - .with_oneshot_policy(100000) + .with_oneshot_policy(None) .build() .await; let mut res = tb - .request(|| { - Request::builder() - .uri("/array-alloc-sync") + .request(|b| { + b.uri("/array-alloc-sync") .method("GET") .body(Body::empty()) .context("can't make request") @@ -1116,14 +1108,13 @@ async fn req_failture_case_memory_limit_1() { #[serial] async fn req_failture_case_memory_limit_2() { let tb = TestBedBuilder::new("./test_cases/main") - .with_oneshot_policy(100000) + .with_oneshot_policy(None) .build() .await; let mut res = tb - .request(|| { - Request::builder() - .uri("/array-alloc") + .request(|b| { + b.uri("/array-alloc") .method("GET") .body(Body::empty()) .context("can't make request") @@ -1148,14 +1139,13 @@ async fn req_failure_case_wall_clock_reached_less_than_100ms() { // dozens of times on the local machine, it will fail with a timeout. let tb = TestBedBuilder::new("./test_cases/main_small_wall_clock_less_than_100ms") - .with_oneshot_policy(100000) + .with_oneshot_policy(None) .build() .await; let mut res = tb - .request(|| { - Request::builder() - .uri("/slow_resp") + .request(|b| { + b.uri("/slow_resp") .method("GET") .body(Body::empty()) .context("can't make request") @@ -1324,7 +1314,11 @@ async fn test_oak_file_upload( let original = RequestBuilder::from_parts(client, req); let request_builder = Some(original); - integration_test!( + integration_test_with_server_flag!( + ServerFlags { + request_buffer_size: Some(1024), + ..Default::default() + }, main_service, NON_SECURE_PORT, "", @@ -1589,14 +1583,13 @@ async fn test_decorator_parse_typescript_experimental_with_metadata() { #[serial] async fn send_partial_payload_into_closed_pipe_should_not_be_affected_worker_stability() { let tb = TestBedBuilder::new("./test_cases/main") - .with_oneshot_policy(100000) + .with_oneshot_policy(None) .build() .await; let mut resp1 = tb - .request(|| { - Request::builder() - .uri("/chunked-char-1000ms") + .request(|b| { + b.uri("/chunked-char-1000ms") .method("GET") .body(Body::empty()) .context("can't make request") @@ -1619,9 +1612,8 @@ async fn send_partial_payload_into_closed_pipe_should_not_be_affected_worker_sta // of `Deno.serve` failing to properly handle an exception from a previous // request. let resp2 = tb - .request(|| { - Request::builder() - .uri("/empty-response") + .request(|b| { + b.uri("/empty-response") .method("GET") .body(Body::empty()) .context("can't make request") @@ -2831,6 +2823,7 @@ async fn test_tmp_fs_should_not_be_available_in_import_stmt() { } // -- sb_ai: ORT @huggingface/transformers +#[cfg(target_arch = "x86_64")] #[tokio::test] #[serial] async fn test_ort_nlp_feature_extraction() { @@ -2850,6 +2843,7 @@ async fn test_ort_nlp_feature_extraction() { ); } +#[cfg(target_arch = "x86_64")] #[tokio::test] #[serial] async fn test_ort_nlp_fill_mask() { @@ -2869,6 +2863,7 @@ async fn test_ort_nlp_fill_mask() { ); } +#[cfg(target_arch = "x86_64")] #[tokio::test] #[serial] async fn test_ort_nlp_question_answering() { @@ -2888,6 +2883,7 @@ async fn test_ort_nlp_question_answering() { ); } +#[cfg(target_arch = "x86_64")] #[tokio::test] #[serial] async fn test_ort_nlp_summarization() { @@ -2907,6 +2903,7 @@ async fn test_ort_nlp_summarization() { ); } +#[cfg(target_arch = "x86_64")] #[tokio::test] #[serial] async fn test_ort_nlp_text_classification() { @@ -2926,6 +2923,7 @@ async fn test_ort_nlp_text_classification() { ); } +#[cfg(target_arch = "x86_64")] #[tokio::test] #[serial] async fn test_ort_nlp_text_generation() { @@ -2945,6 +2943,7 @@ async fn test_ort_nlp_text_generation() { ); } +#[cfg(target_arch = "x86_64")] #[tokio::test] #[serial] async fn test_ort_nlp_text2text_generation() { @@ -2964,6 +2963,7 @@ async fn test_ort_nlp_text2text_generation() { ); } +#[cfg(target_arch = "x86_64")] #[tokio::test] #[serial] async fn test_ort_nlp_token_classification() { @@ -2983,6 +2983,7 @@ async fn test_ort_nlp_token_classification() { ); } +#[cfg(target_arch = "x86_64")] #[tokio::test] #[serial] async fn test_ort_nlp_translation() { @@ -3002,6 +3003,7 @@ async fn test_ort_nlp_translation() { ); } +#[cfg(target_arch = "x86_64")] #[tokio::test] #[serial] async fn test_ort_nlp_zero_shot_classification() { @@ -3021,6 +3023,7 @@ async fn test_ort_nlp_zero_shot_classification() { ); } +#[cfg(target_arch = "x86_64")] #[tokio::test] #[serial] async fn test_ort_vision_image_feature_extraction() { @@ -3040,6 +3043,7 @@ async fn test_ort_vision_image_feature_extraction() { ); } +#[cfg(target_arch = "x86_64")] #[tokio::test] #[serial] async fn test_ort_vision_image_classification() { @@ -3059,6 +3063,7 @@ async fn test_ort_vision_image_classification() { ); } +#[cfg(target_arch = "x86_64")] #[tokio::test] #[serial] async fn test_ort_vision_zero_shot_image_classification() { diff --git a/crates/cli/src/flags.rs b/crates/cli/src/flags.rs index 2673a7c64..34a5d9699 100644 --- a/crates/cli/src/flags.rs +++ b/crates/cli/src/flags.rs @@ -226,6 +226,12 @@ fn get_start_command() -> Command { .default_value("true") .default_missing_value("true"), ) + .arg( + arg!(--"request-buffer-size" ) + .help("The buffer size of the stream that is used to forward a request to the worker") + .value_parser(value_parser!(u64)) + .default_value("16384"), + ) } fn get_bundle_command() -> Command { diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index eefb96c5c..fec893054 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -181,6 +181,11 @@ fn main() -> Result { }; let tcp_nodelay = sub_matches.get_one::("tcp-nodelay").copied().unwrap(); + let request_buffer_size = sub_matches + .get_one::("request-buffer-size") + .copied() + .unwrap(); + let flags = ServerFlags { no_module_cache, allow_main_inspector, @@ -191,6 +196,7 @@ fn main() -> Result { request_wait_timeout_ms: maybe_request_wait_timeout, request_idle_timeout_ms: maybe_request_idle_timeout, request_read_timeout_ms: maybe_request_read_timeout, + request_buffer_size: Some(request_buffer_size), }; let maybe_received_signum = start_server( diff --git a/crates/event_worker/events.rs b/crates/event_worker/events.rs index 270150bc6..adb0c0728 100644 --- a/crates/event_worker/events.rs +++ b/crates/event_worker/events.rs @@ -52,7 +52,7 @@ pub struct LogEvent { pub level: LogLevel, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub enum LogLevel { Debug, Info, diff --git a/crates/sb_core/js/bootstrap.js b/crates/sb_core/js/bootstrap.js index 1cab777bb..dc0148d14 100644 --- a/crates/sb_core/js/bootstrap.js +++ b/crates/sb_core/js/bootstrap.js @@ -471,6 +471,13 @@ globalThis.bootstrapSBEdge = (opts, extraCtx) => { }, }); + // Find declarative fetch handler + core.addMainModuleHandler(main => { + if (ObjectHasOwn(main, 'default')) { + registerDeclarativeServer(main.default); + } + }); + /// DISABLE SHARED MEMORY AND INSTALL MEM CHECK TIMING // NOTE: We should not allow user workers to use shared memory. This is @@ -534,6 +541,9 @@ globalThis.bootstrapSBEdge = (opts, extraCtx) => { 'writeTextFile': true, 'readFile': true, 'readTextFile': true, + 'mkdir': true, + 'makeTempDir': true, + 'readDir': true, 'kill': MOCK_FN, 'exit': MOCK_FN, @@ -561,13 +571,6 @@ globalThis.bootstrapSBEdge = (opts, extraCtx) => { Deno[name] = value; } } - - // find declarative fetch handler - core.addMainModuleHandler(main => { - if (ObjectHasOwn(main, 'default')) { - registerDeclarativeServer(main.default); - } - }); } if (isEventsWorker) { diff --git a/crates/sb_fs/Cargo.toml b/crates/sb_fs/Cargo.toml index abcd03ade..d9295ed2f 100644 --- a/crates/sb_fs/Cargo.toml +++ b/crates/sb_fs/Cargo.toml @@ -40,6 +40,10 @@ enum-as-inner.workspace = true tracing.workspace = true rkyv = { workspace = true, features = ["validation"] } +rustls = "0.21" +hyper-proxy = { version = "0.9", default-features = false, features = ["rustls"] } +hyper-rustls = "0.24" +headers = "0.3" normalize-path = "0.2" memmap2 = "0.9" aws-sdk-s3 = "1.2" @@ -49,4 +53,13 @@ aws-smithy-runtime = "1.7" aws-smithy-runtime-api = "1.7" [dev-dependencies] -rand.workspace = true \ No newline at end of file +base = { version = "0.1.0", path = "../base" } +event_worker = { version = "0.1.0", path = "../event_worker" } + +ctor.workspace = true +rand.workspace = true +serial_test.workspace = true + +dotenvy = "0.15" +aws-smithy-runtime = { version = "1.7", features = ["test-util"] } +aws-smithy-runtime-api = { version = "1.7", features = ["test-util"] } \ No newline at end of file diff --git a/crates/sb_fs/build.rs b/crates/sb_fs/build.rs new file mode 100644 index 000000000..093cad15b --- /dev/null +++ b/crates/sb_fs/build.rs @@ -0,0 +1,8 @@ +use std::path::Path; + +fn main() { + dbg!(Path::new("./tests/.env").exists()); + if Path::new("./tests/.env").exists() { + println!("cargo:rustc-cfg=dotenv") + } +} diff --git a/crates/sb_fs/fs/prefix_fs.rs b/crates/sb_fs/fs/prefix_fs.rs index f7e331898..bf8c2a85c 100644 --- a/crates/sb_fs/fs/prefix_fs.rs +++ b/crates/sb_fs/fs/prefix_fs.rs @@ -11,6 +11,8 @@ use deno_io::fs::{File, FsError, FsResult, FsStat}; #[derive(Debug, Clone)] pub struct PrefixFs { prefix: PathBuf, + cwd: Option, + tmp_dir: Option, fs: Arc, base_fs: Option>, } @@ -25,10 +27,44 @@ where { Self { prefix: prefix.as_ref().to_path_buf(), + cwd: None, + tmp_dir: None, fs: Arc::new(fs), base_fs, } } + + pub fn cwd

(mut self, v: P) -> Self + where + P: AsRef, + { + self.cwd = Some(v.as_ref().to_path_buf()); + self + } + + pub fn tmp_dir

(mut self, v: P) -> Self + where + P: AsRef, + { + self.tmp_dir = Some(v.as_ref().to_path_buf()); + self + } + + pub fn set_cwd

(&mut self, v: P) -> &mut Self + where + P: AsRef, + { + self.cwd = Some(v.as_ref().to_path_buf()); + self + } + + pub fn set_tmp_dir

(&mut self, v: P) -> &mut Self + where + P: AsRef, + { + self.tmp_dir = Some(v.as_ref().to_path_buf()); + self + } } impl PrefixFs @@ -36,7 +72,7 @@ where FileSystem: deno_fs::FileSystem + 'static, { pub fn add_fs( - self, + mut self, prefix: P, fs: FileSystemInner, ) -> PrefixFs @@ -47,6 +83,8 @@ where PrefixFs { prefix: prefix.as_ref().to_path_buf(), fs: Arc::new(fs), + cwd: self.cwd.take(), + tmp_dir: self.tmp_dir.take(), base_fs: Some(Arc::new(self)), } } @@ -58,16 +96,18 @@ where FileSystem: deno_fs::FileSystem, { fn cwd(&self) -> FsResult { - self.base_fs - .as_ref() - .map(|it| it.cwd()) + self.cwd + .clone() + .map(Ok) + .or_else(|| self.base_fs.as_ref().map(|it| it.cwd())) .unwrap_or_else(|| Ok(PathBuf::new())) } fn tmp_dir(&self) -> FsResult { - self.base_fs - .as_ref() - .map(|it| it.tmp_dir()) + self.tmp_dir + .clone() + .map(Ok) + .or_else(|| self.base_fs.as_ref().map(|it| it.tmp_dir())) .unwrap_or_else(|| Err(FsError::NotSupported)) } diff --git a/crates/sb_fs/fs/s3_fs.rs b/crates/sb_fs/fs/s3_fs.rs index 1e120fc04..0cf1294ff 100644 --- a/crates/sb_fs/fs/s3_fs.rs +++ b/crates/sb_fs/fs/s3_fs.rs @@ -1,7 +1,12 @@ +// TODO: Remove the line below after updating the rust toolchain to v1.81. +#![allow(clippy::blocks_in_conditions)] + use core::slice; use std::{ borrow::Cow, + cell::RefCell, ffi::OsStr, + fmt::Debug, io::{self, Cursor}, mem, os::fd::AsRawFd, @@ -38,15 +43,21 @@ use futures::{ stream::FuturesUnordered, AsyncWriteExt, FutureExt, StreamExt, TryFutureExt, }; +use headers::Authorization; +use hyper_proxy::{Intercept, Proxy, ProxyConnector}; +use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; +use hyper_v014::{client::HttpConnector, Uri}; use memmap2::{MmapOptions, MmapRaw}; -use once_cell::sync::OnceCell; +use once_cell::sync::{Lazy, OnceCell}; use serde::{Deserialize, Serialize}; use tempfile::tempfile; use tokio::{ io::{AsyncBufRead, AsyncReadExt}, + sync::RwLock, task::JoinError, }; -use tracing::{debug, debug_span, error, info_span, instrument, warn, Instrument}; +use tracing::{debug, error, info_span, instrument, trace, trace_span, warn, Instrument}; +use url::Url; use super::TryNormalizePath; @@ -57,26 +68,29 @@ type BackgroundTask = Shared>>>; #[derive(Debug, Clone)] pub struct S3Fs { client: aws_sdk_s3::Client, - background_tasks: Arc>, + background_tasks: Arc>>, + + #[allow(unused)] + config: Arc, } impl S3Fs { - pub async fn try_flush_background_tasks(&self) -> bool { - let Ok(mut background_tasks) = Arc::try_unwrap(self.background_tasks.clone()) else { - return false; - }; + pub async fn flush_background_tasks(&self) { + if self.background_tasks.read().await.is_empty() { + return; + } + + let mut background_tasks = self.background_tasks.write().await; loop { if background_tasks.next().await.is_none() { break; } } - - true } } -#[derive(Deserialize, Serialize, Debug)] +#[derive(Deserialize, Serialize, Debug, Clone, Default)] #[serde(rename_all = "camelCase")] pub struct S3CredentialsObject { access_key_id: Cow<'static, str>, @@ -138,7 +152,7 @@ impl ReconnectMode { } } -#[derive(Deserialize, Serialize, Debug)] +#[derive(Deserialize, Serialize, Debug, Clone, Default)] #[serde(rename_all = "camelCase")] pub struct S3ClientRetryConfig { mode: RetryMode, @@ -170,7 +184,7 @@ impl S3ClientRetryConfig { } } -#[derive(Deserialize, Serialize, Debug)] +#[derive(Deserialize, Serialize, Debug, Clone, Default)] #[serde(rename_all = "camelCase")] pub struct S3FsConfig { app_name: Option>, @@ -207,7 +221,7 @@ impl S3FsConfig { move || std::future::ready(Ok(cred.clone())), ))) }) - .set_http_client(Some(Self::get_shared_http_client())) + .set_http_client(Some(Self::get_thread_local_shared_http_client())) .set_behavior_version(Some(BehaviorVersion::latest())) .set_retry_config( self.retry_config @@ -217,20 +231,73 @@ impl S3FsConfig { Ok(builder.build()) } - fn get_shared_http_client() -> SharedHttpClient { - static CLIENT: OnceCell = OnceCell::new(); + fn get_thread_local_shared_http_client() -> SharedHttpClient { + thread_local! { + static CLIENT: RefCell> = const { RefCell::new(OnceCell::new()) }; + } + + CLIENT.with(|it| { + it.borrow_mut() + .get_or_init(|| { + if let Some(proxy_connector) = resolve_proxy_connector() { + HyperClientBuilder::new().build(proxy_connector) + } else { + HyperClientBuilder::new().build_https() + } + }) + .clone() + }) + } +} - CLIENT - .get_or_init(|| HyperClientBuilder::new().build_https()) - .clone() +fn resolve_proxy_connector() -> Option>> { + let proxy_url: Url = std::env::var("HTTPS_PROXY").ok()?.parse().ok()?; + let proxy_uri: Uri = std::env::var("HTTPS_PROXY").ok()?.parse().ok()?; + let mut proxy = Proxy::new(Intercept::All, proxy_uri); + + if let Some(password) = proxy_url.password() { + proxy.set_authorization(Authorization::basic(proxy_url.username(), password)); + } + + static HTTPS_NATIVE_ROOTS: Lazy> = Lazy::new(default_tls); + + fn default_tls() -> HttpsConnector { + use hyper_rustls::ConfigBuilderExt; + HttpsConnectorBuilder::new() + .with_tls_config( + rustls::ClientConfig::builder() + .with_cipher_suites(&[ + // TLS1.3 suites + rustls::cipher_suite::TLS13_AES_256_GCM_SHA384, + rustls::cipher_suite::TLS13_AES_128_GCM_SHA256, + // TLS1.2 suites + rustls::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + rustls::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + rustls::cipher_suite::TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + rustls::cipher_suite::TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + rustls::cipher_suite::TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256, + ]) + .with_safe_default_kx_groups() + .with_safe_default_protocol_versions() + .unwrap() + .with_native_roots() + .with_no_client_auth(), + ) + .https_or_http() + .enable_http1() + .enable_http2() + .build() } + + Some(ProxyConnector::from_proxy(HTTPS_NATIVE_ROOTS.clone(), proxy).unwrap()) } impl S3Fs { pub fn new(config: S3FsConfig) -> Result { Ok(Self { - client: aws_sdk_s3::Client::from_conf(config.try_into_s3_config()?), + client: aws_sdk_s3::Client::from_conf(config.clone().try_into_s3_config()?), background_tasks: Arc::default(), + config: Arc::new(config), }) } } @@ -262,29 +329,79 @@ impl deno_fs::FileSystem for S3Fs { Err(FsError::NotSupported) } + #[instrument( + level = "trace", + skip(self, options, _access_check), + fields(?options, has_access_check = _access_check.is_some()), + err(Debug) + )] async fn open_async<'a>( &'a self, path: PathBuf, options: OpenOptions, _access_check: Option>, ) -> FsResult> { + self.flush_background_tasks().await; + let (bucket_name, key) = try_get_bucket_name_and_key(path.try_normalize()?)?; - Ok(Rc::new(S3Object { + if key.is_empty() { + return Err(FsError::Io(io::Error::from(io::ErrorKind::InvalidInput))); + } + + let resp = self + .client + .head_object() + .bucket(&bucket_name) + .key(&key) + .send() + .await; + + let mut not_found = false; + + if let Some(err) = resp.err() { + not_found = err + .as_service_error() + .map(|it| it.is_not_found()) + .unwrap_or_default(); + + if not_found { + if !(options.create || options.create_new) { + return Err(FsError::Io(io::Error::from(io::ErrorKind::NotFound))); + } + } else { + return Err(FsError::Io(io::Error::other(err))); + } + } + + let file = Rc::new(S3Object { bucket_name, key, fs: self.clone(), - open_options: options, op_slot: AsyncRefCell::default(), - })) + }); + + if not_found || options.truncate { + file.clone().write(BufView::empty()).await?; + } else if options.create_new { + return Err(FsError::Io(io::Error::from(io::ErrorKind::AlreadyExists))); + } + + Ok(file) } fn mkdir_sync(&self, _path: &Path, _recursive: bool, _mode: u32) -> FsResult<()> { Err(FsError::NotSupported) } + #[instrument(level = "trace", skip(self, _mode), fields(mode = _mode) ret, err(Debug))] async fn mkdir_async(&self, path: PathBuf, recursive: bool, _mode: u32) -> FsResult<()> { let (bucket_name, key) = try_get_bucket_name_and_key(path.try_normalize()?)?; + + if key.is_empty() { + return Err(FsError::Io(io::Error::from(io::ErrorKind::InvalidInput))); + } + let keys = if recursive { PathBuf::from(key) .iter() @@ -302,14 +419,47 @@ impl deno_fs::FileSystem for S3Fs { }) .collect::>() } else { + 'scope: { + if let Some(parent) = PathBuf::from(&key).parent() { + if parent == Path::new("") { + break 'scope; + } + + let resp = self + .client + .head_object() + .bucket(&bucket_name) + .key(parent.to_string_lossy()) + .send() + .await; + + if let Some(err) = resp.err() { + if err + .as_service_error() + .map(|it| it.is_not_found()) + .unwrap_or_default() + { + return Err(FsError::Io(io::Error::other(format!( + "No such file or directory: {}", + parent.to_string_lossy() + )))); + } + + return Err(FsError::Io(io::Error::other(err))); + } + } + } + vec![key] }; let mut futs = FuturesUnordered::new(); let mut errors = vec![]; - for folder_key in keys { - debug_assert!(folder_key.ends_with('/')); + for mut folder_key in keys { + if !folder_key.ends_with('/') { + folder_key.push('/'); + } let client = self.client.clone(); let bucket_name = bucket_name.clone(); @@ -387,19 +537,27 @@ impl deno_fs::FileSystem for S3Fs { Err(FsError::NotSupported) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn remove_async(&self, path: PathBuf, recursive: bool) -> FsResult<()> { - let had_slash = path.ends_with("/"); + self.flush_background_tasks().await; + + let had_slash = path.to_string_lossy().ends_with('/'); let (bucket_name, key) = try_get_bucket_name_and_key(path.try_normalize()?)?; if recursive { - let builder = self - .client - .list_objects_v2() - .bucket(&bucket_name) - .prefix(format!("{}/", key)); + let builder = + self.client + .list_objects_v2() + .bucket(&bucket_name) + .prefix(if key.is_empty() { + Cow::Borrowed("") + } else { + Cow::Owned(format!("{}/", key)) + }); let mut errors = vec![]; let mut stream = builder.into_paginator().send(); + let mut ids = vec![]; while let Some(resp) = stream.next().await { let v = match resp { @@ -413,36 +571,39 @@ impl deno_fs::FileSystem for S3Fs { _ => {} } - let delete = Delete::builder() - .set_quiet(Some(true)) - .set_objects(Some( - v.contents() - .iter() - .filter_map(|it| { - it.key() - .and_then(|it| ObjectIdentifier::builder().key(it).build().ok()) - }) - .collect::>(), - )) - .build() - .map_err(io::Error::other)?; + ids.extend(v.contents().iter().filter_map(|it| { + it.key() + .and_then(|it| ObjectIdentifier::builder().key(it).build().ok()) + })); + } - let resp = self - .client - .delete_objects() - .bucket(&bucket_name) - .delete(delete) - .send() - .await; + if ids.is_empty() { + return Ok(()); + } - let v = match resp { - Ok(v) => v, - Err(err) => return Err(io::Error::other(err).into()), - }; + trace!(ids = ?ids.iter().map(|it| it.key()).collect::>()); - if !v.errors().is_empty() { - errors.extend_from_slice(v.errors()); - } + let delete = Delete::builder() + .set_quiet(Some(true)) + .set_objects(Some(ids)) + .build() + .map_err(io::Error::other)?; + + let resp = self + .client + .delete_objects() + .bucket(&bucket_name) + .delete(delete) + .send() + .await; + + let v = match resp { + Ok(v) => v, + Err(err) => return Err(io::Error::other(err).into()), + }; + + if !v.errors().is_empty() { + errors.extend_from_slice(v.errors()); } return to_combined_message(errors.into_iter().map(|it| { @@ -455,11 +616,35 @@ impl deno_fs::FileSystem for S3Fs { })); } - let _resp = self + if key.is_empty() { + return Err(FsError::Io(io::Error::from(io::ErrorKind::InvalidInput))); + } + + let key = if had_slash { format!("{}/", key) } else { key }; + let resp = self .client + .head_object() + .bucket(&bucket_name) + .key(&key) + .send() + .await; + + if let Some(err) = resp.err() { + if err + .as_service_error() + .map(|it| it.is_not_found()) + .unwrap_or_default() + { + return Err(FsError::Io(io::Error::from(io::ErrorKind::NotFound))); + } + + return Err(FsError::Io(io::Error::other(err))); + } + + self.client .delete_object() .bucket(bucket_name) - .key(if had_slash { format!("{}/", key) } else { key }) + .key(key) .send() .await .map_err(io::Error::other)?; @@ -489,8 +674,53 @@ impl deno_fs::FileSystem for S3Fs { Err(FsError::NotSupported) } + #[instrument(level = "trace", skip(self), err(Debug))] async fn stat_async(&self, path: PathBuf) -> FsResult { - self.open_async(path.try_normalize()?, OpenOptions::read(), None) + self.flush_background_tasks().await; + + let path = path.try_normalize()?; + let had_slash = path.to_string_lossy().ends_with('/'); + let (bucket_name, key) = try_get_bucket_name_and_key(path.clone())?; + let key_count = if key.is_empty() { + Some(1) + } else { + self.client + .list_objects_v2() + .max_keys(1) + .bucket(bucket_name) + .prefix(if had_slash { key } else { format!("{}/", key) }) + .send() + .await + .map_err(io::Error::other)? + .key_count() + }; + + if matches!(key_count, Some(v) if v > 0) { + return Ok(FsStat { + is_file: false, + is_directory: true, + is_symlink: false, + size: 0, + mtime: None, + atime: None, + birthtime: None, + dev: 0, + ino: 0, + mode: 0, + nlink: 0, + uid: 0, + gid: 0, + rdev: 0, + blksize: 0, + blocks: 0, + is_block_device: false, + is_char_device: false, + is_fifo: false, + is_socket: false, + }); + } + + self.open_async(path, OpenOptions::read(), None) .and_then(|it| it.stat_async()) .await } @@ -499,6 +729,7 @@ impl deno_fs::FileSystem for S3Fs { Err(FsError::NotSupported) } + #[instrument(level = "trace", skip(self), err(Debug))] async fn lstat_async(&self, path: PathBuf) -> FsResult { self.stat_async(path).await } @@ -515,8 +746,12 @@ impl deno_fs::FileSystem for S3Fs { Err(FsError::NotSupported) } + #[instrument(level = "trace", skip(self), err(Debug))] async fn read_dir_async(&self, path: PathBuf) -> FsResult> { + self.flush_background_tasks().await; + let (bucket_name, mut key) = try_get_bucket_name_and_key(path.try_normalize()?)?; + let is_root = key.is_empty(); debug_assert!(!key.ends_with('/')); key.push('/'); @@ -526,7 +761,7 @@ impl deno_fs::FileSystem for S3Fs { .list_objects_v2() .set_bucket(Some(bucket_name)) .set_delimiter(Some("/".into())) - .set_prefix(Some(key)); + .set_prefix((!is_root).then_some(key)); let mut entries = vec![]; let mut stream = builder.into_paginator().send(); @@ -586,13 +821,16 @@ impl deno_fs::FileSystem for S3Fs { } } - Ok(entries) + Ok(entries).inspect(|it| { + trace!(len = it.len()); + }) } fn rename_sync(&self, _oldpath: &Path, _newpath: &Path) -> FsResult<()> { Err(FsError::NotSupported) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn rename_async(&self, _oldpath: PathBuf, _newpath: PathBuf) -> FsResult<()> { Err(FsError::NotSupported) } @@ -692,18 +930,30 @@ enum S3ObjectOpSlot { struct S3ObjectReadState(Pin>, usize); -#[derive(Debug)] struct FileBackedMmapBuffer { cursor: AllowStdIo>, raw: MmapRaw, - _file: std::fs::File, + file: std::fs::File, +} + +impl std::fmt::Debug for FileBackedMmapBuffer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("FileBackedMmapBuffer") + .field(&self.file) + .finish() + } } impl FileBackedMmapBuffer { fn new() -> Result { - let file = tempfile().context("could not create a temp file")?; + let file = { + let f = tempfile().context("could not create a temp file")?; + + f.set_len(MIN_PART_SIZE as u64)?; + f + }; + let raw = MmapOptions::new() - .len(MIN_PART_SIZE) .map_raw(file.as_raw_fd()) .context("failed to create a file backed memory buffer")?; @@ -711,11 +961,7 @@ impl FileBackedMmapBuffer { slice::from_raw_parts_mut(raw.as_mut_ptr(), raw.len()) })); - Ok(Self { - cursor, - raw, - _file: file, - }) + Ok(Self { cursor, raw, file }) } } @@ -749,6 +995,7 @@ impl S3ObjectWriteState { }) } + #[instrument(level = "trace", skip(self), fields(file = ?self.buf.file), ret, err(Debug))] fn try_swap_buffer(&mut self) -> Result { Ok(mem::replace(&mut self.buf, FileBackedMmapBuffer::new()?)) } @@ -762,7 +1009,7 @@ enum S3WriteErrorSubject { impl std::fmt::Display for S3WriteErrorSubject { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::MultiPartUploadTask((idx, inner)) => write!(f, "{idx}: {inner}"), + Self::MultiPartUploadTask((idx, inner)) => write!(f, "{idx}: {inner:?}"), Self::Join(inner) => write!(f, "{inner:?}: {inner}"), } } @@ -779,7 +1026,7 @@ type BoxedUploadPartTask = BoxFuture< >, >; -#[derive(Debug, Default)] +#[derive(Debug)] struct S3MultiPartUploadMethod { recent_part_idx: i32, upload_id: String, @@ -787,7 +1034,19 @@ struct S3MultiPartUploadMethod { tasks: FuturesUnordered, } +impl Default for S3MultiPartUploadMethod { + fn default() -> Self { + Self { + recent_part_idx: 1, + upload_id: String::default(), + parts: Vec::default(), + tasks: FuturesUnordered::default(), + } + } +} + impl S3MultiPartUploadMethod { + #[instrument(level = "trace", skip(self), fields(upload_id = self.upload_id) ret, err(Debug))] async fn sync(&mut self) -> FsResult<()> { let mut errors = vec![]; @@ -825,6 +1084,53 @@ impl S3MultiPartUploadMethod { to_combined_message(errors) } + + #[instrument(level = "trace", skip_all, fields(upload_id = self.upload_id, last))] + fn add_upload_part_task( + &mut self, + client: aws_sdk_s3::Client, + bucket_name: &str, + key: &str, + buf: &FileBackedMmapBuffer, + last: bool, + ) { + self.tasks.push( + tokio::task::spawn({ + let client = client; + let upload_id = self.upload_id.clone(); + let part_idx = self.recent_part_idx; + let bucket_name = bucket_name.to_string(); + let key = key.to_string(); + let data = unsafe { + slice::from_raw_parts( + buf.raw.as_ptr(), + buf.cursor.get_ref().position() as usize, + ) + }; + + self.recent_part_idx += 1; + + async move { + trace!(size = data.len()); + client + .upload_part() + .bucket(bucket_name) + .key(key) + .upload_id(upload_id) + .part_number(part_idx) + .body(ByteStream::new(data.into())) + .send() + .map(|it| { + trace!(success = it.is_ok()); + (part_idx, it) + }) + .await + } + .instrument(trace_span!("upload part", last, part = part_idx)) + }) + .boxed(), + ); + } } #[derive(Debug, EnumAsInner)] @@ -834,6 +1140,7 @@ enum S3WriteUploadMethod { } impl S3WriteUploadMethod { + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn sync(&mut self) -> FsResult<()> { if let Self::MultiPartUpload(multi_part) = self { multi_part.sync().await? @@ -842,9 +1149,16 @@ impl S3WriteUploadMethod { Ok(()) } + #[instrument( + level = "trace", + skip(self, state), + fields(client, bucket_name, key), + ret, + err(Debug) + )] async fn cleanup( &mut self, - fs: S3Fs, + client: aws_sdk_s3::Client, bucket_name: String, key: String, state: S3ObjectWriteState, @@ -853,41 +1167,12 @@ impl S3WriteUploadMethod { Self::MultiPartUpload(multi_part) => { if state.buf.cursor.get_ref().position() > 0 { state.buf.raw.flush_async()?; - multi_part.tasks.push( - tokio::task::spawn({ - let upload_id = multi_part.upload_id.clone(); - let client = fs.client.clone(); - let bucket_name = bucket_name.clone(); - let key = key.clone(); - let part_idx = multi_part.recent_part_idx; - let data = unsafe { - slice::from_raw_parts( - state.buf.raw.as_ptr(), - state.buf.cursor.get_ref().position() as usize, - ) - }; - - multi_part.recent_part_idx += 1; - - async move { - client - .upload_part() - .bucket(bucket_name) - .key(key) - .upload_id(upload_id) - .part_number(part_idx) - .body(ByteStream::new(data.into())) - .send() - .map(|it| (part_idx, it)) - .await - } - .instrument(debug_span!( - "upload part", - last = true, - part = part_idx - )) - }) - .boxed(), + multi_part.add_upload_part_task( + client.clone(), + &bucket_name, + &key, + &state.buf, + true, ); multi_part.sync().await?; @@ -898,16 +1183,46 @@ impl S3WriteUploadMethod { } debug_assert!(multi_part.tasks.is_empty()); - debug_assert!(multi_part.recent_part_idx > 0); + debug_assert!(multi_part.recent_part_idx > 1); - fs.client + if multi_part.parts.len() != (multi_part.recent_part_idx - 1) as usize { + error!( + parts.len = multi_part.parts.len(), + recent_part_idx = multi_part.recent_part_idx - 1, + "mismatch with recent part idx" + ); + + client + .abort_multipart_upload() + .bucket(bucket_name) + .key(key) + .upload_id(mem::take(&mut multi_part.upload_id)) + .send() + .await + .map_err(io::Error::other)?; + + return Err(FsError::Io(io::Error::other( + "upload was aborted because some parts were not successfully uploaded", + ))); + } + + client .complete_multipart_upload() .bucket(bucket_name) .key(key) .upload_id(mem::take(&mut multi_part.upload_id)) .multipart_upload( CompletedMultipartUpload::builder() - .set_parts(Some(mem::take(&mut multi_part.parts))) + .set_parts({ + let mut parts = mem::take(&mut multi_part.parts); + + parts.sort_by(|a, b| { + a.part_number().unwrap().cmp(&b.part_number().unwrap()) + }); + + trace!(idx_list = ?parts.iter().map(|it| it.part_number().unwrap()).collect::>()); + Some(parts) + }) .build(), ) .send() @@ -916,7 +1231,7 @@ impl S3WriteUploadMethod { } Self::PutObject => { - fs.client + client .put_object() .bucket(bucket_name) .key(key) @@ -939,10 +1254,6 @@ impl S3WriteUploadMethod { pub struct S3Object { fs: S3Fs, op_slot: AsyncRefCell>, - - #[allow(unused)] - open_options: OpenOptions, - bucket_name: String, key: String, } @@ -967,22 +1278,27 @@ impl Drop for S3Object { key = key.as_str() ); - self.fs.background_tasks.push( - tokio::task::spawn( - async move { - if let Err(err) = method.sync().await { - error!(reason = ?err, "sync operation failed"); - } - if let Err(err) = method.cleanup(fs, bucket_name, key, state).await { - error!(reason = ?err, "cleanup operation failed"); + drop(tokio::spawn(async move { + fs.background_tasks.write().await.push( + tokio::task::spawn( + async move { + if let Err(err) = method.sync().await { + error!(reason = ?err, "sync operation failed"); + } + if let Err(err) = method + .cleanup(fs.client.clone(), bucket_name, key, state) + .await + { + error!(reason = ?err, "cleanup operation failed"); + } } - } - .instrument(span), - ) - .map_err(Arc::new) - .boxed() - .shared(), - ); + .instrument(span), + ) + .map_err(Arc::new) + .boxed() + .shared(), + ); + })); } } } @@ -993,7 +1309,7 @@ impl deno_io::fs::File for S3Object { Err(FsError::NotSupported) } - #[instrument(level = "info", skip_all, fields(self.bucket_name, self.key))] + #[instrument(level = "trace", skip_all, fields(self.bucket_name, self.key), err(Debug))] async fn read_byob(self: Rc, mut buf: BufMutView) -> FsResult<(usize, BufMutView)> { let mut op_slot = RcRef::map(&self, |r| &r.op_slot).borrow_mut().await; let Some(op_slot_mut) = op_slot.as_mut() else { @@ -1004,8 +1320,11 @@ impl deno_io::fs::File for S3Object { .bucket(&self.bucket_name) .key(&self.key) .send() - .await - .map_err(io::Error::other)?; + .await; + + let Ok(resp) = resp else { + return Ok((0, buf)); + }; let mut body_buf = resp.body.into_async_read(); let nread = body_buf.read(&mut buf).await?; @@ -1015,6 +1334,7 @@ impl deno_io::fs::File for S3Object { nread, ))); + trace!(nread); return Ok((nread, buf)); }; @@ -1030,6 +1350,7 @@ impl deno_io::fs::File for S3Object { op_slot.take(); } + trace!(nread); return Ok((nread, buf)); } @@ -1069,6 +1390,8 @@ impl deno_io::fs::File for S3Object { state.1 += nread; state.0 = Box::pin(body_buf); + + trace!(nread); Ok((nread, buf)) } else { op_slot.take(); @@ -1080,7 +1403,7 @@ impl deno_io::fs::File for S3Object { Err(FsError::NotSupported) } - #[instrument(level = "info", skip_all, fields(self.bucket_name, self.key))] + #[instrument(level = "trace", skip_all, fields(self.bucket_name, self.key, len = buf.len()), err(Debug))] async fn write(self: Rc, buf: BufView) -> FsResult { let mut op_slot = RcRef::map(&self, |r| &r.op_slot).borrow_mut().await; let Some(op_slot_mut) = op_slot.as_mut() else { @@ -1102,6 +1425,7 @@ impl deno_io::fs::File for S3Object { *op_slot = Some(S3ObjectOpSlot::Write(state)); + trace!(nwritten); return Ok(written); }; @@ -1154,38 +1478,15 @@ impl deno_io::fs::File for S3Object { } }; - method.tasks.push( - tokio::task::spawn({ - let upload_id = method.upload_id.clone(); - let client = self.fs.client.clone(); - let bucket_name = self.bucket_name.clone(); - let key = self.key.clone(); - let part_idx = method.recent_part_idx; - - method.recent_part_idx += 1; - - async move { - client - .upload_part() - .bucket(bucket_name) - .key(key) - .upload_id(upload_id) - .part_number(part_idx) - .body(ByteStream::new( - unsafe { - slice::from_raw_parts(mmap_buf.raw.as_ptr(), mmap_buf.raw.len()) - } - .into(), - )) - .send() - .map(|it| (part_idx, it)) - .await - } - .instrument(debug_span!("upload part", part = part_idx)) - }) - .boxed(), + method.add_upload_part_task( + self.fs.client.clone(), + &self.bucket_name, + &self.key, + &mmap_buf, + false, ); + trace!(nwritten); return Ok(WriteOutcome::Partial { nwritten, view: buf, @@ -1193,6 +1494,7 @@ impl deno_io::fs::File for S3Object { } assert_eq!(nwritten, size); + trace!(nwritten); Ok(WriteOutcome::Full { nwritten }) } @@ -1200,7 +1502,7 @@ impl deno_io::fs::File for S3Object { Err(FsError::NotSupported) } - #[instrument(level = "info", skip_all, fields(self.bucket_name, self.key))] + #[instrument(level = "trace", skip_all, fields(self.bucket_name, self.key), err(Debug))] async fn write_all(self: Rc, mut buf: BufView) -> FsResult<()> { loop { match self.clone().write(buf).await? { @@ -1217,7 +1519,7 @@ impl deno_io::fs::File for S3Object { Err(FsError::NotSupported) } - #[instrument(level = "info", skip_all, fields(self.bucket_name, self.key))] + #[instrument(level = "trace", skip_all, fields(self.bucket_name, self.key), err(Debug))] async fn read_all_async(self: Rc) -> FsResult> { let resp = self .fs @@ -1232,6 +1534,9 @@ impl deno_io::fs::File for S3Object { Ok(v) => Ok(v.body.collect().await.map_err(io::Error::other)?.to_vec()), Err(err) => Err(io::Error::other(err).into()), } + .inspect(|it| { + trace!(nread = it.len()); + }) } fn chmod_sync(self: Rc, _pathmode: u32) -> FsResult<()> { @@ -1320,7 +1625,6 @@ impl deno_io::fs::File for S3Object { mtime: resp.last_modified.and_then(to_msec), atime: None, birthtime: None, - dev: 0, ino: 0, mode: 0, @@ -1434,3 +1738,71 @@ where Err(io::Error::other(messages.join("\n")).into()) } + +#[cfg(test)] +mod test { + use std::{io, path::PathBuf, sync::Arc}; + + use aws_config::BehaviorVersion; + use aws_sdk_s3::{self as s3}; + use aws_smithy_runtime::client::http::test_util::{ReplayEvent, StaticReplayClient}; + use deno_fs::{FileSystem, OpenOptions}; + use once_cell::sync::Lazy; + + static OPEN_CREATE: Lazy = Lazy::new(|| OpenOptions { + read: true, + write: true, + create: true, + truncate: true, + append: true, + create_new: true, + mode: None, + }); + + fn get_s3_credentials() -> s3::config::SharedCredentialsProvider { + s3::config::SharedCredentialsProvider::new(s3::config::Credentials::new( + "AKIMEOWMEOW", + "+meowmeowmeeeeeeow/", + None, + None, + "meowmeow", + )) + } + + fn get_s3_fs(events: I) -> (super::S3Fs, StaticReplayClient) + where + I: IntoIterator, + { + let client = StaticReplayClient::new(events.into_iter().collect()); + + ( + super::S3Fs { + background_tasks: Default::default(), + config: Arc::default(), + client: s3::Client::from_conf( + s3::Config::builder() + .behavior_version(BehaviorVersion::latest()) + .credentials_provider(get_s3_credentials()) + .region(s3::config::Region::new("us-east-1")) + .http_client(client.clone()) + .build(), + ), + }, + client, + ) + } + + #[tokio::test] + async fn should_not_be_open_when_object_key_is_empty() { + let (fs, _) = get_s3_fs([]); + + assert_eq!( + fs.open_async(PathBuf::from("meowmeow"), *OPEN_CREATE, None) + .await + .err() + .unwrap() + .kind(), + io::ErrorKind::InvalidInput + ); + } +} diff --git a/crates/sb_fs/fs/tmp_fs.rs b/crates/sb_fs/fs/tmp_fs.rs index 56f39ce1d..1a69e849b 100644 --- a/crates/sb_fs/fs/tmp_fs.rs +++ b/crates/sb_fs/fs/tmp_fs.rs @@ -1,3 +1,6 @@ +// TODO: Remove the line below after updating the rust toolchain to v1.81. +#![allow(clippy::blocks_in_conditions)] + use std::{ io::{self, SeekFrom}, path::{Path, PathBuf}, @@ -15,7 +18,7 @@ use deno_fs::{AccessCheckCb, FsDirEntry, FsFileType, RealFs}; use deno_io::fs::{File, FsError, FsResult, FsStat}; use serde::{Deserialize, Serialize}; use tempfile::TempDir; -use tracing::instrument; +use tracing::{instrument, trace}; use super::TryNormalizePath; @@ -94,6 +97,7 @@ impl Quota { } } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn check(&self, len: usize) -> FsResult<()> { if self.sync.do_imm.lower() { self.sync().await?; @@ -112,6 +116,7 @@ impl Quota { Ok(()) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn blocking_check(&self, len: usize) -> FsResult<()> { if self.sync.do_imm.lower() { self.blocking_sync()?; @@ -130,6 +135,7 @@ impl Quota { Ok(()) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn sync(&self) -> FsResult { match tokio::task::spawn_blocking(self.make_sync_fn()).await { Ok(v) => v, @@ -137,11 +143,11 @@ impl Quota { } } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn blocking_sync(&self) -> FsResult { self.make_sync_fn()() } - #[instrument(level = "info", skip(self))] fn make_sync_fn(&self) -> impl FnOnce() -> FsResult { fn get_dir_size(path: PathBuf) -> io::Result { use std::fs; @@ -178,6 +184,7 @@ impl Quota { } } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn try_add_delta(&self, amount: i64) -> FsResult<()> { match amount.cmp(&0) { std::cmp::Ordering::Greater => { @@ -195,6 +202,7 @@ impl Quota { Ok(()) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn blocking_try_add_delta(&self, amount: i64) -> FsResult<()> { match amount.cmp(&0) { std::cmp::Ordering::Greater => { @@ -237,6 +245,12 @@ impl deno_fs::FileSystem for TmpFs { Err(FsError::NotSupported) } + #[instrument( + level = "trace", + skip(self, options, access_check), + fields(?options, has_access_check = access_check.is_some()), + err(Debug) + )] fn open_sync( &self, path: &Path, @@ -245,14 +259,24 @@ impl deno_fs::FileSystem for TmpFs { ) -> FsResult> { Ok(Rc::new(TmpObject { fs: self.clone(), - file: RealFs.open_sync( - &self.root.path().join(path.try_normalize()?), - options, - access_check, - )?, + file: RealFs + .open_sync( + &self.root.path().join(path.try_normalize()?), + options, + access_check, + ) + .inspect(|_| { + trace!(ok = true); + })?, })) } + #[instrument( + level = "trace", + skip(self, options, access_check), + fields(?options, has_access_check = access_check.is_some()), + err(Debug) + )] async fn open_async<'a>( &'a self, path: PathBuf, @@ -267,10 +291,14 @@ impl deno_fs::FileSystem for TmpFs { options, access_check, ) - .await?, + .await + .inspect(|_| { + trace!(ok = true); + })?, })) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn mkdir_sync(&self, path: &Path, recursive: bool, mode: u32) -> FsResult<()> { RealFs.mkdir_sync( &self.root.path().join(path.try_normalize()?), @@ -279,6 +307,7 @@ impl deno_fs::FileSystem for TmpFs { ) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn mkdir_async(&self, path: PathBuf, recursive: bool, mode: u32) -> FsResult<()> { RealFs .mkdir_async( @@ -323,11 +352,13 @@ impl deno_fs::FileSystem for TmpFs { Err(FsError::NotSupported) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn remove_sync(&self, path: &Path, recursive: bool) -> FsResult<()> { self.quota.sync.do_opt.raise(); RealFs.remove_sync(&self.root.path().join(path.try_normalize()?), recursive) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn remove_async(&self, path: PathBuf, recursive: bool) -> FsResult<()> { self.quota.sync.do_opt.raise(); RealFs @@ -335,6 +366,7 @@ impl deno_fs::FileSystem for TmpFs { .await } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn copy_file_sync(&self, oldpath: &Path, newpath: &Path) -> FsResult<()> { self.quota .blocking_check(self.stat_sync(oldpath)?.size as usize)?; @@ -345,6 +377,7 @@ impl deno_fs::FileSystem for TmpFs { ) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn copy_file_async(&self, oldpath: PathBuf, newpath: PathBuf) -> FsResult<()> { self.quota .check(self.stat_async(oldpath.clone()).await?.size as usize) @@ -358,6 +391,7 @@ impl deno_fs::FileSystem for TmpFs { .await } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn cp_sync(&self, path: &Path, new_path: &Path) -> FsResult<()> { self.quota .blocking_check(self.stat_sync(path)?.size as usize)?; @@ -368,6 +402,7 @@ impl deno_fs::FileSystem for TmpFs { ) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn cp_async(&self, path: PathBuf, new_path: PathBuf) -> FsResult<()> { self.quota .check(self.stat_async(path.clone()).await?.size as usize) @@ -381,46 +416,62 @@ impl deno_fs::FileSystem for TmpFs { .await } + #[instrument(level = "trace", skip(self), err(Debug))] fn stat_sync(&self, path: &Path) -> FsResult { RealFs.stat_sync(&self.root.path().join(path.try_normalize()?)) } + #[instrument(level = "trace", skip(self), err(Debug))] async fn stat_async(&self, path: PathBuf) -> FsResult { RealFs .stat_async(self.root.path().join(path.try_normalize()?)) .await } + #[instrument(level = "trace", skip(self), err(Debug))] fn lstat_sync(&self, path: &Path) -> FsResult { RealFs.lstat_sync(&self.root.path().join(path.try_normalize()?)) } + #[instrument(level = "trace", skip(self), err(Debug))] async fn lstat_async(&self, path: PathBuf) -> FsResult { RealFs .lstat_async(self.root.path().join(path.try_normalize()?)) .await } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn realpath_sync(&self, path: &Path) -> FsResult { RealFs.realpath_sync(&self.root.path().join(path.try_normalize()?)) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn realpath_async(&self, path: PathBuf) -> FsResult { RealFs .realpath_async(self.root.path().join(path.try_normalize()?)) .await } + #[instrument(level = "trace", skip(self), err(Debug))] fn read_dir_sync(&self, path: &Path) -> FsResult> { - RealFs.read_dir_sync(&self.root.path().join(path.try_normalize()?)) + RealFs + .read_dir_sync(&self.root.path().join(path.try_normalize()?)) + .inspect(|it| { + trace!(len = it.len()); + }) } + #[instrument(level = "trace", skip(self), err(Debug))] async fn read_dir_async(&self, path: PathBuf) -> FsResult> { RealFs .read_dir_async(self.root.path().join(path.try_normalize()?)) .await + .inspect(|it| { + trace!(len = it.len()); + }) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn rename_sync(&self, oldpath: &Path, newpath: &Path) -> FsResult<()> { RealFs.rename_sync( &self.root.path().join(oldpath.try_normalize()?), @@ -428,6 +479,7 @@ impl deno_fs::FileSystem for TmpFs { ) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn rename_async(&self, oldpath: PathBuf, newpath: PathBuf) -> FsResult<()> { RealFs .rename_async( @@ -437,6 +489,7 @@ impl deno_fs::FileSystem for TmpFs { .await } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn link_sync(&self, oldpath: &Path, newpath: &Path) -> FsResult<()> { RealFs.link_sync( &self.root.path().join(oldpath.try_normalize()?), @@ -444,6 +497,7 @@ impl deno_fs::FileSystem for TmpFs { ) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn link_async(&self, oldpath: PathBuf, newpath: PathBuf) -> FsResult<()> { RealFs .link_async( @@ -453,6 +507,7 @@ impl deno_fs::FileSystem for TmpFs { .await } + #[instrument(level = "trace", skip(self, file_type), ret, err(Debug))] fn symlink_sync( &self, oldpath: &Path, @@ -466,6 +521,7 @@ impl deno_fs::FileSystem for TmpFs { ) } + #[instrument(level = "trace", skip(self, file_type), ret, err(Debug))] async fn symlink_async( &self, oldpath: PathBuf, @@ -481,16 +537,19 @@ impl deno_fs::FileSystem for TmpFs { .await } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn read_link_sync(&self, path: &Path) -> FsResult { RealFs.read_link_sync(&self.root.path().join(path.try_normalize()?)) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn read_link_async(&self, path: PathBuf) -> FsResult { RealFs .read_link_async(self.root.path().join(path.try_normalize()?)) .await } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn truncate_sync(&self, path: &Path, len: u64) -> FsResult<()> { let size = self.stat_sync(path)?.size; @@ -500,6 +559,7 @@ impl deno_fs::FileSystem for TmpFs { RealFs.truncate_sync(&self.root.path().join(path.try_normalize()?), len) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn truncate_async(&self, path: PathBuf, len: u64) -> FsResult<()> { let size = self.stat_async(path.clone()).await?.size; @@ -512,6 +572,7 @@ impl deno_fs::FileSystem for TmpFs { .await } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn utime_sync( &self, path: &Path, @@ -529,6 +590,7 @@ impl deno_fs::FileSystem for TmpFs { ) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn utime_async( &self, path: PathBuf, @@ -548,6 +610,7 @@ impl deno_fs::FileSystem for TmpFs { .await } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn lutime_sync( &self, path: &Path, @@ -565,6 +628,7 @@ impl deno_fs::FileSystem for TmpFs { ) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn lutime_async( &self, path: PathBuf, @@ -592,14 +656,19 @@ pub struct TmpObject { #[async_trait::async_trait(?Send)] impl deno_io::fs::File for TmpObject { + #[instrument(level = "trace", skip(self, buf), fields(len = buf.len()), ret, err(Debug))] fn read_sync(self: Rc, buf: &mut [u8]) -> FsResult { self.file.clone().read_sync(buf) } + #[instrument(level = "trace", skip_all, err(Debug))] async fn read_byob(self: Rc, buf: BufMutView) -> FsResult<(usize, BufMutView)> { - self.file.clone().read_byob(buf).await + self.file.clone().read_byob(buf).await.inspect(|it| { + trace!(nread = it.0); + }) } + #[instrument(level = "trace", skip(self, buf), fields(len = buf.len()), ret, err(Debug))] fn write_sync(self: Rc, buf: &[u8]) -> FsResult { self.fs.quota.blocking_check(buf.len())?; self.file.clone().write_sync(buf).inspect(|it| { @@ -607,15 +676,18 @@ impl deno_io::fs::File for TmpObject { }) } + #[instrument(level = "trace", skip(self, buf), fields(len = buf.len()), err(Debug))] async fn write(self: Rc, buf: BufView) -> FsResult { self.fs.quota.check(buf.len()).await?; self.file.clone().write(buf).await.inspect(|it| match it { WriteOutcome::Partial { nwritten, .. } | WriteOutcome::Full { nwritten } => { self.fs.quota.fetch_add(*nwritten, Ordering::Release); + trace!(nwritten = *nwritten); } }) } + #[instrument(level = "trace", skip(self, buf), fields(len = buf.len()), ret, err(Debug))] fn write_all_sync(self: Rc, buf: &[u8]) -> FsResult<()> { self.fs.quota.blocking_check(buf.len())?; self.file.clone().write_all_sync(buf).inspect(|_| { @@ -623,6 +695,7 @@ impl deno_io::fs::File for TmpObject { }) } + #[instrument(level = "trace", skip(self, buf), fields(len = buf.len()), ret, err(Debug))] async fn write_all(self: Rc, buf: BufView) -> FsResult<()> { let len = buf.len(); @@ -632,12 +705,18 @@ impl deno_io::fs::File for TmpObject { }) } + #[instrument(level = "trace", skip(self), err(Debug))] fn read_all_sync(self: Rc) -> FsResult> { - self.file.clone().read_all_sync() + self.file.clone().read_all_sync().inspect(|it| { + trace!(nread = it.len()); + }) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn read_all_async(self: Rc) -> FsResult> { - self.file.clone().read_all_async().await + self.file.clone().read_all_async().await.inspect(|it| { + trace!(nread = it.len()); + }) } fn chmod_sync(self: Rc, _pathmode: u32) -> FsResult<()> { @@ -648,58 +727,71 @@ impl deno_io::fs::File for TmpObject { Err(FsError::NotSupported) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn seek_sync(self: Rc, pos: SeekFrom) -> FsResult { self.file.clone().seek_sync(pos) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn seek_async(self: Rc, pos: SeekFrom) -> FsResult { self.file.clone().seek_async(pos).await } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn datasync_sync(self: Rc) -> FsResult<()> { self.fs.quota.sync.do_imm.raise(); self.file.clone().datasync_sync() } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn datasync_async(self: Rc) -> FsResult<()> { self.fs.quota.sync.do_imm.raise(); self.file.clone().datasync_async().await } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn sync_sync(self: Rc) -> FsResult<()> { self.fs.quota.sync.do_imm.raise(); self.file.clone().sync_sync() } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn sync_async(self: Rc) -> FsResult<()> { self.fs.quota.sync.do_imm.raise(); self.file.clone().sync_async().await } + #[instrument(level = "trace", skip(self), err(Debug))] fn stat_sync(self: Rc) -> FsResult { self.file.clone().stat_sync() } + #[instrument(level = "trace", skip(self), err(Debug))] async fn stat_async(self: Rc) -> FsResult { self.file.clone().stat_async().await } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn lock_sync(self: Rc, exclusive: bool) -> FsResult<()> { self.file.clone().lock_sync(exclusive) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn lock_async(self: Rc, exclusive: bool) -> FsResult<()> { self.file.clone().lock_async(exclusive).await } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn unlock_sync(self: Rc) -> FsResult<()> { self.file.clone().unlock_sync() } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn unlock_async(self: Rc) -> FsResult<()> { self.file.clone().unlock_async().await } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn truncate_sync(self: Rc, len: u64) -> FsResult<()> { let size = self.file.clone().stat_sync()?.size; @@ -710,6 +802,7 @@ impl deno_io::fs::File for TmpObject { self.file.clone().truncate_sync(len) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn truncate_async(self: Rc, len: u64) -> FsResult<()> { let size = self.file.clone().stat_sync()?.size; @@ -721,6 +814,7 @@ impl deno_io::fs::File for TmpObject { self.file.clone().truncate_async(len).await } + #[instrument(level = "trace", skip(self), ret, err(Debug))] fn utime_sync( self: Rc, atime_secs: i64, @@ -733,6 +827,7 @@ impl deno_io::fs::File for TmpObject { .utime_sync(atime_secs, atime_nanos, mtime_secs, mtime_nanos) } + #[instrument(level = "trace", skip(self), ret, err(Debug))] async fn utime_async( self: Rc, atime_secs: i64, diff --git a/crates/sb_fs/tests/.env.template b/crates/sb_fs/tests/.env.template new file mode 100644 index 000000000..9156187e3 --- /dev/null +++ b/crates/sb_fs/tests/.env.template @@ -0,0 +1,7 @@ +S3FS_TEST_SUPABASE_STORAGE= +S3FS_TEST_APP_NAME= +S3FS_TEST_BUCKET_NAME= +S3FS_TEST_ENDPOINT_URL= +S3FS_TEST_REGION= +S3FS_TEST_ACCESS_KEY_ID= +S3FS_TEST_SECRET_ACCESS_KEY= \ No newline at end of file diff --git a/crates/sb_fs/tests/.gitignore b/crates/sb_fs/tests/.gitignore new file mode 100644 index 000000000..2eea525d8 --- /dev/null +++ b/crates/sb_fs/tests/.gitignore @@ -0,0 +1 @@ +.env \ No newline at end of file diff --git a/crates/sb_fs/tests/fixture/get/index.ts b/crates/sb_fs/tests/fixture/get/index.ts new file mode 100644 index 000000000..8ecf83e55 --- /dev/null +++ b/crates/sb_fs/tests/fixture/get/index.ts @@ -0,0 +1,10 @@ +export default { + async fetch(req: Request) { + const url = new URL(req.url); + const bucketName = Deno.env.get("S3FS_TEST_BUCKET_NAME")!; + const key = url.pathname.split("/").slice(2).join("/"); + const f = await Deno.open(`/s3/${bucketName}/${key}`); + + return new Response(f.readable, { status: 200 }); + } +} \ No newline at end of file diff --git a/crates/sb_fs/tests/fixture/main_with_s3fs/index.ts b/crates/sb_fs/tests/fixture/main_with_s3fs/index.ts new file mode 100644 index 000000000..02998074c --- /dev/null +++ b/crates/sb_fs/tests/fixture/main_with_s3fs/index.ts @@ -0,0 +1,88 @@ +console.log('main function started'); + +export default { + fetch(req: Request) { + console.log(req.url); + + const url = new URL(req.url); + const { pathname } = url; + const path_parts = pathname.split("/"); + const service_name = path_parts[1]; + const s3FsConfig = { + appName: Deno.env.get("S3FS_TEST_APP_NAME") || "meowmeow", + endpointUrl: Deno.env.get("S3FS_TEST_ENDPOINT_URL"), + forcePathStyle: !!Deno.env.get("S3FS_TEST_ENDPOINT_URL"), + region: Deno.env.get("S3FS_TEST_REGION"), + credentials: { + accessKeyId: Deno.env.get("S3FS_TEST_ACCESS_KEY_ID"), + secretAccessKey: Deno.env.get("S3FS_TEST_SECRET_ACCESS_KEY"), + }, + retryConfig: { + mode: "standard" + }, + } as const; + + if (!Deno.env.get("S3FS_TEST_BUCKET_NAME")) { + return Response.json({ msg: "no bucket name were found for s3fs test" }, { status: 500 }) + } + + if (!s3FsConfig.credentials.accessKeyId || !s3FsConfig.credentials.secretAccessKey) { + return Response.json({ msg: "no credentials were found for s3fs test" }, { status: 500 }); + } + + if (!service_name || service_name === "") { + const error = { msg: "missing function name in request" } + return new Response( + JSON.stringify(error), + { status: 400, headers: { "Content-Type": "application/json" } }, + ) + } + + const servicePath = `./tests/fixture/${service_name}`; + console.error(`serving the request with ${servicePath}`); + + const createWorker = async () => { + const memoryLimitMb = 150; + const workerTimeoutMs = 10 * 60 * 1000; + const cpuTimeSoftLimitMs = 10 * 60 * 1000; + const cpuTimeHardLimitMs = 10 * 60 * 1000; + const noModuleCache = true; + const importMapPath = null; + const envVarsObj = Deno.env.toObject(); + const envVars = Object.keys(envVarsObj).map(k => [k, envVarsObj[k]]); + + return await EdgeRuntime.userWorkers.create({ + servicePath, + memoryLimitMb, + workerTimeoutMs, + cpuTimeSoftLimitMs, + cpuTimeHardLimitMs, + noModuleCache, + importMapPath, + envVars, + s3FsConfig + }); + } + + const callWorker = async () => { + try { + const worker = await createWorker(); + return await worker.fetch(req); + } catch (e) { + console.error(e); + + // if (e instanceof Deno.errors.WorkerRequestCancelled) { + // return await callWorker(); + // } + + const error = { msg: e.toString() } + return new Response( + JSON.stringify(error), + { status: 500, headers: { "Content-Type": "application/json" } }, + ); + } + } + + return callWorker(); + } +} diff --git a/crates/sb_fs/tests/fixture/mkdir/index.ts b/crates/sb_fs/tests/fixture/mkdir/index.ts new file mode 100644 index 000000000..77691d9a5 --- /dev/null +++ b/crates/sb_fs/tests/fixture/mkdir/index.ts @@ -0,0 +1,12 @@ +export default { + async fetch(req: Request) { + const url = new URL(req.url); + const bucketName = Deno.env.get("S3FS_TEST_BUCKET_NAME")!; + const recursive = url.searchParams.get("recursive") === "true"; + const key = url.pathname.split("/").slice(2).join("/"); + + await Deno.mkdir(`/s3/${bucketName}/${key}`, { recursive }); + + return new Response(null, { status: 200 }); + } +} \ No newline at end of file diff --git a/crates/sb_fs/tests/fixture/read-dir/index.ts b/crates/sb_fs/tests/fixture/read-dir/index.ts new file mode 100644 index 000000000..86c5af3d1 --- /dev/null +++ b/crates/sb_fs/tests/fixture/read-dir/index.ts @@ -0,0 +1,16 @@ +export default { + async fetch(req: Request) { + const url = new URL(req.url); + const bucketName = Deno.env.get("S3FS_TEST_BUCKET_NAME")!; + const key = url.pathname.split("/").slice(2).join("/"); + + const entries = await Deno.readDir(`/s3/${bucketName}/${key}`); + const result = []; + + for await (const entry of entries) { + result.push(entry); + } + + return Response.json(result); + } +} \ No newline at end of file diff --git a/crates/sb_fs/tests/fixture/remove/index.ts b/crates/sb_fs/tests/fixture/remove/index.ts new file mode 100644 index 000000000..ab4390367 --- /dev/null +++ b/crates/sb_fs/tests/fixture/remove/index.ts @@ -0,0 +1,12 @@ +export default { + async fetch(req: Request) { + const url = new URL(req.url); + const bucketName = Deno.env.get("S3FS_TEST_BUCKET_NAME")!; + const recursive = url.searchParams.get("recursive") === "true"; + const key = url.pathname.split("/").slice(2).join("/"); + + await Deno.remove(`/s3/${bucketName}/${key}`, { recursive }); + + return new Response(null, { status: 200 }); + } +} \ No newline at end of file diff --git a/crates/sb_fs/tests/fixture/write/index.ts b/crates/sb_fs/tests/fixture/write/index.ts new file mode 100644 index 000000000..63e89f4c8 --- /dev/null +++ b/crates/sb_fs/tests/fixture/write/index.ts @@ -0,0 +1,17 @@ +export default { + async fetch(req: Request) { + const url = new URL(req.url); + const stream = req.body; + const bucketName = Deno.env.get("S3FS_TEST_BUCKET_NAME")!; + const key = url.pathname.split("/").slice(2).join("/"); + + if (!stream || key === "") { + return new Response(null, { status: 400 }); + } + + const f = await Deno.create(`/s3/${bucketName}/${key}`); + + await stream.pipeTo(f.writable); + return new Response(null, { status: 200 }); + } +} \ No newline at end of file diff --git a/crates/sb_fs/tests/integration_tests.rs b/crates/sb_fs/tests/integration_tests.rs new file mode 100644 index 000000000..955a20608 --- /dev/null +++ b/crates/sb_fs/tests/integration_tests.rs @@ -0,0 +1,523 @@ +use std::{collections::HashMap, time::Duration}; + +use anyhow::Context; +use base::{server::ServerFlags, utils::test_utils::TestBedBuilder}; +use ctor::ctor; +use deno_core::serde_json; +use event_worker::events::{LogLevel, WorkerEvents}; +use hyper_v014::{body::to_bytes, Body, StatusCode}; +use rand::RngCore; +use serde::Deserialize; +use serial_test::serial; +use tokio::sync::mpsc; + +const MIB: usize = 1024 * 1024; +const TESTBED_DEADLINE_SEC: u64 = 20; + +#[ctor] +fn init() { + let _ = dotenvy::from_filename("./tests/.env"); +} + +fn get_tb_builder() -> TestBedBuilder { + TestBedBuilder::new("./tests/fixture/main_with_s3fs").with_oneshot_policy(None) +} + +async fn remove(path: &str, recursive: bool) { + let tb = get_tb_builder().build().await; + let resp = tb + .request(|b| { + b.uri(format!("/remove/{}?recursive={}", path, recursive)) + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; +} + +async fn test_write_and_get_bytes(bytes: usize) { + remove("", true).await; + + let mut arr = vec![0u8; bytes]; + + { + let tb = get_tb_builder() + .with_server_flags(ServerFlags { + request_buffer_size: Some(64 * 1024), + ..Default::default() + }) + .build() + .await; + + rand::thread_rng().fill_bytes(&mut arr); + + let resp = tb + .request(|b| { + b.uri("/write/meow.bin") + .method("POST") + .body(arr.clone().into()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } + + { + let tb = get_tb_builder().build().await; + let mut resp = tb + .request(|b| { + b.uri("/get/meow.bin") + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + let buf = to_bytes(resp.body_mut()).await.unwrap(); + let buf = buf.as_ref(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + assert_eq!(arr, buf); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } +} + +#[cfg_attr(not(dotenv), ignore)] +#[serial] +#[tokio::test] +async fn test_write_and_get_various_bytes() { + test_write_and_get_bytes(0).await; + test_write_and_get_bytes(1).await; + test_write_and_get_bytes(3 * MIB).await; + test_write_and_get_bytes(5 * MIB).await; + test_write_and_get_bytes(8 * MIB).await; + test_write_and_get_bytes(50 * MIB).await; +} + +/// This test is to ensure that the Upload file size limit in the storage settings section is +/// working properly. +/// +/// Note that the test below assumes an upload file size limit of 50 MiB. +/// +/// See: https://supabase.com/docs/guides/storage/uploads/file-limits +#[cfg_attr(not(dotenv), ignore)] +#[tokio::test] +#[serial] +async fn test_write_and_get_over_50_mib() { + remove("", true).await; + + { + let arr = vec![0u8; 51 * MIB]; + let tb = get_tb_builder() + .with_server_flags(ServerFlags { + request_buffer_size: Some(64 * 1024), + ..Default::default() + }) + .build() + .await; + + let resp = tb + .request(|b| { + b.uri("/write/meow.bin") + .method("POST") + .body(arr.clone().into()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } + + { + let (tx, mut rx) = mpsc::unbounded_channel(); + let tb = get_tb_builder() + .with_worker_event_sender(Some(tx)) + .build() + .await; + + let resp = tb + .request(|b| { + b.uri("/get/meow.bin") + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::INTERNAL_SERVER_ERROR); + + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + + let mut found_not_found_error = false; + + while let Some(ev) = rx.recv().await { + let WorkerEvents::Log(ev) = ev.event else { + continue; + }; + if ev.level != LogLevel::Error { + continue; + } + + found_not_found_error = ev.msg.contains("NotFound: entity not found: open '/s3/"); + if found_not_found_error { + break; + } + } + + assert!(found_not_found_error); + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct DenoDirEntry { + name: String, + is_file: bool, + is_directory: bool, +} + +impl DenoDirEntry { + fn from_json_unchecked(slice: &[u8]) -> HashMap { + serde_json::from_slice::>(slice) + .unwrap() + .into_iter() + .map(|it| (it.name.clone(), it)) + .collect() + } +} + +#[cfg_attr(not(dotenv), ignore)] +#[tokio::test] +#[serial] +async fn test_mkdir_and_read_dir() { + remove("", true).await; + + { + let tb = get_tb_builder().build().await; + let resp = tb + .request(|b| { + b.uri("/mkdir/a") + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } + + { + let tb = get_tb_builder().build().await; + let mut resp = tb + .request(|b| { + b.uri("/read-dir") + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + + let buf = to_bytes(resp.body_mut()).await.unwrap(); + let value = DenoDirEntry::from_json_unchecked(&buf); + + assert!(value.contains_key("a")); + assert!(value.get("a").unwrap().is_directory); + + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } +} + +#[cfg_attr(not(dotenv), ignore)] +#[tokio::test] +#[serial] +async fn test_mkdir_recursive_and_read_dir() { + remove("", true).await; + + { + let tb = get_tb_builder().build().await; + let resp = tb + .request(|b| { + b.uri("/mkdir/a/b/c/meow?recursive=true") + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } + + { + let tb = get_tb_builder().build().await; + + for [dir, expected] in [["", "a"], ["a", "b"], ["a/b", "c"], ["a/b/c", "meow"]] { + let mut resp = tb + .request(|b| { + b.uri(format!("/read-dir/{}", dir)) + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + + let buf = to_bytes(resp.body_mut()).await.unwrap(); + let value = DenoDirEntry::from_json_unchecked(&buf); + + assert!(value.contains_key(expected)); + assert!(value.get(expected).unwrap().is_directory); + } + + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } +} + +#[cfg_attr(not(dotenv), ignore)] +#[tokio::test] +#[serial] +async fn test_mkdir_with_no_recursive_opt_must_check_parent_path_exists() { + remove("", true).await; + + { + let tb = get_tb_builder().build().await; + let resp = tb + .request(|b| { + b.uri("/mkdir/a") + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } + + { + let (tx, mut rx) = mpsc::unbounded_channel(); + let tb = get_tb_builder() + .with_worker_event_sender(Some(tx)) + .build() + .await; + let resp = tb + .request(|b| { + b.uri("/mkdir/a/b/c") + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::INTERNAL_SERVER_ERROR); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + + let mut found_no_such_file_or_directory_error = false; + + while let Some(ev) = rx.recv().await { + let WorkerEvents::Log(ev) = ev.event else { + continue; + }; + if ev.level != LogLevel::Error { + continue; + } + + found_no_such_file_or_directory_error = + ev.msg.contains("No such file or directory: a/b"); + + if found_no_such_file_or_directory_error { + break; + } + } + + assert!(found_no_such_file_or_directory_error); + } +} + +#[cfg_attr(not(dotenv), ignore)] +#[tokio::test] +#[serial] +async fn test_mkdir_recursive_and_remove_recursive() { + remove("", true).await; + + { + let tb = get_tb_builder().build().await; + let resp = tb + .request(|b| { + b.uri("/mkdir/a/b/c/meow?recursive=true") + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } + + { + let arr = vec![0u8; 11 * MIB]; + let tb = get_tb_builder() + .with_server_flags(ServerFlags { + request_buffer_size: Some(64 * 1024), + ..Default::default() + }) + .build() + .await; + + let resp = tb + .request(|b| { + b.uri("/write/a/b/c/meeeeow.bin") + .method("POST") + .body(arr.clone().into()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } + + { + let tb = get_tb_builder().build().await; + let mut resp = tb + .request(|b| { + b.uri("/read-dir/a/b/c") + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + + let buf = to_bytes(resp.body_mut()).await.unwrap(); + let value = DenoDirEntry::from_json_unchecked(&buf); + + assert_eq!( + value.len(), + if is_supabase_storage_being_tested() { + // .emptyFolderPlaceholder in Supabase Storage + 3 + } else { + 2 + } + ); + + assert!(value.contains_key("meow")); + assert!(value.get("meow").unwrap().is_directory); + assert!(value.contains_key("meeeeow.bin")); + assert!(value.get("meeeeow.bin").unwrap().is_file); + + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } + + remove("a/b/c", true).await; + remove("a/b", true).await; + + { + let tb = get_tb_builder().build().await; + let mut resp = tb + .request(|b| { + b.uri("/read-dir/a") + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + + let buf = to_bytes(resp.body_mut()).await.unwrap(); + let value = DenoDirEntry::from_json_unchecked(&buf); + + assert_eq!( + value.len(), + if is_supabase_storage_being_tested() { + // .emptyFolderPlaceholder in Supabase Storage + 1 + } else { + 0 + } + ); + + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } + + { + let tb = get_tb_builder().build().await; + let mut resp = tb + .request(|b| { + b.uri("/read-dir") + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + + let buf = to_bytes(resp.body_mut()).await.unwrap(); + let value = DenoDirEntry::from_json_unchecked(&buf); + + assert_eq!(value.len(), 1); + assert!(value.contains_key("a")); + assert!(value.get("a").unwrap().is_directory); + + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } + + remove("a", true).await; + + { + let tb = get_tb_builder().build().await; + let mut resp = tb + .request(|b| { + b.uri("/read-dir") + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + + let buf = to_bytes(resp.body_mut()).await.unwrap(); + let value = DenoDirEntry::from_json_unchecked(&buf); + + assert_eq!(value.len(), 0); + + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } +} + +fn is_supabase_storage_being_tested() -> bool { + std::env::var("S3FS_TEST_SUPABASE_STORAGE").unwrap_or_default() == "true" +}