Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Spark-compatible CAST between integer types #340

Merged
merged 16 commits into from
May 3, 2024

Conversation

ganeshkumar269
Copy link
Contributor

Which issue does this PR close?

Closes #311

Rationale for this change

What changes are included in this PR?

  • handled cases where casting b/w ints causes difference in behaviour b/w datafusion and spark. for eg. long to int, int to byte etc.
  • ignored the cases where the behaviour is normal. for eg. int to long, short to int, all cases of TRY mode.

How are these changes tested?

added corresponding scala test cases.

@viirya viirya changed the title Fix/311 feat: Implement Spark-compatible CAST between integer types Apr 28, 2024
@ganeshkumar269
Copy link
Contributor Author

Hi @viirya @andygrove , firstly please let me know if this PR aligns with the expectations on how to fix the issue, if not kindly provide pointers on how I can move in the right direction.

Also reg the errors in the CI pipeline, looks like error message for 3.2 is a bit different compared to 3.3 and 3.4 for overflow cases. so in the rust code I will have to add a check for spark version and return the error message accordingly.
How can I get spark version from the rust side?

@viirya
Copy link
Member

viirya commented Apr 29, 2024

Also reg the errors in the CI pipeline, looks like error message for 3.2 is a bit different compared to 3.3 and 3.4 for overflow cases. so in the rust code I will have to add a check for spark version and return the error message accordingly.
How can I get spark version from the rust side?

For this kind of case, we tend to return same error message at native side, but handle this difference in Scala tests.

@andygrove
Copy link
Member

As @viirya said, we can handle the difference in error message format in the scala test (we already have examples of this in the castTest method).

For versions prior to 3.4 perhaps you could just check for the word "overflow" in the error message.

- cast short to byte *** FAILED *** (439 milliseconds)
  "Execution error: [CAST_OVERFLOW] The value 18716S of the type "SMALLINT" cannot be cast to "TINYINT" due to an overflow. Use `try_cast` to tolerate overflow and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error." did not contain "asting 18716 to tinyint causes overflow" (CometCastSuite.scala:200)

ganesh.maddula added 2 commits May 1, 2024 01:46
# Conflicts:
#	spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@ganeshkumar269
Copy link
Contributor Author

thanks for the inputs @viirya @andygrove , I have added another check in the assert statement where we compare exception messages.
Do you think this approach looks good? or do I need to separate the "invalid cast" and "overflow" assert statements.

If this change is fine, I will modify the code comments to accommodate "overflow" exception aswell.

@@ -722,7 +746,8 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
// Comet message is in format `The value 'VALUE' of the type FROM_TYPE cannot be cast to TO_TYPE`
// We just check that the comet message contains the same invalid value as the Spark message
val sparkInvalidValue = sparkMessage.substring(sparkMessage.indexOf(':') + 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we are handling multiple types of error here, we should probably check if sparkMessage.indexOf(':') returns a non-zero value before trying to use it.

@andygrove
Copy link
Member

thanks for the inputs @viirya @andygrove , I have added another check in the assert statement where we compare exception messages. Do you think this approach looks good? or do I need to separate the "invalid cast" and "overflow" assert statements.

If this change is fine, I will modify the code comments to accommodate "overflow" exception aswell.

I think this general approach is OK to handle different types of expected errors. I left a specific comment on the code as well.

ganesh.maddula added 4 commits May 1, 2024 23:13
# Conflicts:
#	core/src/execution/datafusion/expressions/cast.rs
added a check before we fetch the sparkInvalidValue
@ganeshkumar269
Copy link
Contributor Author

Hi @andygrove , i have added a check before we fetch sparkInvalidValue, defaulting it to EMPTY_STRING if ':' is not present.
Also added additional comments on why we are checking for the presence of 'overflow' string.

sparkMessage.substring(sparkMessage.indexOf(':') + 2)
}
assert(
cometMessage.contains(sparkInvalidValue) || cometMessage.contains("overflow"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If sparkInvalidValue is EMPTY_STRING, won't cometMessage.contains(sparkInvalidValue) always be true?

Copy link
Contributor Author

@ganeshkumar269 ganeshkumar269 May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, my bad 😅 . so incase sparkMessage doesnt have ':' should I assert on just commetMessage.contains("overflow")
something like this,

if sparkMessage.indexOf(':') == -1 then assert(commetMessage.contains("overflow"))
else assert(commetMessage.contains(sparkInvalidValue))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, something like that. I haven't reviewed the overflow messages to see if they contain : though (in any of the spark versions 3.2, 3.3, and 3.4)

Copy link
Contributor Author

@ganeshkumar269 ganeshkumar269 May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesnt look like overflow error message has ':' in it, i ran spark.sql("select cast(9223372036854775807 as int)").show() in my local on various spark versions.

3.4 - [CAST_OVERFLOW] The value 9223372036854775807L of the type "BIGINT" cannot be cast to "INT" due to an overflow. Use try_cast to tolerate overflow and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error
3.3 - The value 9223372036854775807L of the type "BIGINT" cannot be cast to "INT" due to an overflow. Use try_cast to tolerate overflow and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error
3.2 - Casting 9223372036854775807 to int causes overflow

ganesh.maddula added 3 commits May 2, 2024 22:59
@ganeshkumar269 ganeshkumar269 requested a review from andygrove May 3, 2024 17:46
@@ -665,6 +665,30 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
castTest(generateTimestamps(), DataTypes.DateType)
}

test("cast short to byte") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods already exist in main but have different naming, so I think you need to upmerge/rebase against main.

Example:

ignore("cast ShortType to ByteType")

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I'll be happy to approve once upmerged/reabsed against main branch

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @ganeshkumar269

@ganeshkumar269
Copy link
Contributor Author

thanks @andygrove 🙏🏾 , glad to be a contributor to comet.

@andygrove
Copy link
Member

@ganeshkumar269 It looks like the error message check needs a little more work. Some tests are failing on Spark 3.3.

This error message does not contain : or overflow. In this specific case, looking for CAST_INVALID_INPUT would be a more robust check.

- cast StringType to LongType *** FAILED *** (416 milliseconds)
  "[CAST_INVALID_INPUT] The value '-9223372036854775809' of the type "STRING" cannot be cast to "BIGINT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error." 
did not contain "overflow" (CometCastSuite.scala:881)

# Conflicts:
#	core/src/execution/datafusion/expressions/cast.rs
@ganeshkumar269 ganeshkumar269 requested a review from andygrove May 3, 2024 19:47
@andygrove
Copy link
Member

@ganeshkumar269 My original code for comparing errors in 3.2/3.3 was not very robust. I am also looking at this now to see if I can help improve these checks.

@ganeshkumar269
Copy link
Contributor Author

@ganeshkumar269 My original code for comparing errors in 3.2/3.3 was not very robust. I am also looking at this now to see if I can help improve these checks.

is there a way I could help here?

@andygrove
Copy link
Member

@ganeshkumar269 here is my suggestion:

            if (CometSparkSessionExtensions.isSpark34Plus) {
              // for Spark 3.4 we expect to reproduce the error message exactly
              assert(cometMessage == sparkMessage)
            } else if (CometSparkSessionExtensions.isSpark33Plus) {
              // for Spark 3.3 we just need to strip the prefix from the Comet message
              // before comparing
              val cometMessageModified = cometMessage
                .replace("[CAST_INVALID_INPUT] ", "")
                .replace("[CAST_OVERFLOW] ", "")
              assert(cometMessageModified == sparkMessage)
            } else if (CometSparkSessionExtensions.isSpark32) {
              // for Spark 3.2 we just make sure we are seeing a similar type of error
              if (sparkMessage.contains("causes overflow")) {
                assert(cometMessage.contains("due to an overflow"))
              } else {
                // assume that this is an invalid input message in the form:
                // `invalid input syntax for type numeric: -9223372036854775809`
                // we just check that the Comet message contains the same literal value
                val i = sparkMessage.indexOf(':') + 2
                assert(cometMessage.contains(sparkMessage.substring(i)))
              }
            }

I tested this from the command line for all Spark versions using:

mvn test -Pspark-3.2 -DwildcardSuites=org.apache.comet.CometCastSuite -Dspotless.check.skip=true

@andygrove andygrove merged commit b39ed88 into apache:main May 3, 2024
28 checks passed
himadripal pushed a commit to himadripal/datafusion-comet that referenced this pull request Sep 7, 2024
* handled cast for long to short

* handled cast for all overflow cases

* ran make format

* added check for overflow exception for 3.4 below.

* added comments to on why we do overflow check.
added a check before we fetch the sparkInvalidValue

* -1 instead of 0, -1 indicates the provided character is not present

* ran mvn spotless:apply

* check for presence of ':' and have asserts accordingly

* reusing exising test functions

* added one more check in assert when ':' is not present

* redo the compare logic as per andy's suggestions.

---------

Co-authored-by: ganesh.maddula <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement Spark-compatible CAST between integer types
3 participants