From 70083c88bbcb256be3549f2296c920e06714ac54 Mon Sep 17 00:00:00 2001 From: Noah Treuhaft Date: Thu, 7 Nov 2024 10:57:36 -0500 Subject: [PATCH] Move compiler/optimizer.Optimizer.Parallelize Put it in compiler/optimizer/parallelize.go with the rest of the parallelization code. --- compiler/optimizer/optimizer.go | 70 ------------------------------- compiler/optimizer/parallelize.go | 70 +++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 70 deletions(-) diff --git a/compiler/optimizer/optimizer.go b/compiler/optimizer/optimizer.go index f8a5b2ba84..a38cd084aa 100644 --- a/compiler/optimizer/optimizer.go +++ b/compiler/optimizer/optimizer.go @@ -446,60 +446,6 @@ func (o *Optimizer) sortKey(id ksuid.KSUID) (order.SortKeys, error) { return pool.SortKeys, nil } -// Parallelize tries to parallelize the DAG by splitting each source -// path as much as possible of the sequence into n parallel branches. -func (o *Optimizer) Parallelize(seq dag.Seq, n int) (dag.Seq, error) { - // Compute the number of parallel paths across all input sources to - // achieve the desired level of concurrency. At some point, we should - // use a semaphore here and let each possible path use the max concurrency. - if o.nent == 0 { - return seq, nil - } - concurrency := n / o.nent - if concurrency < 2 { - concurrency = 2 - } - seq, err := walkEntries(seq, func(seq dag.Seq) (dag.Seq, error) { - if len(seq) == 0 { - return seq, nil - } - var front, tail dag.Seq - if lister, slicer, rest := matchSource(seq); lister != nil { - // We parallelize the scanning to achieve the desired concurrency, - // then the step below pulls downstream operators into the parallel - // branches when possible, e.g., to parallelize aggregations etc. - front.Append(lister) - if slicer != nil { - front.Append(slicer) - } - tail = rest - } else if scan, ok := seq[0].(*dag.DefaultScan); ok { - front.Append(scan) - tail = seq[1:] - } else { - return seq, nil - } - if len(tail) == 0 { - return seq, nil - } - parallel, err := o.parallelizeScan(tail, concurrency) - if err != nil { - return nil, err - } - if parallel == nil { - // Leave the source path unmodified. - return seq, nil - } - // Replace the source path with the parallelized gadget. - return append(front, parallel...), nil - }) - if err != nil { - return nil, err - } - o.optimizeParallels(seq) - return removePassOps(seq), nil -} - func (o *Optimizer) lookupPool(id ksuid.KSUID) (*lake.Pool, error) { if o.lake == nil { return nil, errors.New("internal error: lake operation cannot be used in non-lake context") @@ -508,22 +454,6 @@ func (o *Optimizer) lookupPool(id ksuid.KSUID) (*lake.Pool, error) { return o.lake.OpenPool(o.ctx, id) } -func matchSource(seq dag.Seq) (*dag.Lister, *dag.Slicer, dag.Seq) { - lister, ok := seq[0].(*dag.Lister) - if !ok { - return nil, nil, nil - } - seq = seq[1:] - slicer, ok := seq[0].(*dag.Slicer) - if ok { - seq = seq[1:] - } - if _, ok := seq[0].(*dag.SeqScan); !ok { - panic("parseSource: no SeqScan") - } - return lister, slicer, seq -} - // matchFilter attempts to find a filter from the front seq // and returns the filter's expression (and the modified seq) so // we can lift the filter predicate into the scanner. diff --git a/compiler/optimizer/parallelize.go b/compiler/optimizer/parallelize.go index e6d3c02e2e..3cf8a0628f 100644 --- a/compiler/optimizer/parallelize.go +++ b/compiler/optimizer/parallelize.go @@ -8,6 +8,76 @@ import ( "github.com/brimdata/super/order" ) +// Parallelize tries to parallelize the DAG by splitting each source +// path as much as possible of the sequence into n parallel branches. +func (o *Optimizer) Parallelize(seq dag.Seq, n int) (dag.Seq, error) { + // Compute the number of parallel paths across all input sources to + // achieve the desired level of concurrency. At some point, we should + // use a semaphore here and let each possible path use the max concurrency. + if o.nent == 0 { + return seq, nil + } + concurrency := n / o.nent + if concurrency < 2 { + concurrency = 2 + } + seq, err := walkEntries(seq, func(seq dag.Seq) (dag.Seq, error) { + if len(seq) == 0 { + return seq, nil + } + var front, tail dag.Seq + if lister, slicer, rest := matchSource(seq); lister != nil { + // We parallelize the scanning to achieve the desired concurrency, + // then the step below pulls downstream operators into the parallel + // branches when possible, e.g., to parallelize aggregations etc. + front.Append(lister) + if slicer != nil { + front.Append(slicer) + } + tail = rest + } else if scan, ok := seq[0].(*dag.DefaultScan); ok { + front.Append(scan) + tail = seq[1:] + } else { + return seq, nil + } + if len(tail) == 0 { + return seq, nil + } + parallel, err := o.parallelizeScan(tail, concurrency) + if err != nil { + return nil, err + } + if parallel == nil { + // Leave the source path unmodified. + return seq, nil + } + // Replace the source path with the parallelized gadget. + return append(front, parallel...), nil + }) + if err != nil { + return nil, err + } + o.optimizeParallels(seq) + return removePassOps(seq), nil +} + +func matchSource(seq dag.Seq) (*dag.Lister, *dag.Slicer, dag.Seq) { + lister, ok := seq[0].(*dag.Lister) + if !ok { + return nil, nil, nil + } + seq = seq[1:] + slicer, ok := seq[0].(*dag.Slicer) + if ok { + seq = seq[1:] + } + if _, ok := seq[0].(*dag.SeqScan); !ok { + panic("parseSource: no SeqScan") + } + return lister, slicer, seq +} + func (o *Optimizer) parallelizeScan(seq dag.Seq, replicas int) (dag.Seq, error) { // For now we parallelize only pool scans and no metadata scans. // We can do the latter when we want to scale the performance of metadata.