Skip to content

Commit

Permalink
Matrix-message types and cheque signing (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
evlekht committed Sep 29, 2024
1 parent f882b3e commit 720f34c
Show file tree
Hide file tree
Showing 10 changed files with 405 additions and 58 deletions.
25 changes: 13 additions & 12 deletions internal/matrix/matrix_compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
"github.com/chain4travel/camino-messenger-bot/internal/compression"
"github.com/chain4travel/camino-messenger-bot/internal/messaging"
"github.com/chain4travel/camino-messenger-bot/internal/metadata"
"github.com/chain4travel/camino-messenger-bot/pkg/matrix"
"maunium.net/go/mautrix/event"
)

var (
_ compression.Compressor[messaging.Message, []CaminoMatrixMessage] = (*ChunkingCompressor)(nil)
ErrCompressionProducedNoChunks = errors.New("compression produced no chunks")
ErrEncodingMsg = errors.New("error while encoding msg for compression")
_ compression.Compressor[messaging.Message, []matrix.CaminoMatrixMessage] = (*ChunkingCompressor)(nil)
ErrCompressionProducedNoChunks = errors.New("compression produced no chunks")
ErrEncodingMsg = errors.New("error while encoding msg for compression")
)

// ChunkingCompressor is a concrete implementation of Compressor with chunking functionality
Expand All @@ -27,8 +28,8 @@ type ChunkingCompressor struct {
}

// Compress implements the Compressor interface for ChunkingCompressor
func (c *ChunkingCompressor) Compress(msg messaging.Message) ([]CaminoMatrixMessage, error) {
var matrixMessages []CaminoMatrixMessage
func (c *ChunkingCompressor) Compress(msg messaging.Message) ([]matrix.CaminoMatrixMessage, error) {
var matrixMessages []matrix.CaminoMatrixMessage

// 1. CompressBytes the message
compressedContent, err := compress(msg)
Expand All @@ -42,7 +43,7 @@ func (c *ChunkingCompressor) Compress(msg messaging.Message) ([]CaminoMatrixMess
return matrixMessages, err
}

// 3. Create CaminoMatrixMessage objects for each chunk
// 3. Create matrix.CaminoMatrixMessage objects for each chunk
return splitCaminoMatrixMsg(msg, splitCompressedContent)
}

Expand All @@ -67,26 +68,26 @@ func compress(msg messaging.Message) ([]byte, error) {
return compression.CompressBytes(bytes), nil
}

func splitCaminoMatrixMsg(msg messaging.Message, splitCompressedContent [][]byte) ([]CaminoMatrixMessage, error) {
messages := make([]CaminoMatrixMessage, 0, len(splitCompressedContent))
func splitCaminoMatrixMsg(msg messaging.Message, splitCompressedContent [][]byte) ([]matrix.CaminoMatrixMessage, error) {
messages := make([]matrix.CaminoMatrixMessage, 0, len(splitCompressedContent))

// add first chunk to messages slice
{
caminoMatrixMsg := CaminoMatrixMessage{
caminoMatrixMsg := matrix.CaminoMatrixMessage{
MessageEventContent: event.MessageEventContent{MsgType: event.MessageType(msg.Type)},
Metadata: msg.Metadata,
}
caminoMatrixMsg.Metadata.NumberOfChunks = uint(len(splitCompressedContent))
caminoMatrixMsg.Metadata.NumberOfChunks = uint64(len(splitCompressedContent))
caminoMatrixMsg.Metadata.ChunkIndex = 0
caminoMatrixMsg.CompressedContent = splitCompressedContent[0]
messages = append(messages, caminoMatrixMsg)
}

// if multiple chunks were produced upon compression, add them to messages slice
for i, chunk := range splitCompressedContent[1:] {
messages = append(messages, CaminoMatrixMessage{
messages = append(messages, matrix.CaminoMatrixMessage{
MessageEventContent: event.MessageEventContent{MsgType: event.MessageType(msg.Type)},
Metadata: metadata.Metadata{RequestID: msg.Metadata.RequestID, NumberOfChunks: uint(len(splitCompressedContent)), ChunkIndex: uint(i + 1)},
Metadata: metadata.Metadata{RequestID: msg.Metadata.RequestID, NumberOfChunks: uint64(len(splitCompressedContent)), ChunkIndex: uint64(i + 1)},
CompressedContent: chunk,
})
}
Expand Down
9 changes: 5 additions & 4 deletions internal/matrix/matrix_compressor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
activityv1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/activity/v1"
"github.com/chain4travel/camino-messenger-bot/internal/messaging"
"github.com/chain4travel/camino-messenger-bot/internal/metadata"
"github.com/chain4travel/camino-messenger-bot/pkg/matrix"
"github.com/stretchr/testify/require"
"maunium.net/go/mautrix/event"
)
Expand All @@ -27,7 +28,7 @@ func TestChunkingCompressorCompress(t *testing.T) {
}
tests := map[string]struct {
args args
want []CaminoMatrixMessage
want []matrix.CaminoMatrixMessage
err error
}{
"err: unknown message type": {
Expand All @@ -54,7 +55,7 @@ func TestChunkingCompressorCompress(t *testing.T) {
},
maxSize: 100,
},
want: []CaminoMatrixMessage{
want: []matrix.CaminoMatrixMessage{
{
MessageEventContent: event.MessageEventContent{
MsgType: event.MessageType(messaging.ActivitySearchResponse),
Expand Down Expand Up @@ -95,7 +96,7 @@ func TestChunkingCompressorCompress(t *testing.T) {
},
maxSize: 23, // compressed size of msgType=ActivitySearchResponse and serviceCode="test"
},
want: []CaminoMatrixMessage{
want: []matrix.CaminoMatrixMessage{
{
MessageEventContent: event.MessageEventContent{
MsgType: event.MessageType(messaging.ActivitySearchResponse),
Expand Down Expand Up @@ -136,7 +137,7 @@ func TestChunkingCompressorCompress(t *testing.T) {
},
maxSize: 22, // < 23 = compressed size of msgType=ActivitySearchResponse and serviceCode="test"
},
want: []CaminoMatrixMessage{
want: []matrix.CaminoMatrixMessage{
{
MessageEventContent: event.MessageEventContent{
MsgType: event.MessageType(messaging.ActivitySearchResponse),
Expand Down
15 changes: 6 additions & 9 deletions internal/matrix/matrix_messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"

Expand All @@ -13,6 +12,7 @@ import (
"github.com/chain4travel/camino-messenger-bot/config"
"github.com/chain4travel/camino-messenger-bot/internal/compression"
"github.com/chain4travel/camino-messenger-bot/internal/messaging"
"github.com/chain4travel/camino-messenger-bot/pkg/matrix"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand All @@ -27,8 +27,6 @@ import (

var _ messaging.Messenger = (*messenger)(nil)

var C4TMessage = event.Type{Type: "m.room.c4t-msg", Class: event.MessageEventType}

type client struct {
*mautrix.Client
ctx context.Context
Expand All @@ -46,7 +44,7 @@ type messenger struct {
client client
roomHandler RoomHandler
msgAssembler MessageAssembler
compressor compression.Compressor[messaging.Message, []CaminoMatrixMessage]
compressor compression.Compressor[messaging.Message, []matrix.CaminoMatrixMessage]
}

func NewMessenger(cfg *config.MatrixConfig, logger *zap.SugaredLogger) messaging.Messenger {
Expand All @@ -72,10 +70,9 @@ func (m *messenger) Checkpoint() string {

func (m *messenger) StartReceiver() (string, error) {
syncer := m.client.Syncer.(*mautrix.DefaultSyncer)
event.TypeMap[C4TMessage] = reflect.TypeOf(CaminoMatrixMessage{}) // custom message event types have to be registered properly

syncer.OnEventType(C4TMessage, func(ctx context.Context, evt *event.Event) {
msg := evt.Content.Parsed.(*CaminoMatrixMessage)
syncer.OnEventType(matrix.EventTypeC4TMessage, func(ctx context.Context, evt *event.Event) {
msg := evt.Content.Parsed.(*matrix.CaminoMatrixMessage)
traceID, err := trace.TraceIDFromHex(msg.Metadata.RequestID)
if err != nil {
m.logger.Warnf("failed to parse traceID from hex [requestID:%s]: %v", msg.Metadata.RequestID, err)
Expand Down Expand Up @@ -189,10 +186,10 @@ func (m *messenger) SendAsync(ctx context.Context, msg messaging.Message) error
}
compressSpan.End()

return m.sendMessageEvents(ctx, roomID, C4TMessage, messages)
return m.sendMessageEvents(ctx, roomID, matrix.EventTypeC4TMessage, messages)
}

func (m *messenger) sendMessageEvents(ctx context.Context, roomID id.RoomID, eventType event.Type, messages []CaminoMatrixMessage) error {
func (m *messenger) sendMessageEvents(ctx context.Context, roomID id.RoomID, eventType event.Type, messages []matrix.CaminoMatrixMessage) error {
// TODO add retry logic?
for _, msg := range messages {
_, err := m.client.SendMessageEvent(ctx, roomID, eventType, msg)
Expand Down
21 changes: 10 additions & 11 deletions internal/matrix/msg_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"

"github.com/chain4travel/camino-messenger-bot/internal/compression"
"github.com/chain4travel/camino-messenger-bot/pkg/matrix"
)

var (
Expand All @@ -20,34 +21,32 @@ var (
)

type MessageAssembler interface {
AssembleMessage(msg *CaminoMatrixMessage) (assembledMsg *CaminoMatrixMessage, complete bool, err error) // returns assembled message and true if message is complete. Otherwise, it returns an empty message and false
AssembleMessage(msg *matrix.CaminoMatrixMessage) (assembledMsg *matrix.CaminoMatrixMessage, complete bool, err error) // returns assembled message and true if message is complete. Otherwise, it returns an empty message and false
}

type messageAssembler struct {
partialMessages map[string][]*CaminoMatrixMessage
partialMessages map[string][]*matrix.CaminoMatrixMessage
decompressor compression.Decompressor
mu sync.RWMutex
}

func NewMessageAssembler() MessageAssembler {
return &messageAssembler{decompressor: &compression.ZSTDDecompressor{}, partialMessages: make(map[string][]*CaminoMatrixMessage)}
return &messageAssembler{decompressor: &compression.ZSTDDecompressor{}, partialMessages: make(map[string][]*matrix.CaminoMatrixMessage)}
}

func (a *messageAssembler) AssembleMessage(msg *CaminoMatrixMessage) (*CaminoMatrixMessage, bool, error) {
func (a *messageAssembler) AssembleMessage(msg *matrix.CaminoMatrixMessage) (*matrix.CaminoMatrixMessage, bool, error) {
if msg.Metadata.NumberOfChunks == 1 {
decompressedCaminoMsg, err := a.assembleAndDecompressCaminoMatrixMessages([]*CaminoMatrixMessage{msg})
decompressedCaminoMsg, err := a.assembleAndDecompressCaminoMatrixMessages([]*matrix.CaminoMatrixMessage{msg})
return decompressedCaminoMsg, err == nil, err
}
a.mu.Lock()
defer a.mu.Unlock()
id := msg.Metadata.RequestID
if _, ok := a.partialMessages[id]; !ok {
a.partialMessages[id] = []*CaminoMatrixMessage{}
a.partialMessages[id] = []*matrix.CaminoMatrixMessage{}
}

a.partialMessages[id] = append(a.partialMessages[id], msg)
// TODO: I believe it's safe to assume the number of chunks will not overflow
// #nosec G115
if len(a.partialMessages[id]) == int(msg.Metadata.NumberOfChunks) {
decompressedCaminoMsg, err := a.assembleAndDecompressCaminoMatrixMessages(a.partialMessages[id])
delete(a.partialMessages, id)
Expand All @@ -56,11 +55,11 @@ func (a *messageAssembler) AssembleMessage(msg *CaminoMatrixMessage) (*CaminoMat
return nil, false, nil
}

func (a *messageAssembler) assembleAndDecompressCaminoMatrixMessages(messages []*CaminoMatrixMessage) (*CaminoMatrixMessage, error) {
func (a *messageAssembler) assembleAndDecompressCaminoMatrixMessages(messages []*matrix.CaminoMatrixMessage) (*matrix.CaminoMatrixMessage, error) {
compressedPayloads := make([][]byte, 0, len(messages))

// chunks have to be sorted
sort.Sort(ByChunkIndex(messages))
sort.Sort(matrix.ByChunkIndex(messages))
for _, msg := range messages {
compressedPayloads = append(compressedPayloads, msg.CompressedContent)
}
Expand All @@ -71,7 +70,7 @@ func (a *messageAssembler) assembleAndDecompressCaminoMatrixMessages(messages []
return nil, fmt.Errorf("%w: %w", ErrDecompressFailed, err)
}

msg := CaminoMatrixMessage{
msg := matrix.CaminoMatrixMessage{
MessageEventContent: messages[0].MessageEventContent,
Metadata: messages[0].Metadata,
}
Expand Down
31 changes: 16 additions & 15 deletions internal/matrix/msg_assembler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/chain4travel/camino-messenger-bot/internal/compression"
"github.com/chain4travel/camino-messenger-bot/internal/messaging"
"github.com/chain4travel/camino-messenger-bot/internal/metadata"
"github.com/chain4travel/camino-messenger-bot/pkg/matrix"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)
Expand All @@ -32,11 +33,11 @@ func TestAssembleMessage(t *testing.T) {
},
}
type fields struct {
partialMessages map[string][]*CaminoMatrixMessage
partialMessages map[string][]*matrix.CaminoMatrixMessage
}

type args struct {
msg *CaminoMatrixMessage
msg *matrix.CaminoMatrixMessage
}

// mocks
Expand All @@ -47,13 +48,13 @@ func TestAssembleMessage(t *testing.T) {
fields fields
args args
prepare func()
want *CaminoMatrixMessage
want *matrix.CaminoMatrixMessage
isComplete bool
err error
}{
"err: decoder failed to decompress": {
args: args{
msg: &CaminoMatrixMessage{
msg: &matrix.CaminoMatrixMessage{
Metadata: metadata.Metadata{
RequestID: "test",
NumberOfChunks: 1,
Expand All @@ -68,7 +69,7 @@ func TestAssembleMessage(t *testing.T) {
},
"err: unknown message type": {
args: args{
msg: &CaminoMatrixMessage{
msg: &matrix.CaminoMatrixMessage{
Metadata: metadata.Metadata{
RequestID: "test",
NumberOfChunks: 1,
Expand All @@ -83,20 +84,20 @@ func TestAssembleMessage(t *testing.T) {
},
"empty input": {
fields: fields{
partialMessages: map[string][]*CaminoMatrixMessage{},
partialMessages: map[string][]*matrix.CaminoMatrixMessage{},
},
args: args{
msg: &CaminoMatrixMessage{},
msg: &matrix.CaminoMatrixMessage{},
},
isComplete: false,
err: nil,
},
"partial message delivery [metadata number fo chunks do not match provided messages]": {
fields: fields{
partialMessages: map[string][]*CaminoMatrixMessage{},
partialMessages: map[string][]*matrix.CaminoMatrixMessage{},
},
args: args{
msg: &CaminoMatrixMessage{
msg: &matrix.CaminoMatrixMessage{
Metadata: metadata.Metadata{
RequestID: "test",
NumberOfChunks: 2,
Expand All @@ -108,10 +109,10 @@ func TestAssembleMessage(t *testing.T) {
},
"success: single chunk message": {
fields: fields{
partialMessages: map[string][]*CaminoMatrixMessage{},
partialMessages: map[string][]*matrix.CaminoMatrixMessage{},
},
args: args{
msg: &CaminoMatrixMessage{
msg: &matrix.CaminoMatrixMessage{
MessageEventContent: event.MessageEventContent{
MsgType: event.MessageType(messaging.ActivitySearchResponse),
},
Expand All @@ -127,7 +128,7 @@ func TestAssembleMessage(t *testing.T) {
require.NoError(t, err)
mockedDecompressor.EXPECT().Decompress(gomock.Any()).Times(1).Return(msgBytes, nil)
},
want: &CaminoMatrixMessage{
want: &matrix.CaminoMatrixMessage{
Metadata: metadata.Metadata{
RequestID: "id",
NumberOfChunks: 1,
Expand All @@ -142,14 +143,14 @@ func TestAssembleMessage(t *testing.T) {
},
"success: multi-chunk message": {
fields: fields{
partialMessages: map[string][]*CaminoMatrixMessage{"id": {
partialMessages: map[string][]*matrix.CaminoMatrixMessage{"id": {
// only 2 chunks because the last one is passed as the last argument triggering the call of AssembleMessage
// msgType is necessary only for 1st chunk
{MessageEventContent: event.MessageEventContent{MsgType: event.MessageType(messaging.ActivitySearchResponse)}}, {},
}},
},
args: args{
msg: &CaminoMatrixMessage{
msg: &matrix.CaminoMatrixMessage{
Metadata: metadata.Metadata{
RequestID: "id",
NumberOfChunks: 3,
Expand All @@ -162,7 +163,7 @@ func TestAssembleMessage(t *testing.T) {
require.NoError(t, err)
mockedDecompressor.EXPECT().Decompress(gomock.Any()).Times(1).Return(msgBytes, nil)
},
want: &CaminoMatrixMessage{
want: &matrix.CaminoMatrixMessage{
MessageEventContent: event.MessageEventContent{
MsgType: event.MessageType(messaging.ActivitySearchResponse),
},
Expand Down
15 changes: 8 additions & 7 deletions internal/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import (
"strings"
"time"

"github.com/chain4travel/camino-messenger-bot/pkg/cheques"
"google.golang.org/grpc/metadata"
)

type Metadata struct {
RequestID string `json:"request_id"`
Sender string `json:"sender"`
Recipient string `json:"recipient"`
Cheques []map[string]interface{} `json:"cheques"`
Timestamps map[string]int64 `json:"timestamps"` // map of checkpoints to timestamps in unix milliseconds
NumberOfChunks uint `json:"number_of_chunks"`
ChunkIndex uint `json:"chunk_index"`
RequestID string `json:"request_id"`
Sender string `json:"sender"`
Recipient string `json:"recipient"`
Cheques []cheques.SignedCheque `json:"cheques"`
Timestamps map[string]int64 `json:"timestamps"` // map of checkpoints to timestamps in unix milliseconds
NumberOfChunks uint64 `json:"number_of_chunks"`
ChunkIndex uint64 `json:"chunk_index"`

// Deprecated: this metadata serves only as a temp solution and should be removed and addressed on the protocol level
ProviderOperator string `json:"provider_operator"`
Expand Down
Loading

0 comments on commit 720f34c

Please sign in to comment.