Skip to content

Commit

Permalink
groot/{internal/rcompress,riofs,rtree}: expose API to customize key-c…
Browse files Browse the repository at this point in the history
…ompression

Signed-off-by: Sebastien Binet <[email protected]>
  • Loading branch information
sbinet committed Oct 22, 2024
1 parent 7c6853a commit aab80b1
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 11 deletions.
8 changes: 8 additions & 0 deletions groot/internal/rcompress/rcompress.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ type Settings struct {
Lvl int
}

// SettingsFrom create a Settings value from the provided compression
// configuration (compression algorithm and compression level), using
// ROOT's encoding.
func SettingsFrom(compr int32) Settings {
alg, lvl := rootCompressAlgLvl(compr)
return Settings{Alg: alg, Lvl: lvl}
}

// DefaultSettings is the default compression algorithm and level used
// in ROOT files and trees.
var DefaultSettings = Settings{Alg: ZLIB, Lvl: flate.BestSpeed}
Expand Down
2 changes: 1 addition & 1 deletion groot/riofs/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (dir *tdirectoryFile) Put(name string, obj root.Object) error {
dir.addStreamer(si)
}

key, err := newKeyFrom(dir, name, title, rdict.GoName2Cxx(typename), obj, dir.file)
key, err := newKeyFrom(dir, name, title, rdict.GoName2Cxx(typename), obj, dir.file, nil) // FIXME(sbinet): wire in key-opt ?
if err != nil {
return fmt.Errorf("riofs: could not create key %q for object %T: %w", name, obj, err)
}
Expand Down
2 changes: 1 addition & 1 deletion groot/riofs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func (f *File) writeStreamerInfo() error {
return fmt.Errorf("riofs: could not write StreamerInfo list: %w", err)
}

key, err = newKeyFromBuf(&f.dir, "StreamerInfo", sinfos.Title(), sinfos.Class(), 1, buf.Bytes(), f)
key, err = newKeyFromBuf(&f.dir, "StreamerInfo", sinfos.Title(), sinfos.Class(), 1, buf.Bytes(), f, nil)
if err != nil {
return fmt.Errorf("riofs: could not create StreamerInfo key: %w", err)
}
Expand Down
59 changes: 53 additions & 6 deletions groot/riofs/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,27 @@ import (
"go-hep.org/x/hep/groot/rvers"
)

// KeyOption configures how a Key may be created.
type KeyOption func(cfg *keyCfg) error

// WithKeyCompression configures a Key to use the provided compression scheme.
func WithKeyCompression(compression int32) KeyOption {
return func(cfg *keyCfg) error {
cfg.compr = rcompress.SettingsFrom(compression)
return nil
}
}

type keyCfg struct {
compr rcompress.Settings
}

func newKeyCfg() *keyCfg {
return &keyCfg{
compr: rcompress.Settings{Alg: rcompress.Inherit},
}
}

// noKeyError is the error returned when a riofs.Key could not be found.
type noKeyError struct {
key string
Expand Down Expand Up @@ -122,20 +143,28 @@ func newKey(dir *tdirectoryFile, name, title, class string, objlen int32, f *Fil
// NewKey creates a new key from the provided serialized object buffer.
// NewKey puts the key and its payload at the end of the provided file f.
// Depending on the file configuration, NewKey may compress the provided object buffer.
func NewKey(dir Directory, name, title, class string, cycle int16, obj []byte, f *File) (Key, error) {
func NewKey(dir Directory, name, title, class string, cycle int16, obj []byte, f *File, kopts ...KeyOption) (Key, error) {
var d *tdirectoryFile
if dir != nil {
d = dir.(*tdirectoryFile)
}
return newKeyFromBuf(d, name, title, class, cycle, obj, f)
return newKeyFromBuf(d, name, title, class, cycle, obj, f, kopts)
}

func newKeyFrom(dir *tdirectoryFile, name, title, class string, obj root.Object, f *File) (Key, error) {
func newKeyFrom(dir *tdirectoryFile, name, title, class string, obj root.Object, f *File, kopts []KeyOption) (Key, error) {
var err error
if dir == nil {
dir = &f.dir
}

kcfg := newKeyCfg()
for _, opt := range kopts {
err = opt(kcfg)
if err != nil {
return Key{}, fmt.Errorf("riofs: could not setup Key option: %w", err)
}
}

keylen := keylenFor(name, title, class, dir, f.end)

buf := rbytes.NewWBuffer(nil, nil, uint32(keylen), dir.file)
Expand Down Expand Up @@ -171,7 +200,12 @@ func newKeyFrom(dir *tdirectoryFile, name, title, class string, obj root.Object,
k.rvers += 1000
}

k.buf, err = rcompress.Compress(nil, buf.Bytes(), k.f.compression)
compress := k.f.compression
if kcfg.compr.Alg != rcompress.Inherit {
compress = kcfg.compr.Compression()
}

k.buf, err = rcompress.Compress(nil, buf.Bytes(), compress)
if err != nil {
return k, fmt.Errorf("riofs: could not compress object %T for key %q: %w", obj, name, err)
}
Expand All @@ -185,12 +219,20 @@ func newKeyFrom(dir *tdirectoryFile, name, title, class string, obj root.Object,
return k, nil
}

func newKeyFromBuf(dir *tdirectoryFile, name, title, class string, cycle int16, buf []byte, f *File) (Key, error) {
func newKeyFromBuf(dir *tdirectoryFile, name, title, class string, cycle int16, buf []byte, f *File, kopts []KeyOption) (Key, error) {
var err error
if dir == nil {
dir = &f.dir
}

kcfg := newKeyCfg()
for _, opt := range kopts {
err = opt(kcfg)
if err != nil {
return Key{}, fmt.Errorf("riofs: could not setup Key option: %w", err)
}
}

keylen := keylenFor(name, title, class, dir, f.end)
objlen := int32(len(buf))
k := Key{
Expand All @@ -212,7 +254,12 @@ func newKeyFromBuf(dir *tdirectoryFile, name, title, class string, cycle int16,
k.rvers += 1000
}

k.buf, err = rcompress.Compress(nil, buf, k.f.compression)
compress := k.f.compression
if kcfg.compr.Alg != rcompress.Inherit {
compress = kcfg.compr.Compression()
}

k.buf, err = rcompress.Compress(nil, buf, compress)
if err != nil {
return k, fmt.Errorf("riofs: could not compress object %s for key %q: %w", class, name, err)
}
Expand Down
4 changes: 2 additions & 2 deletions groot/rtree/basket.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (b *Basket) grow(n int) {
b.offsets = append(b.offsets, make([]int32, delta)...)
}

func (b *Basket) writeFile(f *riofs.File) (totBytes int64, zipBytes int64, err error) {
func (b *Basket) writeFile(f *riofs.File, compr int32) (totBytes int64, zipBytes int64, err error) {
header := b.header
b.header = true
defer func() {
Expand All @@ -332,7 +332,7 @@ func (b *Basket) writeFile(f *riofs.File) (totBytes int64, zipBytes int64, err e
b.wbuf.WriteArrayI32(b.offsets[:b.nevbuf])
b.wbuf.WriteI32(0)
}
b.key, err = riofs.NewKey(nil, b.key.Name(), b.key.Title(), b.Class(), int16(b.key.Cycle()), b.wbuf.Bytes(), f)
b.key, err = riofs.NewKey(nil, b.key.Name(), b.key.Title(), b.Class(), int16(b.key.Cycle()), b.wbuf.Bytes(), f, riofs.WithKeyCompression(compr))
if err != nil {
return 0, 0, fmt.Errorf("rtree: could not create basket-key: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion groot/rtree/branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ func (b *tbranch) flush() error {
}

f := b.tree.getFile()
totBytes, zipBytes, err := b.ctx.bk.writeFile(f)
totBytes, zipBytes, err := b.ctx.bk.writeFile(f, int32(b.compress))
if err != nil {
return fmt.Errorf("could not marshal basket[%d] (branch=%q): %w", b.writeBasket, b.Name(), err)
}
Expand Down

0 comments on commit aab80b1

Please sign in to comment.