Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics is flaky #20976

Closed
damccorm opened this issue Jun 4, 2022 · 1 comment

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

Sample error:
https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Commit/2869/testReport/junit/apache_beam.runners.portability.flink_runner_test/FlinkRunnerTestOptimized/test_flink_metrics/


Error Message
AssertionError: Items in the second set but not the first: 'stateful).beam.metric:statecache:hit:
11' 'stateful).beam.metric:statecache:put: 1' 'stateful).beam.metric:statecache:miss: 1' 'stateful).beam.metric:statecache:get_total:
120' 'stateful).beam.metric:statecache:size: 10' 'stateful).beam.metric:statecache:get: 12' 'stateful).beam.metric:statecache:evict:
0' 'stateful).beam.metric:statecache:capacity: 123' 'stateful).beam.metric:statecache:put_total: 10'
'stateful).beam.metric:statecache:evict_total: 0' 'counter: 110' 'stateful).beam.metric:statecache:miss_total:
10' 'stateful).beam.metric:statecache:hit_total: 110'
Stacktrace
self = <apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized
testMethod=test_flink_metrics>

    def test_flink_metrics(self):
      """Run a simple DoFn that
increments a counter and verifies state
      caching metrics. Verifies that its expected value is
written to a
      temporary file by the FileReporter"""
    
      counter_name = 'elem_counter'

     state_spec = userstate.BagStateSpec('state', VarIntCoder())
    
      class DoFn(beam.DoFn):

       def __init__(self):
          self.counter = Metrics.counter(self.__class__, counter_name)

         _LOGGER.info('counter: %s' % self.counter.metric_name)
    
        def process(self, kv,
state=beam.DoFn.StateParam(state_spec)):
          # Trigger materialization
          list(state.read())

         state.add(1)
          self.counter.inc()
    
      options = self.create_options()
 
    # Test only supports parallelism of 1
      options._all_options['parallelism'] = 1
      # Create
multiple bundles to test cache metrics
      options._all_options['max_bundle_size'] = 10
      options._all_options['max_bundle_time_millis']
= 95130590130
      experiments = options.view_as(DebugOptions).experiments or []
      experiments.append('state_cache_size=123')

     options.view_as(DebugOptions).experiments = experiments
      with Pipeline(self.get_runner(),
options) as p:
        # pylint: disable=expression-not-assigned
        (
            p
      
     | "create" >> beam.Create(list(range(0, 110)))
            | "mapper" >> beam.Map(lambda x: (x
% 10, 'val'))
            | "stateful" >> beam.ParDo(DoFn()))
    
      lines_expected = {'counter:
110'}
      if options.view_as(StandardOptions).streaming:
        lines_expected.update([
     
      # Gauges for the last finished bundle
            'stateful.beam.metric:statecache:capacity:
123',
            'stateful.beam.metric:statecache:size: 10',
            'stateful.beam.metric:statecache:get:
20',
            'stateful.beam.metric:statecache:miss: 0',
            'stateful.beam.metric:statecache:hit:
20',
            'stateful.beam.metric:statecache:put: 0',
            'stateful.beam.metric:statecache:evict:
0',
            # Counters
            'stateful.beam.metric:statecache:get_total: 220',
       
    'stateful.beam.metric:statecache:miss_total: 10',
            'stateful.beam.metric:statecache:hit_total:
210',
            'stateful.beam.metric:statecache:put_total: 10',
            'stateful.beam.metric:statecache:evict_total:
0',
        ])
      else:
        # Batch has a different processing model. All values for
   
    # a key are processed at once.
        lines_expected.update([
            # Gauges
        
   'stateful).beam.metric:statecache:capacity: 123',
            # For the first key, the cache token
will not be set yet.
            # It's lazily initialized after first access in StateRequestHandlers

           'stateful).beam.metric:statecache:size: 10',
            # We have 11 here because there
are 110 / 10 elements per key
            'stateful).beam.metric:statecache:get: 12',
           
'stateful).beam.metric:statecache:miss: 1',
            'stateful).beam.metric:statecache:hit: 11',

           # State is flushed back once per key
            'stateful).beam.metric:statecache:put:
1',
            'stateful).beam.metric:statecache:evict: 0',
            # Counters
            'stateful).beam.metric:statecache:get_total:
120',
            'stateful).beam.metric:statecache:miss_total: 10',
            'stateful).beam.metric:statecache:hit_total:
110',
            'stateful).beam.metric:statecache:put_total: 10',
            'stateful).beam.metric:statecache:evict_total:
0',
        ])
      lines_actual = set()
      with open(self.test_metrics_path, 'r') as f:
  
     for line in f:
          for metric_str in lines_expected:
            metric_name = metric_str.split()[0]

           if metric_str in line:
              lines_actual.add(metric_str)
            elif metric_name
in line:
              lines_actual.add(line)
>     self.assertSetEqual(lines_actual, lines_expected)
E
    AssertionError: Items in the second set but not the first:
E     'stateful).beam.metric:statecache:hit:
11'
E     'stateful).beam.metric:statecache:put: 1'
E     'stateful).beam.metric:statecache:miss:
1'
E     'stateful).beam.metric:statecache:get_total: 120'
E     'stateful).beam.metric:statecache:size:
10'
E     'stateful).beam.metric:statecache:get: 12'
E     'stateful).beam.metric:statecache:evict:
0'
E     'stateful).beam.metric:statecache:capacity: 123'
E     'stateful).beam.metric:statecache:put_total:
10'
E     'stateful).beam.metric:statecache:evict_total: 0'
E     'counter: 110'
E     'stateful).beam.metric:statecache:miss_total:
10'
E     'stateful).beam.metric:statecache:hit_total: 110'

apache_beam/runners/portability/flink_runner_test.py:390:
AssertionError


Imported from Jira BEAM-12019. Original Jira may contain additional context.
Reported by: tvalentyn.

@damccorm
Copy link
Contributor Author

I think this is fixed. If not, it should get auto-flagged by our tooling anyways, so this should be safe to close

@github-actions github-actions bot added this to the 2.61.0 Release milestone Oct 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant