Skip to content

Commit

Permalink
Split parquet buffers (#99)
Browse files Browse the repository at this point in the history
* Split parquet buffers

* version bump

* fix tests
  • Loading branch information
kylebarron authored Oct 13, 2023
1 parent ee1627b commit 10e8884
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 30 deletions.
22 changes: 22 additions & 0 deletions lonboard/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from lonboard.constants import EPSG_4326, EXTENSION_NAME, OGC_84
from lonboard.geoarrow.geopandas_interop import geopandas_to_geoarrow
from lonboard.serialization import infer_rows_per_chunk
from lonboard.traits import ColorAccessor, FloatAccessor, PyarrowTableTrait
from lonboard.viewport import compute_view

Expand Down Expand Up @@ -41,6 +42,9 @@ class ScatterplotLayer(BaseLayer):
_layer_type = traitlets.Unicode("scatterplot").tag(sync=True)
_initial_view_state = traitlets.Dict().tag(sync=True)

# Number of rows per chunk for serializing table and accessor columns
_rows_per_chunk = traitlets.Int()

table = PyarrowTableTrait(
allowed_geometry_types={EXTENSION_NAME.POINT, EXTENSION_NAME.MULTIPOINT}
)
Expand Down Expand Up @@ -180,6 +184,10 @@ def from_geopandas(cls, gdf: gpd.GeoDataFrame, **kwargs) -> ScatterplotLayer:
def _default_initial_view_state(self):
return compute_view(self.table)

@traitlets.default("_rows_per_chunk")
def _default_rows_per_chunk(self):
return infer_rows_per_chunk(self.table)

@traitlets.validate("get_radius")
def _validate_get_radius_length(self, proposal):
if isinstance(proposal["value"], (pa.ChunkedArray, pa.Array)):
Expand Down Expand Up @@ -226,6 +234,9 @@ class PathLayer(BaseLayer):
_layer_type = traitlets.Unicode("path").tag(sync=True)
_initial_view_state = traitlets.Dict().tag(sync=True)

# Number of rows per chunk for serializing table and accessor columns
_rows_per_chunk = traitlets.Int()

table = PyarrowTableTrait(
allowed_geometry_types={
EXTENSION_NAME.LINESTRING,
Expand Down Expand Up @@ -331,6 +342,10 @@ def from_geopandas(cls, gdf: gpd.GeoDataFrame, **kwargs) -> PathLayer:
def _default_initial_view_state(self):
return compute_view(self.table)

@traitlets.default("_rows_per_chunk")
def _default_rows_per_chunk(self):
return infer_rows_per_chunk(self.table)

@traitlets.validate("get_color")
def _validate_get_color_length(self, proposal):
if isinstance(proposal["value"], (pa.ChunkedArray, pa.Array)):
Expand All @@ -353,6 +368,9 @@ class SolidPolygonLayer(BaseLayer):
_layer_type = traitlets.Unicode("solid-polygon").tag(sync=True)
_initial_view_state = traitlets.Dict().tag(sync=True)

# Number of rows per chunk for serializing table and accessor columns
_rows_per_chunk = traitlets.Int()

table = PyarrowTableTrait(
allowed_geometry_types={EXTENSION_NAME.POLYGON, EXTENSION_NAME.MULTIPOLYGON}
)
Expand Down Expand Up @@ -431,6 +449,10 @@ def from_geopandas(cls, gdf: gpd.GeoDataFrame, **kwargs) -> SolidPolygonLayer:
def _default_initial_view_state(self):
return compute_view(self.table)

@traitlets.default("_rows_per_chunk")
def _default_rows_per_chunk(self):
return infer_rows_per_chunk(self.table)

@traitlets.validate("get_elevation")
def _validate_get_elevation_length(self, proposal):
if isinstance(proposal["value"], (pa.ChunkedArray, pa.Array)):
Expand Down
54 changes: 35 additions & 19 deletions lonboard/serialization.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
from io import BytesIO
from typing import List, Tuple, Union

Expand All @@ -9,29 +10,38 @@
DEFAULT_PARQUET_COMPRESSION = "ZSTD"
DEFAULT_PARQUET_COMPRESSION_LEVEL = 7
DEFAULT_PARQUET_CHUNK_SIZE = 2**16
# Target chunk size for Arrow (uncompressed) per Parquet chunk
DEFAULT_ARROW_CHUNK_BYTES_SIZE = 10 * 1024 * 1024 # 10MB


def serialize_table_to_parquet(table: pa.Table) -> bytes:
with BytesIO() as bio:
# NOTE: careful about row group size; needs to be always constant
pq.write_table(
table,
bio,
row_group_size=DEFAULT_PARQUET_CHUNK_SIZE,
compression=DEFAULT_PARQUET_COMPRESSION,
compression_level=DEFAULT_PARQUET_COMPRESSION_LEVEL,
)
return bio.getvalue()
def serialize_table_to_parquet(
table: pa.Table, *, max_chunksize: int = DEFAULT_PARQUET_CHUNK_SIZE
) -> List[bytes]:
print(max_chunksize)
buffers: List[bytes] = []
for record_batch in table.to_batches(max_chunksize=max_chunksize):
with BytesIO() as bio:
with pq.ParquetWriter(
bio,
schema=table.schema,
compression=DEFAULT_PARQUET_COMPRESSION,
compression_level=DEFAULT_PARQUET_COMPRESSION_LEVEL,
) as writer:
writer.write_batch(record_batch, row_group_size=record_batch.num_rows)

buffers.append(bio.getvalue())

def serialize_pyarrow_column(data: pa.Array) -> bytes:
return buffers


def serialize_pyarrow_column(data: pa.Array, *, max_chunksize: int) -> List[bytes]:
"""Serialize a pyarrow column to a Parquet file with one column"""
pyarrow_table = pa.table({"value": data})
return serialize_table_to_parquet(pyarrow_table)
return serialize_table_to_parquet(pyarrow_table, max_chunksize=max_chunksize)


def serialize_color_accessor(
data: Union[List[int], Tuple[int], NDArray[np.uint8]], obj=None
data: Union[List[int], Tuple[int], NDArray[np.uint8]], obj
):
if data is None:
return None
Expand All @@ -40,23 +50,29 @@ def serialize_color_accessor(
return data

assert isinstance(data, (pa.ChunkedArray, pa.Array))
return serialize_pyarrow_column(data)
return serialize_pyarrow_column(data, max_chunksize=obj._rows_per_chunk)


def serialize_float_accessor(data: Union[int, float, NDArray[np.floating]], obj=None):
def serialize_float_accessor(data: Union[int, float, NDArray[np.floating]], obj):
if data is None:
return None

if isinstance(data, (int, float)):
return data

assert isinstance(data, (pa.ChunkedArray, pa.Array))
return serialize_pyarrow_column(data)
return serialize_pyarrow_column(data, max_chunksize=obj._rows_per_chunk)


def serialize_table(data, obj=None):
def serialize_table(data, obj):
assert isinstance(data, pa.Table), "expected pyarrow table"
return serialize_table_to_parquet(data)
return serialize_table_to_parquet(data, max_chunksize=obj._rows_per_chunk)


def infer_rows_per_chunk(table: pa.Table) -> int:
num_chunks = max(round(table.nbytes / DEFAULT_ARROW_CHUNK_BYTES_SIZE), 1)
rows_per_chunk = math.ceil((table.num_rows / num_chunks))
return rows_per_chunk


COLOR_SERIALIZATION = {"to_json": serialize_color_accessor}
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "lonboard"
version = "0.1.0-beta.5"
version = "0.1.0-beta.6"
description = "Extremely fast geospatial data visualization in Python."
authors = ["Kyle Barron <[email protected]>"]
license = "MIT"
Expand Down
20 changes: 13 additions & 7 deletions src/accessor.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import * as arrow from "apache-arrow";
import { parseParquet } from "./parquet";
import { parseParquetBuffers } from "./parquet";
import { useState, useEffect } from "react";

export function useTableBufferState(
wasmReady: boolean,
dataRaw: DataView
dataRaw: DataView[]
): [arrow.Table | null] {
const [dataTable, setDataTable] = useState<arrow.Table | null>(null);
// Only parse the parquet buffer when the data itself or wasmReady has changed
useEffect(() => {
const callback = () => {
if (wasmReady && dataRaw && dataRaw.byteLength > 0) {
setDataTable(parseParquet(dataRaw));
if (wasmReady && dataRaw && dataRaw.length > 0) {
console.log(
`table byte lengths: ${dataRaw.map(
(dataView) => dataView.byteLength
)}`
);

setDataTable(parseParquetBuffers(dataRaw));
}
};
callback();
Expand All @@ -27,9 +33,9 @@ export function useAccessorState(wasmReady: boolean, accessorRaw: any): any {
useEffect(() => {
const callback = () => {
setAccessorValue(
accessorRaw instanceof DataView
? wasmReady && accessorRaw.byteLength > 0
? parseParquet(accessorRaw).getChildAt(0)
accessorRaw instanceof Array && accessorRaw?.[0] instanceof DataView
? wasmReady && accessorRaw?.[0].byteLength > 0
? parseParquetBuffers(accessorRaw).getChildAt(0)
: null
: accessorRaw
);
Expand Down
21 changes: 21 additions & 0 deletions src/parquet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,27 @@ export function parseParquet(dataView: DataView): arrow.Table {
return arrowTable;
}

/**
* Parse a list of buffers containing Parquet chunks into an Arrow JS table
*
* Each buffer in the list is expected to be a fully self-contained Parquet file
* that can parse on its own and consists of one arrow Record Batch
*
* @var {[type]}
*/
export function parseParquetBuffers(dataViews: DataView[]): arrow.Table {
const batches: arrow.RecordBatch[] = [];
for (const chunkBuffer of dataViews) {
const table = parseParquet(chunkBuffer);
if (table.batches.length !== 1) {
console.warn("Expected one batch");
}
batches.push(...table.batches);
}

return new arrow.Table(batches);
}

export function useParquetWasm(): [boolean] {
const [wasmReady, setWasmReady] = useState<boolean>(false);

Expand Down
2 changes: 1 addition & 1 deletion src/path-layer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ function App() {
const [wasmReady] = useParquetWasm();

let [viewState] = useModelState<DataView>("_initial_view_state");
let [dataRaw] = useModelState<DataView>("table");
let [dataRaw] = useModelState<DataView[]>("table");
let [widthUnits] = useModelState("width_units");
let [widthScale] = useModelState("width_scale");
let [widthMinPixels] = useModelState("width_min_pixels");
Expand Down
2 changes: 1 addition & 1 deletion src/scatterplot-layer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ function App() {
const [wasmReady] = useParquetWasm();

let [viewState] = useModelState<DataView>("_initial_view_state");
let [dataRaw] = useModelState<DataView>("table");
let [dataRaw] = useModelState<DataView[]>("table");
let [radiusUnits] = useModelState("radius_units");
let [radiusScale] = useModelState("radius_scale");
let [radiusMinPixels] = useModelState("radius_min_pixels");
Expand Down
2 changes: 1 addition & 1 deletion src/solid-polygon-layer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ function App() {
const [wasmReady] = useParquetWasm();

let [viewState] = useModelState<DataView>("_initial_view_state");
let [dataRaw] = useModelState<DataView>("table");
let [dataRaw] = useModelState<DataView[]>("table");
let [filled] = useModelState("filled");
let [extruded] = useModelState("extruded");
let [wireframe] = useModelState("wireframe");
Expand Down
4 changes: 4 additions & 0 deletions tests/test_traits.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@


class ColorAccessorWidget(Widget):
_rows_per_chunk = 2

color = ColorAccessor()


Expand Down Expand Up @@ -88,6 +90,8 @@ def test_color_accessor_validation_pyarrow_array_type():


class FloatAccessorWidget(Widget):
_rows_per_chunk = 2

value = FloatAccessor()


Expand Down

0 comments on commit 10e8884

Please sign in to comment.