From e77004836b2b636cb802f0eeefd09c9239acdcd6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 14 Jun 2024 14:05:23 -0600 Subject: [PATCH 01/16] reimplement xxhash64 --- core/src/execution/datafusion/spark_hash.rs | 149 +++++++++++++++++++- 1 file changed, 143 insertions(+), 6 deletions(-) diff --git a/core/src/execution/datafusion/spark_hash.rs b/core/src/execution/datafusion/spark_hash.rs index a31752d4a..fde0b1b25 100644 --- a/core/src/execution/datafusion/spark_hash.rs +++ b/core/src/execution/datafusion/spark_hash.rs @@ -21,8 +21,7 @@ use arrow::{ compute::take, datatypes::{ArrowNativeTypeOp, UInt16Type, UInt32Type, UInt64Type, UInt8Type}, }; -use std::{hash::Hasher, sync::Arc}; -use twox_hash::XxHash64; +use std::sync::Arc; use datafusion::{ arrow::{ @@ -99,12 +98,134 @@ pub(crate) fn spark_compatible_murmur3_hash>(data: T, seed: u32) } } +const CHUNK_SIZE: usize = 32; + +pub const PRIME_1: u64 = 11_400_714_785_074_694_791; +pub const PRIME_2: u64 = 14_029_467_366_897_019_727; +pub const PRIME_3: u64 = 1_609_587_929_392_839_161; +pub const PRIME_4: u64 = 9_650_029_242_287_828_579; +pub const PRIME_5: u64 = 2_870_177_450_012_600_261; + #[inline] pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u64 { - // TODO: Rewrite with a stateless hasher to reduce stack allocation? - let mut hasher = XxHash64::with_seed(seed); - hasher.write(data.as_ref()); - hasher.finish() + let data: &[u8] = data.as_ref(); + let length_bytes = data.len(); + + // XxCore::with_seed + let mut v1 = seed.wrapping_add(PRIME_1).wrapping_add(PRIME_2); + let mut v2 = seed.wrapping_add(PRIME_2); + let mut v3 = seed; + let mut v4 = seed.wrapping_sub(PRIME_1); + + // XxCore::ingest_chunks + #[inline(always)] + fn ingest_one_number(mut current_value: u64, mut value: u64) -> u64 { + value = value.wrapping_mul(PRIME_2); + current_value = current_value.wrapping_add(value); + current_value = current_value.rotate_left(31); + current_value.wrapping_mul(PRIME_1) + } + + // process chunks of 32 bytes + let mut offset_u64 = 0; + let ptr_u64 = data.as_ptr() as *const u64; + unsafe { + while offset_u64 * CHUNK_SIZE + CHUNK_SIZE <= length_bytes { + v1 = ingest_one_number(v1, ptr_u64.add(offset_u64).read_unaligned().to_le()); + v2 = ingest_one_number(v2, ptr_u64.add(offset_u64 + 1).read_unaligned().to_le()); + v3 = ingest_one_number(v3, ptr_u64.add(offset_u64 + 2).read_unaligned().to_le()); + v4 = ingest_one_number(v4, ptr_u64.add(offset_u64 + 3).read_unaligned().to_le()); + offset_u64 += 4; + } + } + let total_len = offset_u64 as u64 * 8_u64; + + let mut hash = if total_len >= CHUNK_SIZE as u64 { + // We have processed at least one full chunk + // XxCore::finish + #[allow(unknown_lints, clippy::needless_late_init)] // keeping things parallel + let mut hash; + + hash = v1.rotate_left(1); + hash = hash.wrapping_add(v2.rotate_left(7)); + hash = hash.wrapping_add(v3.rotate_left(12)); + hash = hash.wrapping_add(v4.rotate_left(18)); + + #[inline(always)] + fn mix_one(mut hash: u64, mut value: u64) -> u64 { + value = value.wrapping_mul(PRIME_2); + value = value.rotate_left(31); + value = value.wrapping_mul(PRIME_1); + hash ^= value; + hash = hash.wrapping_mul(PRIME_1); + hash.wrapping_add(PRIME_4) + } + + hash = mix_one(hash, v1); + hash = mix_one(hash, v2); + hash = mix_one(hash, v3); + hash = mix_one(hash, v4); + + hash + } else { + seed.wrapping_add(PRIME_5) + }; + + hash = hash.wrapping_add(total_len); + + // process u64s + while offset_u64 * 8 + 8 < length_bytes { + let mut k1 = unsafe { + ptr_u64 + .add(offset_u64) + .read_unaligned() + .to_le() + .wrapping_mul(PRIME_2) + }; + k1 = k1.rotate_left(31); + k1 = k1.wrapping_mul(PRIME_1); + hash ^= k1; + hash = hash.rotate_left(27); + hash = hash.wrapping_mul(PRIME_1); + hash = hash.wrapping_add(PRIME_4); + offset_u64 += 1; + } + + // process u32s + let ptr_u32 = data[offset_u64 * 8..].as_ptr() as *const u32; + let length_bytes = length_bytes - offset_u64 * 8; + let mut offset_u32 = 0; + while offset_u32 * 4 + 4 < length_bytes { + let k1 = unsafe { + u64::from(ptr_u32.add(offset_u32).read_unaligned().to_le()).wrapping_mul(PRIME_1) + }; + hash ^= k1; + hash = hash.rotate_left(23); + hash = hash.wrapping_mul(PRIME_2); + hash = hash.wrapping_add(PRIME_3); + offset_u32 += 1; + } + + // process u8s + let data = &data[offset_u32 * 4..]; + let length_bytes = length_bytes - offset_u32 * 4; + let mut offset_u8 = 0; + while offset_u8 < length_bytes { + let k1 = u64::from(data[offset_u8]).wrapping_mul(PRIME_5); + hash ^= k1; + hash = hash.rotate_left(11); + hash = hash.wrapping_mul(PRIME_1); + offset_u8 += 1; + } + + // The final intermixing + hash ^= hash >> 33; + hash = hash.wrapping_mul(PRIME_2); + hash ^= hash >> 29; + hash = hash.wrapping_mul(PRIME_3); + hash ^= hash >> 32; + + hash } macro_rules! hash_array { @@ -504,13 +625,16 @@ pub(crate) fn pmod(hash: u32, n: usize) -> usize { #[cfg(test)] mod tests { + use super::spark_compatible_xxhash64; use arrow::array::{Float32Array, Float64Array}; + use std::hash::Hasher; use std::sync::Arc; use crate::execution::datafusion::spark_hash::{ create_murmur3_hashes, create_xxhash64_hashes, pmod, }; use datafusion::arrow::array::{ArrayRef, Int32Array, Int64Array, Int8Array, StringArray}; + use twox_hash::XxHash64; macro_rules! test_hashes_internal { ($hash_method: ident, $input: expr, $initial_seeds: expr, $expected: expr) => { @@ -564,6 +688,19 @@ mod tests { test_hashes_with_nulls!(create_xxhash64_hashes, T, values, expected, u64); } + #[test] + fn test_xxhash64() { + check_xxhash64("12345678123456781234567812345678", 42_u64); + } + + fn check_xxhash64(data: &str, seed: u64) { + let mut hasher = XxHash64::with_seed(seed); + hasher.write(data.as_ref()); + let hash1 = hasher.finish(); + let hash2 = spark_compatible_xxhash64(data, seed); + assert_eq!(hash1, hash2); + } + #[test] fn test_i8() { test_murmur3_hash::( From 47bb4c1c918f930b8b5abeb7be0ea075aa9bc514 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 14 Jun 2024 14:09:08 -0600 Subject: [PATCH 02/16] move twox_hash to build dep --- core/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 04a45f6ca..10c595e64 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -82,7 +82,6 @@ once_cell = "1.18.0" regex = "1.9.6" crc32fast = "1.3.2" simd-adler32 = "0.3.7" -twox-hash = "1.6.3" [build-dependencies] prost-build = "0.9.0" @@ -94,6 +93,7 @@ jni = { version = "0.21", features = ["invocation"] } lazy_static = "1.4" assertables = "7" hex = "0.4.3" +twox-hash = "1.6.3" [features] default = [] From 6fd69309cadb1597330839df3caf0458182e5d6f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 14 Jun 2024 14:20:42 -0600 Subject: [PATCH 03/16] bug fix --- core/Cargo.toml | 2 +- core/src/execution/datafusion/spark_hash.rs | 15 ++++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 04a45f6ca..10c595e64 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -82,7 +82,6 @@ once_cell = "1.18.0" regex = "1.9.6" crc32fast = "1.3.2" simd-adler32 = "0.3.7" -twox-hash = "1.6.3" [build-dependencies] prost-build = "0.9.0" @@ -94,6 +93,7 @@ jni = { version = "0.21", features = ["invocation"] } lazy_static = "1.4" assertables = "7" hex = "0.4.3" +twox-hash = "1.6.3" [features] default = [] diff --git a/core/src/execution/datafusion/spark_hash.rs b/core/src/execution/datafusion/spark_hash.rs index fde0b1b25..8afbb923e 100644 --- a/core/src/execution/datafusion/spark_hash.rs +++ b/core/src/execution/datafusion/spark_hash.rs @@ -138,7 +138,7 @@ pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u offset_u64 += 4; } } - let total_len = offset_u64 as u64 * 8_u64; + let total_len = data.len() as u64; let mut hash = if total_len >= CHUNK_SIZE as u64 { // We have processed at least one full chunk @@ -171,7 +171,7 @@ pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u seed.wrapping_add(PRIME_5) }; - hash = hash.wrapping_add(total_len); + hash = hash.wrapping_add(total_len as u64); // process u64s while offset_u64 * 8 + 8 < length_bytes { @@ -192,7 +192,8 @@ pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u } // process u32s - let ptr_u32 = data[offset_u64 * 8..].as_ptr() as *const u32; + let data = &data[offset_u64 * 8..]; + let ptr_u32 = data.as_ptr() as *const u32; let length_bytes = length_bytes - offset_u64 * 8; let mut offset_u32 = 0; while offset_u32 * 4 + 4 < length_bytes { @@ -598,7 +599,7 @@ pub(crate) fn create_murmur3_hashes<'a>( } /// Creates xxhash64 hash values for every row, based on the values in the -/// columns. +/// columns.7 /// /// The number of rows to hash is determined by `hashes_buffer.len()`. /// `hashes_buffer` should be pre-sized appropriately @@ -690,7 +691,11 @@ mod tests { #[test] fn test_xxhash64() { - check_xxhash64("12345678123456781234567812345678", 42_u64); + // check_xxhash64("12345678123456781234567812345678", 42_u64); + check_xxhash64("12345678123456781234567812345678a", 42_u64); + // check_xxhash64("12345678123456781234567812345678aab", 42_u64); + // check_xxhash64("a", 42_u64); + // check_xxhash64("aab", 42_u64); } fn check_xxhash64(data: &str, seed: u64) { From 07f5b5599e380b518cb04205ed648b480debb951 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 14 Jun 2024 14:28:17 -0600 Subject: [PATCH 04/16] more tests --- core/src/execution/datafusion/spark_hash.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/execution/datafusion/spark_hash.rs b/core/src/execution/datafusion/spark_hash.rs index 8afbb923e..c73fdc2f1 100644 --- a/core/src/execution/datafusion/spark_hash.rs +++ b/core/src/execution/datafusion/spark_hash.rs @@ -691,11 +691,12 @@ mod tests { #[test] fn test_xxhash64() { - // check_xxhash64("12345678123456781234567812345678", 42_u64); + // TODO fuzz testing + check_xxhash64("12345678123456781234567812345678", 42_u64); check_xxhash64("12345678123456781234567812345678a", 42_u64); - // check_xxhash64("12345678123456781234567812345678aab", 42_u64); - // check_xxhash64("a", 42_u64); - // check_xxhash64("aab", 42_u64); + check_xxhash64("12345678123456781234567812345678aab", 42_u64); + check_xxhash64("a", 42_u64); + check_xxhash64("aab", 42_u64); } fn check_xxhash64(data: &str, seed: u64) { From 91c9aea63b00552b800f3e787027e721090a9947 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 14 Jun 2024 14:43:43 -0600 Subject: [PATCH 05/16] clippy --- core/src/execution/datafusion/spark_hash.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/execution/datafusion/spark_hash.rs b/core/src/execution/datafusion/spark_hash.rs index c73fdc2f1..4d4384c8d 100644 --- a/core/src/execution/datafusion/spark_hash.rs +++ b/core/src/execution/datafusion/spark_hash.rs @@ -171,7 +171,7 @@ pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u seed.wrapping_add(PRIME_5) }; - hash = hash.wrapping_add(total_len as u64); + hash = hash.wrapping_add(total_len); // process u64s while offset_u64 * 8 + 8 < length_bytes { From 50539c43b2856cd59a98d35b6595348dca702c4e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 18 Jun 2024 09:10:18 -0600 Subject: [PATCH 06/16] bug fix --- core/src/execution/datafusion/spark_hash.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/execution/datafusion/spark_hash.rs b/core/src/execution/datafusion/spark_hash.rs index 4d4384c8d..6e172d9cf 100644 --- a/core/src/execution/datafusion/spark_hash.rs +++ b/core/src/execution/datafusion/spark_hash.rs @@ -174,7 +174,7 @@ pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u hash = hash.wrapping_add(total_len); // process u64s - while offset_u64 * 8 + 8 < length_bytes { + while offset_u64 * 8 + 8 <= length_bytes { let mut k1 = unsafe { ptr_u64 .add(offset_u64) @@ -196,7 +196,7 @@ pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u let ptr_u32 = data.as_ptr() as *const u32; let length_bytes = length_bytes - offset_u64 * 8; let mut offset_u32 = 0; - while offset_u32 * 4 + 4 < length_bytes { + while offset_u32 * 4 + 4 <= length_bytes { let k1 = unsafe { u64::from(ptr_u32.add(offset_u32).read_unaligned().to_le()).wrapping_mul(PRIME_1) }; From d1b975e6ef5565caba4ea8c787c6031d47820654 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 18 Jun 2024 09:28:17 -0600 Subject: [PATCH 07/16] attribution --- NOTICE.txt | 21 +++++++++++++++++++++ core/src/execution/datafusion/spark_hash.rs | 10 +++++----- 2 files changed, 26 insertions(+), 5 deletions(-) create mode 100644 NOTICE.txt diff --git a/NOTICE.txt b/NOTICE.txt new file mode 100644 index 000000000..b88f04ca2 --- /dev/null +++ b/NOTICE.txt @@ -0,0 +1,21 @@ +This product includes software from the twox-hash project + * Copyright https://github.com/shepmaster/twox-hash + * Licensed under the MIT License; + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. \ No newline at end of file diff --git a/core/src/execution/datafusion/spark_hash.rs b/core/src/execution/datafusion/spark_hash.rs index 6e172d9cf..791261fcb 100644 --- a/core/src/execution/datafusion/spark_hash.rs +++ b/core/src/execution/datafusion/spark_hash.rs @@ -106,6 +106,10 @@ pub const PRIME_3: u64 = 1_609_587_929_392_839_161; pub const PRIME_4: u64 = 9_650_029_242_287_828_579; pub const PRIME_5: u64 = 2_870_177_450_012_600_261; +/// Custom implementation of xxhash64 based on code from https://github.com/shepmaster/twox-hash +/// but optimized for our use case by removing any intermediate buffering, which is +/// not required because we are operating on data that is already in memory. +/// This results in a 40-50% speedup. #[inline] pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u64 { let data: &[u8] = data.as_ref(); @@ -142,11 +146,7 @@ pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u let mut hash = if total_len >= CHUNK_SIZE as u64 { // We have processed at least one full chunk - // XxCore::finish - #[allow(unknown_lints, clippy::needless_late_init)] // keeping things parallel - let mut hash; - - hash = v1.rotate_left(1); + let mut hash = v1.rotate_left(1); hash = hash.wrapping_add(v2.rotate_left(7)); hash = hash.wrapping_add(v3.rotate_left(12)); hash = hash.wrapping_add(v4.rotate_left(18)); From 5869553f78873c315dc2be50868f3d0404dabd55 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 18 Jun 2024 09:39:13 -0600 Subject: [PATCH 08/16] improve test --- core/src/execution/datafusion/spark_hash.rs | 24 +++++++++++++-------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/core/src/execution/datafusion/spark_hash.rs b/core/src/execution/datafusion/spark_hash.rs index 791261fcb..372944c19 100644 --- a/core/src/execution/datafusion/spark_hash.rs +++ b/core/src/execution/datafusion/spark_hash.rs @@ -599,7 +599,7 @@ pub(crate) fn create_murmur3_hashes<'a>( } /// Creates xxhash64 hash values for every row, based on the values in the -/// columns.7 +/// columns. /// /// The number of rows to hash is determined by `hashes_buffer.len()`. /// `hashes_buffer` should be pre-sized appropriately @@ -635,6 +635,7 @@ mod tests { create_murmur3_hashes, create_xxhash64_hashes, pmod, }; use datafusion::arrow::array::{ArrayRef, Int32Array, Int64Array, Int8Array, StringArray}; + use rand::Rng; use twox_hash::XxHash64; macro_rules! test_hashes_internal { @@ -691,20 +692,25 @@ mod tests { #[test] fn test_xxhash64() { - // TODO fuzz testing - check_xxhash64("12345678123456781234567812345678", 42_u64); - check_xxhash64("12345678123456781234567812345678a", 42_u64); - check_xxhash64("12345678123456781234567812345678aab", 42_u64); - check_xxhash64("a", 42_u64); - check_xxhash64("aab", 42_u64); + let mut rng = rand::thread_rng(); + for len in 1..128 { + for _ in 0..10 { + let data: Vec = (0..len).map(|_| rng.gen()).collect(); + let seed = rng.gen(); + check_xxhash64(&data, seed); + } + } } - fn check_xxhash64(data: &str, seed: u64) { + fn check_xxhash64(data: &[u8], seed: u64) { let mut hasher = XxHash64::with_seed(seed); hasher.write(data.as_ref()); let hash1 = hasher.finish(); let hash2 = spark_compatible_xxhash64(data, seed); - assert_eq!(hash1, hash2); + if hash1 != hash2 { + panic!("input: {} with seed {seed} produced incorrect hash (comet={hash2}, twox-hash={hash1})", + data.iter().map(|byte| format!("{:02x}", byte)).collect::()) + } } #[test] From edcde273ea9cf2adbfe70787f59d0c3e62884f65 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 18 Jun 2024 10:05:20 -0600 Subject: [PATCH 09/16] bug fix --- core/src/execution/datafusion/spark_hash.rs | 28 ++++++++++++++------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/core/src/execution/datafusion/spark_hash.rs b/core/src/execution/datafusion/spark_hash.rs index 372944c19..acdfb014f 100644 --- a/core/src/execution/datafusion/spark_hash.rs +++ b/core/src/execution/datafusion/spark_hash.rs @@ -131,15 +131,24 @@ pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u } // process chunks of 32 bytes - let mut offset_u64 = 0; + let mut offset_u64_4 = 0; let ptr_u64 = data.as_ptr() as *const u64; unsafe { - while offset_u64 * CHUNK_SIZE + CHUNK_SIZE <= length_bytes { - v1 = ingest_one_number(v1, ptr_u64.add(offset_u64).read_unaligned().to_le()); - v2 = ingest_one_number(v2, ptr_u64.add(offset_u64 + 1).read_unaligned().to_le()); - v3 = ingest_one_number(v3, ptr_u64.add(offset_u64 + 2).read_unaligned().to_le()); - v4 = ingest_one_number(v4, ptr_u64.add(offset_u64 + 3).read_unaligned().to_le()); - offset_u64 += 4; + while offset_u64_4 * CHUNK_SIZE + CHUNK_SIZE <= length_bytes { + v1 = ingest_one_number(v1, ptr_u64.add(offset_u64_4 * 4).read_unaligned().to_le()); + v2 = ingest_one_number( + v2, + ptr_u64.add(offset_u64_4 * 4 + 1).read_unaligned().to_le(), + ); + v3 = ingest_one_number( + v3, + ptr_u64.add(offset_u64_4 * 4 + 2).read_unaligned().to_le(), + ); + v4 = ingest_one_number( + v4, + ptr_u64.add(offset_u64_4 * 4 + 3).read_unaligned().to_le(), + ); + offset_u64_4 += 1; } } let total_len = data.len() as u64; @@ -174,6 +183,7 @@ pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u hash = hash.wrapping_add(total_len); // process u64s + let mut offset_u64 = offset_u64_4 * 4; while offset_u64 * 8 + 8 <= length_bytes { let mut k1 = unsafe { ptr_u64 @@ -691,12 +701,12 @@ mod tests { } #[test] - fn test_xxhash64() { + fn test_xxhash64_random() { let mut rng = rand::thread_rng(); for len in 1..128 { for _ in 0..10 { let data: Vec = (0..len).map(|_| rng.gen()).collect(); - let seed = rng.gen(); + let seed = 42_u64; //rng.gen(); check_xxhash64(&data, seed); } } From 0ba913477e918197728ab8cf903a804e65f2e1ad Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 18 Jun 2024 10:07:47 -0600 Subject: [PATCH 10/16] test with random seed --- core/src/execution/datafusion/spark_hash.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/execution/datafusion/spark_hash.rs b/core/src/execution/datafusion/spark_hash.rs index acdfb014f..79d439a92 100644 --- a/core/src/execution/datafusion/spark_hash.rs +++ b/core/src/execution/datafusion/spark_hash.rs @@ -109,7 +109,7 @@ pub const PRIME_5: u64 = 2_870_177_450_012_600_261; /// Custom implementation of xxhash64 based on code from https://github.com/shepmaster/twox-hash /// but optimized for our use case by removing any intermediate buffering, which is /// not required because we are operating on data that is already in memory. -/// This results in a 40-50% speedup. +/// This results in a 40% speedup. #[inline] pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u64 { let data: &[u8] = data.as_ref(); @@ -703,10 +703,10 @@ mod tests { #[test] fn test_xxhash64_random() { let mut rng = rand::thread_rng(); - for len in 1..128 { + for len in 0..128 { for _ in 0..10 { let data: Vec = (0..len).map(|_| rng.gen()).collect(); - let seed = 42_u64; //rng.gen(); + let seed = rng.gen(); check_xxhash64(&data, seed); } } From 3c20411dc07a062700d4384afb8b0a6f160c0f82 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 18 Jun 2024 10:46:56 -0600 Subject: [PATCH 11/16] remove comment --- core/src/execution/datafusion/spark_hash.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/execution/datafusion/spark_hash.rs b/core/src/execution/datafusion/spark_hash.rs index 79d439a92..5d25395c5 100644 --- a/core/src/execution/datafusion/spark_hash.rs +++ b/core/src/execution/datafusion/spark_hash.rs @@ -109,7 +109,6 @@ pub const PRIME_5: u64 = 2_870_177_450_012_600_261; /// Custom implementation of xxhash64 based on code from https://github.com/shepmaster/twox-hash /// but optimized for our use case by removing any intermediate buffering, which is /// not required because we are operating on data that is already in memory. -/// This results in a 40% speedup. #[inline] pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u64 { let data: &[u8] = data.as_ref(); From e979ee96327ff63989356d7d26d9b3ce355ea221 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 19 Jun 2024 02:53:18 -0600 Subject: [PATCH 12/16] more updated to license/notice --- LICENSE.txt | 24 ++++++++++++++++++++++++ NOTICE.txt | 25 ++++++------------------- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/LICENSE.txt b/LICENSE.txt index d74c6b599..7e89ce369 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -201,6 +201,7 @@ See the License for the specific language governing permissions and limitations under the License. +-------------------------------------------------------------------------------- This project includes code from Apache Aurora. @@ -210,3 +211,26 @@ This project includes code from Apache Aurora. Copyright: 2016 The Apache Software Foundation. Home page: https://aurora.apache.org/ License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This project includes software from the twox-hash project (MIT License) +https://github.com/shepmaster/twox-hash + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. diff --git a/NOTICE.txt b/NOTICE.txt index b88f04ca2..655222a26 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -1,21 +1,8 @@ -This product includes software from the twox-hash project - * Copyright https://github.com/shepmaster/twox-hash - * Licensed under the MIT License; +Apache DataFusion Comet +Copyright 2024 The Apache Software Foundation - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). - The above copyright notice and this permission notice shall be included in - all copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - THE SOFTWARE. \ No newline at end of file +This product includes software from the twox-hash project (MIT License) +https://github.com/shepmaster/twox-hash \ No newline at end of file From 45187bd2e688cbc02b26a7bc351596b74b248955 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 19 Jun 2024 08:26:55 -0600 Subject: [PATCH 13/16] remove redundant variable --- core/src/execution/datafusion/spark_hash.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/execution/datafusion/spark_hash.rs b/core/src/execution/datafusion/spark_hash.rs index 5d25395c5..aa32f3879 100644 --- a/core/src/execution/datafusion/spark_hash.rs +++ b/core/src/execution/datafusion/spark_hash.rs @@ -150,9 +150,8 @@ pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u offset_u64_4 += 1; } } - let total_len = data.len() as u64; - let mut hash = if total_len >= CHUNK_SIZE as u64 { + let mut hash = if length_bytes >= CHUNK_SIZE { // We have processed at least one full chunk let mut hash = v1.rotate_left(1); hash = hash.wrapping_add(v2.rotate_left(7)); @@ -179,7 +178,7 @@ pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u seed.wrapping_add(PRIME_5) }; - hash = hash.wrapping_add(total_len); + hash = hash.wrapping_add(length_bytes as u64); // process u64s let mut offset_u64 = offset_u64_4 * 4; From 420a87b8468a693b34ba468104c0379be0a919ae Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 19 Jun 2024 08:37:21 -0600 Subject: [PATCH 14/16] refactor to move xxhash64 into separate file --- .../execution/datafusion/expressions/mod.rs | 1 + .../datafusion/expressions/xxhash64.rs | 188 ++++++++++++++++++ core/src/execution/datafusion/spark_hash.rs | 168 +--------------- 3 files changed, 191 insertions(+), 166 deletions(-) create mode 100644 core/src/execution/datafusion/expressions/xxhash64.rs diff --git a/core/src/execution/datafusion/expressions/mod.rs b/core/src/execution/datafusion/expressions/mod.rs index 5d5f58e0c..d91e25980 100644 --- a/core/src/execution/datafusion/expressions/mod.rs +++ b/core/src/execution/datafusion/expressions/mod.rs @@ -43,6 +43,7 @@ pub mod temporal; pub mod unbound; mod utils; pub mod variance; +pub mod xxhash64; #[derive(Debug, Hash, PartialEq, Clone, Copy)] pub enum EvalMode { diff --git a/core/src/execution/datafusion/expressions/xxhash64.rs b/core/src/execution/datafusion/expressions/xxhash64.rs new file mode 100644 index 000000000..3bc52cc18 --- /dev/null +++ b/core/src/execution/datafusion/expressions/xxhash64.rs @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// xxhash64 implementation + +const CHUNK_SIZE: usize = 32; + +const PRIME_1: u64 = 11_400_714_785_074_694_791; +const PRIME_2: u64 = 14_029_467_366_897_019_727; +const PRIME_3: u64 = 1_609_587_929_392_839_161; +const PRIME_4: u64 = 9_650_029_242_287_828_579; +const PRIME_5: u64 = 2_870_177_450_012_600_261; + +/// Custom implementation of xxhash64 based on code from https://github.com/shepmaster/twox-hash +/// but optimized for our use case by removing any intermediate buffering, which is +/// not required because we are operating on data that is already in memory. +#[inline] +pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u64 { + let data: &[u8] = data.as_ref(); + let length_bytes = data.len(); + + // XxCore::with_seed + let mut v1 = seed.wrapping_add(PRIME_1).wrapping_add(PRIME_2); + let mut v2 = seed.wrapping_add(PRIME_2); + let mut v3 = seed; + let mut v4 = seed.wrapping_sub(PRIME_1); + + // XxCore::ingest_chunks + #[inline(always)] + fn ingest_one_number(mut current_value: u64, mut value: u64) -> u64 { + value = value.wrapping_mul(PRIME_2); + current_value = current_value.wrapping_add(value); + current_value = current_value.rotate_left(31); + current_value.wrapping_mul(PRIME_1) + } + + // process chunks of 32 bytes + let mut offset_u64_4 = 0; + let ptr_u64 = data.as_ptr() as *const u64; + unsafe { + while offset_u64_4 * CHUNK_SIZE + CHUNK_SIZE <= length_bytes { + v1 = ingest_one_number(v1, ptr_u64.add(offset_u64_4 * 4).read_unaligned().to_le()); + v2 = ingest_one_number( + v2, + ptr_u64.add(offset_u64_4 * 4 + 1).read_unaligned().to_le(), + ); + v3 = ingest_one_number( + v3, + ptr_u64.add(offset_u64_4 * 4 + 2).read_unaligned().to_le(), + ); + v4 = ingest_one_number( + v4, + ptr_u64.add(offset_u64_4 * 4 + 3).read_unaligned().to_le(), + ); + offset_u64_4 += 1; + } + } + + let mut hash = if length_bytes >= CHUNK_SIZE { + // We have processed at least one full chunk + let mut hash = v1.rotate_left(1); + hash = hash.wrapping_add(v2.rotate_left(7)); + hash = hash.wrapping_add(v3.rotate_left(12)); + hash = hash.wrapping_add(v4.rotate_left(18)); + + #[inline(always)] + fn mix_one(mut hash: u64, mut value: u64) -> u64 { + value = value.wrapping_mul(PRIME_2); + value = value.rotate_left(31); + value = value.wrapping_mul(PRIME_1); + hash ^= value; + hash = hash.wrapping_mul(PRIME_1); + hash.wrapping_add(PRIME_4) + } + + hash = mix_one(hash, v1); + hash = mix_one(hash, v2); + hash = mix_one(hash, v3); + hash = mix_one(hash, v4); + + hash + } else { + seed.wrapping_add(PRIME_5) + }; + + hash = hash.wrapping_add(length_bytes as u64); + + // process u64s + let mut offset_u64 = offset_u64_4 * 4; + while offset_u64 * 8 + 8 <= length_bytes { + let mut k1 = unsafe { + ptr_u64 + .add(offset_u64) + .read_unaligned() + .to_le() + .wrapping_mul(PRIME_2) + }; + k1 = k1.rotate_left(31); + k1 = k1.wrapping_mul(PRIME_1); + hash ^= k1; + hash = hash.rotate_left(27); + hash = hash.wrapping_mul(PRIME_1); + hash = hash.wrapping_add(PRIME_4); + offset_u64 += 1; + } + + // process u32s + let data = &data[offset_u64 * 8..]; + let ptr_u32 = data.as_ptr() as *const u32; + let length_bytes = length_bytes - offset_u64 * 8; + let mut offset_u32 = 0; + while offset_u32 * 4 + 4 <= length_bytes { + let k1 = unsafe { + u64::from(ptr_u32.add(offset_u32).read_unaligned().to_le()).wrapping_mul(PRIME_1) + }; + hash ^= k1; + hash = hash.rotate_left(23); + hash = hash.wrapping_mul(PRIME_2); + hash = hash.wrapping_add(PRIME_3); + offset_u32 += 1; + } + + // process u8s + let data = &data[offset_u32 * 4..]; + let length_bytes = length_bytes - offset_u32 * 4; + let mut offset_u8 = 0; + while offset_u8 < length_bytes { + let k1 = u64::from(data[offset_u8]).wrapping_mul(PRIME_5); + hash ^= k1; + hash = hash.rotate_left(11); + hash = hash.wrapping_mul(PRIME_1); + offset_u8 += 1; + } + + // The final intermixing + hash ^= hash >> 33; + hash = hash.wrapping_mul(PRIME_2); + hash ^= hash >> 29; + hash = hash.wrapping_mul(PRIME_3); + hash ^= hash >> 32; + + hash +} + +#[cfg(test)] +mod test { + use super::spark_compatible_xxhash64; + use rand::Rng; + use std::hash::Hasher; + use twox_hash::XxHash64; + + #[test] + fn test_xxhash64_random() { + let mut rng = rand::thread_rng(); + for len in 0..128 { + for _ in 0..10 { + let data: Vec = (0..len).map(|_| rng.gen()).collect(); + let seed = rng.gen(); + check_xxhash64(&data, seed); + } + } + } + + fn check_xxhash64(data: &[u8], seed: u64) { + let mut hasher = XxHash64::with_seed(seed); + hasher.write(data.as_ref()); + let hash1 = hasher.finish(); + let hash2 = spark_compatible_xxhash64(data, seed); + if hash1 != hash2 { + panic!("input: {} with seed {seed} produced incorrect hash (comet={hash2}, twox-hash={hash1})", + data.iter().map(|byte| format!("{:02x}", byte)).collect::()) + } + } +} diff --git a/core/src/execution/datafusion/spark_hash.rs b/core/src/execution/datafusion/spark_hash.rs index aa32f3879..6a8e2d239 100644 --- a/core/src/execution/datafusion/spark_hash.rs +++ b/core/src/execution/datafusion/spark_hash.rs @@ -34,6 +34,8 @@ use datafusion::{ error::{DataFusionError, Result}, }; +use crate::execution::datafusion::expressions::xxhash64::spark_compatible_xxhash64; + #[inline] pub(crate) fn spark_compatible_murmur3_hash>(data: T, seed: u32) -> u32 { #[inline] @@ -98,145 +100,6 @@ pub(crate) fn spark_compatible_murmur3_hash>(data: T, seed: u32) } } -const CHUNK_SIZE: usize = 32; - -pub const PRIME_1: u64 = 11_400_714_785_074_694_791; -pub const PRIME_2: u64 = 14_029_467_366_897_019_727; -pub const PRIME_3: u64 = 1_609_587_929_392_839_161; -pub const PRIME_4: u64 = 9_650_029_242_287_828_579; -pub const PRIME_5: u64 = 2_870_177_450_012_600_261; - -/// Custom implementation of xxhash64 based on code from https://github.com/shepmaster/twox-hash -/// but optimized for our use case by removing any intermediate buffering, which is -/// not required because we are operating on data that is already in memory. -#[inline] -pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u64 { - let data: &[u8] = data.as_ref(); - let length_bytes = data.len(); - - // XxCore::with_seed - let mut v1 = seed.wrapping_add(PRIME_1).wrapping_add(PRIME_2); - let mut v2 = seed.wrapping_add(PRIME_2); - let mut v3 = seed; - let mut v4 = seed.wrapping_sub(PRIME_1); - - // XxCore::ingest_chunks - #[inline(always)] - fn ingest_one_number(mut current_value: u64, mut value: u64) -> u64 { - value = value.wrapping_mul(PRIME_2); - current_value = current_value.wrapping_add(value); - current_value = current_value.rotate_left(31); - current_value.wrapping_mul(PRIME_1) - } - - // process chunks of 32 bytes - let mut offset_u64_4 = 0; - let ptr_u64 = data.as_ptr() as *const u64; - unsafe { - while offset_u64_4 * CHUNK_SIZE + CHUNK_SIZE <= length_bytes { - v1 = ingest_one_number(v1, ptr_u64.add(offset_u64_4 * 4).read_unaligned().to_le()); - v2 = ingest_one_number( - v2, - ptr_u64.add(offset_u64_4 * 4 + 1).read_unaligned().to_le(), - ); - v3 = ingest_one_number( - v3, - ptr_u64.add(offset_u64_4 * 4 + 2).read_unaligned().to_le(), - ); - v4 = ingest_one_number( - v4, - ptr_u64.add(offset_u64_4 * 4 + 3).read_unaligned().to_le(), - ); - offset_u64_4 += 1; - } - } - - let mut hash = if length_bytes >= CHUNK_SIZE { - // We have processed at least one full chunk - let mut hash = v1.rotate_left(1); - hash = hash.wrapping_add(v2.rotate_left(7)); - hash = hash.wrapping_add(v3.rotate_left(12)); - hash = hash.wrapping_add(v4.rotate_left(18)); - - #[inline(always)] - fn mix_one(mut hash: u64, mut value: u64) -> u64 { - value = value.wrapping_mul(PRIME_2); - value = value.rotate_left(31); - value = value.wrapping_mul(PRIME_1); - hash ^= value; - hash = hash.wrapping_mul(PRIME_1); - hash.wrapping_add(PRIME_4) - } - - hash = mix_one(hash, v1); - hash = mix_one(hash, v2); - hash = mix_one(hash, v3); - hash = mix_one(hash, v4); - - hash - } else { - seed.wrapping_add(PRIME_5) - }; - - hash = hash.wrapping_add(length_bytes as u64); - - // process u64s - let mut offset_u64 = offset_u64_4 * 4; - while offset_u64 * 8 + 8 <= length_bytes { - let mut k1 = unsafe { - ptr_u64 - .add(offset_u64) - .read_unaligned() - .to_le() - .wrapping_mul(PRIME_2) - }; - k1 = k1.rotate_left(31); - k1 = k1.wrapping_mul(PRIME_1); - hash ^= k1; - hash = hash.rotate_left(27); - hash = hash.wrapping_mul(PRIME_1); - hash = hash.wrapping_add(PRIME_4); - offset_u64 += 1; - } - - // process u32s - let data = &data[offset_u64 * 8..]; - let ptr_u32 = data.as_ptr() as *const u32; - let length_bytes = length_bytes - offset_u64 * 8; - let mut offset_u32 = 0; - while offset_u32 * 4 + 4 <= length_bytes { - let k1 = unsafe { - u64::from(ptr_u32.add(offset_u32).read_unaligned().to_le()).wrapping_mul(PRIME_1) - }; - hash ^= k1; - hash = hash.rotate_left(23); - hash = hash.wrapping_mul(PRIME_2); - hash = hash.wrapping_add(PRIME_3); - offset_u32 += 1; - } - - // process u8s - let data = &data[offset_u32 * 4..]; - let length_bytes = length_bytes - offset_u32 * 4; - let mut offset_u8 = 0; - while offset_u8 < length_bytes { - let k1 = u64::from(data[offset_u8]).wrapping_mul(PRIME_5); - hash ^= k1; - hash = hash.rotate_left(11); - hash = hash.wrapping_mul(PRIME_1); - offset_u8 += 1; - } - - // The final intermixing - hash ^= hash >> 33; - hash = hash.wrapping_mul(PRIME_2); - hash ^= hash >> 29; - hash = hash.wrapping_mul(PRIME_3); - hash ^= hash >> 32; - - hash -} - macro_rules! hash_array { ($array_type: ident, $column: ident, $hashes: ident, $hash_method: ident) => { let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); @@ -634,17 +497,13 @@ pub(crate) fn pmod(hash: u32, n: usize) -> usize { #[cfg(test)] mod tests { - use super::spark_compatible_xxhash64; use arrow::array::{Float32Array, Float64Array}; - use std::hash::Hasher; use std::sync::Arc; use crate::execution::datafusion::spark_hash::{ create_murmur3_hashes, create_xxhash64_hashes, pmod, }; use datafusion::arrow::array::{ArrayRef, Int32Array, Int64Array, Int8Array, StringArray}; - use rand::Rng; - use twox_hash::XxHash64; macro_rules! test_hashes_internal { ($hash_method: ident, $input: expr, $initial_seeds: expr, $expected: expr) => { @@ -698,29 +557,6 @@ mod tests { test_hashes_with_nulls!(create_xxhash64_hashes, T, values, expected, u64); } - #[test] - fn test_xxhash64_random() { - let mut rng = rand::thread_rng(); - for len in 0..128 { - for _ in 0..10 { - let data: Vec = (0..len).map(|_| rng.gen()).collect(); - let seed = rng.gen(); - check_xxhash64(&data, seed); - } - } - } - - fn check_xxhash64(data: &[u8], seed: u64) { - let mut hasher = XxHash64::with_seed(seed); - hasher.write(data.as_ref()); - let hash1 = hasher.finish(); - let hash2 = spark_compatible_xxhash64(data, seed); - if hash1 != hash2 { - panic!("input: {} with seed {seed} produced incorrect hash (comet={hash2}, twox-hash={hash1})", - data.iter().map(|byte| format!("{:02x}", byte)).collect::()) - } - } - #[test] fn test_i8() { test_murmur3_hash::( From e57c8d6dfdc0a1d6c3c95738ae9931d07a199cb3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 19 Jun 2024 08:42:01 -0600 Subject: [PATCH 15/16] refactor --- .../datafusion/expressions/xxhash64.rs | 40 +++++++++---------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/core/src/execution/datafusion/expressions/xxhash64.rs b/core/src/execution/datafusion/expressions/xxhash64.rs index 3bc52cc18..94b9e04ba 100644 --- a/core/src/execution/datafusion/expressions/xxhash64.rs +++ b/core/src/execution/datafusion/expressions/xxhash64.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -/// xxhash64 implementation +//! xxhash64 implementation const CHUNK_SIZE: usize = 32; @@ -33,21 +33,11 @@ pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u let data: &[u8] = data.as_ref(); let length_bytes = data.len(); - // XxCore::with_seed let mut v1 = seed.wrapping_add(PRIME_1).wrapping_add(PRIME_2); let mut v2 = seed.wrapping_add(PRIME_2); let mut v3 = seed; let mut v4 = seed.wrapping_sub(PRIME_1); - // XxCore::ingest_chunks - #[inline(always)] - fn ingest_one_number(mut current_value: u64, mut value: u64) -> u64 { - value = value.wrapping_mul(PRIME_2); - current_value = current_value.wrapping_add(value); - current_value = current_value.rotate_left(31); - current_value.wrapping_mul(PRIME_1) - } - // process chunks of 32 bytes let mut offset_u64_4 = 0; let ptr_u64 = data.as_ptr() as *const u64; @@ -77,16 +67,6 @@ pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u hash = hash.wrapping_add(v3.rotate_left(12)); hash = hash.wrapping_add(v4.rotate_left(18)); - #[inline(always)] - fn mix_one(mut hash: u64, mut value: u64) -> u64 { - value = value.wrapping_mul(PRIME_2); - value = value.rotate_left(31); - value = value.wrapping_mul(PRIME_1); - hash ^= value; - hash = hash.wrapping_mul(PRIME_1); - hash.wrapping_add(PRIME_4) - } - hash = mix_one(hash, v1); hash = mix_one(hash, v2); hash = mix_one(hash, v3); @@ -156,6 +136,24 @@ pub(crate) fn spark_compatible_xxhash64>(data: T, seed: u64) -> u hash } +#[inline(always)] +fn ingest_one_number(mut current_value: u64, mut value: u64) -> u64 { + value = value.wrapping_mul(PRIME_2); + current_value = current_value.wrapping_add(value); + current_value = current_value.rotate_left(31); + current_value.wrapping_mul(PRIME_1) +} + +#[inline(always)] +fn mix_one(mut hash: u64, mut value: u64) -> u64 { + value = value.wrapping_mul(PRIME_2); + value = value.rotate_left(31); + value = value.wrapping_mul(PRIME_1); + hash ^= value; + hash = hash.wrapping_mul(PRIME_1); + hash.wrapping_add(PRIME_4) +} + #[cfg(test)] mod test { use super::spark_compatible_xxhash64; From 96c2bcfbd2be9656b3d65cbfd27f6fa1800196e0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 21 Jun 2024 13:39:19 -0600 Subject: [PATCH 16/16] add copyright --- LICENSE.txt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/LICENSE.txt b/LICENSE.txt index 7e89ce369..1afea9f49 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -214,9 +214,13 @@ License: http://www.apache.org/licenses/LICENSE-2.0 -------------------------------------------------------------------------------- -This project includes software from the twox-hash project (MIT License) +This project includes software from the twox-hash project https://github.com/shepmaster/twox-hash +The MIT License (MIT) + +Copyright (c) 2015 Jake Goulding + Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights