Skip to content

Commit

Permalink
[#32085][prism] Fix session windowing. (#32086)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored Aug 6, 2024
1 parent 17283bb commit e3e4454
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
24 changes: 15 additions & 9 deletions sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (h *runner) ExecuteTransform(stageID, tid string, t *pipepb.PTransform, com
kc := coders[kcID]
ec := coders[ecID]

data = append(data, gbkBytes(ws, wc, kc, ec, inputData, coders, watermark))
data = append(data, gbkBytes(ws, wc, kc, ec, inputData, coders))
if len(data[0]) == 0 {
panic("no data for GBK")
}
Expand Down Expand Up @@ -290,7 +290,7 @@ func windowingStrategy(comps *pipepb.Components, tid string) *pipepb.WindowingSt
}

// gbkBytes re-encodes gbk inputs in a gbk result.
func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregate [][]byte, coders map[string]*pipepb.Coder, watermark mtime.Time) []byte {
func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregate [][]byte, coders map[string]*pipepb.Coder) []byte {
// Pick how the timestamp of the aggregated output is computed.
var outputTime func(typex.Window, mtime.Time, mtime.Time) mtime.Time
switch ws.GetOutputTime() {
Expand Down Expand Up @@ -333,9 +333,8 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat
kd := pullDecoder(kc, coders)
vd := pullDecoder(vc, coders)

// Right, need to get the key coder, and the element coder.
// Cus I'll need to pull out anything the runner knows how to deal with.
// And repeat.
// Aggregate by windows and keys, using the window coder and KV coders.
// We need to extract and split the key bytes from the element bytes.
for _, data := range toAggregate {
// Parse out each element's data, and repeat.
buf := bytes.NewBuffer(data)
Expand Down Expand Up @@ -388,34 +387,41 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat
}
// Use a decreasing sort (latest to earliest) so we can correct
// the output timestamp to the new end of window immeadiately.
// TODO need to correct this if output time is different.
sort.Slice(ordered, func(i, j int) bool {
return ordered[i].MaxTimestamp() > ordered[j].MaxTimestamp()
})

cur := ordered[0]
sessionData := windows[cur]
delete(windows, cur)
for _, iw := range ordered[1:] {
// If they overlap, then we merge the data.
// Check if the gap between windows is less than the gapSize.
// If not, this window is done, and we start a next window.
if iw.End+gapSize < cur.Start {
// Start a new session.
// Store current data with the current window.
windows[cur] = sessionData
// Use the incoming window instead, and clear it from the map.
cur = iw
sessionData = windows[iw]
delete(windows, cur)
// There's nothing to merge, since we've just started with this windowed data.
continue
}
// Extend the session
// Extend the session with the incoming window, and merge the the incoming window's data.
cur.Start = iw.Start
toMerge := windows[iw]
delete(windows, iw)
for k, kt := range toMerge {
skt := sessionData[k]
// Ensure the output time matches the given function.
skt.time = outputTime(cur, kt.time, skt.time)
skt.key = kt.key
skt.w = cur
skt.values = append(skt.values, kt.values...)
sessionData[k] = skt
}
}
windows[cur] = sessionData
}
// Everything's aggregated!
// Time to turn things into a windowed KV<K, Iterable<V>>
Expand Down
22 changes: 22 additions & 0 deletions sdks/python/apache_beam/runners/portability/prism_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
from apache_beam.runners.portability import portable_runner_test
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms import window
from apache_beam.transforms.sql import SqlTransform
from apache_beam.utils import timestamp

# Run as
#
Expand Down Expand Up @@ -178,6 +180,26 @@ def create_options(self):

return options

# Slightly more robust session window test:
# Validates that an inner grouping doesn't duplicate data either.
# Copied also because the timestamp in fn_runner_test.py isn't being
# inferred correctly as seconds for some reason, but as micros.
# The belabored specification is validating the timestamp type works at least.
# See https://github.com/apache/beam/issues/32085
def test_windowing(self):
with self.create_pipeline() as p:
res = (
p
| beam.Create([1, 2, 100, 101, 102, 123])
| beam.Map(
lambda t: window.TimestampedValue(
('k', t), timestamp.Timestamp.of(t).micros))
| beam.WindowInto(beam.transforms.window.Sessions(10))
| beam.GroupByKey()
| beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
assert_that(
res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])]))

# Can't read host files from within docker, read a "local" file there.
def test_read(self):
print('name:', __name__)
Expand Down

0 comments on commit e3e4454

Please sign in to comment.