Skip to content

Commit

Permalink
Move compiler/optimizer.Optimizer.Parallelize
Browse files Browse the repository at this point in the history
Put it in compiler/optimizer/parallelize.go with the rest of the
parallelization code.
  • Loading branch information
nwt committed Nov 7, 2024
1 parent 76ef358 commit 70083c8
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 70 deletions.
70 changes: 0 additions & 70 deletions compiler/optimizer/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,60 +446,6 @@ func (o *Optimizer) sortKey(id ksuid.KSUID) (order.SortKeys, error) {
return pool.SortKeys, nil
}

// Parallelize tries to parallelize the DAG by splitting each source
// path as much as possible of the sequence into n parallel branches.
func (o *Optimizer) Parallelize(seq dag.Seq, n int) (dag.Seq, error) {
// Compute the number of parallel paths across all input sources to
// achieve the desired level of concurrency. At some point, we should
// use a semaphore here and let each possible path use the max concurrency.
if o.nent == 0 {
return seq, nil
}
concurrency := n / o.nent
if concurrency < 2 {
concurrency = 2
}
seq, err := walkEntries(seq, func(seq dag.Seq) (dag.Seq, error) {
if len(seq) == 0 {
return seq, nil
}
var front, tail dag.Seq
if lister, slicer, rest := matchSource(seq); lister != nil {
// We parallelize the scanning to achieve the desired concurrency,
// then the step below pulls downstream operators into the parallel
// branches when possible, e.g., to parallelize aggregations etc.
front.Append(lister)
if slicer != nil {
front.Append(slicer)
}
tail = rest
} else if scan, ok := seq[0].(*dag.DefaultScan); ok {
front.Append(scan)
tail = seq[1:]
} else {
return seq, nil
}
if len(tail) == 0 {
return seq, nil
}
parallel, err := o.parallelizeScan(tail, concurrency)
if err != nil {
return nil, err
}
if parallel == nil {
// Leave the source path unmodified.
return seq, nil
}
// Replace the source path with the parallelized gadget.
return append(front, parallel...), nil
})
if err != nil {
return nil, err
}
o.optimizeParallels(seq)
return removePassOps(seq), nil
}

func (o *Optimizer) lookupPool(id ksuid.KSUID) (*lake.Pool, error) {
if o.lake == nil {
return nil, errors.New("internal error: lake operation cannot be used in non-lake context")
Expand All @@ -508,22 +454,6 @@ func (o *Optimizer) lookupPool(id ksuid.KSUID) (*lake.Pool, error) {
return o.lake.OpenPool(o.ctx, id)
}

func matchSource(seq dag.Seq) (*dag.Lister, *dag.Slicer, dag.Seq) {
lister, ok := seq[0].(*dag.Lister)
if !ok {
return nil, nil, nil
}
seq = seq[1:]
slicer, ok := seq[0].(*dag.Slicer)
if ok {
seq = seq[1:]
}
if _, ok := seq[0].(*dag.SeqScan); !ok {
panic("parseSource: no SeqScan")
}
return lister, slicer, seq
}

// matchFilter attempts to find a filter from the front seq
// and returns the filter's expression (and the modified seq) so
// we can lift the filter predicate into the scanner.
Expand Down
70 changes: 70 additions & 0 deletions compiler/optimizer/parallelize.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,76 @@ import (
"github.com/brimdata/super/order"
)

// Parallelize tries to parallelize the DAG by splitting each source
// path as much as possible of the sequence into n parallel branches.
func (o *Optimizer) Parallelize(seq dag.Seq, n int) (dag.Seq, error) {
// Compute the number of parallel paths across all input sources to
// achieve the desired level of concurrency. At some point, we should
// use a semaphore here and let each possible path use the max concurrency.
if o.nent == 0 {
return seq, nil
}
concurrency := n / o.nent
if concurrency < 2 {
concurrency = 2
}
seq, err := walkEntries(seq, func(seq dag.Seq) (dag.Seq, error) {
if len(seq) == 0 {
return seq, nil
}
var front, tail dag.Seq
if lister, slicer, rest := matchSource(seq); lister != nil {
// We parallelize the scanning to achieve the desired concurrency,
// then the step below pulls downstream operators into the parallel
// branches when possible, e.g., to parallelize aggregations etc.
front.Append(lister)
if slicer != nil {
front.Append(slicer)
}
tail = rest
} else if scan, ok := seq[0].(*dag.DefaultScan); ok {
front.Append(scan)
tail = seq[1:]
} else {
return seq, nil
}
if len(tail) == 0 {
return seq, nil
}
parallel, err := o.parallelizeScan(tail, concurrency)
if err != nil {
return nil, err
}
if parallel == nil {
// Leave the source path unmodified.
return seq, nil
}
// Replace the source path with the parallelized gadget.
return append(front, parallel...), nil
})
if err != nil {
return nil, err
}
o.optimizeParallels(seq)
return removePassOps(seq), nil
}

func matchSource(seq dag.Seq) (*dag.Lister, *dag.Slicer, dag.Seq) {
lister, ok := seq[0].(*dag.Lister)
if !ok {
return nil, nil, nil
}
seq = seq[1:]
slicer, ok := seq[0].(*dag.Slicer)
if ok {
seq = seq[1:]
}
if _, ok := seq[0].(*dag.SeqScan); !ok {
panic("parseSource: no SeqScan")
}
return lister, slicer, seq
}

func (o *Optimizer) parallelizeScan(seq dag.Seq, replicas int) (dag.Seq, error) {
// For now we parallelize only pool scans and no metadata scans.
// We can do the latter when we want to scale the performance of metadata.
Expand Down

0 comments on commit 70083c8

Please sign in to comment.