Skip to content

Commit

Permalink
added test tables and version tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mightyshazam committed Jan 27, 2024
1 parent f7ab0fb commit 113eeec
Show file tree
Hide file tree
Showing 420 changed files with 827 additions and 244 deletions.
12 changes: 11 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,14 @@ bin/
obj/
/.vs
/.vscode
target/
target/

tests/data/checkpoints_tombstones/expired/
tests/data/checkpoints_tombstones/metadata_broken/
tests/data/checkpoints_tombstones/metadata_false/
tests/data/checkpoints_tombstones/metadata_true/
tests/data/checkpoints_with_expired_logs/
tests/data/read_null_partitions_from_checkpoint/
tests/data/action_reconciliation/
tests/data/simple_table_with_no_checkpoint/
tests/data/simple_table_with_no_checkpoint_2/
2 changes: 1 addition & 1 deletion src/DeltaLake/Bridge/CancellationToken.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace DeltaLake.Bridge
/// <summary>
/// Core-owned cancellation token.
/// </summary>
internal class CancellationToken : SafeHandle
internal sealed class CancellationToken : SafeHandle
{
private readonly List<IDisposable> _cancellationRegistrations = new List<IDisposable>();

Expand Down
3 changes: 2 additions & 1 deletion src/DeltaLake/Bridge/Interop/Interop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ internal enum DeltaTableErrorCode : uint
Kernel = 30,
MetaDataError = 31,
NotInitialized = 32,
OperationCanceled = 33,
}

internal partial struct CancellationToken
Expand Down Expand Up @@ -266,7 +267,7 @@ internal static unsafe partial class Methods
public static extern void table_update_incremental([NativeTypeName("struct Runtime *")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable *")] RawDeltaTable* table, [NativeTypeName("TableEmptyCallback")] IntPtr callback);

[DllImport("delta_rs_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void table_load_version([NativeTypeName("struct Runtime *")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable *")] RawDeltaTable* table, [NativeTypeName("int64_t")] long version, [NativeTypeName("TableEmptyCallback")] IntPtr callback);
public static extern void table_load_version([NativeTypeName("struct Runtime *")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable *")] RawDeltaTable* table, [NativeTypeName("int64_t")] long version, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("TableEmptyCallback")] IntPtr callback);

[DllImport("delta_rs_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void table_load_with_datetime([NativeTypeName("struct Runtime *")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable *")] RawDeltaTable* table, [NativeTypeName("int64_t")] long ts_milliseconds, [NativeTypeName("TableEmptyCallback")] IntPtr callback);
Expand Down
5 changes: 2 additions & 3 deletions src/DeltaLake/Bridge/Map.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;

Expand All @@ -16,7 +15,7 @@ private unsafe Map(Interop.Map* inner, Runtime runtime)

public unsafe Interop.Map* Ref { get; }

public static unsafe Map FromDictionary(Runtime runtime, IDictionary<string, string> source)
public static unsafe Map FromDictionary(Runtime runtime, IReadOnlyCollection<KeyValuePair<string, string>> source)
{
var map = Interop.Methods.map_new(runtime.Ptr, (nuint)source.Count);
foreach (var (key, value) in source)
Expand All @@ -29,7 +28,7 @@ public static unsafe Map FromDictionary(Runtime runtime, IDictionary<string, str
return new Map(map, runtime);
}

public static unsafe Map FromOptionalDictionary(Runtime runtime, IDictionary<string, string?> source)
public static unsafe Map FromOptionalDictionary(Runtime runtime, IReadOnlyCollection<KeyValuePair<string, string?>> source)
{
var map = Interop.Methods.map_new(runtime.Ptr, (nuint)source.Count);
foreach (var (key, value) in source)
Expand Down
15 changes: 6 additions & 9 deletions src/DeltaLake/Bridge/Runtime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,21 @@ public Runtime(DeltaLake.Runtime.RuntimeOptions options)
/// <inheritdoc />
public override unsafe bool IsInvalid => false;

public async Task<Table> NewTableAsync(string tableUri, DeltaLake.Table.TableOptions options)
public async Task<Table> LoadTableAsync(string tableUri, DeltaLake.Table.TableOptions options)
{
var buffer = ArrayPool<byte>.Shared.Rent(System.Text.Encoding.UTF8.GetByteCount(tableUri));
var encodedLength = System.Text.Encoding.UTF8.GetBytes(tableUri, buffer);
try
{
return await NewTableAsync(buffer.AsMemory(0, encodedLength), options).ConfigureAwait(false);
return await LoadTableAsync(buffer.AsMemory(0, encodedLength), options).ConfigureAwait(false);
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}

internal async Task<Table> NewTableAsync(Memory<byte> tableUri, DeltaLake.Table.TableOptions options)
internal async Task<Table> LoadTableAsync(Memory<byte> tableUri, DeltaLake.Table.TableOptions options)
{
var tsc = new TaskCompletionSource<Table>();
unsafe
Expand All @@ -86,9 +86,7 @@ internal async Task<Table> NewTableAsync(Memory<byte> tableUri, DeltaLake.Table.
{
if (fail != null)
{
var errorMessage = System.Text.Encoding.UTF8.GetString(fail->error.data, (int)fail->error.size);
tsc.TrySetException(new InvalidOperationException(errorMessage));
Interop.Methods.error_free(Ptr, fail);
tsc.TrySetException(DeltaLakeException.FromDeltaTableError(Ptr, fail));
}
else
{
Expand Down Expand Up @@ -135,6 +133,7 @@ internal async Task<Table> CreateTableAsync(DeltaLake.Table.TableCreateOptions o
partition_by = scope.ArrayPointer(options.PartitionBy.Select(x => scope.ByteArray(x)).ToArray()),
partition_count = (nuint)options.PartitionBy.Count,
mode = saveMode.Ref,
name = scope.ByteArray(options.Name),
description = scope.ByteArray(options.Description),
configuration = scope.OptionalDictionary(this, options.Configuration ?? new Dictionary<string, string?>()),
custom_metadata = scope.Dictionary(this, options.CustomMetadata ?? new Dictionary<string, string>()),
Expand All @@ -147,9 +146,7 @@ internal async Task<Table> CreateTableAsync(DeltaLake.Table.TableCreateOptions o
{
if (fail != null)
{
var errorMessage = System.Text.Encoding.UTF8.GetString(fail->error.data, (int)fail->error.size);
tsc.TrySetException(new InvalidOperationException(errorMessage));
Interop.Methods.error_free(Ptr, fail);
tsc.TrySetException(DeltaLakeException.FromDeltaTableError(Ptr, fail));
}
else
{
Expand Down
17 changes: 14 additions & 3 deletions src/DeltaLake/Bridge/Scope.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using DeltaLake.Table;

namespace DeltaLake.Bridge
{
Expand All @@ -28,6 +27,18 @@ public Interop.ByteArrayRef ByteArray(byte[]? bytes)
return val.Ref;
}

/// <summary>
/// Create a byte array ref.
/// </summary>
/// <param name="bytes">Bytes to create from.</param>
/// <returns>Created byte array ref.</returns>
public Interop.ByteArrayRef ByteArray(Memory<byte> bytes)
{
var val = new ByteArrayRef(bytes);
toKeepAlive.Add(val);
return val.Ref;
}

/// <summary>
/// Create a UTF-8 byte array ref.
/// </summary>
Expand Down Expand Up @@ -116,7 +127,7 @@ public Interop.ByteArrayRef NewlineDelimited(IEnumerable<string>? values)
/// <param name="runtime">Instance of runtime</param>
/// <param name="value">Dictionary with nullable values</param>
/// <returns></returns>
public unsafe Interop.Map* OptionalDictionary(Runtime runtime, Dictionary<string, string?> value)
public unsafe Interop.Map* OptionalDictionary(Runtime runtime, IReadOnlyCollection<KeyValuePair<string, string?>> value)
{
var map = Map.FromOptionalDictionary(runtime, value);
toKeepAlive.Add(map);
Expand All @@ -129,7 +140,7 @@ public Interop.ByteArrayRef NewlineDelimited(IEnumerable<string>? values)
/// <param name="runtime">Instance of runtime</param>
/// <param name="value">Dictionary</param>
/// <returns></returns>
public unsafe Interop.Map* Dictionary(Runtime runtime, Dictionary<string, string> value)
public unsafe Interop.Map* Dictionary(Runtime runtime, IReadOnlyCollection<KeyValuePair<string, string>> value)
{
var map = Map.FromDictionary(runtime, value);
toKeepAlive.Add(map);
Expand Down
55 changes: 21 additions & 34 deletions src/DeltaLake/Bridge/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
using System.Threading.Tasks;
using DeltaLake.Bridge.Interop;
using DeltaLake.Table;
using ICancellationToken = System.Threading.CancellationToken;

namespace DeltaLake.Bridge
{
/// <summary>
///
/// Reference to unmanaged delta table
/// </summary>
internal sealed class Table : SafeHandle
{
Expand Down Expand Up @@ -41,39 +42,33 @@ internal unsafe Table(Runtime runtime, Interop.RawDeltaTable* inner)
/// Returns the current version of the table
/// </summary>
/// <returns></returns>
public async Task LoadVersionAsync(long version)
public async Task LoadVersionAsync(long version, ICancellationToken cancellationToken)
{
var tsc = new TaskCompletionSource<bool>();
unsafe
using (var scope = new Scope())
{
var funcHandle = default(GCHandle);
object? funcPointer = null;
(funcHandle, funcPointer) = Runtime.FunctionPointer<Interop.TableEmptyCallback>((fail) =>
unsafe
{
try
Interop.Methods.table_load_version(
_runtime.Ptr,
_ptr,
version,
scope.CancellationToken(cancellationToken),
scope.FunctionPointer<Interop.TableEmptyCallback>((fail) =>
{
if (fail != null)
{
tsc.TrySetException(new InvalidOperationException());
Interop.Methods.error_free(_runtime.Ptr, fail);
tsc.TrySetException(DeltaLakeException.FromDeltaTableError(_runtime.Ptr, fail));
}
else
{
tsc.TrySetResult(true);
}
}
finally
{
if (funcHandle.IsAllocated)
{
funcHandle.Free();
}
}
});
Interop.Methods.table_load_version(_runtime.Ptr, _ptr, version, funcHandle.AddrOfPinnedObject());
}
}));
}

await tsc.Task.ConfigureAwait(false);
await tsc.Task.ConfigureAwait(false);
}
}

/// <summary>
Expand Down Expand Up @@ -151,21 +146,13 @@ protected override unsafe bool ReleaseHandle()

private unsafe string[] GetStringArray(GenericOrError genericOrError)
{
try
if (genericOrError.error != null)
{
if (genericOrError.error != null)
{
try
{
var errorMessage = System.Text.Encoding.UTF8.GetString(genericOrError.error->error.data, (int)genericOrError.error->error.size);
throw new InvalidOperationException(errorMessage);
}
finally
{
Interop.Methods.error_free(_runtime.Ptr, genericOrError.error);
}
}
throw DeltaLakeException.FromDeltaTableError(_runtime.Ptr, genericOrError.error);
}

try
{
if (genericOrError.bytes == null)
{
return Array.Empty<string>();
Expand Down
76 changes: 0 additions & 76 deletions src/DeltaLake/Bridge/UnsafeMemoryManager.cs

This file was deleted.

2 changes: 2 additions & 0 deletions src/DeltaLake/Bridge/include/delta-lake-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ typedef enum DeltaTableErrorCode {
Kernel = 30,
MetaDataError = 31,
NotInitialized = 32,
OperationCanceled = 33,
} DeltaTableErrorCode;

typedef struct CancellationToken CancellationToken;
Expand Down Expand Up @@ -192,6 +193,7 @@ void table_update_incremental(struct Runtime *runtime,
void table_load_version(struct Runtime *runtime,
struct RawDeltaTable *table,
int64_t version,
const struct CancellationToken *cancellation_token,
TableEmptyCallback callback);

void table_load_with_datetime(struct Runtime *runtime,
Expand Down
Loading

0 comments on commit 113eeec

Please sign in to comment.