Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: [Go SDK] periodic.impulse increases Dataflow Backlog time #27707

Closed
2 of 15 tasks
kemkemG0 opened this issue Jul 27, 2023 · 7 comments
Closed
2 of 15 tasks

[Bug]: [Go SDK] periodic.impulse increases Dataflow Backlog time #27707

kemkemG0 opened this issue Jul 27, 2023 · 7 comments

Comments

@kemkemG0
Copy link

kemkemG0 commented Jul 27, 2023

What happened?

We have a slowly updating object that we'd like to use as a SideInput.

We've decided to utilize periodic.Impulse to periodically update the SideInput. We set the startTime to be "now" and the endTime to be "10 years later", with the intention of producing output endlessly.

However, we observed an extreme increase in the Backlog time, ballooning to around 600 weeks. We suspect that this is because we've set the endTime to be 10 years later.

Image below, Reaching to around 500 weeks = 10 years as I set as endTime.
スクリーンショット 2023-07-27 19 17 31

It appears that Dataflow estimates the end of the job as "10 years later = 500 weeks", causing horizontal scaling of workers and resulting in the use of 100 workers.

Below is our simplified sample code, which closely follows this reference.

[Refrence] https://github.com/apache/beam/blob/dc1cfe54bfa0d3a22034f3fea463f0284cb2ba83/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go

func main() {
	flag.Parse()
	beam.Init()
	ctx := context.Background()

	checkFlags(ctx)

	project := gcpopts.GetProject(ctx)
	p, s := beam.NewPipelineWithRoot()
	pubsubCol := pubsubio.Read(s, project, *topicId, &pubsubio.ReadOptions{
		Subscription: *subscription,
	})

	timeInterval := 1 * time.Minute

	windowedPubsubCol := beam.WindowInto(s, window.NewFixedWindows(timeInterval), pubsubCol)

	periodicImp := periodic.Impulse(s, time.Now(), time.Now().AddDate(10, 0, 0), timeInterval, false)
	updatedSideInput := beam.ParDo(s, SideInputGeneratorDoFn, periodicImp)
	windowedSideInput := beam.WindowInto(s, window.NewFixedWindows(timeInterval),
		updatedSideInput,
		beam.Trigger(trigger.Repeat(trigger.Always())),
		beam.PanesDiscard(),
	)

	col2 := beam.ParDo(s, &DoFn1{}, windowedPubsubCol, beam.SideInput{Input: windowedSideInput})
	col3 := beam.ParDo(s, &DoFn2{}, col2)
	pubsubio.Write(s, *wProject, *wProjectId, col3)

	if err := beamx.Run(ctx, p); err != nil {
		log.Exitf(ctx, "Failed to execute job: %v", err)
	}
}

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@Abacn
Copy link
Contributor

Abacn commented Jul 31, 2023

just note Java PeriodicImpulse having the same issue. Its more likely a Dataflow issue of metrics

@kemkemG0
Copy link
Author

@Abacn
Thank you for your reply.

Then, is there a way to implement the slowly updating side input pattern with the Go SDK?

We have come up with some workaround ideas:

  1. Use another PubSub to simulate the impulse.
  2. Use windowing with beam.Trigger(trigger.Repeat(trigger.AfterProcessingTime().PlusDelay(timeInterval))) and beam.Combine to mimic the impulse.
  3. Store data on local file so that each DoFns can access to it.

However, we would like to avoid complex implementations.

@Abacn
Copy link
Contributor

Abacn commented Jul 31, 2023

If I understood correctly this is just an artifact of Dataflow UI. Does it affect the functionality of PeriodicImpulse?

@victorrgez
Copy link

Same issue here with Python but without having explicitly specified an End Time. Once Dataflow scales up, it will not scale down. As you can see, it has many many weeks in backlog even though Data freshness is already 0 because everything is already being processed at a good pace after catching up with the real backlog

image

image

image

@victorrgez
Copy link

victorrgez commented Jun 4, 2024

I believe this has to do with the fact that PeriodicImpulse uses ImpulseSeqGenDoFn which has a restriction_tracker behind the scenes. Similarly to a SplittableDoFn, it is indeed telling Dataflow that there is a huge backlog yet to process.

EDITED (Answer in next comment):
What would be the suitable alternative then for slowly-changing side inputs?

@victorrgez
Copy link

@kemkemG0 In case you did not manage to solve the issue, the whole problem is in the class ImpulseSeqGenRestrictionProvider in the method restriction_size which is the one that tells Dataflow how much backlog there is to be processed, and hence, it tells Dataflow not to downscale because there is too much to process yet.

In can be solved "easily" by replacing this method in a subclass for:

def restriction_size(self, unused_element, restriction):
        """
        This removes the backlog issue by specifying the size to be processed is 0
        """
        return 0

And then subclass ImpulseSeqGenDoFn so that it uses this RestrictionProvider and, finally subclass PeriodicImpulse so that it uses your ImpulseSeqGenDoFn

@victorrgez
Copy link

closed by #32506

@Abacn Abacn closed this as completed Nov 25, 2024
@github-actions github-actions bot added this to the 2.62.0 Release milestone Nov 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants