diff --git a/Cargo.toml b/Cargo.toml index 8df6ced9e..f98ee82fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ thiserror = "1" smartstring = { version = "1" } serde_json = { version = "1" } either = "1.9" +deltalake = "0.17.1" [dependencies.polars] features = [ diff --git a/__tests__/examples/delta/sample.table/0-07808ab0-6b13-421c-bb67-ca09945eb281-0.parquet b/__tests__/examples/delta/sample.table/0-07808ab0-6b13-421c-bb67-ca09945eb281-0.parquet new file mode 100644 index 000000000..4187ad570 Binary files /dev/null and b/__tests__/examples/delta/sample.table/0-07808ab0-6b13-421c-bb67-ca09945eb281-0.parquet differ diff --git a/__tests__/examples/delta/sample.table/_delta_log/00000000000000000000.json b/__tests__/examples/delta/sample.table/_delta_log/00000000000000000000.json new file mode 100644 index 000000000..2ad03164b --- /dev/null +++ b/__tests__/examples/delta/sample.table/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"b27252b0-26cc-49c5-b79d-12dd6647ffba","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"foo\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"fruits\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"B\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"cars\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1711300448856,"configuration":{}}} +{"add":{"path":"0-07808ab0-6b13-421c-bb67-ca09945eb281-0.parquet","partitionValues":{},"size":1550,"modificationTime":1711300448855,"dataChange":true,"stats":"{\"numRecords\": 5, \"minValues\": {\"foo\": \"5\", \"fruits\": \"apple\", \"B\": 1, \"cars\": \"audi\"}, \"maxValues\": {\"foo\": \"you have a,b,c\", \"fruits\": \"banana\", \"B\": 5, \"cars\": \"beetle\"}, \"nullCount\": {\"foo\": 0, \"fruits\": 0, \"B\": 0, \"cars\": 0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1711300448856,"operation":"CREATE TABLE","operationParameters":{"location":"file:///examples/delta/sample.table","protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":2}","metadata":"{\"configuration\":{},\"createdTime\":1711300448856,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"b27252b0-26cc-49c5-b79d-12dd6647ffba\",\"name\":null,\"partitionColumns\":[],\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"foo\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"fruits\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"B\\\",\\\"type\\\":\\\"long\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"cars\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\"}","mode":"ErrorIfExists"},"clientVersion":"delta-rs.0.17.1"}} \ No newline at end of file diff --git a/__tests__/io.test.ts b/__tests__/io.test.ts index 4e7ba9093..f1a824a14 100644 --- a/__tests__/io.test.ts +++ b/__tests__/io.test.ts @@ -18,6 +18,8 @@ const ipcpath = path.resolve(__dirname, "./examples/foods.ipc"); const jsonpath = path.resolve(__dirname, "./examples/foods.json"); // eslint-disable-next-line no-undef const singlejsonpath = path.resolve(__dirname, "./examples/single_foods.json"); +// eslint-disable-next-line no-undef +const deltapath = path.resolve(__dirname, "./examples/delta/sample.table"); describe("read:csv", () => { it("can read from a csv file", () => { const df = pl.readCSV(csvpath); @@ -235,6 +237,13 @@ describe("scan", () => { }); }); +describe("delta", () => { + test("delta:scan", async () => { + const df = await pl.readDelta(deltapath).collect(); + expect(df.shape).toEqual({ height: 5, width: 4 }); + }); +}); + describe("parquet", () => { beforeEach(() => { pl.readCSV(csvpath).writeParquet(parquetpath); diff --git a/biome.json b/biome.json index bf712a99e..e76001db8 100644 --- a/biome.json +++ b/biome.json @@ -1,5 +1,5 @@ { - "$schema": "https://biomejs.dev/schemas/1.0.0/schema.json", + "$schema": "https://biomejs.dev/schemas/1.6.2/schema.json", "organizeImports": { "enabled": false }, @@ -24,7 +24,8 @@ "ignore": [ "polars/native-polars.js", "./docs/*", - "./bin/*" + "./bin/*", + "**/delta/sample.table/*" ] }, "formatter": { @@ -33,7 +34,8 @@ "ignore": [ "polars/native-polars.js", "./docs/*", - "./bin/*" + "./bin/*", + "**/delta/sample.table/*" ] }, "javascript": { diff --git a/bun.lockb b/bun.lockb index 5f637beae..de32f4a8a 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/package.json b/package.json index ec67b8036..0fb7f479f 100644 --- a/package.json +++ b/package.json @@ -54,7 +54,7 @@ "precommit": "yarn lint && yarn test" }, "devDependencies": { - "@biomejs/biome": "^1.6.1", + "@biomejs/biome": "^1.6.2", "@napi-rs/cli": "^2.18.0", "@types/chance": "^1.1.6", "@types/jest": "^29.5.12", @@ -65,7 +65,7 @@ "ts-jest": "^29.1.2", "ts-node": "^10.9.2", "typedoc": "^0.25.12", - "typescript": "5.4.2" + "typescript": "5.4.3" }, "packageManager": "yarn@4.0.2", "workspaces": [ diff --git a/polars/index.ts b/polars/index.ts index 4abd9caf1..21f86bc74 100644 --- a/polars/index.ts +++ b/polars/index.ts @@ -47,6 +47,7 @@ export namespace pl { export import readJSON = io.readJSON; export import readParquet = io.readParquet; export import readAvro = io.readAvro; + export import readDelta = io.readDelta; export import readCSVStream = io.readCSVStream; export import readJSONStream = io.readJSONStream; diff --git a/polars/io.ts b/polars/io.ts index 4743f4fe9..d50e5b3a9 100644 --- a/polars/io.ts +++ b/polars/io.ts @@ -436,6 +436,32 @@ export function readParquet( throw new Error("must supply either a path or body"); } +interface ReadDeltaOptions { + version?: number; + columns?: string[]; + parallel?: "auto" | "columns" | "row_groups" | "none"; +} + +/** + * Reads into a DataFrame from a Delta lake table. + * @param path - Path or URI to the root of the Delta lake table. + * @param option.version - Version of the Delta lake table. Note: If `version` is not provided, the latest version of delta lake + * table is read. + * @param option.columns - Columns to select. Accepts a list of column names. + * @param options.parallel - Any of 'auto' | 'columns' | 'row_groups' | 'none' + * This determines the direction of parallelism. 'auto' will try to determine the optimal direction. + * Defaults to 'auto' + */ + +export function readDelta( + path: string, + options: ReadDeltaOptions = {}, +): LazyDataFrame { + const parallel = options?.parallel ?? "auto"; + options = { parallel, ...options }; + return _LazyDataFrame(pli.readDelta(path, options)); +} + export interface ReadAvroOptions { columns: string[] | Array | number[]; projection: number; diff --git a/src/dataframe.rs b/src/dataframe.rs index 2d9fa2f9b..75d9efd71 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -1,9 +1,12 @@ +use crate::delta::read_delta_table; +use crate::export::JsLazyFrame; use crate::file::*; use crate::prelude::*; use crate::series::JsSeries; use napi::JsUnknown; use polars::frame::row::{infer_schema, Row}; use polars::frame::NullStrategy; +use polars_io::pl_async::get_runtime; use polars_io::RowIndex; use std::borrow::Borrow; @@ -271,6 +274,29 @@ pub fn read_json( Ok(df.into()) } +#[napi(object)] +pub struct ReadDeltaOptions { + pub version: Option, + pub columns: Option>, + pub parallel: Wrap, +} + +#[napi(catch_unwind)] +pub fn read_delta(path: String, + options: ReadDeltaOptions +) -> napi::Result { + let table: std::prelude::v1::Result = get_runtime().block_on(async { + read_delta_table(&path, options).await + }); + + let ldf:LazyFrame = match table { + Ok(table) => table, + Err(err) => return Err(napi::Error::from_reason(err.to_string())) + }; + + Ok(LazyFrame::from(ldf).into()) +} + #[napi(object)] pub struct ReadParquetOptions { pub columns: Option>, diff --git a/src/delta.rs b/src/delta.rs new file mode 100644 index 000000000..713c9889b --- /dev/null +++ b/src/delta.rs @@ -0,0 +1,52 @@ +use polars_core::prelude::DataFrame; +use polars_io::{parquet::ParquetReader, SerReader}; +use polars_lazy::{dsl::UnionArgs, frame::{IntoLazy, LazyFrame}}; +use polars::prelude::concat; +use deltalake::{DeltaTableBuilder, DeltaTableError}; +use crate::dataframe::ReadDeltaOptions; + +pub async fn read_delta_table(path: &str, + options: ReadDeltaOptions, + ) -> Result { + let mut db = DeltaTableBuilder::from_uri(path) + .with_allow_http(false); + + // if version specified, add it + if options.version.is_some() { + db = db.with_version(options.version.unwrap()); + } + + let dt = db.load().await?; + + // show all active files in the table + let files: Vec<_> = dt.get_file_uris()?.collect(); + + let mut df_collection: Vec = vec![]; + + for file in files.into_iter() { + let base = std::path::Path::new(path); + let file_path = std::path::Path::new(&file); + let full_path = base.join(file_path); + let mut file = std::fs::File::open(full_path).unwrap(); + + let columns = options.columns.clone(); + let parallel = options.parallel.0; + + let df = ParquetReader::new(&mut file) + .with_columns(columns) + .read_parallel(parallel) + .finish().unwrap(); + + df_collection.push(df); + } + + let empty_head = df_collection[0].clone().lazy().limit(0); + + Ok(df_collection.into_iter().fold(empty_head, |acc, df| concat([acc, df.lazy()], + UnionArgs { + rechunk: false, + parallel: false, + ..Default::default() + }).unwrap())) + +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 9671a06e7..e001a4efe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,7 @@ pub mod series; pub mod set; pub mod utils; pub mod sql; +pub mod delta; pub use polars_core; diff --git a/yarn.lock b/yarn.lock index e9d388f29..94585fafe 100644 --- a/yarn.lock +++ b/yarn.lock @@ -408,18 +408,18 @@ __metadata: languageName: node linkType: hard -"@biomejs/biome@npm:^1.6.1": - version: 1.6.1 - resolution: "@biomejs/biome@npm:1.6.1" - dependencies: - "@biomejs/cli-darwin-arm64": 1.6.1 - "@biomejs/cli-darwin-x64": 1.6.1 - "@biomejs/cli-linux-arm64": 1.6.1 - "@biomejs/cli-linux-arm64-musl": 1.6.1 - "@biomejs/cli-linux-x64": 1.6.1 - "@biomejs/cli-linux-x64-musl": 1.6.1 - "@biomejs/cli-win32-arm64": 1.6.1 - "@biomejs/cli-win32-x64": 1.6.1 +"@biomejs/biome@npm:^1.6.2": + version: 1.6.2 + resolution: "@biomejs/biome@npm:1.6.2" + dependencies: + "@biomejs/cli-darwin-arm64": 1.6.2 + "@biomejs/cli-darwin-x64": 1.6.2 + "@biomejs/cli-linux-arm64": 1.6.2 + "@biomejs/cli-linux-arm64-musl": 1.6.2 + "@biomejs/cli-linux-x64": 1.6.2 + "@biomejs/cli-linux-x64-musl": 1.6.2 + "@biomejs/cli-win32-arm64": 1.6.2 + "@biomejs/cli-win32-x64": 1.6.2 dependenciesMeta: "@biomejs/cli-darwin-arm64": optional: true @@ -439,62 +439,62 @@ __metadata: optional: true bin: biome: bin/biome - checksum: 4d012438d5ce52418ce7af4cb2285180afe767b8df2465cf7890fb2f8b01920bf29d7a8d78c5c4379b589f6e735db8f0ad7502b78ec943b65efd0e74eea5abb2 + checksum: 3e2d1b5d54aa3ebac1f758bd7ad524d7e45f95f82436900da592ff8b3a3398863a28b098e6a6799f9af33cb7367c15ef4af10399cd9d28587d0f817e977ed994 languageName: node linkType: hard -"@biomejs/cli-darwin-arm64@npm:1.6.1": - version: 1.6.1 - resolution: "@biomejs/cli-darwin-arm64@npm:1.6.1" +"@biomejs/cli-darwin-arm64@npm:1.6.2": + version: 1.6.2 + resolution: "@biomejs/cli-darwin-arm64@npm:1.6.2" conditions: os=darwin & cpu=arm64 languageName: node linkType: hard -"@biomejs/cli-darwin-x64@npm:1.6.1": - version: 1.6.1 - resolution: "@biomejs/cli-darwin-x64@npm:1.6.1" +"@biomejs/cli-darwin-x64@npm:1.6.2": + version: 1.6.2 + resolution: "@biomejs/cli-darwin-x64@npm:1.6.2" conditions: os=darwin & cpu=x64 languageName: node linkType: hard -"@biomejs/cli-linux-arm64-musl@npm:1.6.1": - version: 1.6.1 - resolution: "@biomejs/cli-linux-arm64-musl@npm:1.6.1" +"@biomejs/cli-linux-arm64-musl@npm:1.6.2": + version: 1.6.2 + resolution: "@biomejs/cli-linux-arm64-musl@npm:1.6.2" conditions: os=linux & cpu=arm64 & libc=musl languageName: node linkType: hard -"@biomejs/cli-linux-arm64@npm:1.6.1": - version: 1.6.1 - resolution: "@biomejs/cli-linux-arm64@npm:1.6.1" +"@biomejs/cli-linux-arm64@npm:1.6.2": + version: 1.6.2 + resolution: "@biomejs/cli-linux-arm64@npm:1.6.2" conditions: os=linux & cpu=arm64 & libc=glibc languageName: node linkType: hard -"@biomejs/cli-linux-x64-musl@npm:1.6.1": - version: 1.6.1 - resolution: "@biomejs/cli-linux-x64-musl@npm:1.6.1" +"@biomejs/cli-linux-x64-musl@npm:1.6.2": + version: 1.6.2 + resolution: "@biomejs/cli-linux-x64-musl@npm:1.6.2" conditions: os=linux & cpu=x64 & libc=musl languageName: node linkType: hard -"@biomejs/cli-linux-x64@npm:1.6.1": - version: 1.6.1 - resolution: "@biomejs/cli-linux-x64@npm:1.6.1" +"@biomejs/cli-linux-x64@npm:1.6.2": + version: 1.6.2 + resolution: "@biomejs/cli-linux-x64@npm:1.6.2" conditions: os=linux & cpu=x64 & libc=glibc languageName: node linkType: hard -"@biomejs/cli-win32-arm64@npm:1.6.1": - version: 1.6.1 - resolution: "@biomejs/cli-win32-arm64@npm:1.6.1" +"@biomejs/cli-win32-arm64@npm:1.6.2": + version: 1.6.2 + resolution: "@biomejs/cli-win32-arm64@npm:1.6.2" conditions: os=win32 & cpu=arm64 languageName: node linkType: hard -"@biomejs/cli-win32-x64@npm:1.6.1": - version: 1.6.1 - resolution: "@biomejs/cli-win32-x64@npm:1.6.1" +"@biomejs/cli-win32-x64@npm:1.6.2": + version: 1.6.2 + resolution: "@biomejs/cli-win32-x64@npm:1.6.2" conditions: os=win32 & cpu=x64 languageName: node linkType: hard @@ -3004,7 +3004,7 @@ __metadata: version: 0.0.0-use.local resolution: "nodejs-polars@workspace:." dependencies: - "@biomejs/biome": ^1.6.1 + "@biomejs/biome": ^1.6.2 "@napi-rs/cli": ^2.18.0 "@types/chance": ^1.1.6 "@types/jest": ^29.5.12 @@ -3015,7 +3015,7 @@ __metadata: ts-jest: ^29.1.2 ts-node: ^10.9.2 typedoc: ^0.25.12 - typescript: 5.4.2 + typescript: 5.4.3 languageName: unknown linkType: soft @@ -3733,23 +3733,23 @@ __metadata: languageName: node linkType: hard -"typescript@npm:5.4.2": - version: 5.4.2 - resolution: "typescript@npm:5.4.2" +"typescript@npm:5.4.3": + version: 5.4.3 + resolution: "typescript@npm:5.4.3" bin: tsc: bin/tsc tsserver: bin/tsserver - checksum: 96d80fde25a09bcb04d399082fb27a808a9e17c2111e43849d2aafbd642d835e4f4ef0de09b0ba795ec2a700be6c4c2c3f62bf4660c05404c948727b5bbfb32a + checksum: d74d731527e35e64d8d2dcf2f897cf8cfbc3428be0ad7c48434218ba4ae41239f53be7c90714089db1068c05cae22436af2ecba71fd36ecc5e7a9118af060198 languageName: node linkType: hard -"typescript@patch:typescript@5.4.2#~builtin": - version: 5.4.2 - resolution: "typescript@patch:typescript@npm%3A5.4.2#~builtin::version=5.4.2&hash=14eedb" +"typescript@patch:typescript@5.4.3#~builtin": + version: 5.4.3 + resolution: "typescript@patch:typescript@npm%3A5.4.3#~builtin::version=5.4.3&hash=14eedb" bin: tsc: bin/tsc tsserver: bin/tsserver - checksum: c1b669146bca5529873aae60870e243fa8140c85f57ca32c42f898f586d73ce4a6b4f6bb02ae312729e214d7f5859a0c70da3e527a116fdf5ad00c9fc733ecc6 + checksum: 3a62fe90aa79d68c9ce38ea5edb2957e62801c733b99f0e5a2b8b50922761f68f7d9a40d28c544b449866e81185cddb93cba2496d0ff3fa52ef5b1f8bcace38c languageName: node linkType: hard