Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vam: Array Expressions #5462

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions compiler/kernel/vexpr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func (b *Builder) compileVamExpr(e dag.Expr) (vamexpr.Evaluator, error) {
return nil, errors.New("null expression not allowed")
}
switch e := e.(type) {
case *dag.ArrayExpr:
return b.compileVamArrayExpr(e)
case *dag.Literal:
val, err := zson.ParseValue(b.zctx(), e.Value)
if err != nil {
Expand Down Expand Up @@ -47,8 +49,6 @@ func (b *Builder) compileVamExpr(e dag.Expr) (vamexpr.Evaluator, error) {
// return b.compileVamRegexpSearch(e)
case *dag.RecordExpr:
return b.compileVamRecordExpr(e)
//case *dag.ArrayExpr:
// return b.compileVamArrayExpr(e)
//case *dag.SetExpr:
// return b.compileVamSetExpr(e)
//case *dag.MapCall:
Expand Down Expand Up @@ -218,3 +218,34 @@ func (b *Builder) compileVamRecordExpr(e *dag.RecordExpr) (vamexpr.Evaluator, er
}
return vamexpr.NewRecordExpr(b.zctx(), elems), nil
}

func (b *Builder) compileVamArrayExpr(e *dag.ArrayExpr) (vamexpr.Evaluator, error) {
elems, err := b.compileVamListElems(e.Elems)
if err != nil {
return nil, err
}
return vamexpr.NewArrayExpr(b.zctx(), elems), nil
}

func (b *Builder) compileVamListElems(elems []dag.VectorElem) ([]vamexpr.ListElem, error) {
var out []vamexpr.ListElem
for _, elem := range elems {
switch elem := elem.(type) {
case *dag.Spread:
e, err := b.compileVamExpr(elem.Expr)
if err != nil {
return nil, err
}
out = append(out, vamexpr.ListElem{Spread: e})
case *dag.VectorValue:
e, err := b.compileVamExpr(elem.Expr)
if err != nil {
return nil, err
}
out = append(out, vamexpr.ListElem{Value: e})
default:
panic(elem)
}
mattnibs marked this conversation as resolved.
Show resolved Hide resolved
}
return out, nil
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/brimdata/super
go 1.23

require (
github.com/RoaringBitmap/roaring v1.9.4
github.com/agnivade/levenshtein v1.1.1
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d
github.com/apache/arrow-go/v18 v18.0.0
Expand Down Expand Up @@ -46,6 +47,7 @@ require (
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/apache/thrift v0.21.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.12.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect
Expand All @@ -64,6 +66,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ=
github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8=
github.com/agnivade/levenshtein v1.1.1/go.mod h1:veldBMzWxcCG2ZvUTKD2kJNRdCk5hVbJomOvKkmgYbo=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -65,6 +67,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down Expand Up @@ -244,6 +248,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
Expand Down
6 changes: 0 additions & 6 deletions runtime/sam/expr/ztests/array-spread.yaml

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
zed: yield [...a,...b], |[...a,...b]|
zed: yield |[...a,...b]|

input: |
{a:|[2,3]|,b:[0,1],c:"hi"}
{a:[1,2],b:"hi"}

output: |
[2,3,0,1]
|[0,1,2,3]|
[1,2]
|[1,2]|
129 changes: 129 additions & 0 deletions runtime/vam/expr/arrayexpr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package expr

import (
"github.com/brimdata/super"
"github.com/brimdata/super/vector"
"github.com/brimdata/super/zcode"
)

type ListElem struct {
Value Evaluator
Spread Evaluator
}

type ArrayExpr struct {
elems []ListElem
zctx *super.Context
}

func NewArrayExpr(zctx *super.Context, elems []ListElem) *ArrayExpr {
return &ArrayExpr{
elems: elems,
zctx: zctx,
}
}

func (a *ArrayExpr) Eval(this vector.Any) vector.Any {
var vecs []vector.Any
for _, e := range a.elems {
if e.Spread != nil {
vecs = append(vecs, e.Spread.Eval(this))
} else {
vecs = append(vecs, e.Value.Eval(this))
}
}
return vector.Apply(false, a.eval, vecs...)
}

func (a *ArrayExpr) eval(in ...vector.Any) vector.Any {
n := in[0].Len()
if n == 0 {
return vector.NewConst(super.Null, 0, nil)
}
var spreadOffs [][]uint32
var viewIndexes [][]uint32
var vecs []vector.Any
for i, elem := range a.elems {
vec := in[i]
var offsets, index []uint32
if elem.Spread != nil {
vec, offsets, index = a.unwrapSpread(in[i])
if vec == nil {
// drop unspreadable elements.
continue
}
}
vecs = append(vecs, vec)
spreadOffs = append(spreadOffs, offsets)
viewIndexes = append(viewIndexes, index)
}
offsets := []uint32{0}
var tags []uint32
for i := range n {
var size uint32
for tag, spreadOff := range spreadOffs {
if len(spreadOff) == 0 {
tags = append(tags, uint32(tag))
size++
} else {
if index := viewIndexes[tag]; index != nil {
i = index[i]
}
off := spreadOff[i]
for end := spreadOff[i+1]; off < end; off++ {
tags = append(tags, uint32(tag))
size++
}
}
}
offsets = append(offsets, offsets[i]+size)
}
var typ super.Type
var innerVec vector.Any
if len(vecs) == 1 {
typ = vecs[0].Type()
innerVec = vecs[0]
} else {
var all []super.Type
for _, vec := range vecs {
all = append(all, vec.Type())
}
types := super.UniqueTypes(all)
if len(types) == 1 {
typ = types[0]
innerVec = mergeSameTypeVecs(typ, tags, vecs)
} else {
typ = a.zctx.LookupTypeUnion(types)
innerVec = vector.NewUnion(typ.(*super.TypeUnion), tags, vecs, nil)
}
}
return vector.NewArray(a.zctx.LookupTypeArray(typ), offsets, innerVec, nil)
}

func (a *ArrayExpr) unwrapSpread(vec vector.Any) (vector.Any, []uint32, []uint32) {
switch vec := vec.(type) {
case *vector.Array:
return vec.Values, vec.Offsets, nil
case *vector.Set:
return vec.Values, vec.Offsets, nil
case *vector.View:
vals, offsets, _ := a.unwrapSpread(vec.Any)
return vals, offsets, vec.Index
}
return nil, nil, nil
}

func mergeSameTypeVecs(typ super.Type, tags []uint32, vecs []vector.Any) vector.Any {
// XXX This is going to be slow. At some point would nice to write a native
// merge of same type vectors.
counts := make([]uint32, len(vecs))
vb := vector.NewBuilder(typ)
var b zcode.Builder
for _, tag := range tags {
b.Truncate()
vecs[tag].Serialize(&b, counts[tag])
vb.Write(b.Bytes().Body())
counts[tag]++
}
return vb.Build()
}
22 changes: 22 additions & 0 deletions runtime/ztests/expr/array-expr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
zed: yield [a,b,c]

vector: true

input: |
{a:error("missing"),b:error("quiet"),c:null(error(string))}
{a:[1,2],b:null([int64]),c:[3,4]}
{a:[1,"foo"],b:[2,"bar"],c:[3,"baz"]}
{a:|[1,2]|,b:null(|[int64]|),c:|[3,4]|}
{a:|{"key":"k1"}|,b:null(|{string:string}|),c:|{"key":"k3"}|}
// heterogenous
{a:"foo",b:1,c:127.0.0.1}
{a:"bar",b:2,c:127.0.0.2}

output: |
[error("missing"),error("quiet"),null(error(string))]
[[1,2],null([int64]),[3,4]]
[[1,"foo"],[2,"bar"],[3,"baz"]]
[|[1,2]|,null(|[int64]|),|[3,4]|]
[|{"key":"k1"}|,null(|{string:string}|),|{"key":"k3"}|]
["foo",1,127.0.0.1]
["bar",2,127.0.0.2]
15 changes: 15 additions & 0 deletions runtime/ztests/expr/array-spread.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
zed: yield [...a,...b]

vector: true

input: |
{a:|[1,2]|,b:[0,1,null(int64)],c:"hi"}
{a:|[1,2]|,b:"hi"}
{a:[{x:"foo"},null({x:string})],b:[{x:"bar"}]}
{a:"foo",b:"bar"}

output: |
[1,2,0,1,null(int64)]
[1,2]
[{x:"foo"},null({x:string}),{x:"bar"}]
[]
43 changes: 43 additions & 0 deletions vector/bool.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,46 @@ func NullsOf(v Any) *Bool {
}
panic(v)
}

func setNulls(v Any, nulls *Bool) {
switch v := v.(type) {
case *Array:
v.Nulls = nulls
case *Bytes:
v.Nulls = nulls
case *Bool:
v.Nulls = nulls
case *Const:
v.Nulls = nulls
case *Dict:
v.Nulls = nulls
case *Error:
v.Nulls = nulls
case *Float:
v.Nulls = nulls
case *Int:
v.Nulls = nulls
case *IP:
v.Nulls = nulls
case *Map:
v.Nulls = nulls
case *Named:
setNulls(v.Any, nulls)
case *Net:
v.Nulls = nulls
case *Record:
v.Nulls = nulls
case *Set:
v.Nulls = nulls
case *String:
v.Nulls = nulls
case *TypeValue:
v.Nulls = nulls
case *Uint:
v.Nulls = nulls
case *Union:
v.Nulls = nulls
default:
panic(v)
}
}
Loading