Skip to content

Commit

Permalink
Merge branch 'streamingfast:feature/parquet' into feature/parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
coutug authored Nov 21, 2024
2 parents 13ed98e + bcd8ee2 commit 6a8e88f
Show file tree
Hide file tree
Showing 24 changed files with 938 additions and 406 deletions.
66 changes: 0 additions & 66 deletions bundler/writer/parquet_rows.go

This file was deleted.

24 changes: 11 additions & 13 deletions bundler/writer/parquet_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"github.com/parquet-go/parquet-go"
"github.com/streamingfast/bstream"
"github.com/streamingfast/dstore"
"github.com/streamingfast/logging"
"github.com/streamingfast/substreams-sink-files/parquetx"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/dynamicpb"
Expand All @@ -23,18 +25,8 @@ var _ Writer = (*ParquetWriter)(nil)

// ParquetWriter implements our internal interface for writing Parquet data to files
// directly.
//
// FIXME: There is definitely a lot of knowledge to share between [ParquetWriter] here
// and [encoder.ProtoToParquet] struct. Indeed, both needs to determine some kind of information
// about the message structure to be able to work properly. The write for example needs to find the
// correct Message to derive a "table" schema from. The [encoder.ProtoToParquet] needs similar information
// but this time to extract "rows" to pass to the [ParquetWriter] here.
//
// In fact more I think about, Writer and Encoder should co-exists to decouple I/O to actual line format
// used (CSV, JSONL, TSV, etc). But in the case of Parquet, there is no sense to actually have the relationship
// as the writer and the encoder a coupled and can go only hand-in-hand (e.g. there will never be a [ParquetWriter]
// with a different encoder implementation).
type ParquetWriter struct {
options *ParquetWriterOptions
descriptor protoreflect.MessageDescriptor
tables []parquetx.TableResult
tablesByName map[string]parquetx.TableResult
Expand All @@ -44,8 +36,13 @@ type ParquetWriter struct {
rowsBufferByTableName map[string]*parquet.RowBuffer[any]
}

func NewParquetWriter(descriptor protoreflect.MessageDescriptor) (*ParquetWriter, error) {
tables, rowExtractor := parquetx.FindTablesInMessageDescriptor(descriptor)
func NewParquetWriter(descriptor protoreflect.MessageDescriptor, logger *zap.Logger, tracer logging.Tracer, opts ...ParquetWriterOption) (*ParquetWriter, error) {
options, err := NewParquetWriterOptions(opts)
if err != nil {
return nil, fmt.Errorf("invalid parquet writer options: %w", err)
}

tables, rowExtractor := parquetx.FindTablesInMessageDescriptor(descriptor, options.DefaultColumnCompression, logger, tracer)
if len(tables) == 0 {
return nil, fmt.Errorf("no tables found in message descriptor")
}
Expand All @@ -56,6 +53,7 @@ func NewParquetWriter(descriptor protoreflect.MessageDescriptor) (*ParquetWriter
}

return &ParquetWriter{
options: options,
descriptor: descriptor,
tables: tables,
tablesByName: tablesByName,
Expand Down
63 changes: 63 additions & 0 deletions bundler/writer/parquet_writer_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package writer

import (
"fmt"
"strings"

"github.com/bobg/go-generics/v2/maps"
"github.com/bobg/go-generics/v2/slices"
pbparquet "github.com/streamingfast/substreams-sink-files/pb/parquet"
)

// ParquetWriterOptions holds the configuration options for the Parquet writer.
// It's the fully resolved, well typed version of ParquetWriterUserOptions which
// is the user facing configuration.
type ParquetWriterOptions struct {
DefaultColumnCompression *pbparquet.Compression
}

// ParquetWriterUserOptions holds the configuration options for the Parquet writer.
type ParquetWriterUserOptions struct {
DefaultColumnCompression string
}

func NewParquetWriterOptions(opts []ParquetWriterOption) (*ParquetWriterOptions, error) {
userOptions := &ParquetWriterUserOptions{}
for _, opt := range opts {
opt.apply(userOptions)
}

options := &ParquetWriterOptions{}
if userOptions.DefaultColumnCompression != "" {
compression, found := pbparquet.Compression_value[strings.ToUpper(userOptions.DefaultColumnCompression)]
if !found {
return nil, fmt.Errorf("invalid compression type %q, accepted compression values are %v", userOptions.DefaultColumnCompression, slices.Map(maps.Keys(pbparquet.Compression_value), strings.ToLower))
}

options.DefaultColumnCompression = ptr(pbparquet.Compression(compression))
}

return options, nil
}

// Option is a function that configures a ParquetWriterUserOptions.
type ParquetWriterOption interface {
apply(*ParquetWriterUserOptions)
}

type optionFunc func(*ParquetWriterUserOptions)

func (f optionFunc) apply(o *ParquetWriterUserOptions) {
f(o)
}

// ParquetDefaultColumnCompression sets the default column compression for the Parquet writer.
func ParquetDefaultColumnCompression(compression string) ParquetWriterOption {
return optionFunc(func(o *ParquetWriterUserOptions) {
o.DefaultColumnCompression = compression
})
}

func ptr[T any](v T) *T {
return &v
}
6 changes: 5 additions & 1 deletion cmd/substreams-sink-files/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ var SyncRunCmd = Command(syncRunE,
Default value for the buffer is 64 MiB.
`))

addCommonParquetFlags(flags)
}),
ExamplePrefixed("substreams-sink-files run",
"mainnet.eth.streamingfast.io:443 substreams.spkg map_transfers '.transfers[]' ./localdata",
Expand Down Expand Up @@ -137,12 +139,14 @@ func syncRunE(cmd *cobra.Command, args []string) error {
}

case encoderType == "parquet":
flagValues := readCommonParquetFlags(cmd)

msgDesc, err := outputProtoreflectMessageDescriptor(sinker)
if err != nil {
return fmt.Errorf("output module message descriptor: %w", err)
}

parquetWriter, err := writer.NewParquetWriter(msgDesc)
parquetWriter, err := writer.NewParquetWriter(msgDesc, zlog, tracer, flagValues.AsParquetWriterOptions()...)
if err != nil {
return fmt.Errorf("new parquet writer: %w", err)
}
Expand Down
50 changes: 50 additions & 0 deletions cmd/substreams-sink-files/shared_flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/substreams-sink-files/bundler/writer"
)

// addCommonParquetFlags adds common flags for Parquet encoder. The list of flags added by this function are:
// - parquet-default-column-compression
func addCommonParquetFlags(flags *pflag.FlagSet) {
flags.String("parquet-default-column-compression", "", cli.FlagDescription(`
The default column compression to use for all tables that is going to be created that doesn't have a specific column
compression set.
If set, if a Protobuf field doesn't have a column specific compression extension, the default compression will be used
for that column. If the field has a specific compression set, the field column specific compression will be used.
Available values are:
- uncompressed
- snappy
- gzip
- lz4_raw
- brotli
- zstd
Note that this setting is only used when the encoder is set to 'parquet'.
`))
}

type parquetCommonFlagValues struct {
DefaultColumnCompression string
}

func (f parquetCommonFlagValues) AsParquetWriterOptions() []writer.ParquetWriterOption {
writerOptions := []writer.ParquetWriterOption{}
if f.DefaultColumnCompression != "" {
writerOptions = append(writerOptions, writer.ParquetDefaultColumnCompression(f.DefaultColumnCompression))
}

return writerOptions
}

func readCommonParquetFlags(cmd *cobra.Command) parquetCommonFlagValues {
return parquetCommonFlagValues{
DefaultColumnCompression: sflags.MustGetString(cmd, "parquet-default-column-compression"),
}
}
10 changes: 9 additions & 1 deletion cmd/substreams-sink-files/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@ import (
"strings"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/streamingfast/cli"
. "github.com/streamingfast/cli"
sink "github.com/streamingfast/substreams-sink"
"github.com/streamingfast/substreams-sink-files/bundler/writer"
"github.com/streamingfast/substreams-sink-files/parquetx"
)

var ToolsParquet = Group("parquet", "Parquet related tools",
Command(toolsParquetSchemaE,
"schema <manifest> [<output_module>]",
"Generate a parquet schema from a proto message",
Flags(func(flags *pflag.FlagSet) {
addCommonParquetFlags(flags)
}),
RangeArgs(1, 2),
),
)
Expand Down Expand Up @@ -44,7 +49,10 @@ func toolsParquetSchemaE(cmd *cobra.Command, args []string) error {
descriptor, err := outputProtoreflectMessageDescriptor(sinker)
cli.NoError(err, "Failed to extract message descriptor from output module")

tables, _ := parquetx.FindTablesInMessageDescriptor(descriptor)
parquetWriterOptions, err := writer.NewParquetWriterOptions(readCommonParquetFlags(cmd).AsParquetWriterOptions())
cli.NoError(err, "Failed to create parquet writer options")

tables, _ := parquetx.FindTablesInMessageDescriptor(descriptor, parquetWriterOptions.DefaultColumnCompression, zlog, tracer)
if len(tables) == 0 {
fmt.Printf("No tables found or inferred in message descriptor %q\n", descriptor.FullName())
os.Exit(1)
Expand Down
Loading

0 comments on commit 6a8e88f

Please sign in to comment.