forked from microsoft/semantic-kernel
-
Notifications
You must be signed in to change notification settings - Fork 1
/
XpoDatabase.cs
160 lines (142 loc) · 5.95 KB
/
XpoDatabase.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using DevExpress.Xpo;
namespace Microsoft.SemanticKernel.Connectors.Xpo;
internal sealed class XpoDatabase
{
public XpoDatabase() { }
public Task CreateTableAsync(IDataLayer conn, CancellationToken cancellationToken = default)
{
IDataLayer dl = conn;
using (Session session = new(dl))
{
System.Reflection.Assembly[] assemblies = new System.Reflection.Assembly[] {
typeof(XpoDatabaseEntry).Assembly };
session.UpdateSchema(assemblies);
session.CreateObjectTypeRecords(assemblies);
}
return Task.CompletedTask;
}
public async Task CreateCollectionAsync(IDataLayer conn, string collectionName, CancellationToken cancellationToken = default)
{
if (await this.DoesCollectionExistsAsync(conn, collectionName, cancellationToken).ConfigureAwait(false))
{
// Collection already exists
return;
}
UnitOfWork unitOfWork = new(conn);
XpoDatabaseEntry entry = new(unitOfWork)
{
Collection = collectionName
};
await unitOfWork.CommitChangesAsync(cancellationToken).ConfigureAwait(false);
}
public async Task UpdateAsync(IDataLayer conn,
string collection, string key, string? metadata, string? embedding, string? timestamp, CancellationToken cancellationToken = default)
{
UnitOfWork unitOfWork = new(conn);
var entry = unitOfWork.Query<XpoDatabaseEntry>().FirstOrDefault(x => x.Collection == collection && x.Key == key);
if (entry != null)
{
entry.MetadataString = metadata ?? string.Empty;
entry.EmbeddingString = embedding ?? string.Empty;
entry.Timestamp = timestamp ?? string.Empty;
await unitOfWork.CommitChangesAsync(cancellationToken).ConfigureAwait(false);
}
}
public async Task InsertOrIgnoreAsync(IDataLayer conn,
string collection, string key, string? metadata, string? embedding, string? timestamp, CancellationToken cancellationToken = default)
{
UnitOfWork unitOfWork = new(conn);
if (unitOfWork.Query<XpoDatabaseEntry>().FirstOrDefault(x => x.Collection == collection && x.Key == key) == null)
{
XpoDatabaseEntry entry = new(unitOfWork)
{
Collection = collection,
Key = key,
MetadataString = metadata ?? string.Empty,
EmbeddingString = embedding ?? string.Empty,
Timestamp = timestamp ?? string.Empty
};
await unitOfWork.CommitChangesAsync(cancellationToken).ConfigureAwait(false);
}
}
public async Task<bool> DoesCollectionExistsAsync(IDataLayer conn,
string collectionName,
CancellationToken cancellationToken = default)
{
var collections = await this.GetCollectionsAsync(conn, cancellationToken).ToListAsync(cancellationToken).ConfigureAwait(false);
return collections.Contains(collectionName);
}
public async IAsyncEnumerable<string> GetCollectionsAsync(IDataLayer conn,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
UnitOfWork unitOfWork = new(conn);
var Collections = unitOfWork.Query<XpoDatabaseEntry>().Select(x => x.Collection).Distinct().ToList();
foreach (string collection in Collections)
{
yield return collection;
}
}
public async IAsyncEnumerable<DatabaseEntry> ReadAllAsync(IDataLayer conn,
string collectionName,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
UnitOfWork unitOfWork = new(conn);
var entries = unitOfWork.Query<XpoDatabaseEntry>().Where(x => x.Collection == collectionName).ToList();
foreach (XpoDatabaseEntry entry in entries)
{
yield return entry.ToDatabaseEntry();
}
}
public async Task<DatabaseEntry?> ReadAsync(IDataLayer conn,
string collectionName,
string key,
CancellationToken cancellationToken = default)
{
UnitOfWork unitOfWork = new(conn);
var entry = unitOfWork.Query<XpoDatabaseEntry>().FirstOrDefault(x => x.Collection == collectionName && x.Key == key);
if (entry != null)
{
return entry.ToDatabaseEntry();
}
return null;
}
public Task DeleteCollectionAsync(IDataLayer conn, string collectionName, CancellationToken cancellationToken = default)
{
UnitOfWork unitOfWork = new(conn);
var entries = unitOfWork.Query<XpoDatabaseEntry>().Where(x => x.Collection == collectionName).ToList();
foreach (XpoDatabaseEntry entry in entries)
{
unitOfWork.Delete(entry);
}
unitOfWork.CommitChanges();
return Task.CompletedTask;
}
public Task DeleteAsync(IDataLayer conn, string collectionName, string key, CancellationToken cancellationToken = default)
{
UnitOfWork unitOfWork = new(conn);
var entry = unitOfWork.Query<XpoDatabaseEntry>().FirstOrDefault(x => x.Collection == collectionName && x.Key == key);
if (entry != null)
{
unitOfWork.Delete(entry);
unitOfWork.CommitChanges();
}
return Task.CompletedTask;
}
public Task DeleteEmptyAsync(IDataLayer conn, string collectionName, CancellationToken cancellationToken = default)
{
UnitOfWork unitOfWork = new(conn);
var entries = unitOfWork.Query<XpoDatabaseEntry>().Where(x => x.Collection == collectionName && x.Key == null).ToList();
if (entries != null)
{
unitOfWork.Delete(entries);
return unitOfWork.CommitChangesAsync(cancellationToken);
}
return Task.CompletedTask;
}
}