diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 906238c..99089f0 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -30,8 +30,9 @@ version.workspace = true bitflags = "2.6.0" chrono = { version = "0.4.38", features = ["serde"] } serde = { version = "1", features = ["derive"] } -serde_with = "3.8.3" serde_bytes = "0.11.15" +serde_json = "1.0.120" +serde_with = "3.8.3" snafu = "0.8.3" typed-builder = "^0.19" opendal = "0.48" diff --git a/crates/paimon/src/spec/snapshot.rs b/crates/paimon/src/spec/snapshot.rs index c232431..44ffed2 100644 --- a/crates/paimon/src/spec/snapshot.rs +++ b/crates/paimon/src/spec/snapshot.rs @@ -16,6 +16,7 @@ // under the License. use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use typed_builder::TypedBuilder; /// Snapshot for paimon. @@ -38,7 +39,21 @@ pub struct Snapshot { /// a manifest recording all index files of this table #[builder(default = None)] index_manifest: Option, + /// user who committed this snapshot commit_user: String, + /// Mainly for snapshot deduplication. + /// + /// If multiple snapshots have the same commitIdentifier, reading from any of these snapshots + /// must produce the same table. + /// + /// If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be + /// committed before snapshot B, and thus snapshot A must contain older records than snapshot B. + commit_identifier: i64, + /// timestamp of this snapshot + time_millis: u64, + /// log offsets of all changes occurred in this snapshot + #[builder(default = None)] + log_offsets: Option>, /// record count of all changes occurred in this snapshot #[builder(default = None)] total_record_count: Option, @@ -105,6 +120,24 @@ impl Snapshot { &self.commit_user } + /// Get the commit time of this snapshot. + #[inline] + pub fn time_millis(&self) -> u64 { + self.time_millis + } + + /// Get the commit identifier of this snapshot. + #[inline] + pub fn commit_identifier(&self) -> i64 { + self.commit_identifier + } + + /// Get the log offsets of this snapshot. + #[inline] + pub fn log_offsets(&self) -> Option<&HashMap> { + self.log_offsets.as_ref() + } + /// Get the total record count of this snapshot. #[inline] pub fn total_record_count(&self) -> Option { @@ -135,3 +168,217 @@ impl Snapshot { self.statistics.as_deref() } } + +#[cfg(test)] +mod tests { + use super::*; + use serde_json; + + #[test] + fn test_snapshot_creation() { + let snapshot = Snapshot::builder() + .version(3) + .id(1) + .schema_id(0) + .base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string()) + .delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string()) + .commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string()) + .commit_identifier(9223372036854775807) + .time_millis(1721287833568) + .build(); + + assert_eq!(snapshot.version(), 3); + assert_eq!(snapshot.id(), 1); + assert_eq!(snapshot.schema_id(), 0); + assert_eq!( + snapshot.base_manifest_list(), + "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0" + ); + assert_eq!( + snapshot.delta_manifest_list(), + "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1" + ); + assert_eq!( + snapshot.commit_user(), + "cf568e07-05ad-4943-b4bd-37461bc58729" + ); + assert_eq!(snapshot.commit_identifier(), 9223372036854775807); + assert_eq!(snapshot.time_millis(), 1721287833568); + assert!(snapshot.changelog_manifest_list().is_none()); + assert!(snapshot.index_manifest().is_none()); + assert!(snapshot.log_offsets().is_none()); + assert!(snapshot.total_record_count().is_none()); + assert!(snapshot.delta_record_count().is_none()); + assert!(snapshot.changelog_record_count().is_none()); + assert!(snapshot.watermark().is_none()); + assert!(snapshot.statistics().is_none()); + } + + #[test] + fn test_snapshot_with_optional_fields() { + let snapshot = Snapshot::builder() + .version(3) + .id(1) + .schema_id(0) + .base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string()) + .delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string()) + .changelog_manifest_list(Some( + "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2".to_string(), + )) + .index_manifest(Some( + "index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0".to_string(), + )) + .commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string()) + .commit_identifier(9223372036854775807) + .time_millis(1721287833568) + .total_record_count(Some(1)) + .delta_record_count(Some(1)) + .changelog_record_count(Some(0)) + .watermark(Some(-9223372036854775808)) + .statistics(Some("statistics_v2".to_string())) + .build(); + + assert_eq!(snapshot.version(), 3); + assert_eq!(snapshot.id(), 1); + assert_eq!(snapshot.schema_id(), 0); + assert_eq!( + snapshot.base_manifest_list(), + "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0" + ); + assert_eq!( + snapshot.delta_manifest_list(), + "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1" + ); + assert_eq!( + snapshot.changelog_manifest_list(), + Some("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2") + ); + assert_eq!( + snapshot.index_manifest(), + Some("index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0") + ); + assert_eq!( + snapshot.commit_user(), + "cf568e07-05ad-4943-b4bd-37461bc58729" + ); + assert_eq!(snapshot.commit_identifier(), 9223372036854775807); + assert_eq!(snapshot.time_millis(), 1721287833568); + assert_eq!( + snapshot.changelog_manifest_list(), + Some("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-2") + ); + assert_eq!(snapshot.total_record_count(), Some(1)); + assert_eq!(snapshot.delta_record_count(), Some(1)); + assert_eq!(snapshot.changelog_record_count(), Some(0)); + assert_eq!(snapshot.watermark(), Some(-9223372036854775808)); + assert_eq!(snapshot.statistics(), Some("statistics_v2")); + } + + #[test] + fn test_snapshot_with_none_fields() { + let snapshot = Snapshot::builder() + .version(3) + .id(1) + .schema_id(0) + .base_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0".to_string()) + .delta_manifest_list("manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1".to_string()) + .changelog_manifest_list(None) + .index_manifest(None) + .commit_user("cf568e07-05ad-4943-b4bd-37461bc58729".to_string()) + .commit_identifier(9223372036854775807) + .time_millis(1721287833568) + .total_record_count(None) + .delta_record_count(None) + .changelog_record_count(None) + .watermark(None) + .statistics(None) + .build(); + + assert_eq!(snapshot.version(), 3); + assert_eq!(snapshot.id(), 1); + assert_eq!(snapshot.schema_id(), 0); + assert_eq!( + snapshot.base_manifest_list(), + "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0" + ); + assert_eq!( + snapshot.delta_manifest_list(), + "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1" + ); + assert_eq!( + snapshot.commit_user(), + "cf568e07-05ad-4943-b4bd-37461bc58729" + ); + assert_eq!(snapshot.commit_identifier(), 9223372036854775807); + assert_eq!(snapshot.time_millis(), 1721287833568); + assert!(snapshot.changelog_manifest_list().is_none()); + assert!(snapshot.index_manifest().is_none()); + assert!(snapshot.log_offsets().is_none()); + assert!(snapshot.total_record_count().is_none()); + assert!(snapshot.delta_record_count().is_none()); + assert!(snapshot.changelog_record_count().is_none()); + assert!(snapshot.watermark().is_none()); + assert!(snapshot.statistics().is_none()); + } + + #[test] + fn test_snapshot_serialization_deserialization() { + let data = r#" + { + "version" : 3, + "id" : 1, + "schemaId" : 0, + "baseManifestList" : "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0", + "deltaManifestList" : "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1", + "changelogManifestList" : null, + "indexManifest" : "index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0", + "commitUser" : "cf568e07-05ad-4943-b4bd-37461bc58729", + "commitIdentifier" : 9223372036854775807, + "timeMillis" : 1721287833568, + "logOffsets" : { }, + "totalRecordCount" : 1, + "deltaRecordCount" : 1, + "changelogRecordCount" : 0, + "watermark" : -9223372036854775808 + } + "#; + + let snapshot: Snapshot = + serde_json::from_str(data).expect("Failed to deserialize Snapshot"); + + assert_eq!(snapshot.version(), 3); + assert_eq!(snapshot.id(), 1); + assert_eq!(snapshot.schema_id(), 0); + assert_eq!( + snapshot.base_manifest_list(), + "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-0" + ); + assert_eq!( + snapshot.delta_manifest_list(), + "manifest-list-4091b92b-01d3-4b91-8e47-8c9f61847d2f-1" + ); + assert_eq!(snapshot.changelog_manifest_list(), None); + assert_eq!( + snapshot.index_manifest(), + Some("index-manifest-55e3e815-08ab-4a70-a808-a8df5d275cb4-0") + ); + assert_eq!( + snapshot.commit_user(), + "cf568e07-05ad-4943-b4bd-37461bc58729" + ); + assert_eq!(snapshot.commit_identifier(), 9223372036854775807); + assert_eq!(snapshot.time_millis(), 1721287833568); + assert!(snapshot.changelog_manifest_list().is_none()); + assert_eq!(snapshot.total_record_count(), Some(1)); + assert_eq!(snapshot.delta_record_count(), Some(1)); + assert_eq!(snapshot.changelog_record_count(), Some(0)); + assert_eq!(snapshot.watermark(), Some(-9223372036854775808)); + + let serialized = serde_json::to_string(&snapshot).expect("Failed to serialize Snapshot"); + + let deserialized: Snapshot = + serde_json::from_str(&serialized).expect("Failed to deserialize serialized Snapshot"); + + assert_eq!(snapshot, deserialized); + } +}