diff --git a/src/DeltaLake/Bridge/ForwardedLog.cs b/src/DeltaLake/Bridge/ForwardedLog.cs deleted file mode 100644 index 0da1574..0000000 --- a/src/DeltaLake/Bridge/ForwardedLog.cs +++ /dev/null @@ -1,84 +0,0 @@ -using System; -using System.Collections; -using System.Collections.Generic; -using System.Linq; -using Microsoft.Extensions.Logging; - -namespace DeltaLake.Bridge -{ - /// - /// Representation of log state for a Core log. - /// - /// Log level. - /// Log target. - /// Log message. - /// Ms since Unix epoch. - /// JSON fields, or null to not include. The keys are the field names - /// and the values are raw JSON strings. - internal sealed record ForwardedLog( - LogLevel Level, - string Target, - string Message, - ulong TimestampMilliseconds, - IReadOnlyDictionary? JsonFields) : IReadOnlyList> - { - // Unfortunately DateTime.UnixEpoch not in standard library in all versions we need - private static readonly DateTime UnixEpoch = new(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); - - /// - /// Gets the timestamp for this log. - /// - public DateTime Timestamp => UnixEpoch.AddMilliseconds(TimestampMilliseconds); - - /// - public int Count => 5; - - /// - public KeyValuePair this[int index] - { - get - { - switch (index) - { - case 0: - return new("Level", Level); - case 1: - return new("Target", Target); - case 2: - return new("Message", Message); - case 3: - return new("Timestamp", Timestamp); - case 4: - return new("JsonFields", JsonFields); - default: -#pragma warning disable CA2201 // We intentionally use this usually-internal-use-only exception - throw new IndexOutOfRangeException(nameof(index)); -#pragma warning restore CA2201 - } - } - } - - /// - public IEnumerator> GetEnumerator() - { - for (int i = 0; i < Count; ++i) - { - yield return this[i]; - } - } - - /// - public override string ToString() - { - var message = $"[sdk_core::{Target}] {Message}"; - if (JsonFields is { } jsonFields) - { - message += " " + string.Join(", ", jsonFields.Select(kv => $"{kv.Key}={kv.Value}")); - } - return message; - } - - /// - IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); - } -} \ No newline at end of file diff --git a/src/DeltaLake/Bridge/RecordBatchReader.cs b/src/DeltaLake/Bridge/RecordBatchReader.cs index e5c40fa..895ee06 100644 --- a/src/DeltaLake/Bridge/RecordBatchReader.cs +++ b/src/DeltaLake/Bridge/RecordBatchReader.cs @@ -1,4 +1,3 @@ -using System; using System.Collections.Generic; using System.Threading.Tasks; using Apache.Arrow; @@ -6,15 +5,13 @@ namespace DeltaLake.Bridge { - internal class RecordBatchReader : IArrowArrayStream + internal sealed class RecordBatchReader : IArrowArrayStream { - private readonly IEnumerable _recordBatches; private readonly IEnumerator _enumerator; public RecordBatchReader(IEnumerable recordBatches, Schema schema) { - _recordBatches = recordBatches; - _enumerator = _recordBatches.GetEnumerator(); + _enumerator = recordBatches.GetEnumerator(); Schema = schema; } @@ -32,7 +29,9 @@ public ValueTask ReadNextRecordBatchAsync(System.Threading.Cancella return ValueTask.FromResult(_enumerator.Current); } - return ValueTask.FromResult(default(RecordBatch)); +#pragma warning disable CS8625 // Cannot convert null literal to non-nullable reference type. + return ValueTask.FromResult(default); +#pragma warning restore CS8625 // Cannot convert null literal to non-nullable reference type. } } } \ No newline at end of file diff --git a/src/DeltaLake/Bridge/Runtime.cs b/src/DeltaLake/Bridge/Runtime.cs index 3a58770..617b015 100644 --- a/src/DeltaLake/Bridge/Runtime.cs +++ b/src/DeltaLake/Bridge/Runtime.cs @@ -6,7 +6,6 @@ using System.Text.Json; using System.Threading.Tasks; using Apache.Arrow.C; -using Microsoft.Extensions.Logging; namespace DeltaLake.Bridge { @@ -15,15 +14,6 @@ namespace DeltaLake.Bridge /// internal sealed class Runtime : SafeHandle { - /* - private static readonly Func ForwardLogMessageFormatter = - LogMessageFormatter; - - private readonly bool forwardLoggerIncludeFields;*/ - private readonly GCHandle? forwardLoggerCallback; - - private ILogger? forwardLogger; - /// /// Initializes a new instance of the class. /// @@ -234,13 +224,8 @@ internal unsafe void FreeByteArray(Interop.ByteArray* byteArray) /// protected override unsafe bool ReleaseHandle() { - forwardLogger = null; - forwardLoggerCallback?.Free(); Interop.Methods.runtime_free(Ptr); return true; } - - private static string LogMessageFormatter(ForwardedLog state, Exception? error) => - state.ToString(); } } \ No newline at end of file diff --git a/src/DeltaLake/Bridge/Table.cs b/src/DeltaLake/Bridge/Table.cs index 6576f1c..94b8705 100644 --- a/src/DeltaLake/Bridge/Table.cs +++ b/src/DeltaLake/Bridge/Table.cs @@ -448,7 +448,7 @@ public async Task HistoryAsync(ulong limit, ICancellationToken cancellat Methods.history( _runtime.Ptr, _ptr, - (UIntPtr)limit, + new UIntPtr(limit), scope.CancellationToken(cancellationToken), scope.FunctionPointer((success, fail) => { @@ -506,7 +506,7 @@ public async Task UpdateIncrementalAsync(ICancellationToken cancellationToken) await tsc.Task.ConfigureAwait(false); } - public Metadata Metadata() + public DeltaLake.Table.TableMetadata Metadata() { unsafe { @@ -518,11 +518,11 @@ public Metadata Metadata() try { - return DeltaLake.Table.Metadata.FromUnmanaged(result.metadata); + return DeltaLake.Table.TableMetadata.FromUnmanaged(result.metadata); } finally { - var release = (delegate* unmanaged)result.metadata->release; + var release = (delegate* unmanaged)result.metadata->release; release(result.metadata); } } diff --git a/src/DeltaLake/Bridge/include/delta-lake-bridge.h b/src/DeltaLake/Bridge/include/delta-lake-bridge.h index 0dde21c..cda9cb6 100644 --- a/src/DeltaLake/Bridge/include/delta-lake-bridge.h +++ b/src/DeltaLake/Bridge/include/delta-lake-bridge.h @@ -258,16 +258,16 @@ void dynamic_array_free(struct Runtime *runtime, const struct DynamicArray *arra struct PartitionFilterList *partition_filter_list_new(uintptr_t capacity); -bool partition_filter_list_add_binary(struct PartitionFilterList *list, - const struct ByteArrayRef *key, - enum PartitionFilterBinaryOp op, - const struct ByteArrayRef *value); - -bool partition_filter_list_add_set(struct PartitionFilterList *list, - const struct ByteArrayRef *key, - enum PartitionFilterBinaryOp op, - const struct ByteArrayRef *value, - uintptr_t value_count); +bool partition_filter_list_add_binary(struct PartitionFilterList *_list, + const struct ByteArrayRef *_key, + enum PartitionFilterBinaryOp _op, + const struct ByteArrayRef *_value); + +bool partition_filter_list_add_set(struct PartitionFilterList *_list, + const struct ByteArrayRef *_key, + enum PartitionFilterBinaryOp _op, + const struct ByteArrayRef *_value, + uintptr_t _value_count); void partition_filter_list_free(struct PartitionFilterList *list); diff --git a/src/DeltaLake/Bridge/src/error.rs b/src/DeltaLake/Bridge/src/error.rs index 994a8b5..3b86345 100644 --- a/src/DeltaLake/Bridge/src/error.rs +++ b/src/DeltaLake/Bridge/src/error.rs @@ -1,6 +1,6 @@ use deltalake::datafusion::sql::sqlparser::parser::ParserError; -use crate::{runtime::Runtime, ByteArray, ByteArrayRef}; +use crate::{runtime::Runtime, ByteArray}; #[repr(C)] pub struct DeltaTableError { @@ -8,21 +8,6 @@ pub struct DeltaTableError { error: ByteArray, } -#[repr(C)] -pub struct DeltaTableErrorRef { - code: DeltaTableErrorCode, - error: ByteArrayRef, -} - -impl DeltaTableErrorRef { - pub(crate) fn new(code: DeltaTableErrorCode, error: &str) -> Self { - Self { - code, - error: ByteArrayRef::from_str(error), - } - } -} - #[repr(C)] pub enum DeltaTableErrorCode { Utf8 = 0, @@ -145,18 +130,6 @@ impl DeltaTableError { Self::new(_runtime, code, &error_string) } - pub(crate) fn from_cancellation() -> Self { - DeltaTableError { - code: DeltaTableErrorCode::OperationCanceled, - error: ByteArray { - data: std::ptr::null(), - size: 0, - cap: 0, - disable_free: true, - }, - } - } - pub(crate) fn into_raw(self) -> *mut DeltaTableError { Box::into_raw(Box::new(self)) } diff --git a/src/DeltaLake/Bridge/src/lib.rs b/src/DeltaLake/Bridge/src/lib.rs index 2ff7cae..a8bc6f1 100644 --- a/src/DeltaLake/Bridge/src/lib.rs +++ b/src/DeltaLake/Bridge/src/lib.rs @@ -66,6 +66,7 @@ impl KeyValuePair { ManuallyDrop::new(mapped).as_mut_ptr() } + #[allow(dead_code)] pub(crate) fn from_hash_map(input: HashMap) -> *mut *mut Self { ManuallyDrop::new( input @@ -90,13 +91,9 @@ impl KeyValuePair { impl Drop for KeyValuePair { fn drop(&mut self) { unsafe { - let _ = String::from_raw_parts(self.key as *mut u8, self.key_length, self.key_length); + let _ = String::from_raw_parts(self.key, self.key_length, self.key_length); if !self.value.is_null() { - let _ = String::from_raw_parts( - self.value as *mut u8, - self.value_length, - self.value_length, - ); + let _ = String::from_raw_parts(self.value, self.value_length, self.value_length); } } } @@ -168,33 +165,10 @@ pub struct ByteArrayRef { } impl ByteArrayRef { - fn from_str(s: &str) -> ByteArrayRef { - ByteArrayRef { - data: s.as_ptr(), - size: s.len(), - } - } - - fn from_string(s: &String) -> ByteArrayRef { - ByteArrayRef { - data: s.as_ptr(), - size: s.len(), - } - } - fn to_slice(&self) -> &[u8] { unsafe { std::slice::from_raw_parts(self.data, self.size) } } - #[allow(clippy::mut_from_ref)] - fn to_slice_mut(&self) -> &mut [u8] { - unsafe { std::slice::from_raw_parts_mut(self.data as *mut u8, self.size) } - } - - fn to_vec(&self) -> Vec { - self.to_slice().to_vec() - } - fn to_str(&self) -> &str { // Trust caller to send UTF8. Even if we did do a checked call here with // error, the caller can still have a bad pointer or something else @@ -215,14 +189,6 @@ impl ByteArrayRef { } } - fn to_option_vec(&self) -> Option> { - if self.size == 0 { - None - } else { - Some(self.to_vec()) - } - } - fn to_option_str(&self) -> Option<&str> { if self.size == 0 { None @@ -234,20 +200,6 @@ impl ByteArrayRef { fn to_option_string(&self) -> Option { self.to_option_str().map(str::to_string) } - - fn to_str_map_on_newlines(&self) -> HashMap<&str, &str> { - let strs: Vec<&str> = self.to_str().split('\n').collect(); - strs.chunks_exact(2) - .map(|pair| (pair[0], pair[1])) - .collect() - } - - fn to_string_map_on_newlines(&self) -> HashMap { - self.to_str_map_on_newlines() - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect() - } } #[repr(C)] diff --git a/src/DeltaLake/Bridge/src/schema.rs b/src/DeltaLake/Bridge/src/schema.rs index 5f93399..43a8df4 100644 --- a/src/DeltaLake/Bridge/src/schema.rs +++ b/src/DeltaLake/Bridge/src/schema.rs @@ -26,6 +26,7 @@ pub enum PartitionFilterSetOp { } pub struct PartitionFilterList { pub(crate) filters: Vec, + #[allow(dead_code)] disable_free: bool, } @@ -39,21 +40,21 @@ pub extern "C" fn partition_filter_list_new(capacity: usize) -> *mut PartitionFi #[no_mangle] pub extern "C" fn partition_filter_list_add_binary( - list: *mut PartitionFilterList, - key: *const ByteArrayRef, - op: PartitionFilterBinaryOp, - value: *const ByteArrayRef, + _list: *mut PartitionFilterList, + _key: *const ByteArrayRef, + _op: PartitionFilterBinaryOp, + _value: *const ByteArrayRef, ) -> bool { - unimplemented!() + todo!() } #[no_mangle] pub extern "C" fn partition_filter_list_add_set( - list: *mut PartitionFilterList, - key: *const ByteArrayRef, - op: PartitionFilterBinaryOp, - value: *const ByteArrayRef, - value_count: usize, + _list: *mut PartitionFilterList, + _key: *const ByteArrayRef, + _op: PartitionFilterBinaryOp, + _value: *const ByteArrayRef, + _value_count: usize, ) -> bool { unimplemented!() } diff --git a/src/DeltaLake/Bridge/src/sql.rs b/src/DeltaLake/Bridge/src/sql.rs index 3dfa975..a79d36b 100644 --- a/src/DeltaLake/Bridge/src/sql.rs +++ b/src/DeltaLake/Bridge/src/sql.rs @@ -1,34 +1,22 @@ -use std::collections::VecDeque; - use arrow::{ - datatypes::{Schema, SchemaRef}, + datatypes::SchemaRef, error::ArrowError, record_batch::{RecordBatch, RecordBatchReader}, }; use deltalake::datafusion::{ physical_plan::SendableRecordBatchStream, - sql::{ - parser::{DFParser, Statement as DFStatement}, - sqlparser::{ - ast::{Assignment, Expr, Ident, TableAlias, TableFactor, Values}, - dialect::{Dialect, GenericDialect}, - keywords::Keyword, - parser::{Parser, ParserError}, - tokenizer::{Token, TokenWithLocation, Tokenizer}, - }, + sql::sqlparser::{ + ast::{Assignment, Expr, Ident, TableAlias, TableFactor, Values}, + dialect::{Dialect, GenericDialect}, + keywords::Keyword, + parser::{Parser, ParserError}, + tokenizer::{Token, Tokenizer}, }, }; -use futures::{FutureExt, StreamExt}; -use tokio::{runtime::Handle, task::spawn_blocking}; +use futures::StreamExt; use crate::error::{DeltaTableError, DeltaTableErrorCode}; -macro_rules! parser_err { - ($MSG:expr) => { - Err(ParserError::ParserError($MSG.to_string())) - }; -} - macro_rules! make_update { ($update:ident, $predicate:ident, $assignments:ident) => {{ if let Some(predicate) = $predicate { @@ -45,7 +33,6 @@ macro_rules! make_update { } pub struct DeltaLakeParser<'a> { - sql: &'a str, parser: Parser<'a>, } @@ -63,81 +50,9 @@ impl<'a> DeltaLakeParser<'a> { let tokens = tokenizer.tokenize()?; Ok(Self { - sql, parser: Parser::new(dialect).with_tokens(tokens), }) } - /// Parse a sql string into one or [`Statement`]s using the - /// [`GenericDialect`]. - pub fn parse_sql(sql: impl AsRef) -> Result, ParserError> { - let dialect: &GenericDialect = &GenericDialect {}; - DeltaLakeParser::parse_sql_with_dialect(sql.as_ref(), dialect) - } - - /// Parse a SQL string and produce one or more [`Statement`]s with - /// with the specified dialect. - pub fn parse_sql_with_dialect( - sql: &str, - dialect: &dyn Dialect, - ) -> Result, ParserError> { - let mut parser = DeltaLakeParser::new_with_dialect(sql, dialect)?; - let mut stmts = VecDeque::new(); - let mut expecting_statement_delimiter = false; - loop { - // ignore empty statements (between successive statement delimiters) - while parser.parser.consume_token(&Token::SemiColon) { - expecting_statement_delimiter = false; - } - - if parser.parser.peek_token() == Token::EOF { - break; - } - if expecting_statement_delimiter { - return parser.expected("end of statement", parser.parser.peek_token()); - } - - let statement = parser.parse_statement()?; - stmts.push_back(statement); - expecting_statement_delimiter = true; - } - - Ok(stmts) - } - - /// Report an unexpected token - fn expected(&self, expected: &str, found: TokenWithLocation) -> Result { - parser_err!(format!("Expected {expected}, found: {found}")) - } - - /// Parse a new expression - pub fn parse_statement(&mut self) -> Result { - match self.parser.peek_token().token { - Token::Word(w) => { - match w.keyword { - Keyword::MERGE => { - self.parser.next_token(); - self.parse_merge() - } - _ => { - // use the native parser - // TODO fix for multiple statememnts and keeping parsers in sync - let mut df = DFParser::new(self.sql)?; - let stmt = df.parse_statement()?; - self.parser.parse_statement()?; - Ok(Statement::Datafusion(stmt)) - } - } - } - _ => { - // use the native parser - // TODO fix for multiple statememnts and keeping parsers in sync - let mut df = DFParser::new(self.sql)?; - let stmt = df.parse_statement()?; - self.parser.parse_statement()?; - Ok(Statement::Datafusion(stmt)) - } - } - } pub fn parse_merge(&mut self) -> Result { let into = self.parser.parse_keyword(Keyword::INTO); @@ -311,7 +226,6 @@ impl<'a> DeltaLakeParser<'a> { #[derive(Debug, Clone, PartialEq, Eq)] pub enum Statement { /// Datafusion AST node (from datafusion-sql) - Datafusion(DFStatement), MergeStatement { into: bool, // Specifies the table to merge diff --git a/src/DeltaLake/Bridge/src/table.rs b/src/DeltaLake/Bridge/src/table.rs index fb33255..c9d594b 100644 --- a/src/DeltaLake/Bridge/src/table.rs +++ b/src/DeltaLake/Bridge/src/table.rs @@ -34,7 +34,7 @@ use crate::{ schema::PartitionFilterList, sql::{extract_table_factor_alias, DataFrameStreamIterator, DeltaLakeParser, Statement}, ByteArray, ByteArrayRef, CancellationToken, Dictionary, DynamicArray, KeyNullableValuePair, - KeyValuePair, Map, + Map, }; pub struct RawDeltaTable { @@ -707,20 +707,6 @@ pub extern "C" fn table_merge( .into_raw(), ); }, - _ => unsafe { - callback( - std::ptr::null(), - DeltaTableError::new( - &mut *runtime, - DeltaTableErrorCode::Generic, - &format!( - "invalid sql statement. expected merge. received: {}", - query_str - ), - ) - .into_raw(), - ); - }, }; } @@ -1563,7 +1549,7 @@ mod tests { )", ); let mut res = parser.expect("this should parser"); - let stmt = res.parse_statement().expect("the statement should parse"); + let stmt = res.parse_merge().expect("the statement should parse"); match stmt { Statement::MergeStatement { into: _, @@ -1574,7 +1560,6 @@ mod tests { } => { assert!(!clauses.is_empty()); } - _ => panic!("expected statement"), } } } diff --git a/src/DeltaLake/Table/DeltaTable.cs b/src/DeltaLake/Table/DeltaTable.cs index 76e86f8..3e98f09 100644 --- a/src/DeltaLake/Table/DeltaTable.cs +++ b/src/DeltaLake/Table/DeltaTable.cs @@ -154,7 +154,6 @@ public async Task InsertAsync( public async Task HistoryAsync(ulong? limit, CancellationToken cancellationToken) { await _table.HistoryAsync(limit ?? 0, cancellationToken).ConfigureAwait(false); - // TODO: Deserialize the results } /// @@ -195,8 +194,8 @@ public async Task MergeAsync(string query, IReadOnlyCollection reco /// /// Returns table metadata /// - /// - public Metadata Metadata() + /// + public TableMetadata Metadata() { return _table.Metadata(); } diff --git a/src/DeltaLake/Table/InsertOptions.cs b/src/DeltaLake/Table/InsertOptions.cs index 3ace6ae..aae03ad 100644 --- a/src/DeltaLake/Table/InsertOptions.cs +++ b/src/DeltaLake/Table/InsertOptions.cs @@ -1,7 +1,3 @@ -using System.Collections; -using System.Collections.Generic; -using Apache.Arrow; - namespace DeltaLake.Table { /// diff --git a/src/DeltaLake/Table/ProtocolInfo.cs b/src/DeltaLake/Table/ProtocolInfo.cs index a0a1787..9f02d76 100644 --- a/src/DeltaLake/Table/ProtocolInfo.cs +++ b/src/DeltaLake/Table/ProtocolInfo.cs @@ -1,9 +1,20 @@ namespace DeltaLake.Table { + /// + /// Protocol information for a delta table + /// +#pragma warning disable CA1815 // Override equals and operator equals on value types public readonly struct ProtocolInfo +#pragma warning restore CA1815 // Override equals and operator equals on value types { + /// + /// Table minimum reader version + /// public int MinimumReaderVersion { get; init; } + /// + /// Table minimum writer version + /// public int MinimumWriterVersion { get; init; } } } \ No newline at end of file diff --git a/src/DeltaLake/Table/RestoreOptions.cs b/src/DeltaLake/Table/RestoreOptions.cs index 5fcb255..0f060d2 100644 --- a/src/DeltaLake/Table/RestoreOptions.cs +++ b/src/DeltaLake/Table/RestoreOptions.cs @@ -31,6 +31,6 @@ public class RestoreOptions /// /// Custom metadata /// - public Dictionary CustomMetadata { get; set; } + public Dictionary CustomMetadata { get; init; } = new Dictionary(); } } \ No newline at end of file diff --git a/src/DeltaLake/Table/Metadata.cs b/src/DeltaLake/Table/TableMetadata.cs similarity index 95% rename from src/DeltaLake/Table/Metadata.cs rename to src/DeltaLake/Table/TableMetadata.cs index caea1e2..307cacf 100644 --- a/src/DeltaLake/Table/Metadata.cs +++ b/src/DeltaLake/Table/TableMetadata.cs @@ -7,7 +7,7 @@ namespace DeltaLake.Table /// /// Metadata for the table /// - public class Metadata + public class TableMetadata { private static readonly Dictionary EmptySettings = new(); @@ -56,9 +56,9 @@ public class Metadata /// public IReadOnlyDictionary Configuration { get; init; } = EmptySettings; - internal unsafe static Metadata FromUnmanaged(Bridge.Interop.TableMetadata* metadata) + internal unsafe static TableMetadata FromUnmanaged(Bridge.Interop.TableMetadata* metadata) { - return new DeltaLake.Table.Metadata + return new DeltaLake.Table.TableMetadata { Id = Marshal.PtrToStringUTF8(new IntPtr(metadata->id)) ?? string.Empty, Name = Marshal.PtrToStringUTF8(new IntPtr(metadata->name)),