Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: TextIO read record twice for large delimited files #29146

Closed
2 of 16 tasks
vatanrathi opened this issue Oct 26, 2023 · 33 comments · Fixed by #30026
Closed
2 of 16 tasks

[Bug]: TextIO read record twice for large delimited files #29146

vatanrathi opened this issue Oct 26, 2023 · 33 comments · Fixed by #30026

Comments

@vatanrathi
Copy link

What happened?

Beam TextIO reader reads same record twice for large (~45GB) delimited (.DAT) files . No issues encountered when same file was splitted into smaller files and processed together.
This behaviour is not intermittent or random. Same records appears twice in every run.

wc -l test_tx_20211101.dat
78132206 test_tx_20211101.dat 

record count of 78132206 was also verified loading file in spark which gives same count
Note: records are delimited with CRLF (^M$)

pipeline to read file:

input.apply("Read lines", TextIO.read().from(<s3 file path)
.apply("timestamp", WithTimestamp.of(e -> Instant.now()))
.setCoder(StringUtf8Coder.of())
.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults())
.apply(ParDo.of(new LogOutput<>("Pcollection count: ")))

Same code was executed with beam 2.23.0 + aws sdk1 & beam 2.48.0 + aws sdk2

beam 2.23.0 + aws sdk1 --> Pcollection count: 78132206
beam 2.48.0 + aws sdk2 --> Pcollection count: 78132208

To eliminate the possibility of a record being corrupted , I deleted first (of 2) record which appeared twice.. Now the next record to it appeared twice and second duplicate record also shift down to next one .. this implies that something went wrong while read at certain byte offset

**Scenerio ** records at line # 33554433 & 67108865 appeared twice
Line# item
1
.
.
33554433 abc
.
.
.
67108865 xyz
.
.
78132206

In second test, when record at line # 33554433 was deleted, now the records at 33554433 (which was at line# 33554434) and 67108865 (which was at line# 67108866) were read twice

It seems records are read successfully until records# 33554432 and then record#33554433 read twice
Note: 67108865 - 33554433 = 33554432

Record#33554433 --> Line 33554433 of 78132206 ; Word 871941986 of 2028601968; Byte 21374174185 of 49770215222
Record#67108865 --> Line 67108865 of 78132206 ; Word 1743499355 of 2028601968; Byte 42748346369 of 49770215222

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@vatanrathi
Copy link
Author

@mosche @aromanenko-dev Can you please suggest what might be going on here ? Or, please assign to someone who can help

@johnjcasey
Copy link
Contributor

johnjcasey commented Oct 27, 2023

Is it possible for you to try and narrow down which beam version this was introduced in, by replaying this for more beam versions? identifying the version that this was introduced in will greatly help diagnostics

@vatanrathi
Copy link
Author

@johnjcasey beam version <2.48 had a performance issues (#25991) with large files which is stopping me to test this file. I tried running on 2.40, and it is still running after 18hrs.. so can you suggest any other option to provide you stats you are after ?

@johnjcasey
Copy link
Contributor

I'm trying to understand which beam version introduced this issue. 2.23 - 2.48 is a very large range of changes to try and debug.

Is it feasible to share this file? I could try and reproduce this with a non S3 filesystem.

@vatanrathi
Copy link
Author

I cant share file as it has company sensitive data.
I feel this is related to aws2 sdk..

@johnjcasey
Copy link
Contributor

I assume not, but is it possible to load this file into GCS, and try from there? just to try and isolate the issue?

@vatanrathi
Copy link
Author

I processed same file for all versions starting 2.40 and can confirm that issue started appearing from 2.43.0 onwards.
Can we identify breaking change?

@aromanenko-dev
Copy link
Contributor

It would be great to narrow down this issue and create a reproducible use case.

@johnjcasey
Copy link
Contributor

What is the typical width of a line in your file? I'm going to try and generate a scenario that reproduces this

@johnjcasey
Copy link
Contributor

Also, I have a commit in 2.43 that is at least a plausible cause for this bug. #23196

@vatanrathi are you able to run a custom version of beam against your file, to verify if this is the faulty commit or not?

@vatanrathi
Copy link
Author

Hi @johnjcasey yes, I should be able to execute test with a custom version.
Please share the same

@johnjcasey
Copy link
Contributor

Here is the branch with the revert. It is otherwise 2.43.0, so it should be a fair comparison. https://github.com/johnjcasey/beam/tree/2.43.0-no-fileio-improvement

@johnjcasey
Copy link
Contributor

Also, is it possible to create a file that has the same structure, but without private data? I'm trying to brainstorm ways to make it so I can debug locally.

@johnjcasey
Copy link
Contributor

Hi @vatanrathi , did you have any luck with that branch?

@vatanrathi
Copy link
Author

@johnjcasey I have been busy with something else.. I apologise for the delay.. I will get it tested over the weekend.Just to confirm your changes are only in Java core module ?

@vatanrathi
Copy link
Author

vatanrathi commented Dec 4, 2023

@johnjcasey I can confirm that duplicate issue is resolved with custom java core package built with your branch. So, it seems bug was introduced as part of the that change mentioned above. Would you be able to let me know how we can take it further so fix is available in main beam release ?

@johnjcasey
Copy link
Contributor

that is good and bad news. Thanks for all your help debugging this. Someone from my team will be looking at this in depth.

It would be incredibly helpful if we could try and repro this locally. Is there a way you could create a similar file that repros the issue, without containing sensitive data?

@vatanrathi
Copy link
Author

Hi @johnjcasey is there an update on bug fix ? Unfortunately, i was unable to generate a test file for you.

@johnjcasey
Copy link
Contributor

We are planning to address it in Q1. We will need to generate a test file on our end to replicate this first, which will be hard. Can you tell us more about your file? How big was it, how many lines, etc.?

@shunping
Copy link
Contributor

Could we also know if the line has a fixed size or not?

@shunping
Copy link
Contributor

@vatanrathi To isolate the problem, could you add .withDelimiter(new byte[] { '\r', '\n' }) when calling TextIO.read() and run your pipeline with the original release 2.43.0?

@shunping
Copy link
Contributor

shunping commented Jan 2, 2024

Hi @vatanrathi, hope you enjoyed the holidays. I am checking to see if you have tried my comment (#29146 (comment))?

In addition to the questions about the file in use (the size of the file, whether the line has a fixed size), could you also let us know which runner (e.g. direct runner, dataflow runner, flink runner etc) you were using to run Beam?

@johnjcasey johnjcasey removed their assignment Jan 3, 2024
@vatanrathi
Copy link
Author

Hi @shunping I havent tried your suggested approach as our nonprod aws systems are down until 8th Jan. I will do it once it is online and will confirm the results.
Also, it is not a fixed width file and runner is spark

@vatanrathi
Copy link
Author

49770215222

File size 49.78 GB. Total records are 78132206 including header

@shunping
Copy link
Contributor

shunping commented Jan 4, 2024

Thanks for your reply! Please keep me posted on how it goes with my suggestion. I am actively working on this issue right now, and I will let you know if i have other questions or find out anything.

@shunping
Copy link
Contributor

Hi @vatanrathi, do you have a chance to run the pipeline with my suggestion in #29146 (comment)?

Also could you let me know what version of spark you are running the pipeline on?

@vatanrathi
Copy link
Author

Hi @shunping yes tried with adding suggested delimiter, but my pipelines failed to generate any output parquet files after adding that.
I am running spark 3.1.1

@shunping
Copy link
Contributor

shunping commented Jan 11, 2024

Hi @vatanrathi, I think there could be a problem when we split the file, but I need some more info from you to confirm my guess.

Are you able to run the beam in my branch https://github.com/shunping/beam/tree/add-log-to-2.43.0 (where I added a logging message in FileBasedSource), and send me back some (if not all ) of the output logs? I would need the log messages that contain "Beam-issue-29146" or "split" or "Split" at least to further debug the issue.

Thanks for your help!

@vatanrathi
Copy link
Author

Hi @shunping i got required logs but i am unable to share it.. my org doesn't allow any kind of data to be shared outside including github. Is there a specific info you are after which i can check in logs..
I know its not very helpful ..

@shunping
Copy link
Contributor

shunping commented Jan 13, 2024

@vatanrathi, no worries! That's totally understandable.

Meanwhile, I am able to reproduce the issue in my environment and I have a fix for you to try.
Could you run your pipeline with Beam in this branch https://github.com/shunping/beam/tree/fix-dup-rows-2.43?

Thanks!

@vatanrathi
Copy link
Author

@shunping Thats great! I will get it tested today and will confirm

@vatanrathi
Copy link
Author

@shunping I tested core module with your branch and it returns correct result without any duplicates.

@shunping
Copy link
Contributor

Great! Thanks for verifying the fix, @vatanrathi. I will push it to the master branch later this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants