Skip to content

Commit

Permalink
decouple id completely
Browse files Browse the repository at this point in the history
  • Loading branch information
Modest-as committed Apr 1, 2023
1 parent 55600ab commit 9663630
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 314 deletions.
16 changes: 12 additions & 4 deletions EventSauce.MongoDB/MongoDBSauceStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ public MongoDBSauceStore(IMongoCollection<BsonDocument> collection)
_collection = collection;
}

public async Task<IEnumerable<SaucyEvent<TAggregateId>>> ReadEvents<TAggregateId>(TAggregateId id) where TAggregateId : SaucyAggregateId
public async Task<IEnumerable<SaucyEvent<TAggregateId>>> ReadEvents<TAggregateId>(TAggregateId id)
{
try
{
const string fieldName = nameof(SaucyEvent<TAggregateId>.AggregateId);

using var cursor = await _collection.FindAsync(x => x[fieldName] == id.Id);
var serializer = BsonSerializer.SerializerRegistry.GetSerializer<TAggregateId>();

var value = serializer.ToBsonValue(id);

using var cursor = await _collection.FindAsync(x => x[fieldName] == value);

var result = new List<SaucyEvent<TAggregateId>>();

Expand All @@ -40,7 +44,7 @@ public async Task<IEnumerable<SaucyEvent<TAggregateId>>> ReadEvents<TAggregateId
}
}

public async Task AppendEvent<TAggregateId>(SaucyEvent<TAggregateId> sourceEvent, SaucyAggregateId? performedBy) where TAggregateId : SaucyAggregateId
public async Task AppendEvent<TAggregateId>(SaucyEvent<TAggregateId> sourceEvent, object? performedBy)
{
try
{
Expand All @@ -51,9 +55,13 @@ public async Task AppendEvent<TAggregateId>(SaucyEvent<TAggregateId> sourceEvent

var bsonDocument = sourceEvent.ToBsonDocument();

bsonDocument.Add("_id", ObjectId.GenerateNewId());

if (performedBy != null)
{
bsonDocument.Add("_performedBy", performedBy.Id);
var serializer = BsonSerializer.SerializerRegistry.GetSerializer(performedBy.GetType());

bsonDocument.Add("_performedBy", serializer.ToBsonValue(performedBy));
}

await _collection.InsertOneAsync(bsonDocument);
Expand Down
80 changes: 18 additions & 62 deletions EventSauce.MongoDB/MongoDBSauceStoreFactory.cs
Original file line number Diff line number Diff line change
@@ -1,35 +1,33 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using System;
using System.Reflection;
using MongoDB.Bson.Serialization.Conventions;

namespace EventSauce.MongoDB
{
public class MongoDBSauceStoreFactory
{
private readonly Assembly[] _assemblies;
private readonly MongoClientSettings _clientSettings;
private readonly string _database;
private readonly string _collection;

public MongoDBSauceStoreFactory(
Assembly[] assemblies,
string connectionString,
string database) : this(assemblies, MongoClientSettings.FromConnectionString(connectionString), database, "events") { }
string database) : this(MongoClientSettings.FromConnectionString(connectionString), database, "events")
{
}

public MongoDBSauceStoreFactory(
Assembly[] assemblies,
MongoClientSettings clientSettings,
string database) : this(assemblies, clientSettings, database, "events") { }
string database) : this(clientSettings, database, "events")
{
}

public MongoDBSauceStoreFactory(
Assembly[] assemblies,
MongoClientSettings clientSettings,
string database,
string collection)
{
_assemblies = assemblies;
_clientSettings = clientSettings;
_database = database;
_collection = collection;
Expand All @@ -41,28 +39,14 @@ private void FindEvents()
{
try
{
var pack = new ConventionPack
{
new IgnoreExtraElementsConvention(true)
};

var genericEventType = typeof(SaucyEvent<>);

foreach (var assembly in _assemblies)
{
foreach (var type in assembly.GetTypes())
{
if (IsSubclassOfRawGeneric(genericEventType, type))
{
var map = new BsonClassMap(type);

map.AutoMap();
map.SetIgnoreExtraElements(true);

BsonClassMap.RegisterClassMap(map);
}

if (type.IsSubclassOf(typeof(SaucyAggregateId)))
{
BsonSerializer.RegisterSerializer(type, new SaucyAggregateIdSerializer(type));
}
}
}
ConventionRegistry.Register("Sauce Conventions", pack, type => IsSubclassOfRawGeneric(genericEventType, type));
}
catch (Exception ex)
{
Expand Down Expand Up @@ -97,7 +81,7 @@ private IMongoCollection<BsonDocument> CreateCollection()

var collection = database.GetCollection<BsonDocument>(_collection, settings);

CreateIndex(collection, nameof(SaucyEvent<SaucyAggregateId>.AggregateId), false);
CreateIndex(collection, nameof(SaucyEvent<object>.AggregateId), false);

return collection;
}
Expand All @@ -115,42 +99,14 @@ private static void CreateIndex(IMongoCollection<BsonDocument> collection, strin
collection.Indexes.CreateOne(indexModel);
}

private class SaucyAggregateIdSerializer : IBsonSerializer
private static bool IsSubclassOfRawGeneric(Type generic, Type? toCheck)
{
private readonly Type _type;

public SaucyAggregateIdSerializer(Type type)
{
_type = type;
}

public object Deserialize(BsonDeserializationContext context, BsonDeserializationArgs args)
while (toCheck != null && toCheck != typeof(object))
{
var id = BsonSerializer.Deserialize<Guid?>(context.Reader);

var ctor = _type.GetConstructor(new[] { typeof(Guid) });

var instance = ctor!.Invoke(new object[] { id });

return instance;
}

public void Serialize(BsonSerializationContext context, BsonSerializationArgs args, object value)
{
var aggregateId = (SaucyAggregateId)value;

BsonSerializer.Serialize(context.Writer, aggregateId.Id);
}

public Type ValueType => typeof(SaucyAggregateId);
}

private static bool IsSubclassOfRawGeneric(Type generic, Type? toCheck) {
while (toCheck != null && toCheck != typeof(object)) {

var current = toCheck.IsGenericType ? toCheck.GetGenericTypeDefinition() : toCheck;

if (generic == current) {
if (generic == current)
{
return true;
}

Expand Down
37 changes: 13 additions & 24 deletions EventSauce.Postgre/PostgreSauceStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,74 +13,63 @@ internal class PostgreSauceStore : ISauceStore
private readonly string _tableName;

private readonly Dictionary<string, Type> _eventTypes;
private readonly Dictionary<string, Type> _aggregateTypes;

private readonly JsonSerializerOptions _options;

public PostgreSauceStore(
NpgsqlConnection connection,
string tableName,
Dictionary<string, Type> eventTypes,
Dictionary<string, Type> aggregateTypes,
JsonSerializerOptions options)
{
_connection = connection;
_tableName = tableName;
_eventTypes = eventTypes;
_aggregateTypes = aggregateTypes;
_options = options;
}

public async Task<IEnumerable<SaucyEvent<TAggregateId>>> ReadEvents<TAggregateId>(TAggregateId id) where TAggregateId : SaucyAggregateId
public async Task<IEnumerable<SaucyEvent<TAggregateId>>> ReadEvents<TAggregateId>(TAggregateId id)
{
try
{
var sql = GetCommand(PostgreSqlCommands.SelectEvent);

await using var command = new NpgsqlCommand(sql, _connection);

command.Parameters.AddWithValue("aggregate_id", id.Id);
command.Parameters.AddWithValue("aggregate_id_type", id.IdType);
command.Parameters.AddWithValue("aggregate_id", id);
command.Parameters.AddWithValue("aggregate_id_type", id.GetType().Name);

await using var reader = await command.ExecuteReaderAsync();

var result = new List<SaucyEvent<TAggregateId>>();

TAggregateId? aggregate = null;
TAggregateId? aggregate = default;

var eventTypeIndex = reader.GetOrdinal("EventType");
var eventDataIndex = reader.GetOrdinal("EventData");
var eventIdIndex = reader.GetOrdinal("EventId");
var createdIndex = reader.GetOrdinal("Created");
var aggregateVersionIndex = reader.GetOrdinal("AggregateVersion");
var aggregateIdIndex = reader.GetOrdinal("AggregateId");
var aggregateIdTypeNameIndex = reader.GetOrdinal("AggregateIdType");

while (reader.Read())
{
var eventTypeName = reader.GetSaucyString(eventTypeIndex);
var eventData = reader.GetSaucyString(eventDataIndex);
var eventId = reader.GetSaucyGuid(eventIdIndex);
var created = reader.GetSaucyDate(createdIndex);
var aggregateVersion = reader.GetSaucyLong(aggregateVersionIndex);

var eventType = _eventTypes[eventTypeName];

if (aggregate == null)
if (eventType.IsGenericType)
{
var aggregateId = reader.GetSaucyGuid(aggregateIdIndex);
var aggregateIdTypeName = reader.GetSaucyString(aggregateIdTypeNameIndex);
var aggregateIdType = _aggregateTypes[aggregateIdTypeName];
aggregate = (TAggregateId) Activator.CreateInstance(aggregateIdType, aggregateId)!;
eventType = eventType.MakeGenericType(typeof(TAggregateId));
}

var sourceEvent = (SaucyEvent<TAggregateId>) JsonSerializer.Deserialize(eventData, eventType, _options)!;
var sourceEvent = (SaucyEvent<TAggregateId>)JsonSerializer.Deserialize(eventData, eventType, _options)!;

result.Add(sourceEvent with
{
AggregateVersion = aggregateVersion,
AggregateId = aggregate,
Id = eventId,
AggregateId = id,
Created = created
});
}
Expand All @@ -93,7 +82,7 @@ public async Task<IEnumerable<SaucyEvent<TAggregateId>>> ReadEvents<TAggregateId
}
}

public async Task AppendEvent<TAggregateId>(SaucyEvent<TAggregateId> sourceEvent, SaucyAggregateId? performedBy) where TAggregateId : SaucyAggregateId
public async Task AppendEvent<TAggregateId>(SaucyEvent<TAggregateId> sourceEvent, object? performedBy)
{
try
{
Expand All @@ -105,14 +94,14 @@ public async Task AppendEvent<TAggregateId>(SaucyEvent<TAggregateId> sourceEvent

await using var command = new NpgsqlCommand(sql, _connection);

command.Parameters.AddWithValue("aggregate_id", sourceEvent.AggregateId?.Id ?? throw new ArgumentNullException(nameof(sourceEvent.AggregateId)));
command.Parameters.AddWithValue("aggregate_id_type", sourceEvent.AggregateId?.IdType ?? throw new ArgumentNullException(nameof(sourceEvent.AggregateId)));
command.Parameters.AddWithValue("aggregate_id", sourceEvent.AggregateId);
command.Parameters.AddWithValue("aggregate_id_type", typeof(TAggregateId).Name);
command.Parameters.AddWithValue("aggregate_version", sourceEvent.AggregateVersion);
command.Parameters.AddWithValue("created", sourceEvent.Created);
command.Parameters.AddWithValue("event_id", sourceEvent.Id);
command.Parameters.AddWithValue("event_id", Guid.NewGuid());
command.Parameters.AddWithValue("event_type", eventType);
command.Parameters.AddWithValue("event_data", NpgsqlDbType.Jsonb, eventData);
command.Parameters.AddWithValue("performed_by", performedBy?.Id ?? Guid.Empty);
command.Parameters.AddWithValue("performed_by", performedBy ?? Guid.Empty);

await command.ExecuteNonQueryAsync();
}
Expand Down
8 changes: 1 addition & 7 deletions EventSauce.Postgre/PostgreSauceStoreFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ public class PostgreSauceStoreFactory
private readonly string _tableName;

private readonly Dictionary<string, Type> _eventTypes = new();
private readonly Dictionary<string, Type> _aggregateTypes = new();

public PostgreSauceStoreFactory(
Assembly[] assemblies,
Expand Down Expand Up @@ -70,11 +69,6 @@ private void FindEvents()
{
_eventTypes.Add(type.Name, type);
}

if (type.IsSubclassOf(typeof(SaucyAggregateId)))
{
_aggregateTypes.Add(type.Name, type);
}
}
}
}
Expand All @@ -90,7 +84,7 @@ public ISauceStore Create()
{
var connection = CreateConnection();

return new PostgreSauceStore(connection, _tableName, _eventTypes, _aggregateTypes, _options);
return new PostgreSauceStore(connection, _tableName, _eventTypes, _options);
}
catch (Exception ex)
{
Expand Down
Loading

0 comments on commit 9663630

Please sign in to comment.