Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reading delta tables #190

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ thiserror = "1"
smartstring = { version = "1" }
serde_json = { version = "1" }
either = "1.9"
deltalake = "0.17.1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit hesitant to add this in as a dependency.

this pulls in the entire arrow-rs project as well as many other heavy dependencies.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense, I will keep looking for alternatives. Thx

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I think the "ideal" approach would be first to upstream some work to delta-rs and expose a javascript library. Then we could use the JS library directly like py-polars does, and do a bunch of the logic on the JS side using apache-arrow as an optional dependency.

If the apache-arrow npm package becomes a point of contention, we could likely bypass it by dynamically linking to the delta js bindings directly.

Copy link
Collaborator Author

@Bidek56 Bidek56 Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delta-rs team is busy with C# implementation now. :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was effort to use delta-sharing using JS and Danfo.js.


[dependencies.polars]
features = [
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"b27252b0-26cc-49c5-b79d-12dd6647ffba","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"foo\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"fruits\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"B\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"cars\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1711300448856,"configuration":{}}}
{"add":{"path":"0-07808ab0-6b13-421c-bb67-ca09945eb281-0.parquet","partitionValues":{},"size":1550,"modificationTime":1711300448855,"dataChange":true,"stats":"{\"numRecords\": 5, \"minValues\": {\"foo\": \"5\", \"fruits\": \"apple\", \"B\": 1, \"cars\": \"audi\"}, \"maxValues\": {\"foo\": \"you have a,b,c\", \"fruits\": \"banana\", \"B\": 5, \"cars\": \"beetle\"}, \"nullCount\": {\"foo\": 0, \"fruits\": 0, \"B\": 0, \"cars\": 0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1711300448856,"operation":"CREATE TABLE","operationParameters":{"location":"file:///examples/delta/sample.table","protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":2}","metadata":"{\"configuration\":{},\"createdTime\":1711300448856,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"b27252b0-26cc-49c5-b79d-12dd6647ffba\",\"name\":null,\"partitionColumns\":[],\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"foo\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"fruits\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"B\\\",\\\"type\\\":\\\"long\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"cars\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\"}","mode":"ErrorIfExists"},"clientVersion":"delta-rs.0.17.1"}}
9 changes: 9 additions & 0 deletions __tests__/io.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const ipcpath = path.resolve(__dirname, "./examples/foods.ipc");
const jsonpath = path.resolve(__dirname, "./examples/foods.json");
// eslint-disable-next-line no-undef
const singlejsonpath = path.resolve(__dirname, "./examples/single_foods.json");
// eslint-disable-next-line no-undef
const deltapath = path.resolve(__dirname, "./examples/delta/sample.table");
describe("read:csv", () => {
it("can read from a csv file", () => {
const df = pl.readCSV(csvpath);
Expand Down Expand Up @@ -235,6 +237,13 @@ describe("scan", () => {
});
});

describe("delta", () => {
test("delta:scan", async () => {
const df = await pl.readDelta(deltapath).collect();
expect(df.shape).toEqual({ height: 5, width: 4 });
});
});

describe("parquet", () => {
beforeEach(() => {
pl.readCSV(csvpath).writeParquet(parquetpath);
Expand Down
8 changes: 5 additions & 3 deletions biome.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"$schema": "https://biomejs.dev/schemas/1.0.0/schema.json",
"$schema": "https://biomejs.dev/schemas/1.6.2/schema.json",
"organizeImports": {
"enabled": false
},
Expand All @@ -24,7 +24,8 @@
"ignore": [
"polars/native-polars.js",
"./docs/*",
"./bin/*"
"./bin/*",
"**/delta/sample.table/*"
]
},
"formatter": {
Expand All @@ -33,7 +34,8 @@
"ignore": [
"polars/native-polars.js",
"./docs/*",
"./bin/*"
"./bin/*",
"**/delta/sample.table/*"
]
},
"javascript": {
Expand Down
Binary file modified bun.lockb
Binary file not shown.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"precommit": "yarn lint && yarn test"
},
"devDependencies": {
"@biomejs/biome": "^1.6.1",
"@biomejs/biome": "^1.6.2",
"@napi-rs/cli": "^2.18.0",
"@types/chance": "^1.1.6",
"@types/jest": "^29.5.12",
Expand All @@ -65,7 +65,7 @@
"ts-jest": "^29.1.2",
"ts-node": "^10.9.2",
"typedoc": "^0.25.12",
"typescript": "5.4.2"
"typescript": "5.4.3"
},
"packageManager": "[email protected]",
"workspaces": [
Expand Down
1 change: 1 addition & 0 deletions polars/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export namespace pl {
export import readJSON = io.readJSON;
export import readParquet = io.readParquet;
export import readAvro = io.readAvro;
export import readDelta = io.readDelta;

export import readCSVStream = io.readCSVStream;
export import readJSONStream = io.readJSONStream;
Expand Down
26 changes: 26 additions & 0 deletions polars/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,32 @@ export function readParquet(
throw new Error("must supply either a path or body");
}

interface ReadDeltaOptions {
version?: number;
columns?: string[];
parallel?: "auto" | "columns" | "row_groups" | "none";
}

/**
* Reads into a DataFrame from a Delta lake table.
* @param path - Path or URI to the root of the Delta lake table.
* @param option.version - Version of the Delta lake table. Note: If `version` is not provided, the latest version of delta lake
* table is read.
* @param option.columns - Columns to select. Accepts a list of column names.
* @param options.parallel - Any of 'auto' | 'columns' | 'row_groups' | 'none'
* This determines the direction of parallelism. 'auto' will try to determine the optimal direction.
* Defaults to 'auto'
*/

export function readDelta(
path: string,
options: ReadDeltaOptions = {},
): LazyDataFrame {
const parallel = options?.parallel ?? "auto";
options = { parallel, ...options };
return _LazyDataFrame(pli.readDelta(path, options));
}

export interface ReadAvroOptions {
columns: string[] | Array<string> | number[];
projection: number;
Expand Down
26 changes: 26 additions & 0 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use crate::delta::read_delta_table;
use crate::export::JsLazyFrame;
use crate::file::*;
use crate::prelude::*;
use crate::series::JsSeries;
use napi::JsUnknown;
use polars::frame::row::{infer_schema, Row};
use polars::frame::NullStrategy;
use polars_io::pl_async::get_runtime;
use polars_io::RowIndex;

use std::borrow::Borrow;
Expand Down Expand Up @@ -271,6 +274,29 @@ pub fn read_json(
Ok(df.into())
}

#[napi(object)]
pub struct ReadDeltaOptions {
pub version: Option<i64>,
pub columns: Option<Vec<String>>,
pub parallel: Wrap<ParallelStrategy>,
}

#[napi(catch_unwind)]
pub fn read_delta(path: String,
options: ReadDeltaOptions
) -> napi::Result<JsLazyFrame> {
let table: std::prelude::v1::Result<LazyFrame, deltalake::DeltaTableError> = get_runtime().block_on(async {
read_delta_table(&path, options).await
});

let ldf:LazyFrame = match table {
Ok(table) => table,
Err(err) => return Err(napi::Error::from_reason(err.to_string()))
};

Ok(LazyFrame::from(ldf).into())
}

#[napi(object)]
pub struct ReadParquetOptions {
pub columns: Option<Vec<String>>,
Expand Down
52 changes: 52 additions & 0 deletions src/delta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use polars_core::prelude::DataFrame;
use polars_io::{parquet::ParquetReader, SerReader};
use polars_lazy::{dsl::UnionArgs, frame::{IntoLazy, LazyFrame}};
use polars::prelude::concat;
use deltalake::{DeltaTableBuilder, DeltaTableError};
use crate::dataframe::ReadDeltaOptions;

pub async fn read_delta_table(path: &str,
options: ReadDeltaOptions,
) -> Result<LazyFrame, DeltaTableError> {
let mut db = DeltaTableBuilder::from_uri(path)
.with_allow_http(false);

// if version specified, add it
if options.version.is_some() {
db = db.with_version(options.version.unwrap());
}

let dt = db.load().await?;

// show all active files in the table
let files: Vec<_> = dt.get_file_uris()?.collect();

let mut df_collection: Vec<DataFrame> = vec![];

for file in files.into_iter() {
let base = std::path::Path::new(path);
let file_path = std::path::Path::new(&file);
let full_path = base.join(file_path);
let mut file = std::fs::File::open(full_path).unwrap();

let columns = options.columns.clone();
let parallel = options.parallel.0;

let df = ParquetReader::new(&mut file)
.with_columns(columns)
.read_parallel(parallel)
.finish().unwrap();

df_collection.push(df);
}

let empty_head = df_collection[0].clone().lazy().limit(0);

Ok(df_collection.into_iter().fold(empty_head, |acc, df| concat([acc, df.lazy()],
UnionArgs {
rechunk: false,
parallel: false,
..Default::default()
}).unwrap()))

}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod series;
pub mod set;
pub mod utils;
pub mod sql;
pub mod delta;

pub use polars_core;

Expand Down
94 changes: 47 additions & 47 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -408,18 +408,18 @@ __metadata:
languageName: node
linkType: hard

"@biomejs/biome@npm:^1.6.1":
version: 1.6.1
resolution: "@biomejs/biome@npm:1.6.1"
dependencies:
"@biomejs/cli-darwin-arm64": 1.6.1
"@biomejs/cli-darwin-x64": 1.6.1
"@biomejs/cli-linux-arm64": 1.6.1
"@biomejs/cli-linux-arm64-musl": 1.6.1
"@biomejs/cli-linux-x64": 1.6.1
"@biomejs/cli-linux-x64-musl": 1.6.1
"@biomejs/cli-win32-arm64": 1.6.1
"@biomejs/cli-win32-x64": 1.6.1
"@biomejs/biome@npm:^1.6.2":
version: 1.6.2
resolution: "@biomejs/biome@npm:1.6.2"
dependencies:
"@biomejs/cli-darwin-arm64": 1.6.2
"@biomejs/cli-darwin-x64": 1.6.2
"@biomejs/cli-linux-arm64": 1.6.2
"@biomejs/cli-linux-arm64-musl": 1.6.2
"@biomejs/cli-linux-x64": 1.6.2
"@biomejs/cli-linux-x64-musl": 1.6.2
"@biomejs/cli-win32-arm64": 1.6.2
"@biomejs/cli-win32-x64": 1.6.2
dependenciesMeta:
"@biomejs/cli-darwin-arm64":
optional: true
Expand All @@ -439,62 +439,62 @@ __metadata:
optional: true
bin:
biome: bin/biome
checksum: 4d012438d5ce52418ce7af4cb2285180afe767b8df2465cf7890fb2f8b01920bf29d7a8d78c5c4379b589f6e735db8f0ad7502b78ec943b65efd0e74eea5abb2
checksum: 3e2d1b5d54aa3ebac1f758bd7ad524d7e45f95f82436900da592ff8b3a3398863a28b098e6a6799f9af33cb7367c15ef4af10399cd9d28587d0f817e977ed994
languageName: node
linkType: hard

"@biomejs/cli-darwin-arm64@npm:1.6.1":
version: 1.6.1
resolution: "@biomejs/cli-darwin-arm64@npm:1.6.1"
"@biomejs/cli-darwin-arm64@npm:1.6.2":
version: 1.6.2
resolution: "@biomejs/cli-darwin-arm64@npm:1.6.2"
conditions: os=darwin & cpu=arm64
languageName: node
linkType: hard

"@biomejs/cli-darwin-x64@npm:1.6.1":
version: 1.6.1
resolution: "@biomejs/cli-darwin-x64@npm:1.6.1"
"@biomejs/cli-darwin-x64@npm:1.6.2":
version: 1.6.2
resolution: "@biomejs/cli-darwin-x64@npm:1.6.2"
conditions: os=darwin & cpu=x64
languageName: node
linkType: hard

"@biomejs/cli-linux-arm64-musl@npm:1.6.1":
version: 1.6.1
resolution: "@biomejs/cli-linux-arm64-musl@npm:1.6.1"
"@biomejs/cli-linux-arm64-musl@npm:1.6.2":
version: 1.6.2
resolution: "@biomejs/cli-linux-arm64-musl@npm:1.6.2"
conditions: os=linux & cpu=arm64 & libc=musl
languageName: node
linkType: hard

"@biomejs/cli-linux-arm64@npm:1.6.1":
version: 1.6.1
resolution: "@biomejs/cli-linux-arm64@npm:1.6.1"
"@biomejs/cli-linux-arm64@npm:1.6.2":
version: 1.6.2
resolution: "@biomejs/cli-linux-arm64@npm:1.6.2"
conditions: os=linux & cpu=arm64 & libc=glibc
languageName: node
linkType: hard

"@biomejs/cli-linux-x64-musl@npm:1.6.1":
version: 1.6.1
resolution: "@biomejs/cli-linux-x64-musl@npm:1.6.1"
"@biomejs/cli-linux-x64-musl@npm:1.6.2":
version: 1.6.2
resolution: "@biomejs/cli-linux-x64-musl@npm:1.6.2"
conditions: os=linux & cpu=x64 & libc=musl
languageName: node
linkType: hard

"@biomejs/cli-linux-x64@npm:1.6.1":
version: 1.6.1
resolution: "@biomejs/cli-linux-x64@npm:1.6.1"
"@biomejs/cli-linux-x64@npm:1.6.2":
version: 1.6.2
resolution: "@biomejs/cli-linux-x64@npm:1.6.2"
conditions: os=linux & cpu=x64 & libc=glibc
languageName: node
linkType: hard

"@biomejs/cli-win32-arm64@npm:1.6.1":
version: 1.6.1
resolution: "@biomejs/cli-win32-arm64@npm:1.6.1"
"@biomejs/cli-win32-arm64@npm:1.6.2":
version: 1.6.2
resolution: "@biomejs/cli-win32-arm64@npm:1.6.2"
conditions: os=win32 & cpu=arm64
languageName: node
linkType: hard

"@biomejs/cli-win32-x64@npm:1.6.1":
version: 1.6.1
resolution: "@biomejs/cli-win32-x64@npm:1.6.1"
"@biomejs/cli-win32-x64@npm:1.6.2":
version: 1.6.2
resolution: "@biomejs/cli-win32-x64@npm:1.6.2"
conditions: os=win32 & cpu=x64
languageName: node
linkType: hard
Expand Down Expand Up @@ -3004,7 +3004,7 @@ __metadata:
version: 0.0.0-use.local
resolution: "nodejs-polars@workspace:."
dependencies:
"@biomejs/biome": ^1.6.1
"@biomejs/biome": ^1.6.2
"@napi-rs/cli": ^2.18.0
"@types/chance": ^1.1.6
"@types/jest": ^29.5.12
Expand All @@ -3015,7 +3015,7 @@ __metadata:
ts-jest: ^29.1.2
ts-node: ^10.9.2
typedoc: ^0.25.12
typescript: 5.4.2
typescript: 5.4.3
languageName: unknown
linkType: soft

Expand Down Expand Up @@ -3733,23 +3733,23 @@ __metadata:
languageName: node
linkType: hard

"typescript@npm:5.4.2":
version: 5.4.2
resolution: "typescript@npm:5.4.2"
"typescript@npm:5.4.3":
version: 5.4.3
resolution: "typescript@npm:5.4.3"
bin:
tsc: bin/tsc
tsserver: bin/tsserver
checksum: 96d80fde25a09bcb04d399082fb27a808a9e17c2111e43849d2aafbd642d835e4f4ef0de09b0ba795ec2a700be6c4c2c3f62bf4660c05404c948727b5bbfb32a
checksum: d74d731527e35e64d8d2dcf2f897cf8cfbc3428be0ad7c48434218ba4ae41239f53be7c90714089db1068c05cae22436af2ecba71fd36ecc5e7a9118af060198
languageName: node
linkType: hard

"typescript@patch:[email protected].2#~builtin<compat/typescript>":
version: 5.4.2
resolution: "typescript@patch:typescript@npm%3A5.4.2#~builtin<compat/typescript>::version=5.4.2&hash=14eedb"
"typescript@patch:[email protected].3#~builtin<compat/typescript>":
version: 5.4.3
resolution: "typescript@patch:typescript@npm%3A5.4.3#~builtin<compat/typescript>::version=5.4.3&hash=14eedb"
bin:
tsc: bin/tsc
tsserver: bin/tsserver
checksum: c1b669146bca5529873aae60870e243fa8140c85f57ca32c42f898f586d73ce4a6b4f6bb02ae312729e214d7f5859a0c70da3e527a116fdf5ad00c9fc733ecc6
checksum: 3a62fe90aa79d68c9ce38ea5edb2957e62801c733b99f0e5a2b8b50922761f68f7d9a40d28c544b449866e81185cddb93cba2496d0ff3fa52ef5b1f8bcace38c
languageName: node
linkType: hard

Expand Down