diff --git a/.gitignore b/.gitignore index b4a4e8823..3995a772d 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ openapi-generator-cli.jar .env ceramic_cicddb.sqlite* event-svc/model_error_counts.csv +/.history diff --git a/Cargo.lock b/Cargo.lock index c2e72d249..ce30677af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,12 +17,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" -[[package]] -name = "adler32" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" - [[package]] name = "aead" version = "0.5.2" @@ -139,7 +133,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8158b4878c67837e5413721cc44298e6a2d88d39203175ea025e51892a16ba4c" dependencies = [ "num_enum 0.7.3", - "strum 0.26.3", + "strum", ] [[package]] @@ -650,34 +644,6 @@ version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" -[[package]] -name = "apache-avro" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ceb7c683b2f8f40970b70e39ff8be514c95b96fcb9c4af87e1ed2cb2e10801a0" -dependencies = [ - "bzip2", - "crc32fast", - "digest 0.10.7", - "lazy_static", - "libflate", - "log", - "num-bigint", - "quad-rand", - "rand 0.8.5", - "regex-lite", - "serde", - "serde_json", - "snap", - "strum 0.25.0", - "strum_macros 0.25.3", - "thiserror", - "typed-builder", - "uuid 1.10.0", - "xz2", - "zstd 0.12.4", -] - [[package]] name = "ark-ff" version = "0.3.0" @@ -843,9 +809,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "53.0.0" +version = "53.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03675e42d1560790f3524800e41403b40d0da1c793fe9528929fde06d8c7649a" +checksum = "91f2dfd1a7ec0aca967dfaa616096aec49779adc8eccec005e2f5e4111b1192a" dependencies = [ "arrow-array", "arrow-buffer", @@ -858,9 +824,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "53.0.0" +version = "53.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd2bf348cf9f02a5975c5962c7fa6dee107a2009a7b41ac5fb1a027e12dc033f" +checksum = "d39387ca628be747394890a6e47f138ceac1aa912eab64f02519fed24b637af8" dependencies = [ "ahash 0.8.11", "arrow-buffer", @@ -875,9 +841,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "53.0.0" +version = "53.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3092e37715f168976012ce52273c3989b5793b0db5f06cbaa246be25e5f0924d" +checksum = "9e51e05228852ffe3eb391ce7178a0f97d2cf80cc6ef91d3c4a6b3cb688049ec" dependencies = [ "bytes 1.7.2", "half 2.4.1", @@ -886,9 +852,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "53.0.0" +version = "53.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ce1018bb710d502f9db06af026ed3561552e493e989a79d0d0f5d9cf267a785" +checksum = "d09aea56ec9fa267f3f3f6cdab67d8a9974cbba90b3aa38c8fe9d0bb071bd8c1" dependencies = [ "arrow-array", "arrow-buffer", @@ -900,7 +866,7 @@ dependencies = [ "chrono", "comfy-table", "half 2.4.1", - "lexical-core", + "lexical-core 1.0.2", "num", "ryu", ] @@ -920,15 +886,15 @@ dependencies = [ "csv", "csv-core", "lazy_static", - "lexical-core", + "lexical-core 0.8.5", "regex", ] [[package]] name = "arrow-data" -version = "53.0.0" +version = "53.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e4ac0c4ee79150afe067dc4857154b3ee9c1cd52b5f40d59a77306d0ed18d65" +checksum = "b98ae0af50890b494cebd7d6b04b35e896205c1d1df7b29a6272c5d0d0249ef5" dependencies = [ "arrow-buffer", "arrow-schema", @@ -938,9 +904,9 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "53.0.0" +version = "53.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b915fb36d935b969894d7909ad417c67ddeadebbbd57c3c168edf64721a37d31" +checksum = "703620bf755500804893dc4b42982b8a33ee20d7c20c9c3ab3490a1d0f7cf641" dependencies = [ "arrow-arith", "arrow-array", @@ -966,9 +932,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "53.0.0" +version = "53.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb307482348a1267f91b0912e962cd53440e5de0f7fb24c5f7b10da70b38c94a" +checksum = "0ed91bdeaff5a1c00d28d8f73466bcb64d32bbd7093b5a30156b4b9f4dba3eee" dependencies = [ "arrow-array", "arrow-buffer", @@ -993,7 +959,7 @@ dependencies = [ "chrono", "half 2.4.1", "indexmap 2.5.0", - "lexical-core", + "lexical-core 0.8.5", "num", "serde", "serde_json", @@ -1001,9 +967,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "53.0.0" +version = "53.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "644046c479d80ae8ed02a7f1e1399072ea344ca6a7b0e293ab2d5d9ed924aa3b" +checksum = "2883d7035e0b600fb4c30ce1e50e66e53d8656aa729f2bfa4b51d359cf3ded52" dependencies = [ "arrow-array", "arrow-buffer", @@ -1016,9 +982,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "53.0.0" +version = "53.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a29791f8eb13b340ce35525b723f5f0df17ecb955599e11f65c2a94ab34e2efb" +checksum = "552907e8e587a6fde4f8843fd7a27a576a260f65dab6c065741ea79f633fc5be" dependencies = [ "ahash 0.8.11", "arrow-array", @@ -1030,15 +996,15 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "53.0.0" +version = "53.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c85320a3a2facf2b2822b57aa9d6d9d55edb8aee0b6b5d3b8df158e503d10858" +checksum = "539ada65246b949bd99ffa0881a9a15a4a529448af1a07a9838dd78617dafab1" [[package]] name = "arrow-select" -version = "53.0.0" +version = "53.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cc7e6b582e23855fd1625ce46e51647aa440c20ea2e71b1d748e0839dd73cba" +checksum = "6259e566b752da6dceab91766ed8b2e67bf6270eb9ad8a6e07a33c1bede2b125" dependencies = [ "ahash 0.8.11", "arrow-array", @@ -1050,9 +1016,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "53.0.0" +version = "53.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0775b6567c66e56ded19b87a954b6b1beffbdd784ef95a3a2b03f59570c1d230" +checksum = "f3179ccbd18ebf04277a095ba7321b93fd1f774f18816bd5f6b3ce2f594edb6c" dependencies = [ "arrow-array", "arrow-buffer", @@ -1216,8 +1182,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", - "zstd 0.13.2", - "zstd-safe 7.2.1", + "zstd", + "zstd-safe", ] [[package]] @@ -1525,306 +1491,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" -[[package]] -name = "aws-config" -version = "1.5.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e95816a168520d72c0e7680c405a5a8c1fb6a035b4bc4b9d7b0de8e1a941697" -dependencies = [ - "aws-credential-types", - "aws-runtime", - "aws-sdk-sso", - "aws-sdk-ssooidc", - "aws-sdk-sts", - "aws-smithy-async", - "aws-smithy-http", - "aws-smithy-json", - "aws-smithy-runtime", - "aws-smithy-runtime-api", - "aws-smithy-types", - "aws-types", - "bytes 1.7.2", - "fastrand 2.1.1", - "hex", - "http 0.2.12", - "ring 0.17.8", - "time 0.3.36", - "tokio", - "tracing", - "url", - "zeroize", -] - -[[package]] -name = "aws-credential-types" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60e8f6b615cb5fc60a98132268508ad104310f0cfb25a1c22eee76efdf9154da" -dependencies = [ - "aws-smithy-async", - "aws-smithy-runtime-api", - "aws-smithy-types", - "zeroize", -] - -[[package]] -name = "aws-runtime" -version = "1.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" -dependencies = [ - "aws-credential-types", - "aws-sigv4", - "aws-smithy-async", - "aws-smithy-http", - "aws-smithy-runtime", - "aws-smithy-runtime-api", - "aws-smithy-types", - "aws-types", - "bytes 1.7.2", - "fastrand 2.1.1", - "http 0.2.12", - "http-body 0.4.6", - "once_cell", - "percent-encoding 2.3.1", - "pin-project-lite", - "tracing", - "uuid 1.10.0", -] - -[[package]] -name = "aws-sdk-sso" -version = "1.39.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11822090cf501c316c6f75711d77b96fba30658e3867a7762e5e2f5d32d31e81" -dependencies = [ - "aws-credential-types", - "aws-runtime", - "aws-smithy-async", - "aws-smithy-http", - "aws-smithy-json", - "aws-smithy-runtime", - "aws-smithy-runtime-api", - "aws-smithy-types", - "aws-types", - "bytes 1.7.2", - "http 0.2.12", - "once_cell", - "regex-lite", - "tracing", -] - -[[package]] -name = "aws-sdk-ssooidc" -version = "1.40.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78a2a06ff89176123945d1bbe865603c4d7101bea216a550bb4d2e4e9ba74d74" -dependencies = [ - "aws-credential-types", - "aws-runtime", - "aws-smithy-async", - "aws-smithy-http", - "aws-smithy-json", - "aws-smithy-runtime", - "aws-smithy-runtime-api", - "aws-smithy-types", - "aws-types", - "bytes 1.7.2", - "http 0.2.12", - "once_cell", - "regex-lite", - "tracing", -] - -[[package]] -name = "aws-sdk-sts" -version = "1.39.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a20a91795850826a6f456f4a48eff1dfa59a0e69bdbf5b8c50518fd372106574" -dependencies = [ - "aws-credential-types", - "aws-runtime", - "aws-smithy-async", - "aws-smithy-http", - "aws-smithy-json", - "aws-smithy-query", - "aws-smithy-runtime", - "aws-smithy-runtime-api", - "aws-smithy-types", - "aws-smithy-xml", - "aws-types", - "http 0.2.12", - "once_cell", - "regex-lite", - "tracing", -] - -[[package]] -name = "aws-sigv4" -version = "1.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc8db6904450bafe7473c6ca9123f88cc11089e41a025408f992db4e22d3be68" -dependencies = [ - "aws-credential-types", - "aws-smithy-http", - "aws-smithy-runtime-api", - "aws-smithy-types", - "bytes 1.7.2", - "form_urlencoded", - "hex", - "hmac", - "http 0.2.12", - "http 1.1.0", - "once_cell", - "percent-encoding 2.3.1", - "sha2 0.10.8", - "time 0.3.36", - "tracing", -] - -[[package]] -name = "aws-smithy-async" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c" -dependencies = [ - "futures-util", - "pin-project-lite", - "tokio", -] - -[[package]] -name = "aws-smithy-http" -version = "0.60.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6" -dependencies = [ - "aws-smithy-runtime-api", - "aws-smithy-types", - "bytes 1.7.2", - "bytes-utils", - "futures-core", - "http 0.2.12", - "http-body 0.4.6", - "once_cell", - "percent-encoding 2.3.1", - "pin-project-lite", - "pin-utils", - "tracing", -] - -[[package]] -name = "aws-smithy-json" -version = "0.60.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" -dependencies = [ - "aws-smithy-types", -] - -[[package]] -name = "aws-smithy-query" -version = "0.60.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2fbd61ceb3fe8a1cb7352e42689cec5335833cd9f94103a61e98f9bb61c64bb" -dependencies = [ - "aws-smithy-types", - "urlencoding", -] - -[[package]] -name = "aws-smithy-runtime" -version = "1.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a065c0fe6fdbdf9f11817eb68582b2ab4aff9e9c39e986ae48f7ec576c6322db" -dependencies = [ - "aws-smithy-async", - "aws-smithy-http", - "aws-smithy-runtime-api", - "aws-smithy-types", - "bytes 1.7.2", - "fastrand 2.1.1", - "h2 0.3.26", - "http 0.2.12", - "http-body 0.4.6", - "http-body 1.0.1", - "httparse", - "hyper 0.14.30", - "hyper-rustls 0.24.2", - "once_cell", - "pin-project-lite", - "pin-utils", - "rustls 0.21.12", - "tokio", - "tracing", -] - -[[package]] -name = "aws-smithy-runtime-api" -version = "1.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e086682a53d3aa241192aa110fa8dfce98f2f5ac2ead0de84d41582c7e8fdb96" -dependencies = [ - "aws-smithy-async", - "aws-smithy-types", - "bytes 1.7.2", - "http 0.2.12", - "http 1.1.0", - "pin-project-lite", - "tokio", - "tracing", - "zeroize", -] - -[[package]] -name = "aws-smithy-types" -version = "1.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "147100a7bea70fa20ef224a6bad700358305f5dc0f84649c53769761395b355b" -dependencies = [ - "base64-simd", - "bytes 1.7.2", - "bytes-utils", - "futures-core", - "http 0.2.12", - "http 1.1.0", - "http-body 0.4.6", - "http-body 1.0.1", - "http-body-util", - "itoa", - "num-integer", - "pin-project-lite", - "pin-utils", - "ryu", - "serde", - "time 0.3.36", - "tokio", - "tokio-util", -] - -[[package]] -name = "aws-smithy-xml" -version = "0.60.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc" -dependencies = [ - "xmlparser", -] - -[[package]] -name = "aws-types" -version = "1.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5221b91b3e441e6675310829fd8984801b772cb1546ef6c0e54dec9f1ac13fef" -dependencies = [ - "aws-credential-types", - "aws-smithy-async", - "aws-smithy-runtime-api", - "aws-smithy-types", - "rustc_version 0.4.1", - "tracing", -] - [[package]] name = "axum" version = "0.6.20" @@ -1995,16 +1661,6 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" -[[package]] -name = "base64-simd" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" -dependencies = [ - "outref", - "vsimd", -] - [[package]] name = "base64-url" version = "2.0.2" @@ -2279,16 +1935,6 @@ dependencies = [ "serde", ] -[[package]] -name = "bytes-utils" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" -dependencies = [ - "bytes 1.7.2", - "either", -] - [[package]] name = "bzip2" version = "0.4.4" @@ -2626,8 +2272,8 @@ dependencies = [ "ceramic-car", "ceramic-core", "ceramic-event", - "ceramic-flight", "ceramic-metrics", + "ceramic-pipeline", "ceramic-sql", "ceramic-validation", "cid 0.11.1", @@ -2669,11 +2315,7 @@ dependencies = [ "arrow-array", "arrow-flight", "arrow-schema", - "async-stream", - "async-trait", "ceramic-arrow-test", - "ceramic-core", - "ceramic-event", "ceramic-pipeline", "cid 0.11.1", "datafusion", @@ -2681,10 +2323,8 @@ dependencies = [ "expect-test", "futures", "http 1.1.0", - "int-enum", - "ipld-core", "mockall", - "serde", + "object_store", "test-log", "tokio", "tokio-stream", @@ -2818,45 +2458,13 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "ceramic-olap" -version = "0.41.1" -dependencies = [ - "anyhow", - "arrow", - "ceramic-core", - "ceramic-flight", - "ceramic-metrics", - "ceramic-pipeline", - "cid 0.11.1", - "clap 4.5.19", - "datafusion", - "expect-test", - "futures", - "git-version", - "hyper 0.14.30", - "json-patch", - "multibase 0.9.1", - "multihash 0.19.1", - "multihash-codetable", - "multihash-derive 0.9.0", - "names", - "object_store", - "prometheus-client", - "serde", - "serde_json", - "signal-hook", - "signal-hook-tokio", - "test-log", - "tokio", - "tracing", -] - [[package]] name = "ceramic-one" version = "0.41.1" dependencies = [ "anyhow", + "arrow-cast", + "arrow-flight", "async-stream", "async-trait", "ceramic-anchor-remote", @@ -2876,8 +2484,7 @@ dependencies = [ "ceramic-sql", "cid 0.11.1", "clap 4.5.19", - "datafusion-cli", - "datafusion-functions-json", + "datafusion", "expect-test", "futures", "git-version", @@ -2906,6 +2513,7 @@ dependencies = [ "tokio-metrics", "tokio-prometheus-client", "tokio-stream", + "tonic 0.12.3", "tracing", ] @@ -2953,13 +2561,27 @@ name = "ceramic-pipeline" version = "0.41.1" dependencies = [ "anyhow", - "arrow-flight", + "arrow", + "arrow-schema", + "async-stream", + "async-trait", + "ceramic-arrow-test", + "ceramic-core", + "ceramic-event", "cid 0.11.1", "datafusion", - "datafusion-federation", - "datafusion-flight-sql-table-provider", + "datafusion-functions-json", + "expect-test", + "futures", + "int-enum", + "ipld-core", + "json-patch", "object_store", - "tonic 0.12.3", + "serde", + "serde_json", + "test-log", + "tokio", + "tracing", "url", ] @@ -3010,12 +2632,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "cfg_aliases" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" - [[package]] name = "chacha20" version = "0.9.1" @@ -3057,9 +2673,9 @@ dependencies = [ [[package]] name = "chrono-tz" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93698b29de5e97ad0ae26447b344c482a7284c737d9ddc5f9e52b74a336671bb" +checksum = "cd6dd8046d00723a59a2f8c5f295c515b9bb9a331ee4f8f3d4dd49e428acd3b6" dependencies = [ "chrono", "chrono-tz-build", @@ -3068,12 +2684,11 @@ dependencies = [ [[package]] name = "chrono-tz-build" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c088aee841df9c3041febbb73934cfc39708749bf96dc827e3359cd39ef11b1" +checksum = "e94fea34d77a245229e7746bd2beb786cd2a896f306ff491fb8cecb3074b10a7" dependencies = [ "parse-zoneinfo", - "phf", "phf_codegen", ] @@ -3207,15 +2822,6 @@ dependencies = [ "cc", ] -[[package]] -name = "clipboard-win" -version = "5.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15efe7a882b08f34e38556b14f2fb3daa98769d06c7f0c1b076dfd0d983bc892" -dependencies = [ - "error-code", -] - [[package]] name = "codespan-reporting" version = "0.11.1" @@ -3244,8 +2850,8 @@ version = "7.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7" dependencies = [ - "strum 0.26.3", - "strum_macros 0.26.4", + "strum", + "strum_macros", "unicode-width", ] @@ -3732,12 +3338,6 @@ dependencies = [ "syn 2.0.79", ] -[[package]] -name = "dary_heap" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04d2cd9c18b9f454ed67da600630b021a8a80bf33f8c95896ab33aaf1c26b728" - [[package]] name = "dashmap" version = "6.1.0" @@ -3785,7 +3385,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee907b081e45e1d14e1f327e89ef134f91fcebad0bfc2dc229fa9f6044379682" dependencies = [ "ahash 0.8.11", - "apache-avro", "arrow", "arrow-array", "arrow-ipc", @@ -3819,7 +3418,6 @@ dependencies = [ "indexmap 2.5.0", "itertools 0.13.0", "log", - "num-traits", "num_cpus", "object_store", "parking_lot", @@ -3834,7 +3432,7 @@ dependencies = [ "url", "uuid 1.10.0", "xz2", - "zstd 0.13.2", + "zstd", ] [[package]] @@ -3852,34 +3450,6 @@ dependencies = [ "parking_lot", ] -[[package]] -name = "datafusion-cli" -version = "42.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e679e47247db7ee964bb9d57ae0f3fff5e8412c2245663bcb4d269691407472f" -dependencies = [ - "arrow", - "async-trait", - "aws-config", - "aws-credential-types", - "aws-sdk-sso", - "aws-sdk-ssooidc", - "aws-sdk-sts", - "clap 4.5.19", - "datafusion", - "dirs", - "env_logger 0.11.5", - "futures", - "mimalloc", - "object_store", - "parking_lot", - "parquet", - "regex", - "rustyline", - "tokio", - "url", -] - [[package]] name = "datafusion-common" version = "42.0.0" @@ -3887,7 +3457,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a84f8e76330c582a6b8ada0b2c599ca46cfe46b7585e458fc3f4092bc722a18" dependencies = [ "ahash 0.8.11", - "apache-avro", "arrow", "arrow-array", "arrow-buffer", @@ -3954,8 +3523,8 @@ dependencies = [ "paste", "serde_json", "sqlparser", - "strum 0.26.3", - "strum_macros 0.26.4", + "strum", + "strum_macros", ] [[package]] @@ -3977,31 +3546,13 @@ dependencies = [ "arrow-json", "async-stream", "async-trait", - "datafusion", - "futures", -] - -[[package]] -name = "datafusion-flight-sql-server" -version = "0.4.0" -source = "git+https://github.com/datafusion-contrib/datafusion-federation.git?branch=main#e1084a92f7bc75a07cb1a9840429bca3842bd593" -dependencies = [ - "arrow", - "arrow-flight", - "async-trait", - "datafusion", - "datafusion-federation", - "datafusion-substrait", + "datafusion", "futures", - "log", - "once_cell", - "prost 0.13.3", - "tonic 0.12.3", ] [[package]] -name = "datafusion-flight-sql-table-provider" -version = "0.3.0" +name = "datafusion-flight-sql-server" +version = "0.4.0" source = "git+https://github.com/datafusion-contrib/datafusion-federation.git?branch=main#e1084a92f7bc75a07cb1a9840429bca3842bd593" dependencies = [ "arrow", @@ -4009,7 +3560,11 @@ dependencies = [ "async-trait", "datafusion", "datafusion-federation", + "datafusion-substrait", "futures", + "log", + "once_cell", + "prost 0.13.3", "tonic 0.12.3", ] @@ -4251,7 +3806,7 @@ dependencies = [ "log", "regex", "sqlparser", - "strum 0.26.3", + "strum", ] [[package]] @@ -4494,15 +4049,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "dirs" -version = "5.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" -dependencies = [ - "dirs-sys", -] - [[package]] name = "dirs-next" version = "2.0.0" @@ -4513,18 +4059,6 @@ dependencies = [ "dirs-sys-next", ] -[[package]] -name = "dirs-sys" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" -dependencies = [ - "libc", - "option-ext", - "redox_users", - "windows-sys 0.48.0", -] - [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -4729,12 +4263,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "endian-type" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" - [[package]] name = "enum-as-inner" version = "0.6.1" @@ -4754,7 +4282,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab" dependencies = [ "log", - "regex", ] [[package]] @@ -4789,7 +4316,6 @@ dependencies = [ "anstream", "anstyle", "env_filter", - "humantime 2.1.0", "log", ] @@ -4809,12 +4335,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "error-code" -version = "3.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5d9305ccc6942a704f4335694ecd3de2ea531b114ac2d51f5f843750787a92f" - [[package]] name = "etcetera" version = "0.8.0" @@ -4900,17 +4420,6 @@ dependencies = [ "bytes 1.7.2", ] -[[package]] -name = "fd-lock" -version = "4.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e5768da2206272c81ef0b5e951a41862938a6070da63bcea197899942d3b947" -dependencies = [ - "cfg-if", - "rustix 0.38.37", - "windows-sys 0.52.0", -] - [[package]] name = "ff" version = "0.12.1" @@ -4993,15 +4502,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "fluent-uri" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17c704e9dbe1ddd863da1e6ff3567795087b1eb201ce80d8fa81162e1516500d" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "flume" version = "0.10.14" @@ -5824,9 +5324,7 @@ dependencies = [ "futures-util", "http 0.2.12", "hyper 0.14.30", - "log", "rustls 0.21.12", - "rustls-native-certs 0.6.3", "tokio", "tokio-rustls 0.24.1", ] @@ -5842,7 +5340,7 @@ dependencies = [ "hyper 1.4.1", "hyper-util", "rustls 0.23.13", - "rustls-native-certs 0.8.0", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -5933,7 +5431,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core 0.52.0", + "windows-core", ] [[package]] @@ -6373,7 +5871,7 @@ checksum = "02e23549143ef50eddffd46ba8cd0229b0a4500aef7518cf2eb0f41c9a09d22b" dependencies = [ "ahash 0.8.11", "bitvec 1.0.1", - "lexical-parse-float", + "lexical-parse-float 0.8.5", "num-bigint", "num-traits", "smallvec", @@ -6540,9 +6038,9 @@ dependencies = [ [[package]] name = "json-patch" -version = "2.0.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b1fb8864823fad91877e6caea0baca82e49e8db50f8e5c9f9a453e27d3330fc" +checksum = "863726d7afb6bc2590eeff7135d923545e5e964f004c2ccf8716c25e70a86f08" dependencies = [ "jsonptr", "serde", @@ -6571,11 +6069,10 @@ dependencies = [ [[package]] name = "jsonptr" -version = "0.4.7" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c6e529149475ca0b2820835d3dce8fcc41c6b943ca608d32f35b449255e4627" +checksum = "5dea2b27dd239b2556ed7a25ba842fe47fd602e7fc7433c2a8d6106d4d9edd70" dependencies = [ - "fluent-uri", "serde", "serde_json", ] @@ -6712,7 +6209,7 @@ version = "6.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7aefb36fd43fef7003334742cbf77b243fcd36418a1d1bdd480d613a67968f6" dependencies = [ - "lexical-core", + "lexical-core 0.8.5", ] [[package]] @@ -6721,11 +6218,24 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" dependencies = [ - "lexical-parse-float", - "lexical-parse-integer", - "lexical-util", - "lexical-write-float", - "lexical-write-integer", + "lexical-parse-float 0.8.5", + "lexical-parse-integer 0.8.6", + "lexical-util 0.8.5", + "lexical-write-float 0.8.5", + "lexical-write-integer 0.8.5", +] + +[[package]] +name = "lexical-core" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0431c65b318a590c1de6b8fd6e72798c92291d27762d94c9e6c37ed7a73d8458" +dependencies = [ + "lexical-parse-float 1.0.2", + "lexical-parse-integer 1.0.2", + "lexical-util 1.0.3", + "lexical-write-float 1.0.2", + "lexical-write-integer 1.0.2", ] [[package]] @@ -6734,8 +6244,19 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" dependencies = [ - "lexical-parse-integer", - "lexical-util", + "lexical-parse-integer 0.8.6", + "lexical-util 0.8.5", + "static_assertions", +] + +[[package]] +name = "lexical-parse-float" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb17a4bdb9b418051aa59d41d65b1c9be5affab314a872e5ad7f06231fb3b4e0" +dependencies = [ + "lexical-parse-integer 1.0.2", + "lexical-util 1.0.3", "static_assertions", ] @@ -6745,7 +6266,17 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" dependencies = [ - "lexical-util", + "lexical-util 0.8.5", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5df98f4a4ab53bf8b175b363a34c7af608fe31f93cc1fb1bf07130622ca4ef61" +dependencies = [ + "lexical-util 1.0.3", "static_assertions", ] @@ -6759,56 +6290,62 @@ dependencies = [ ] [[package]] -name = "lexical-write-float" -version = "0.8.5" +name = "lexical-util" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" +checksum = "85314db53332e5c192b6bca611fb10c114a80d1b831ddac0af1e9be1b9232ca0" dependencies = [ - "lexical-util", - "lexical-write-integer", "static_assertions", ] [[package]] -name = "lexical-write-integer" +name = "lexical-write-float" version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" +checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" dependencies = [ - "lexical-util", + "lexical-util 0.8.5", + "lexical-write-integer 0.8.5", "static_assertions", ] [[package]] -name = "libc" -version = "0.2.159" +name = "lexical-write-float" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5" +checksum = "6e7c3ad4e37db81c1cbe7cf34610340adc09c322871972f74877a712abc6c809" +dependencies = [ + "lexical-util 1.0.3", + "lexical-write-integer 1.0.2", + "static_assertions", +] [[package]] -name = "libflate" -version = "2.1.0" +name = "lexical-write-integer" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45d9dfdc14ea4ef0900c1cddbc8dcd553fbaacd8a4a282cf4018ae9dd04fb21e" +checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" dependencies = [ - "adler32", - "core2", - "crc32fast", - "dary_heap", - "libflate_lz77", + "lexical-util 0.8.5", + "static_assertions", ] [[package]] -name = "libflate_lz77" -version = "2.1.0" +name = "lexical-write-integer" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6e0d73b369f386f1c44abd9c570d5318f55ccde816ff4b562fa452e5182863d" +checksum = "eb89e9f6958b83258afa3deed90b5de9ef68eef090ad5086c791cd2345610162" dependencies = [ - "core2", - "hashbrown 0.14.5", - "rle-decode-fast", + "lexical-util 1.0.3", + "static_assertions", ] +[[package]] +name = "libc" +version = "0.2.159" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5" + [[package]] name = "libgit2-sys" version = "0.15.2+1.6.4" @@ -6907,16 +6444,6 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" -[[package]] -name = "libmimalloc-sys" -version = "0.1.39" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23aa6811d3bd4deb8a84dde645f943476d13b248d818edcf8ce0b2f37f036b44" -dependencies = [ - "cc", - "libc", -] - [[package]] name = "libp2p" version = "0.53.2" @@ -7715,15 +7242,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "mimalloc" -version = "0.1.43" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68914350ae34959d83f732418d51e2427a794055d0b9529f48259ac07af65633" -dependencies = [ - "libmimalloc-sys", -] - [[package]] name = "mime" version = "0.3.17" @@ -8068,15 +7586,6 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" -[[package]] -name = "nibble_vec" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" -dependencies = [ - "smallvec", -] - [[package]] name = "nix" version = "0.24.3" @@ -8101,18 +7610,6 @@ dependencies = [ "pin-utils", ] -[[package]] -name = "nix" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" -dependencies = [ - "bitflags 2.6.0", - "cfg-if", - "cfg_aliases", - "libc", -] - [[package]] name = "nohash-hasher" version = "0.2.0" @@ -8324,9 +7821,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25a0c4b3a0e31f8b66f71ad8064521efa773910196e2cde791436f13409f3b45" +checksum = "6eb4c22c6154a1e759d7099f9ffad7cc5ef8245f9efbab4a41b92623079c82f3" dependencies = [ "async-trait", "base64 0.22.1", @@ -8343,7 +7840,6 @@ dependencies = [ "rand 0.8.5", "reqwest 0.12.8", "ring 0.17.8", - "rustls-pemfile 2.2.0", "serde", "serde_json", "snafu", @@ -8510,12 +8006,6 @@ dependencies = [ "tokio-stream", ] -[[package]] -name = "option-ext" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" - [[package]] name = "ordered-float" version = "2.10.1" @@ -8534,12 +8024,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "outref" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" - [[package]] name = "overload" version = "0.1.1" @@ -8667,7 +8151,7 @@ dependencies = [ "thrift", "tokio", "twox-hash", - "zstd 0.13.2", + "zstd", "zstd-sys", ] @@ -9411,12 +8895,6 @@ dependencies = [ "prost 0.13.3", ] -[[package]] -name = "quad-rand" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b76f1009795ca44bb5aaae8fd3f18953e209259c33d9b059b1f53d58ab7511db" - [[package]] name = "quic-rpc" version = "0.3.2" @@ -9566,16 +9044,6 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" -[[package]] -name = "radix_trie" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" -dependencies = [ - "endian-type", - "nibble_vec", -] - [[package]] name = "rand" version = "0.7.3" @@ -9793,12 +9261,6 @@ dependencies = [ "regex-syntax 0.8.5", ] -[[package]] -name = "regex-lite" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" - [[package]] name = "regex-syntax" version = "0.6.29" @@ -9890,7 +9352,7 @@ dependencies = [ "pin-project-lite", "quinn", "rustls 0.23.13", - "rustls-native-certs 0.8.0", + "rustls-native-certs", "rustls-pemfile 2.2.0", "rustls-pki-types", "serde", @@ -9991,12 +9453,6 @@ dependencies = [ "opaque-debug", ] -[[package]] -name = "rle-decode-fast" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" - [[package]] name = "rlp" version = "0.5.2" @@ -10213,18 +9669,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-native-certs" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" -dependencies = [ - "openssl-probe", - "rustls-pemfile 1.0.4", - "schannel", - "security-framework", -] - [[package]] name = "rustls-native-certs" version = "0.8.0" @@ -10301,28 +9745,6 @@ dependencies = [ "wait-timeout", ] -[[package]] -name = "rustyline" -version = "14.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7803e8936da37efd9b6d4478277f4b2b9bb5cdb37a113e8d63222e58da647e63" -dependencies = [ - "bitflags 2.6.0", - "cfg-if", - "clipboard-win", - "fd-lock", - "home", - "libc", - "log", - "memchr", - "nix 0.28.0", - "radix_trie", - "unicode-segmentation", - "unicode-width", - "utf8parse", - "windows-sys 0.52.0", -] - [[package]] name = "rw-stream-sink" version = "0.4.0" @@ -10613,9 +10035,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.128" +version = "1.0.132" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" +checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" dependencies = [ "itoa", "memchr", @@ -11715,32 +11137,13 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" -[[package]] -name = "strum" -version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" - [[package]] name = "strum" version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" dependencies = [ - "strum_macros 0.26.4", -] - -[[package]] -name = "strum_macros" -version = "0.25.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" -dependencies = [ - "heck 0.4.1", - "proc-macro2", - "quote", - "rustversion", - "syn 2.0.79", + "strum_macros", ] [[package]] @@ -11973,6 +11376,7 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dffced63c2b5c7be278154d76b479f9f9920ed34e7574201407f0b14e2bbb93" dependencies = [ + "env_logger 0.11.5", "test-log-macros", "tracing-subscriber", ] @@ -12586,26 +11990,6 @@ version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6af6ae20167a9ece4bcb41af5b80f8a1f1df981f6391189ce00fd257af04126a" -[[package]] -name = "typed-builder" -version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34085c17941e36627a879208083e25d357243812c30e7d7387c3b954f30ade16" -dependencies = [ - "typed-builder-macro", -] - -[[package]] -name = "typed-builder-macro" -version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.79", -] - [[package]] name = "typenum" version = "1.17.0" @@ -12837,7 +12221,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" dependencies = [ "getrandom 0.2.15", - "serde", ] [[package]] @@ -12918,12 +12301,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" -[[package]] -name = "vsimd" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" - [[package]] name = "wait-timeout" version = "0.2.0" @@ -13142,7 +12519,7 @@ version = "0.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca229916c5ee38c2f2bc1e9d8f04df975b4bd93f9955dc69fabb5d91270045c9" dependencies = [ - "windows-core 0.51.1", + "windows-core", "windows-targets 0.48.5", ] @@ -13155,15 +12532,6 @@ dependencies = [ "windows-targets 0.48.5", ] -[[package]] -name = "windows-core" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" -dependencies = [ - "windows-targets 0.52.6", -] - [[package]] name = "windows-registry" version = "0.2.0" @@ -13437,12 +12805,6 @@ version = "0.8.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af4e2e2f7cba5a093896c1e150fbfe177d1883e7448200efb81d40b9d339ef26" -[[package]] -name = "xmlparser" -version = "0.13.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" - [[package]] name = "xmltree" version = "0.10.3" @@ -13542,32 +12904,13 @@ dependencies = [ "syn 2.0.79", ] -[[package]] -name = "zstd" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" -dependencies = [ - "zstd-safe 6.0.6", -] - [[package]] name = "zstd" version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" dependencies = [ - "zstd-safe 7.2.1", -] - -[[package]] -name = "zstd-safe" -version = "6.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" -dependencies = [ - "libc", - "zstd-sys", + "zstd-safe", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 933d78792..65731740f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ members = [ "kubo-rpc-server", "metadata", "metrics", - "olap", "one", "p2p", "pipeline", @@ -43,6 +42,7 @@ ahash = "0.8" anyhow = { version = "1" } arrow = { version = "53", features = ["prettyprint"] } arrow-array = "53" +arrow-cast = "53" arrow-flight = { version = "53", features = ["flight-sql-experimental"] } arrow-ipc = "53" arrow-schema = "53" @@ -90,10 +90,6 @@ crossterm = "0.25" ctrlc = "3.2.2" dag-jose = "0.2" datafusion = "42" -datafusion-federation = { git = "https://github.com/datafusion-contrib/datafusion-federation.git", features = [ - "sql", -], branch = "main" } -datafusion-flight-sql-table-provider = { git = "https://github.com/datafusion-contrib/datafusion-federation.git", branch = "main" } datafusion-flight-sql-server = { git = "https://github.com/datafusion-contrib/datafusion-federation.git", branch = "main" } deadqueue = "0.2.3" derivative = "2.2" @@ -148,6 +144,7 @@ multihash-derive = { version = "0.9" } names = { version = "0.14.0", default-features = false } nix = "0.26" num_enum = "0.5.7" +object_store = { version = "0.11", features = ["aws"] } once_cell = "1.17.1" opentelemetry = "0.22" opentelemetry-otlp = "0.15" diff --git a/event-svc/Cargo.toml b/event-svc/Cargo.toml index 796c028ac..321c641f8 100644 --- a/event-svc/Cargo.toml +++ b/event-svc/Cargo.toml @@ -18,8 +18,8 @@ ceramic-core.workspace = true ceramic-event.workspace = true ceramic-metrics.workspace = true ceramic-sql.workspace = true -ceramic-flight.workspace = true ceramic-validation.workspace = true +ceramic-pipeline.workspace = true cid.workspace = true futures.workspace = true hex.workspace = true diff --git a/event-svc/src/event/feed.rs b/event-svc/src/event/feed.rs index af9326b3a..13e93e80b 100644 --- a/event-svc/src/event/feed.rs +++ b/event-svc/src/event/feed.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use ceramic_flight::{server::ConclusionFeed, ConclusionEvent}; +use ceramic_pipeline::{ConclusionEvent, ConclusionFeed}; use futures::future::try_join_all; use crate::EventService; diff --git a/event-svc/src/event/service.rs b/event-svc/src/event/service.rs index cf7c6bf2f..bd23caeae 100644 --- a/event-svc/src/event/service.rs +++ b/event-svc/src/event/service.rs @@ -11,7 +11,7 @@ use super::{ }; use async_trait::async_trait; use ceramic_core::{EventId, Network, NodeId, SerializeExt}; -use ceramic_flight::{ConclusionData, ConclusionEvent, ConclusionInit, ConclusionTime}; +use ceramic_pipeline::{ConclusionData, ConclusionEvent, ConclusionInit, ConclusionTime}; use ceramic_sql::sqlite::SqlitePool; use cid::Cid; use futures::stream::BoxStream; diff --git a/event-svc/src/tests/event.rs b/event-svc/src/tests/event.rs index 767ca4de5..04ae1cd21 100644 --- a/event-svc/src/tests/event.rs +++ b/event-svc/src/tests/event.rs @@ -5,8 +5,7 @@ use anyhow::Error; use bytes::Bytes; use ceramic_api::{ApiItem, EventService as ApiEventService}; use ceramic_core::NodeId; -use ceramic_flight::server::ConclusionFeed as _; -use ceramic_flight::ConclusionEvent; +use ceramic_pipeline::{ConclusionEvent, ConclusionFeed as _}; use ceramic_sql::sqlite::SqlitePool; use cid::{Cid, CidGeneric}; use expect_test::expect; diff --git a/flight/Cargo.toml b/flight/Cargo.toml index a6cf33aca..46bb40902 100644 --- a/flight/Cargo.toml +++ b/flight/Cargo.toml @@ -13,30 +13,25 @@ arrow.workspace = true arrow-array.workspace = true arrow-flight.workspace = true arrow-schema.workspace = true -async-stream.workspace = true -async-trait.workspace = true -ceramic-event.workspace = true -ceramic-core.workspace = true ceramic-pipeline.workspace = true cid.workspace = true datafusion-flight-sql-server.workspace = true datafusion.workspace = true expect-test.workspace = true futures.workspace = true -int-enum.workspace = true -ipld-core.workspace = true -serde.workspace = true tonic.workspace = true tracing.workspace = true [dev-dependencies] ceramic-arrow-test.workspace = true +ceramic-pipeline.workspace = true expect-test.workspace = true tokio = { workspace = true, features = ["macros", "rt"] } test-log.workspace = true http.workspace = true tokio-stream = { workspace = true, features = ["net"] } mockall.workspace = true +object_store.workspace = true [package.metadata.cargo-machete] ignored = [ diff --git a/flight/src/lib.rs b/flight/src/lib.rs index f235ef75e..f824c2520 100644 --- a/flight/src/lib.rs +++ b/flight/src/lib.rs @@ -1,10 +1,4 @@ //! Implementation of a FlightSQL server for exposing Ceramic data. #![warn(missing_docs)] -mod conversion; -mod types; - pub mod server; - -pub use conversion::*; -pub use types::*; diff --git a/flight/src/server.rs b/flight/src/server.rs index a4586ae1d..99f2e06d6 100644 --- a/flight/src/server.rs +++ b/flight/src/server.rs @@ -1,50 +1,26 @@ //! Implementation of the FlightSQL server. -use std::any::Any; use std::net::SocketAddr; -use std::sync::Arc; use arrow_flight::flight_service_server::FlightServiceServer; -use arrow_schema::{DataType, Field, Schema, SchemaRef}; -use async_stream::try_stream; -use ceramic_pipeline::cid_string::{CidString, CidStringList}; -use datafusion::catalog::{Session, TableProvider}; -use datafusion::common::exec_datafusion_err; -use datafusion::datasource::TableType; -use datafusion::execution::config::SessionConfig; use datafusion::execution::context::{SQLOptions, SessionContext}; -use datafusion::logical_expr::{Expr, ScalarUDF, TableProviderFilterPushDown}; -use datafusion::physical_expr::EquivalenceProperties; -use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use datafusion::physical_plan::{ExecutionMode, ExecutionPlan, PlanProperties}; -use datafusion::scalar::ScalarValue; use datafusion_flight_sql_server::service::FlightSqlService; -use futures::{Future, TryStreamExt}; +use futures::Future; use tonic::transport::server::Router; use tonic::transport::Server; -use tracing::{debug, info, instrument, Level}; - -use crate::{conclusion_events_to_record_batch, ConclusionEvent}; +use tracing::info; /// Start FlightSQL server, blocks until server has shutdown pub async fn run( - feed: Arc, + ctx: SessionContext, addr: SocketAddr, shutdown_signal: impl Future, ) -> anyhow::Result<()> { - let srv = new_server(feed)?; + let srv = new_server(ctx)?; run_service(addr, srv, shutdown_signal).await } /// Constructs a new server and can be started. -pub fn new_server( - feed: Arc, -) -> anyhow::Result { - let ctx = SessionContext::new_with_config( - SessionConfig::new().with_default_catalog_and_schema("ceramic", "v0"), - ); - ctx.register_table("conclusion_feed", Arc::new(FeedTable::new(feed)))?; - ctx.register_udf(ScalarUDF::new_from_impl(CidString::new())); - ctx.register_udf(ScalarUDF::new_from_impl(CidStringList::new())); +pub fn new_server(ctx: SessionContext) -> anyhow::Result { let svc = FlightServiceServer::new( FlightSqlService::new(ctx.state()).with_sql_options(Some( // Disable all access except read only queries. @@ -65,249 +41,3 @@ async fn run_service( info!(%addr, "FlightSQL server listening"); Ok(svr.serve_with_shutdown(addr, shutdown_signal).await?) } - -/// A ConclusionFeed provides access to [`ConclusionEvent`]s. -#[async_trait::async_trait] -pub trait ConclusionFeed: Send + Sync { - /// Produce a set of conclusion events up to the limit with an index greater than the highwater_mark - /// Event must be returned in index order. - async fn conclusion_events_since( - &self, - highwater_mark: i64, - limit: i64, - ) -> anyhow::Result>; -} -#[async_trait::async_trait] -impl ConclusionFeed for Arc { - async fn conclusion_events_since( - &self, - highwater_mark: i64, - limit: i64, - ) -> anyhow::Result> { - self.as_ref() - .conclusion_events_since(highwater_mark, limit) - .await - } -} - -// Implements the [`TableProvider`] trait producing a [`FeedExec`] instance when the table is -// scanned, which in turn calls into the [`ConclusionFeed`] to get the actual events. -struct FeedTable { - feed: Arc, - schema: SchemaRef, -} - -impl FeedTable { - fn new(feed: Arc) -> Self { - Self { - feed, - schema: Arc::new(Schema::new(vec![ - Field::new("index", DataType::UInt64, false), - Field::new("event_type", DataType::UInt8, false), - Field::new("stream_cid", DataType::Binary, false), - Field::new("stream_type", DataType::UInt8, false), - Field::new("controller", DataType::Utf8, false), - Field::new( - // NOTE: The entire dimensions map may be null or values for a given key may - // be null. No other aspect of dimensions may be null. - "dimensions", - DataType::Map( - Field::new( - "entries", - DataType::Struct( - vec![ - Field::new("key", DataType::Utf8, false), - Field::new( - "value", - DataType::Dictionary( - Box::new(DataType::Int32), - Box::new(DataType::Binary), - ), - true, - ), - ] - .into(), - ), - false, - ) - .into(), - false, - ), - true, - ), - Field::new("event_cid", DataType::Binary, false), - Field::new("data", DataType::Binary, true), - Field::new( - "previous", - DataType::List(Arc::new(Field::new("item", DataType::Binary, false))), - true, - ), - ])), - } - } - fn highwater_mark_from_expr(expr: &Expr) -> Option { - let find_highwater_mark = |col: &Expr, lit: &Expr| { - col.try_as_col() - .map_or(false, |column| column.name == "index") - .then(|| { - if let Expr::Literal(ScalarValue::UInt64(highwater_mark)) = lit { - highwater_mark.to_owned() - } else { - None - } - }) - .flatten() - }; - match expr { - Expr::BinaryExpr(expr) => match expr.op { - datafusion::logical_expr::Operator::Gt => { - find_highwater_mark(expr.left.as_ref(), expr.right.as_ref()) - } - datafusion::logical_expr::Operator::LtEq => { - find_highwater_mark(expr.right.as_ref(), expr.left.as_ref()) - } - _ => None, - }, - _ => None, - } - } -} -#[async_trait::async_trait] -impl TableProvider for FeedTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn table_type(&self) -> TableType { - TableType::Base - } - #[instrument(skip(self,_state), ret(level = Level::DEBUG))] - async fn scan( - &self, - _state: &dyn Session, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> datafusion::common::Result> { - let schema = projection - .map(|projection| self.schema.project(projection)) - .transpose()? - .map(Arc::new) - .unwrap_or_else(|| self.schema.clone()); - debug!(?schema, "projected schema"); - Ok(Arc::new(FeedExec { - feed: self.feed.clone(), - schema: schema.clone(), - projection: projection.cloned(), - properties: PlanProperties::new( - EquivalenceProperties::new(schema), - datafusion::physical_plan::Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, - ), - highwater_mark: filters - .iter() - .filter_map(Self::highwater_mark_from_expr) - .next() - .map(|hm| hm as i64) - .unwrap_or(0), - limit: limit.map(|l| l as i64), - })) - } - #[instrument(skip(self), ret(level = Level::DEBUG))] - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> datafusion::common::Result> { - Ok(filters - .iter() - .map(|expr| Self::highwater_mark_from_expr(expr)) - .map(|highwater_mark| { - if highwater_mark.is_some() { - TableProviderFilterPushDown::Exact - } else { - TableProviderFilterPushDown::Unsupported - } - }) - .collect()) - } -} - -// Implements the [`ExecutionPlan`] trait in terms of a [`ConclusionFeed`]. -// This allows calls to scan the `conclusion_feed` table to be mapped to calls into the -// [`ConclusionFeed`]. -#[derive(Debug)] -struct FeedExec { - feed: Arc, - schema: SchemaRef, - projection: Option>, - properties: PlanProperties, - highwater_mark: i64, - limit: Option, -} - -impl datafusion::physical_plan::DisplayAs for FeedExec { - fn fmt_as( - &self, - _t: datafusion::physical_plan::DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - // TODO add useful information about predicates etc - write!(f, "FeedExec") - } -} - -impl ExecutionPlan for FeedExec { - fn name(&self) -> &str { - "FeedExec" - } - - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn properties(&self) -> &datafusion::physical_plan::PlanProperties { - &self.properties - } - - fn children(&self) -> Vec<&Arc> { - vec![] - } - - fn with_new_children( - self: Arc, - _children: Vec>, - ) -> datafusion::error::Result> { - Ok(self) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> datafusion::error::Result { - // Set a reasonable default limit - const DEFAULT_LIMIT: i64 = 10_000; - let feed = self.feed.clone(); - let projection = self.projection.clone(); - let highwater_mark = self.highwater_mark; - let limit = self.limit.unwrap_or(DEFAULT_LIMIT); - let stream = try_stream! { - let events = feed.conclusion_events_since(highwater_mark,limit).await?; - let batch = conclusion_events_to_record_batch(&events)?; - let batch = projection - .map(|projection| batch.project(&projection)) - .transpose()? - .unwrap_or_else(|| batch); - yield batch; - }; - let stream = stream.map_err(|err: anyhow::Error| exec_datafusion_err!("{err}")); - Ok(Box::pin(RecordBatchStreamAdapter::new( - self.schema.clone(), - stream, - ))) - } -} diff --git a/flight/src/types.rs b/flight/src/types.rs deleted file mode 100644 index c1dc88dc7..000000000 --- a/flight/src/types.rs +++ /dev/null @@ -1,143 +0,0 @@ -use anyhow::{anyhow, Result}; -use ceramic_core::METAMODEL_STREAM_ID; -use ceramic_event::{unvalidated, StreamId, StreamIdType}; -use cid::Cid; -use int_enum::IntEnum; -use ipld_core::ipld::Ipld; -use serde::{Deserialize, Serialize}; - -/// A Ceramic event annotated with conclusions about the event. -/// -/// Conclusions included for all events: -/// 1. An event's signature has been verified -/// 2. An event's previous events will have an index less than the event's index. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ConclusionEvent { - /// An event that contains data for a stream. - Data(ConclusionData), - /// An event that contains temporal information for a stream. - Time(ConclusionTime), -} - -impl AsRef for ConclusionEvent { - fn as_ref(&self) -> &ConclusionEvent { - self - } -} - -impl ConclusionEvent { - pub(crate) fn event_type_as_int(&self) -> u8 { - match self { - ConclusionEvent::Data(_) => 0, - ConclusionEvent::Time(_) => 1, - } - } -} - -/// ConclusionInit is static metadata about a stream. -/// All events within a stream have the same ConclusionInit. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ConclusionInit { - /// The CID of the init event of the stream. Can be used as a unique identifier for the stream. - /// This is not the StreamId as it does not contain the StreamType. - pub stream_cid: Cid, - /// The type of the stream. - pub stream_type: u8, - /// DID controller of the stream. - pub controller: String, - /// Order set of key value pairs that annotate the stream. - pub dimensions: Vec<(String, Vec)>, -} - -/// ConclusionData represents a Ceramic event that contained data. -/// -/// Additionally we have concluded to which stream the event belongs and its associated metadata. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ConclusionData { - /// Index of the event. See [`ConclusionEvent`] for invariants about the index. - pub index: u64, - /// The CID of the event itself. Can be used as a unique identifier for the event. - pub event_cid: Cid, - /// The stream metadata of the event. - pub init: ConclusionInit, - /// Ordered list of previous events this event references. - pub previous: Vec, - /// Raw bytes of the event data encoded as dag-json. - pub data: Vec, -} - -/// ConclusionTime represents a Ceramic event that contains time relevant information. -/// -/// Additionally we have concluded to which stream the event belongs and its associated metadata. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ConclusionTime { - /// Index of the event. See [`ConclusionEvent`] for invariants about the index. - pub index: u64, - /// The CID of the event itself. Can be used as a unique identifier for the event. - pub event_cid: Cid, - /// The stream metadata of the event. - pub init: ConclusionInit, - /// Ordered list of previous events this event references. - pub previous: Vec, - //TODO Add temporal conclusions, i.e the block timestamp of this event -} - -impl<'a> TryFrom<&'a unvalidated::Event> for ConclusionInit { - type Error = anyhow::Error; - - fn try_from(event: &'a unvalidated::Event) -> Result { - // Extract the init payload from the event - let init_payload = event - .init_payload() - .ok_or_else(|| anyhow!("malformed event: no init payload found"))?; - - // Get the model from the init header - // The model indicates the creator of the stream - let model = init_payload.header().model(); - - // Convert the model to a StreamId - let stream_id = StreamId::try_from(model)?; - - // Determine the stream type: - // If the stream_id matches the metamodel, it's a Model stream - // Otherwise, it's a ModelInstanceDocument stream - let stream_type = if stream_id == METAMODEL_STREAM_ID { - StreamIdType::Model - } else { - StreamIdType::ModelInstanceDocument - }; - - // Construct and return the ConclusionInit - Ok(ConclusionInit { - stream_cid: *event.stream_cid(), - stream_type: stream_type.int_value() as u8, - controller: init_payload - .header() - .controllers() - .first() - .ok_or_else(|| anyhow!("no controller found"))? - .to_string(), - dimensions: vec![ - ("model".to_string(), init_payload.header().model().to_vec()), - ( - "controller".to_string(), - init_payload - .header() - .controllers() - .first() - .cloned() - .unwrap_or_default() - .into_bytes(), - ), - ( - "context".to_string(), - init_payload - .header() - .context() - .map(|unique| unique.to_vec()) - .unwrap_or_default(), - ), - ], - }) - } -} diff --git a/flight/tests/server.rs b/flight/tests/server.rs index 4a04df5f8..702610a88 100644 --- a/flight/tests/server.rs +++ b/flight/tests/server.rs @@ -6,9 +6,9 @@ use arrow_array::RecordBatch; use arrow_flight::{sql::client::FlightSqlServiceClient, FlightInfo}; use arrow_schema::Schema; use ceramic_arrow_test::pretty_feed_from_batch; -use ceramic_flight::{ - server::{new_server, ConclusionFeed}, - ConclusionData, ConclusionEvent, ConclusionInit, ConclusionTime, +use ceramic_flight::server::new_server; +use ceramic_pipeline::{ + ConclusionData, ConclusionEvent, ConclusionFeed, ConclusionInit, ConclusionTime, }; use cid::Cid; use expect_test::expect; @@ -32,7 +32,14 @@ pub async fn channel(addr: &SocketAddr) -> Channel { } async fn start_server(feed: MockFeed) -> FlightSqlServiceClient { - let server = new_server(Arc::new(feed)).unwrap(); + let ctx = ceramic_pipeline::session_from_config(ceramic_pipeline::Config { + conclusion_feed: feed.into(), + object_store: Arc::new(object_store::memory::InMemory::new()), + object_store_bucket_name: "test_bucket".to_string(), + }) + .await + .unwrap(); + let server = new_server(ctx).unwrap(); // let OS choose a free port let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); diff --git a/olap/Cargo.toml b/olap/Cargo.toml deleted file mode 100644 index 7a8bc5534..000000000 --- a/olap/Cargo.toml +++ /dev/null @@ -1,40 +0,0 @@ -[package] -name = "ceramic-olap" -description = "Standalone process that aggregates Ceramic events into a data lake for OLAP workloads" -version.workspace = true -edition.workspace = true -authors.workspace = true -license.workspace = true -repository.workspace = true - -[dependencies] -anyhow.workspace = true -arrow.workspace = true -ceramic-metrics.workspace = true -ceramic-pipeline.workspace = true -cid.workspace = true -clap.workspace = true -datafusion.workspace = true -futures.workspace = true -git-version = "0.3.9" -hyper.workspace = true -json-patch = "2.0.0" -multibase.workspace = true -multihash.workspace = true -multihash-codetable = { version = "0.1.3", features = ["sha2"] } -multihash-derive = "0.9.0" -names.workspace = true -object_store = { version = "0.11", features = ["aws"] } -prometheus-client.workspace = true -serde_json.workspace = true -serde.workspace = true -signal-hook = "0.3.17" -signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } -tokio = { workspace = true, features = ["fs", "rt-multi-thread"] } -tracing.workspace = true - -[dev-dependencies] -test-log.workspace = true -ceramic-flight.workspace = true -expect-test.workspace = true -ceramic-core.workspace = true diff --git a/olap/src/lib.rs b/olap/src/lib.rs deleted file mode 100644 index 94c4b353a..000000000 --- a/olap/src/lib.rs +++ /dev/null @@ -1,274 +0,0 @@ -//! OLAP Aggregator process for aggregating Ceramic Model Instance Document streams. -#![warn(missing_docs)] - -mod aggregator; -mod metrics; - -use anyhow::{anyhow, Result}; -use ceramic_metrics::config::Config as MetricsConfig; -use clap::{Args, Parser, Subcommand, ValueEnum}; -use futures::StreamExt as _; -use multihash::Multihash; -use multihash_codetable::Code; -use multihash_derive::Hasher as _; -use signal_hook::consts::signal::*; -use signal_hook_tokio::Signals; -use tokio::{io::AsyncReadExt as _, sync::oneshot}; -use tracing::{debug, info, warn}; - -#[derive(Parser, Debug)] -#[command(author, version, about, long_about = None)] -struct Cli { - #[command(subcommand)] - command: Command, -} - -#[derive(Subcommand, Debug)] -enum Command { - /// Run a daemon process - Daemon(Box), -} - -#[derive(Args, Debug)] -struct DaemonOpts { - /// Endpoint of a Flight SQL server for the conclusion feed. - #[arg( - short, - long, - default_value = "http://127.0.0.1:5102", - env = "CERAMIC_OLAP_FLIGHT_SQL_ENDPOINT" - )] - flight_sql_endpoint: String, - - /// Bind address of the metrics endpoint. - #[arg( - short, - long, - default_value = "127.0.0.1:9465", - env = "CERAMIC_OLAP_METRICS_BIND_ADDRESS" - )] - metrics_bind_address: String, - - /// When true metrics will be exported - #[arg(long, default_value_t = false, env = "CERAMIC_OLAP_METRICS")] - metrics: bool, - - /// When true traces will be exported - #[arg(long, default_value_t = false, env = "CERAMIC_OLAP_TRACING")] - tracing: bool, - - /// AWS S3 bucket name. - /// When configured the aggregator will support storing data in S3 compatible object stores. - /// - /// Credentials are read from the environment: - /// - /// * AWS_ACCESS_KEY_ID -> access_key_id - /// * AWS_SECRET_ACCESS_KEY -> secret_access_key - /// * AWS_DEFAULT_REGION -> region - /// * AWS_ENDPOINT -> endpoint - /// * AWS_SESSION_TOKEN -> token - /// * AWS_ALLOW_HTTP -> set to "true" to permit HTTP connections without TLS - /// - #[arg(long, env = "CERAMIC_OLAP_AWS_BUCKET")] - aws_bucket: String, - - #[command(flatten)] - log_opts: LogOpts, -} - -#[derive(Args, Debug)] -struct LogOpts { - /// Specify the format of log events. - #[arg(long, default_value = "multi-line", env = "CERAMIC_OLAP_LOG_FORMAT")] - log_format: LogFormat, -} - -impl LogOpts { - fn format(&self) -> ceramic_metrics::config::LogFormat { - match self.log_format { - LogFormat::SingleLine => ceramic_metrics::config::LogFormat::SingleLine, - LogFormat::MultiLine => ceramic_metrics::config::LogFormat::MultiLine, - LogFormat::Json => ceramic_metrics::config::LogFormat::Json, - } - } -} - -#[derive(ValueEnum, Debug, Clone, Default)] -enum LogFormat { - /// Format log events on multiple lines using ANSI colors. - #[default] - MultiLine, - /// Format log events on a single line using ANSI colors. - SingleLine, - /// Format log events newline delimited JSON objects. - /// No ANSI colors are used. - Json, -} - -impl From<&DaemonOpts> for aggregator::Config { - fn from(value: &DaemonOpts) -> Self { - Self { - flight_sql_endpoint: value.flight_sql_endpoint.clone(), - aws_s3_bucket: value.aws_bucket.clone(), - } - } -} - -/// Run the ceramic one binary process -pub async fn run() -> Result<()> { - let args = Cli::parse(); - match args.command { - Command::Daemon(opts) => daemon(*opts).await, - } -} - -async fn daemon(opts: DaemonOpts) -> Result<()> { - let info = Info::new().await?; - - let mut metrics_config = MetricsConfig { - export: opts.metrics, - tracing: opts.tracing, - log_format: opts.log_opts.format(), - ..Default::default() - }; - info.apply_to_metrics_config(&mut metrics_config); - - // Currently only an info metric is recorded so we do not need to keep the handle to the - // Metrics struct. That will change once we add more metrics. - let _metrics = ceramic_metrics::MetricsHandle::register(|registry| { - crate::metrics::Metrics::register(info.clone(), registry) - }); - - // Logging Tracing and metrics are initialized here, - // debug,info etc will not work until after this line - let metrics_handle = ceramic_metrics::MetricsHandle::new(metrics_config.clone()) - .await - .expect("failed to initialize metrics"); - - // Start metrics server - debug!( - bind_address = opts.metrics_bind_address, - "starting prometheus metrics server" - ); - let (tx_metrics_server_shutdown, metrics_server_handle) = - metrics::start(&opts.metrics_bind_address.parse()?).map_err(|e| { - anyhow!( - "Failed to start metrics server using address: {}. {}", - opts.metrics_bind_address, - e - ) - })?; - - let (tx, rx) = tokio::sync::oneshot::channel::<()>(); - let signals = Signals::new([SIGHUP, SIGTERM, SIGINT, SIGQUIT])?; - let handle = signals.handle(); - debug!("starting signal handler task"); - let signals_handle = tokio::spawn(handle_signals(signals, tx)); - - // Start aggregator - aggregator::run(&opts, async move { - let _ = rx.await; - }) - .await?; - - // Shutdown metrics server and collection handler - tx_metrics_server_shutdown - .send(()) - .expect("should be able to send metrics shutdown message"); - if let Err(err) = metrics_server_handle.await? { - warn!(%err, "metrics server task error") - } - metrics_handle.shutdown(); - debug!("metrics server stopped"); - - // Wait for signal handler to finish - handle.close(); - signals_handle.await?; - debug!("signal handler stopped"); - - Ok(()) -} - -async fn handle_signals(mut signals: Signals, shutdown: oneshot::Sender<()>) { - let mut shutdown = Some(shutdown); - while let Some(signal) = signals.next().await { - debug!(?signal, "signal received"); - if let Some(shutdown) = shutdown.take() { - info!("sending shutdown message"); - shutdown - .send(()) - .expect("should be able to send shutdown message"); - } - } -} - -/// Static information about the current process. -#[derive(Debug, Clone)] -pub struct Info { - /// Name of the service. - pub service_name: String, - /// Semantic version of the build. - pub version: String, - /// Description of git commit. - pub build: String, - /// Unique name generated for this invocation of the process. - pub instance_id: String, - /// Multibase encoded multihash of the current running executable. - pub exe_hash: String, -} - -impl Info { - async fn new() -> Result { - let exe_hash = multibase::encode( - multibase::Base::Base64Url, - current_exe_hash().await?.to_bytes(), - ); - Ok(Self { - service_name: env!("CARGO_PKG_NAME").to_string(), - build: git_version::git_version!( - prefix = "git:", - cargo_prefix = "cargo:", - fallback = "unknown" - ) - .to_string(), - version: env!("CARGO_PKG_VERSION").to_string(), - instance_id: names::Generator::default().next().unwrap(), - exe_hash, - }) - } - fn apply_to_metrics_config(&self, cfg: &mut MetricsConfig) { - cfg.service_name.clone_from(&self.service_name); - cfg.version.clone_from(&self.version); - cfg.build.clone_from(&self.build); - cfg.instance_id.clone_from(&self.instance_id); - } -} - -async fn current_exe_hash() -> Result> { - if cfg!(debug_assertions) { - // Debug builds can be 1GB+, so do we not want to spend the time to hash them. - // Return a fake hash. - Ok(Multihash::<32>::wrap( - // Identity hash code - 0, - // Spells debug when base64 url encoded with some leading padding. - &[00, 117, 230, 238, 130], - ) - .expect("hardcoded digest should fit in 32 bytes")) - } else { - let exe_path = std::env::current_exe()?; - let mut hasher = multihash_codetable::Sha2_256::default(); - let mut f = tokio::fs::File::open(exe_path).await?; - let mut buffer = vec![0; 4096]; - - loop { - let bytes_read = f.read(&mut buffer[..]).await?; - if bytes_read == 0 { - break; - } - hasher.update(&buffer[..bytes_read]); - } - let hash = hasher.finalize(); - Ok(Multihash::<32>::wrap(Code::Sha2_256.into(), hash)?) - } -} diff --git a/olap/src/main.rs b/olap/src/main.rs deleted file mode 100644 index c790eca32..000000000 --- a/olap/src/main.rs +++ /dev/null @@ -1,8 +0,0 @@ -#[tokio::main(flavor = "multi_thread")] -async fn main() -> anyhow::Result<()> { - ceramic_olap::run().await.map_err(|e| { - // this should use stderr if we error before tracing is hooked up - tracing::error!("Error running command: {:#}", e); - e - }) -} diff --git a/olap/src/metrics.rs b/olap/src/metrics.rs deleted file mode 100644 index b7a2f89dc..000000000 --- a/olap/src/metrics.rs +++ /dev/null @@ -1,72 +0,0 @@ -//! Metrics server -use std::{convert::Infallible, net::SocketAddr}; - -use anyhow::Result; -use hyper::{ - http::HeaderValue, - service::{make_service_fn, service_fn}, - Body, Request, Response, -}; -use prometheus_client::{encoding::EncodeLabelSet, metrics::info::Info, registry::Registry}; -use tokio::{sync::oneshot, task::JoinHandle}; - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -struct InfoLabels { - service_name: String, - version: String, - build: String, - instance_id: String, - exe_hash: String, -} - -impl From for InfoLabels { - fn from(info: crate::Info) -> Self { - Self { - service_name: info.service_name, - version: info.version, - build: info.build, - instance_id: info.instance_id, - exe_hash: info.exe_hash, - } - } -} - -pub struct Metrics {} - -impl Metrics { - pub fn register(info: crate::Info, registry: &mut Registry) -> Self { - let sub_registry = registry.sub_registry_with_prefix("ceramic"); - - let info: Info = Info::new(info.into()); - sub_registry.register("one", "Information about the ceramic-one process", info); - - Self {} - } -} - -async fn handle(_req: Request) -> Result, Infallible> { - let data = ceramic_metrics::MetricsHandle::encode(); - let mut resp = Response::new(Body::from(data)); - resp.headers_mut().insert( - "Content-Type", - // Use OpenMetrics content type so prometheus knows to parse it accordingly - HeaderValue::from_static("application/openmetrics-text"), - ); - Ok(resp) -} - -pub type MetricsServerTask = JoinHandle>; - -/// Start metrics server. -/// Sending on the returned channel will cause the server to shutdown gracefully. -pub fn start(addr: &SocketAddr) -> Result<(oneshot::Sender<()>, MetricsServerTask)> { - let (tx, rx) = oneshot::channel::<()>(); - let service = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) }); - - let server = hyper::Server::try_bind(addr)? - .serve(service) - .with_graceful_shutdown(async { - rx.await.ok(); - }); - Ok((tx, tokio::spawn(server))) -} diff --git a/one/Cargo.toml b/one/Cargo.toml index e2a723590..9ed1e794c 100644 --- a/one/Cargo.toml +++ b/one/Cargo.toml @@ -10,6 +10,8 @@ publish = false [dependencies] anyhow.workspace = true +arrow-cast.workspace = true +arrow-flight.workspace = true async-stream.workspace = true async-trait.workspace = true ceramic-anchor-remote.workspace = true @@ -29,8 +31,7 @@ ceramic-pipeline.workspace = true ceramic-sql.workspace = true cid.workspace = true clap.workspace = true -datafusion-cli = "42.0.0" -datafusion-functions-json = "0.42.0" +datafusion.workspace = true futures.workspace = true git-version = "0.3" home = "0.5" @@ -45,7 +46,7 @@ multihash-codetable.workspace = true multihash-derive.workspace = true multihash.workspace = true names.workspace = true -object_store = { version = "0.11", features = ["aws"] } +object_store.workspace = true prometheus-client.workspace = true recon.workspace = true serde_ipld_dagcbor.workspace = true @@ -56,6 +57,7 @@ tokio-metrics = { version = "0.3.1", features = ["rt"] } tokio-prometheus-client = "0.1" tokio-stream = { workspace = true, features = ["io-util"] } tokio.workspace = true +tonic.workspace = true tracing.workspace = true [target.'cfg(not(target_env = "msvc"))'.dependencies] diff --git a/one/src/daemon.rs b/one/src/daemon.rs index f23a3a6a0..28b749e56 100644 --- a/one/src/daemon.rs +++ b/one/src/daemon.rs @@ -15,13 +15,14 @@ use ceramic_kubo_rpc::Multiaddr; use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle}; use ceramic_p2p::{load_identity, DiskStorage, Keychain, Libp2pConfig}; use clap::Args; +use object_store::aws::AmazonS3Builder; use recon::{FullInterests, Recon, ReconInterestProvider}; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; use std::sync::Arc; use swagger::{auth::MakeAllowAllAuthenticator, EmptyContext}; use tokio::sync::broadcast; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; #[derive(Args, Debug)] pub struct DaemonOpts { @@ -236,6 +237,34 @@ pub struct DaemonOpts { env = "CERAMIC_ONE_ETHEREUM_RPC_URLS" )] ethereum_rpc_urls: Vec, + + /// Enable the aggregator, requires Flight SQL and S3 bucket to be defined. + #[arg( + long, + requires = "flight_sql_bind_address", + requires = "s3_bucket", + env = "CERAMIC_ONE_AGGREGATOR" + )] + aggregator: Option, + + /// Name of the S3 bucket where Ceramic stores published data tables. + /// Requires using the experimental-features flag + /// + /// Credentials are read from the environment: + /// + /// * AWS_ACCESS_KEY_ID -> access_key_id + /// * AWS_SECRET_ACCESS_KEY -> secret_access_key + /// * AWS_DEFAULT_REGION -> region + /// * AWS_ENDPOINT -> endpoint + /// * AWS_SESSION_TOKEN -> token + /// * AWS_ALLOW_HTTP -> set to "true" to permit HTTP connections without TLS + /// + #[arg( + long, + requires = "experimental_features", + env = "CERAMIC_ONE_S3_BUCKET" + )] + s3_bucket: Option, } async fn get_eth_rpc_providers( @@ -495,18 +524,50 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { let handle = signals.handle(); // Start Flight server - let flight_handle = if let Some(addr) = opts.flight_sql_bind_address { + let (flight_handle, aggregator_handle) = if let Some(addr) = opts.flight_sql_bind_address { let addr = addr.parse()?; let feed = event_svc.clone(); - let mut shutdown_signal = shutdown_signal.resubscribe(); - Some(tokio::spawn(async move { - ceramic_flight::server::run(feed, addr, async move { - let _ = shutdown_signal.recv().await; + let bucket = opts + .s3_bucket + .ok_or_else(|| anyhow!("s3_bucket option is required when exposing flight sql"))?; + let ctx = ceramic_pipeline::session_from_config(ceramic_pipeline::Config { + conclusion_feed: feed, + object_store: Arc::new( + AmazonS3Builder::from_env() + .with_bucket_name(&bucket) + .build()?, + ), + object_store_bucket_name: bucket, + }) + .await?; + + // Start aggregator + let aggregator_handle = if opts.aggregator.unwrap_or_default() { + let mut ss = shutdown_signal.resubscribe(); + let ctx = ctx.clone(); + Some(tokio::spawn(async move { + if let Err(err) = ceramic_pipeline::aggregator::run(ctx, async move { + let _ = ss.recv().await; + }) + .await + { + error!(%err, "aggregator task failed"); + } + })) + } else { + None + }; + + let mut ss = shutdown_signal.resubscribe(); + let flight_handle = tokio::spawn(async move { + ceramic_flight::server::run(ctx, addr, async move { + let _ = ss.recv().await; }) .await - })) + }); + (aggregator_handle, Some(flight_handle)) } else { - None + (None, None) }; // Start anchoring if remote anchor service URL is provided @@ -619,6 +680,10 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { let _ = flight_handle.await; } + if let Some(aggregator_handle) = aggregator_handle { + let _ = aggregator_handle.await; + } + if let Some(anchor_service_handle) = anchor_service_handle { let _ = anchor_service_handle.await; } diff --git a/one/src/query.rs b/one/src/query.rs index 71c65d196..e01fc2e21 100644 --- a/one/src/query.rs +++ b/one/src/query.rs @@ -1,6 +1,21 @@ +use std::{sync::Arc, time::Duration}; + +use anyhow::{bail, Context, Result}; +use arrow_cast::{cast_with_options, CastOptions}; +use arrow_flight::{ + sql::{client::FlightSqlServiceClient, CommandGetDbSchemas, CommandGetTables}, + FlightInfo, +}; use clap::Args; -use datafusion_cli::{exec::exec_from_repl, print_options::PrintOptions}; -use object_store::aws::AmazonS3Builder; +use clap::Subcommand; +use core::str; +use datafusion::arrow::{ + array::{ArrayRef, Datum as _, RecordBatch, StringArray}, + datatypes::Schema, + util::pretty::pretty_format_batches, +}; +use futures::TryStreamExt; +use tonic::transport::{Channel, Endpoint}; #[derive(Args, Debug)] pub struct QueryOpts { @@ -13,44 +28,230 @@ pub struct QueryOpts { )] query_endpoint: String, - /// S3 bucket name. - /// When configured the aggregator will support storing data in S3 compatible object stores. - /// - /// Credentials are read from the environment: - /// - /// * AWS_ACCESS_KEY_ID -> access_key_id - /// * AWS_SECRET_ACCESS_KEY -> secret_access_key - /// * AWS_DEFAULT_REGION -> region - /// * AWS_ENDPOINT -> endpoint - /// * AWS_SESSION_TOKEN -> token - /// * AWS_ALLOW_HTTP -> set to "true" to permit HTTP connections without TLS - /// - #[arg(long, env = "CERAMIC_ONE_AWS_BUCKET")] - s3_bucket: String, + #[clap(subcommand)] + cmd: Command, } -pub async fn run(opts: QueryOpts) -> anyhow::Result<()> { - let mut ctx = ceramic_pipeline::session_from_config(opts).await?; +/// Different available commands. +#[derive(Debug, Subcommand)] +enum Command { + /// Get catalogs. + Catalogs, + /// Get db schemas for a catalog. + DbSchemas { + /// Name of a catalog. + /// + /// Required. + catalog: String, + /// Specifies a filter pattern for schemas to search for. + /// When no schema_filter is provided, the pattern will not be used to narrow the search. + /// In the pattern string, two special characters can be used to denote matching rules: + /// - "%" means to match any substring with 0 or more characters. + /// - "_" means to match any one character. + #[clap(short, long)] + db_schema_filter: Option, + }, + /// Get tables for a catalog. + Tables { + /// Name of a catalog. + /// + /// Required. + catalog: String, + /// Specifies a filter pattern for schemas to search for. + /// When no schema_filter is provided, the pattern will not be used to narrow the search. + /// In the pattern string, two special characters can be used to denote matching rules: + /// - "%" means to match any substring with 0 or more characters. + /// - "_" means to match any one character. + #[clap(short, long)] + db_schema_filter: Option, + /// Specifies a filter pattern for tables to search for. + /// When no table_filter is provided, all tables matching other filters are searched. + /// In the pattern string, two special characters can be used to denote matching rules: + /// - "%" means to match any substring with 0 or more characters. + /// - "_" means to match any one character. + #[clap(short, long)] + table_filter: Option, + /// Specifies a filter of table types which must match. + /// The table types depend on vendor/implementation. It is usually used to separate tables from views or system tables. + /// TABLE, VIEW, and SYSTEM TABLE are commonly supported. + #[clap(long)] + table_types: Vec, + }, + /// Get table types. + TableTypes, + + /// Execute given statement. + StatementQuery { + /// SQL query. + /// + /// Required. + query: String, + }, + + /// Prepare given statement and then execute it. + PreparedStatementQuery { + /// SQL query. + /// + /// Required. + /// + /// Can contains placeholders like `$1`. + /// + /// Example: `SELECT * FROM t WHERE x = $1` + query: String, + + /// Additional parameters. + /// + /// Can be given multiple times. Names and values are separated by '='. Values will be + /// converted to the type that the server reported for the prepared statement. + /// + /// Example: `-p $1=42` + #[clap(short, value_parser = parse_key_val)] + params: Vec<(String, String)>, + }, +} + +pub async fn run(opts: QueryOpts) -> Result<()> { + let mut client = setup_client(opts.query_endpoint) + .await + .context("setup client")?; + + let flight_info = match opts.cmd { + Command::Catalogs => client.get_catalogs().await.context("get catalogs")?, + Command::DbSchemas { + catalog, + db_schema_filter, + } => client + .get_db_schemas(CommandGetDbSchemas { + catalog: Some(catalog), + db_schema_filter_pattern: db_schema_filter, + }) + .await + .context("get db schemas")?, + Command::Tables { + catalog, + db_schema_filter, + table_filter, + table_types, + } => client + .get_tables(CommandGetTables { + catalog: Some(catalog), + db_schema_filter_pattern: db_schema_filter, + table_name_filter_pattern: table_filter, + table_types, + // Schema is returned as ipc encoded bytes. + // We do not support returning the schema as there is no trivial mechanism + // to display the information to the user. + include_schema: false, + }) + .await + .context("get tables")?, + Command::TableTypes => client.get_table_types().await.context("get table types")?, + Command::StatementQuery { query } => client + .execute(query, None) + .await + .context("execute statement")?, + Command::PreparedStatementQuery { query, params } => { + let mut prepared_stmt = client + .prepare(query, None) + .await + .context("prepare statement")?; - datafusion_functions_json::register_all(&mut ctx)?; + if !params.is_empty() { + prepared_stmt + .set_parameters( + construct_record_batch_from_params( + ¶ms, + prepared_stmt + .parameter_schema() + .context("get parameter schema")?, + ) + .context("construct parameters")?, + ) + .context("bind parameters")?; + } - let mut print_options = PrintOptions { - format: datafusion_cli::print_format::PrintFormat::Automatic, - quiet: false, - maxrows: datafusion_cli::print_options::MaxRows::Unlimited, - color: true, + prepared_stmt + .execute() + .await + .context("execute prepared statement")? + } }; - exec_from_repl(&ctx, &mut print_options).await.unwrap(); + let batches = execute_flight(&mut client, flight_info) + .await + .context("read flight data")?; + + let res = pretty_format_batches(batches.as_slice()).context("format results")?; + println!("{res}"); Ok(()) } -impl From for ceramic_pipeline::Config { - fn from(value: QueryOpts) -> Self { - Self { - flight_sql_endpoint: value.query_endpoint, - aws_s3_builder: AmazonS3Builder::from_env().with_bucket_name(&value.s3_bucket), - } +async fn execute_flight( + client: &mut FlightSqlServiceClient, + info: FlightInfo, +) -> Result> { + let schema = Arc::new(Schema::try_from(info.clone()).context("valid schema")?); + let mut batches = Vec::with_capacity(info.endpoint.len() + 1); + batches.push(RecordBatch::new_empty(schema)); + + for endpoint in info.endpoint { + let Some(ticket) = &endpoint.ticket else { + bail!("did not get ticket"); + }; + + let mut flight_data = client.do_get(ticket.clone()).await.context("do get")?; + + let mut endpoint_batches: Vec<_> = (&mut flight_data) + .try_collect() + .await + .context("collect data stream")?; + batches.append(&mut endpoint_batches); } + + Ok(batches) +} + +fn construct_record_batch_from_params( + params: &[(String, String)], + parameter_schema: &Schema, +) -> Result { + let mut items = Vec::<(&String, ArrayRef)>::new(); + + for (name, value) in params { + let field = parameter_schema.field_with_name(name)?; + let value_as_array = StringArray::new_scalar(value); + let casted = cast_with_options( + value_as_array.get().0, + field.data_type(), + &CastOptions::default(), + )?; + items.push((name, casted)) + } + + Ok(RecordBatch::try_from_iter(items)?) +} + +async fn setup_client(endpoint: String) -> Result> { + let endpoint = Endpoint::new(endpoint) + .context("create endpoint")? + .connect_timeout(Duration::from_secs(20)) + .timeout(Duration::from_secs(20)) + .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait + .tcp_keepalive(Option::Some(Duration::from_secs(3600))) + .http2_keep_alive_interval(Duration::from_secs(300)) + .keep_alive_timeout(Duration::from_secs(20)) + .keep_alive_while_idle(true); + + let channel = endpoint.connect().await.context("connect to endpoint")?; + + Ok(FlightSqlServiceClient::new(channel)) +} + +/// Parse a single key-value pair +fn parse_key_val(s: &str) -> Result<(String, String), String> { + let pos = s + .find('=') + .ok_or_else(|| format!("invalid KEY=value: no `=` found in `{s}`"))?; + Ok((s[..pos].to_owned(), s[pos + 1..].to_owned())) } diff --git a/pipeline/Cargo.toml b/pipeline/Cargo.toml index 3d689b9e9..69c084217 100644 --- a/pipeline/Cargo.toml +++ b/pipeline/Cargo.toml @@ -10,11 +10,27 @@ repository.workspace = true [dependencies] anyhow.workspace = true -arrow-flight.workspace = true +arrow-schema.workspace = true +arrow.workspace = true +async-stream.workspace = true +async-trait.workspace = true +ceramic-core.workspace = true +ceramic-event.workspace = true cid.workspace = true -datafusion-federation.workspace = true -datafusion-flight-sql-table-provider.workspace = true datafusion.workspace = true -object_store = { version = "0.11", features = ["aws"] } -tonic.workspace = true +datafusion-functions-json = "0.42.0" +expect-test.workspace = true +futures.workspace = true +int-enum.workspace = true +ipld-core.workspace = true +json-patch = "3.0.1" +object_store.workspace = true +serde.workspace = true +serde_json.workspace = true +tokio.workspace = true +tracing.workspace = true url.workspace = true + +[dev-dependencies] +test-log = "0.2.16" +ceramic-arrow-test.workspace = true diff --git a/olap/src/aggregator/ceramic_patch.rs b/pipeline/src/aggregator/ceramic_patch.rs similarity index 100% rename from olap/src/aggregator/ceramic_patch.rs rename to pipeline/src/aggregator/ceramic_patch.rs diff --git a/olap/src/aggregator/mod.rs b/pipeline/src/aggregator/mod.rs similarity index 97% rename from olap/src/aggregator/mod.rs rename to pipeline/src/aggregator/mod.rs index 65f1899d9..956cc8b76 100644 --- a/olap/src/aggregator/mod.rs +++ b/pipeline/src/aggregator/mod.rs @@ -21,32 +21,12 @@ use datafusion::{ physical_plan::collect_partitioned, sql::TableReference, }; -use object_store::aws::AmazonS3Builder; use std::{future::Future, sync::Arc}; use tracing::{debug, error}; use crate::Result; -pub struct Config { - pub flight_sql_endpoint: String, - pub aws_s3_bucket: String, -} - -impl From for ceramic_pipeline::Config { - fn from(value: Config) -> Self { - Self { - flight_sql_endpoint: value.flight_sql_endpoint, - aws_s3_builder: AmazonS3Builder::from_env().with_bucket_name(value.aws_s3_bucket), - } - } -} - -pub async fn run( - config: impl Into, - shutdown_signal: impl Future, -) -> Result<()> { - let config = config.into(); - let ctx = ceramic_pipeline::session_from_config(config).await?; +pub async fn run(ctx: SessionContext, shutdown_signal: impl Future) -> Result<()> { run_continuous_stream(ctx, shutdown_signal, 10000).await?; Ok(()) } @@ -257,11 +237,6 @@ mod tests { use arrow::{array::RecordBatch, util::pretty::pretty_format_batches}; use ceramic_core::StreamIdType; - use ceramic_flight::{ - conclusion_events_to_record_batch, ConclusionData, ConclusionEvent, ConclusionInit, - ConclusionTime, - }; - use ceramic_pipeline::cid_string::CidString; use cid::Cid; use datafusion::{ catalog_common::{CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider}, @@ -275,6 +250,11 @@ mod tests { use expect_test::expect; use test_log::test; + use crate::{ + cid_string::CidString, conclusion_events_to_record_batch, schemas, ConclusionData, + ConclusionEvent, ConclusionInit, ConclusionTime, + }; + async fn do_test(conclusion_feed: RecordBatch) -> anyhow::Result { do_pass(init_ctx().await?, conclusion_feed).await } @@ -287,7 +267,7 @@ mod tests { constraints: Constraints::empty(), input: LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: Arc::new(ceramic_pipeline::schemas::doc_state().try_into().unwrap()), + schema: Arc::new(schemas::doc_state().try_into().unwrap()), }) .into(), if_not_exists: false, @@ -312,7 +292,7 @@ mod tests { constraints: Constraints::empty(), input: LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, - schema: Arc::new(ceramic_pipeline::schemas::doc_state().try_into().unwrap()), + schema: Arc::new(schemas::doc_state().try_into().unwrap()), }) .into(), if_not_exists: false, diff --git a/flight/src/conversion.rs b/pipeline/src/conclusion/event.rs similarity index 74% rename from flight/src/conversion.rs rename to pipeline/src/conclusion/event.rs index f0b90ac92..a0ab96de8 100644 --- a/flight/src/conversion.rs +++ b/pipeline/src/conclusion/event.rs @@ -1,12 +1,154 @@ -use crate::types::*; -use anyhow::Result; -use arrow::array::{ +use std::sync::Arc; + +use anyhow::{anyhow, Result}; +use arrow::datatypes::{DataType, Field, Int32Type}; +use ceramic_core::METAMODEL_STREAM_ID; +use ceramic_event::{unvalidated, StreamId, StreamIdType}; +use cid::Cid; +use datafusion::arrow::array::RecordBatch; +use datafusion::arrow::array::{ ArrayRef, BinaryBuilder, BinaryDictionaryBuilder, ListBuilder, MapBuilder, MapFieldNames, PrimitiveBuilder, StringBuilder, StructArray, UInt64Builder, UInt8Builder, }; -use arrow::datatypes::{DataType, Field, Int32Type}; -use arrow::record_batch::RecordBatch; -use std::sync::Arc; +use int_enum::IntEnum; +use ipld_core::ipld::Ipld; +use serde::{Deserialize, Serialize}; + +/// A Ceramic event annotated with conclusions about the event. +/// +/// Conclusions included for all events: +/// 1. An event's signature has been verified +/// 2. An event's previous events will have an index less than the event's index. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ConclusionEvent { + /// An event that contains data for a stream. + Data(ConclusionData), + /// An event that contains temporal information for a stream. + Time(ConclusionTime), +} + +impl AsRef for ConclusionEvent { + fn as_ref(&self) -> &ConclusionEvent { + self + } +} + +impl ConclusionEvent { + pub(crate) fn event_type_as_int(&self) -> u8 { + match self { + ConclusionEvent::Data(_) => 0, + ConclusionEvent::Time(_) => 1, + } + } +} + +/// ConclusionInit is static metadata about a stream. +/// All events within a stream have the same ConclusionInit. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConclusionInit { + /// The CID of the init event of the stream. Can be used as a unique identifier for the stream. + /// This is not the StreamId as it does not contain the StreamType. + pub stream_cid: Cid, + /// The type of the stream. + pub stream_type: u8, + /// DID controller of the stream. + pub controller: String, + /// Order set of key value pairs that annotate the stream. + pub dimensions: Vec<(String, Vec)>, +} + +/// ConclusionData represents a Ceramic event that contained data. +/// +/// Additionally we have concluded to which stream the event belongs and its associated metadata. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConclusionData { + /// Index of the event. See [`ConclusionEvent`] for invariants about the index. + pub index: u64, + /// The CID of the event itself. Can be used as a unique identifier for the event. + pub event_cid: Cid, + /// The stream metadata of the event. + pub init: ConclusionInit, + /// Ordered list of previous events this event references. + pub previous: Vec, + /// Raw bytes of the event data encoded as dag-json. + pub data: Vec, +} + +/// ConclusionTime represents a Ceramic event that contains time relevant information. +/// +/// Additionally we have concluded to which stream the event belongs and its associated metadata. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConclusionTime { + /// Index of the event. See [`ConclusionEvent`] for invariants about the index. + pub index: u64, + /// The CID of the event itself. Can be used as a unique identifier for the event. + pub event_cid: Cid, + /// The stream metadata of the event. + pub init: ConclusionInit, + /// Ordered list of previous events this event references. + pub previous: Vec, + //TODO Add temporal conclusions, i.e the block timestamp of this event +} + +impl<'a> TryFrom<&'a unvalidated::Event> for ConclusionInit { + type Error = anyhow::Error; + + fn try_from(event: &'a unvalidated::Event) -> Result { + // Extract the init payload from the event + let init_payload = event + .init_payload() + .ok_or_else(|| anyhow!("malformed event: no init payload found"))?; + + // Get the model from the init header + // The model indicates the creator of the stream + let model = init_payload.header().model(); + + // Convert the model to a StreamId + let stream_id = StreamId::try_from(model)?; + + // Determine the stream type: + // If the stream_id matches the metamodel, it's a Model stream + // Otherwise, it's a ModelInstanceDocument stream + let stream_type = if stream_id == METAMODEL_STREAM_ID { + StreamIdType::Model + } else { + StreamIdType::ModelInstanceDocument + }; + + // Construct and return the ConclusionInit + Ok(ConclusionInit { + stream_cid: *event.stream_cid(), + stream_type: stream_type.int_value() as u8, + controller: init_payload + .header() + .controllers() + .first() + .ok_or_else(|| anyhow!("no controller found"))? + .to_string(), + dimensions: vec![ + ("model".to_string(), init_payload.header().model().to_vec()), + ( + "controller".to_string(), + init_payload + .header() + .controllers() + .first() + .cloned() + .unwrap_or_default() + .into_bytes(), + ), + ( + "context".to_string(), + init_payload + .header() + .context() + .map(|unique| unique.to_vec()) + .unwrap_or_default(), + ), + ], + }) + } +} /// Construct a [`RecordBatch`] from an iterator of [`ConclusionEvent`]s. pub struct ConclusionEventBuilder { @@ -136,8 +278,10 @@ impl<'a> Extend<&'a ConclusionEvent> for ConclusionEventBuilder { /// /// ``` /// use anyhow::Result; -/// use ceramic_flight::{ConclusionEvent, ConclusionData, ConclusionInit}; -/// use ceramic_flight::conclusion_events_to_record_batch; +/// use ceramic_pipeline::{ +/// conclusion_events_to_record_batch, +/// ConclusionEvent, ConclusionData, ConclusionInit +/// }; /// use cid::Cid; /// /// fn main() -> Result<()> { diff --git a/pipeline/src/conclusion/mod.rs b/pipeline/src/conclusion/mod.rs new file mode 100644 index 000000000..5ad6a725f --- /dev/null +++ b/pipeline/src/conclusion/mod.rs @@ -0,0 +1,8 @@ +mod event; +mod table; + +pub use event::{ + conclusion_events_to_record_batch, ConclusionData, ConclusionEvent, ConclusionInit, + ConclusionTime, +}; +pub use table::{ConclusionFeed, FeedTable}; diff --git a/pipeline/src/conclusion/table.rs b/pipeline/src/conclusion/table.rs new file mode 100644 index 000000000..1121ac432 --- /dev/null +++ b/pipeline/src/conclusion/table.rs @@ -0,0 +1,267 @@ +use std::{any::Any, sync::Arc}; + +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use async_stream::try_stream; +use datafusion::{ + catalog::{Session, TableProvider}, + common::exec_datafusion_err, + datasource::TableType, + logical_expr::{Expr, TableProviderFilterPushDown}, + physical_expr::EquivalenceProperties, + physical_plan::{ + stream::RecordBatchStreamAdapter, ExecutionMode, ExecutionPlan, PlanProperties, + }, + scalar::ScalarValue, +}; +use futures::TryStreamExt as _; +use tracing::{debug, instrument, Level}; + +use crate::conclusion::conclusion_events_to_record_batch; + +use super::ConclusionEvent; + +/// A ConclusionFeed provides access to [`ConclusionEvent`]s. +#[async_trait::async_trait] +pub trait ConclusionFeed: std::fmt::Debug + Send + Sync { + /// Produce a set of conclusion events up to the limit with an index greater than the highwater_mark + /// Event must be returned in index order. + async fn conclusion_events_since( + &self, + highwater_mark: i64, + limit: i64, + ) -> anyhow::Result>; +} +#[async_trait::async_trait] +impl ConclusionFeed for Arc { + async fn conclusion_events_since( + &self, + highwater_mark: i64, + limit: i64, + ) -> anyhow::Result> { + self.as_ref() + .conclusion_events_since(highwater_mark, limit) + .await + } +} + +// Implements the [`TableProvider`] trait producing a [`FeedExec`] instance when the table is +// scanned, which in turn calls into the [`ConclusionFeed`] to get the actual events. +pub struct FeedTable { + feed: Arc, + schema: SchemaRef, +} + +impl FeedTable { + pub fn new(feed: Arc) -> Self { + Self { + feed, + schema: Arc::new(Schema::new(vec![ + Field::new("index", DataType::UInt64, false), + Field::new("event_type", DataType::UInt8, false), + Field::new("stream_cid", DataType::Binary, false), + Field::new("stream_type", DataType::UInt8, false), + Field::new("controller", DataType::Utf8, false), + Field::new( + // NOTE: The entire dimensions map may be null or values for a given key may + // be null. No other aspect of dimensions may be null. + "dimensions", + DataType::Map( + Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("key", DataType::Utf8, false), + Field::new( + "value", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Binary), + ), + true, + ), + ] + .into(), + ), + false, + ) + .into(), + false, + ), + true, + ), + Field::new("event_cid", DataType::Binary, false), + Field::new("data", DataType::Binary, true), + Field::new( + "previous", + DataType::List(Arc::new(Field::new("item", DataType::Binary, false))), + true, + ), + ])), + } + } + fn highwater_mark_from_expr(expr: &Expr) -> Option { + let find_highwater_mark = |col: &Expr, lit: &Expr| { + col.try_as_col() + .map_or(false, |column| column.name == "index") + .then(|| { + if let Expr::Literal(ScalarValue::UInt64(highwater_mark)) = lit { + highwater_mark.to_owned() + } else { + None + } + }) + .flatten() + }; + match expr { + Expr::BinaryExpr(expr) => match expr.op { + datafusion::logical_expr::Operator::Gt => { + find_highwater_mark(expr.left.as_ref(), expr.right.as_ref()) + } + datafusion::logical_expr::Operator::LtEq => { + find_highwater_mark(expr.right.as_ref(), expr.left.as_ref()) + } + _ => None, + }, + _ => None, + } + } +} +#[async_trait::async_trait] +impl TableProvider for FeedTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + #[instrument(skip(self,_state), ret(level = Level::DEBUG))] + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> datafusion::common::Result> { + let schema = projection + .map(|projection| self.schema.project(projection)) + .transpose()? + .map(Arc::new) + .unwrap_or_else(|| self.schema.clone()); + debug!(?schema, "projected schema"); + Ok(Arc::new(FeedExec { + feed: self.feed.clone(), + schema: schema.clone(), + projection: projection.cloned(), + properties: PlanProperties::new( + EquivalenceProperties::new(schema), + datafusion::physical_plan::Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ), + highwater_mark: filters + .iter() + .filter_map(Self::highwater_mark_from_expr) + .next() + .map(|hm| hm as i64) + .unwrap_or(0), + limit: limit.map(|l| l as i64), + })) + } + #[instrument(skip(self), ret(level = Level::DEBUG))] + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion::common::Result> { + Ok(filters + .iter() + .map(|expr| Self::highwater_mark_from_expr(expr)) + .map(|highwater_mark| { + if highwater_mark.is_some() { + TableProviderFilterPushDown::Exact + } else { + TableProviderFilterPushDown::Unsupported + } + }) + .collect()) + } +} + +// Implements the [`ExecutionPlan`] trait in terms of a [`ConclusionFeed`]. +// This allows calls to scan the `conclusion_feed` table to be mapped to calls into the +// [`ConclusionFeed`]. +#[derive(Debug)] +struct FeedExec { + feed: Arc, + schema: SchemaRef, + projection: Option>, + properties: PlanProperties, + highwater_mark: i64, + limit: Option, +} + +impl datafusion::physical_plan::DisplayAs for FeedExec { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + // TODO add useful information about predicates etc + write!(f, "FeedExec") + } +} + +impl ExecutionPlan for FeedExec { + fn name(&self) -> &str { + "FeedExec" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> datafusion::error::Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> datafusion::error::Result { + // Set a reasonable default limit + const DEFAULT_LIMIT: i64 = 10_000; + let feed = self.feed.clone(); + let projection = self.projection.clone(); + let highwater_mark = self.highwater_mark; + let limit = self.limit.unwrap_or(DEFAULT_LIMIT); + let stream = try_stream! { + let events = feed.conclusion_events_since(highwater_mark,limit).await?; + let batch = conclusion_events_to_record_batch(&events)?; + let batch = projection + .map(|projection| batch.project(&projection)) + .transpose()? + .unwrap_or_else(|| batch); + yield batch; + }; + let stream = stream.map_err(|err: anyhow::Error| exec_datafusion_err!("{err}")); + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + stream, + ))) + } +} diff --git a/pipeline/src/config.rs b/pipeline/src/config.rs index 8e4428a67..0ca21fecc 100644 --- a/pipeline/src/config.rs +++ b/pipeline/src/config.rs @@ -1,10 +1,15 @@ -use object_store::aws::AmazonS3Builder; +use std::sync::Arc; + +use object_store::ObjectStore; /// Configuration for pipeline session. -pub struct Config { - /// Endpoint of a Flight SQL server for the conclusion feed. - pub flight_sql_endpoint: String, +pub struct Config { + /// Define how the conclusion feed will be accessed. + pub conclusion_feed: Arc, + + /// Bucket name in which to store objects. + pub object_store_bucket_name: String, - /// AWS S3 configuration - pub aws_s3_builder: AmazonS3Builder, + /// Access to an object store. + pub object_store: Arc, } diff --git a/pipeline/src/lib.rs b/pipeline/src/lib.rs index a439e42f7..c6525787a 100644 --- a/pipeline/src/lib.rs +++ b/pipeline/src/lib.rs @@ -1,64 +1,63 @@ //! Pipeline provides a set of tables of Ceramic events and transformations between them. +pub mod aggregator; pub mod cid_string; +mod conclusion; #[warn(missing_docs)] mod config; pub mod schemas; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; -use anyhow::{anyhow, Result}; -use arrow_flight::sql::client::FlightSqlServiceClient; -use cid_string::{CidString, CidStringList}; +use anyhow::Result; use datafusion::{ - catalog::{CatalogProvider, SchemaProvider}, datasource::{file_format::parquet::ParquetFormat, listing::ListingOptions}, - error::DataFusionError, - execution::context::SessionContext, + execution::{config::SessionConfig, context::SessionContext}, functions_aggregate::first_last::LastValue, logical_expr::{col, AggregateUDF, ScalarUDF}, }; -use datafusion_federation::sql::{SQLFederationProvider, SQLSchemaProvider}; -use datafusion_flight_sql_table_provider::FlightSQLExecutor; -use object_store::aws::AmazonS3ConfigKey; -use tonic::transport::Endpoint; use url::Url; +use cid_string::{CidString, CidStringList}; + +pub use conclusion::{ + conclusion_events_to_record_batch, ConclusionData, ConclusionEvent, ConclusionFeed, + ConclusionInit, ConclusionTime, +}; pub use config::Config; +/// Report the list of tables in the pipeline +pub fn tables() -> Vec { + vec!["conclusion_feed".to_string(), "doc_state".to_string()] +} /// Constructs a [`SessionContext`] configured with all tables in the pipeline. -pub async fn session_from_config(config: impl Into) -> Result { - let config: Config = config.into(); +pub async fn session_from_config( + config: impl Into>, +) -> Result { + let config: Config = config.into(); - // Create federated datafusion state - let state = datafusion_federation::default_session_state(); - let client = new_client(config.flight_sql_endpoint.clone()).await?; - let executor = Arc::new(FlightSQLExecutor::new(config.flight_sql_endpoint, client)); - let provider = Arc::new(SQLFederationProvider::new(executor)); - let schema_provider = Arc::new( - SQLSchemaProvider::new_with_tables(provider, vec!["conclusion_feed".to_string()]).await?, - ); + let session_config = SessionConfig::new() + .with_default_catalog_and_schema("ceramic", "v0") + .with_information_schema(true); - // Create datafusion context - let ctx = SessionContext::new_with_state(state); + let mut ctx = SessionContext::new_with_config(session_config); + ctx.register_table( + "conclusion_feed", + Arc::new(conclusion::FeedTable::new(config.conclusion_feed)), + )?; // Register various UDxFs ctx.register_udaf(AggregateUDF::new_from_impl(LastValue::default())); ctx.register_udf(ScalarUDF::new_from_impl(CidString::new())); ctx.register_udf(ScalarUDF::new_from_impl(CidStringList::new())); + // Register JSON functions + datafusion_functions_json::register_all(&mut ctx)?; + // Register s3 object store - let bucket = config - .aws_s3_builder - .get_config_value(&AmazonS3ConfigKey::Bucket) - .ok_or_else(|| anyhow!("AWS S3 bucket must be specified"))?; - let s3 = config.aws_s3_builder.build()?; let mut url = Url::parse("s3://")?; - url.set_host(Some(&bucket))?; - ctx.register_object_store(&url, Arc::new(s3)); - - // Register federated catalog - ctx.register_catalog("ceramic", Arc::new(SQLCatalog { schema_provider })); + url.set_host(Some(&config.object_store_bucket_name))?; + ctx.register_object_store(&url, config.object_store); // Configure doc_state listing table let file_format = ParquetFormat::default().with_enable_pruning(true); @@ -80,31 +79,3 @@ pub async fn session_from_config(config: impl Into) -> Result Result> { - let endpoint = Endpoint::new(dsn).map_err(tx_error_to_df)?; - let channel = endpoint.connect().await.map_err(tx_error_to_df)?; - Ok(FlightSqlServiceClient::new(channel)) -} - -fn tx_error_to_df(err: tonic::transport::Error) -> DataFusionError { - DataFusionError::External(format!("failed to connect: {err:?}").into()) -} -struct SQLCatalog { - schema_provider: Arc, -} - -impl CatalogProvider for SQLCatalog { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema_names(&self) -> Vec { - vec!["v0".to_string()] - } - - fn schema(&self, _name: &str) -> Option> { - Some(self.schema_provider.clone()) - } -}