Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cheque and matrix-message types and signing #28

Merged
merged 8 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions internal/matrix/matrix_compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
"github.com/chain4travel/camino-messenger-bot/internal/compression"
"github.com/chain4travel/camino-messenger-bot/internal/messaging"
"github.com/chain4travel/camino-messenger-bot/internal/metadata"
"github.com/chain4travel/camino-messenger-bot/pkg/matrix"
"maunium.net/go/mautrix/event"
)

var (
_ compression.Compressor[messaging.Message, []CaminoMatrixMessage] = (*ChunkingCompressor)(nil)
ErrCompressionProducedNoChunks = errors.New("compression produced no chunks")
ErrEncodingMsg = errors.New("error while encoding msg for compression")
_ compression.Compressor[messaging.Message, []matrix.CaminoMatrixMessage] = (*ChunkingCompressor)(nil)
ErrCompressionProducedNoChunks = errors.New("compression produced no chunks")
ErrEncodingMsg = errors.New("error while encoding msg for compression")
)

// ChunkingCompressor is a concrete implementation of Compressor with chunking functionality
Expand All @@ -27,8 +28,8 @@ type ChunkingCompressor struct {
}

// Compress implements the Compressor interface for ChunkingCompressor
func (c *ChunkingCompressor) Compress(msg messaging.Message) ([]CaminoMatrixMessage, error) {
var matrixMessages []CaminoMatrixMessage
func (c *ChunkingCompressor) Compress(msg messaging.Message) ([]matrix.CaminoMatrixMessage, error) {
var matrixMessages []matrix.CaminoMatrixMessage

// 1. CompressBytes the message
compressedContent, err := compress(msg)
Expand All @@ -42,7 +43,7 @@ func (c *ChunkingCompressor) Compress(msg messaging.Message) ([]CaminoMatrixMess
return matrixMessages, err
}

// 3. Create CaminoMatrixMessage objects for each chunk
// 3. Create matrix.CaminoMatrixMessage objects for each chunk
return splitCaminoMatrixMsg(msg, splitCompressedContent)
}

Expand All @@ -67,26 +68,26 @@ func compress(msg messaging.Message) ([]byte, error) {
return compression.CompressBytes(bytes), nil
}

func splitCaminoMatrixMsg(msg messaging.Message, splitCompressedContent [][]byte) ([]CaminoMatrixMessage, error) {
messages := make([]CaminoMatrixMessage, 0, len(splitCompressedContent))
func splitCaminoMatrixMsg(msg messaging.Message, splitCompressedContent [][]byte) ([]matrix.CaminoMatrixMessage, error) {
messages := make([]matrix.CaminoMatrixMessage, 0, len(splitCompressedContent))

// add first chunk to messages slice
{
caminoMatrixMsg := CaminoMatrixMessage{
caminoMatrixMsg := matrix.CaminoMatrixMessage{
MessageEventContent: event.MessageEventContent{MsgType: event.MessageType(msg.Type)},
Metadata: msg.Metadata,
}
caminoMatrixMsg.Metadata.NumberOfChunks = uint(len(splitCompressedContent))
caminoMatrixMsg.Metadata.NumberOfChunks = uint64(len(splitCompressedContent))
caminoMatrixMsg.Metadata.ChunkIndex = 0
caminoMatrixMsg.CompressedContent = splitCompressedContent[0]
messages = append(messages, caminoMatrixMsg)
}

// if multiple chunks were produced upon compression, add them to messages slice
for i, chunk := range splitCompressedContent[1:] {
messages = append(messages, CaminoMatrixMessage{
messages = append(messages, matrix.CaminoMatrixMessage{
MessageEventContent: event.MessageEventContent{MsgType: event.MessageType(msg.Type)},
Metadata: metadata.Metadata{RequestID: msg.Metadata.RequestID, NumberOfChunks: uint(len(splitCompressedContent)), ChunkIndex: uint(i + 1)},
Metadata: metadata.Metadata{RequestID: msg.Metadata.RequestID, NumberOfChunks: uint64(len(splitCompressedContent)), ChunkIndex: uint64(i + 1)},
CompressedContent: chunk,
})
}
Expand Down
9 changes: 5 additions & 4 deletions internal/matrix/matrix_compressor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
activityv1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/activity/v1"
"github.com/chain4travel/camino-messenger-bot/internal/messaging"
"github.com/chain4travel/camino-messenger-bot/internal/metadata"
"github.com/chain4travel/camino-messenger-bot/pkg/matrix"
"github.com/stretchr/testify/require"
"maunium.net/go/mautrix/event"
)
Expand All @@ -27,7 +28,7 @@ func TestChunkingCompressorCompress(t *testing.T) {
}
tests := map[string]struct {
args args
want []CaminoMatrixMessage
want []matrix.CaminoMatrixMessage
err error
}{
"err: unknown message type": {
Expand All @@ -54,7 +55,7 @@ func TestChunkingCompressorCompress(t *testing.T) {
},
maxSize: 100,
},
want: []CaminoMatrixMessage{
want: []matrix.CaminoMatrixMessage{
{
MessageEventContent: event.MessageEventContent{
MsgType: event.MessageType(messaging.ActivitySearchResponse),
Expand Down Expand Up @@ -95,7 +96,7 @@ func TestChunkingCompressorCompress(t *testing.T) {
},
maxSize: 23, // compressed size of msgType=ActivitySearchResponse and serviceCode="test"
},
want: []CaminoMatrixMessage{
want: []matrix.CaminoMatrixMessage{
{
MessageEventContent: event.MessageEventContent{
MsgType: event.MessageType(messaging.ActivitySearchResponse),
Expand Down Expand Up @@ -136,7 +137,7 @@ func TestChunkingCompressorCompress(t *testing.T) {
},
maxSize: 22, // < 23 = compressed size of msgType=ActivitySearchResponse and serviceCode="test"
},
want: []CaminoMatrixMessage{
want: []matrix.CaminoMatrixMessage{
{
MessageEventContent: event.MessageEventContent{
MsgType: event.MessageType(messaging.ActivitySearchResponse),
Expand Down
15 changes: 6 additions & 9 deletions internal/matrix/matrix_messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"

Expand All @@ -13,6 +12,7 @@ import (
"github.com/chain4travel/camino-messenger-bot/config"
"github.com/chain4travel/camino-messenger-bot/internal/compression"
"github.com/chain4travel/camino-messenger-bot/internal/messaging"
"github.com/chain4travel/camino-messenger-bot/pkg/matrix"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand All @@ -27,8 +27,6 @@ import (

var _ messaging.Messenger = (*messenger)(nil)

var C4TMessage = event.Type{Type: "m.room.c4t-msg", Class: event.MessageEventType}

type client struct {
*mautrix.Client
ctx context.Context
Expand All @@ -46,7 +44,7 @@ type messenger struct {
client client
roomHandler RoomHandler
msgAssembler MessageAssembler
compressor compression.Compressor[messaging.Message, []CaminoMatrixMessage]
compressor compression.Compressor[messaging.Message, []matrix.CaminoMatrixMessage]
}

func NewMessenger(cfg *config.MatrixConfig, logger *zap.SugaredLogger) messaging.Messenger {
Expand All @@ -72,10 +70,9 @@ func (m *messenger) Checkpoint() string {

func (m *messenger) StartReceiver() (string, error) {
syncer := m.client.Syncer.(*mautrix.DefaultSyncer)
event.TypeMap[C4TMessage] = reflect.TypeOf(CaminoMatrixMessage{}) // custom message event types have to be registered properly

syncer.OnEventType(C4TMessage, func(ctx context.Context, evt *event.Event) {
msg := evt.Content.Parsed.(*CaminoMatrixMessage)
syncer.OnEventType(matrix.EventTypeC4TMessage, func(ctx context.Context, evt *event.Event) {
msg := evt.Content.Parsed.(*matrix.CaminoMatrixMessage)
traceID, err := trace.TraceIDFromHex(msg.Metadata.RequestID)
if err != nil {
m.logger.Warnf("failed to parse traceID from hex [requestID:%s]: %v", msg.Metadata.RequestID, err)
Expand Down Expand Up @@ -189,10 +186,10 @@ func (m *messenger) SendAsync(ctx context.Context, msg messaging.Message) error
}
compressSpan.End()

return m.sendMessageEvents(ctx, roomID, C4TMessage, messages)
return m.sendMessageEvents(ctx, roomID, matrix.EventTypeC4TMessage, messages)
}

func (m *messenger) sendMessageEvents(ctx context.Context, roomID id.RoomID, eventType event.Type, messages []CaminoMatrixMessage) error {
func (m *messenger) sendMessageEvents(ctx context.Context, roomID id.RoomID, eventType event.Type, messages []matrix.CaminoMatrixMessage) error {
// TODO add retry logic?
for _, msg := range messages {
_, err := m.client.SendMessageEvent(ctx, roomID, eventType, msg)
Expand Down
21 changes: 10 additions & 11 deletions internal/matrix/msg_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"

"github.com/chain4travel/camino-messenger-bot/internal/compression"
"github.com/chain4travel/camino-messenger-bot/pkg/matrix"
)

var (
Expand All @@ -20,34 +21,32 @@ var (
)

type MessageAssembler interface {
AssembleMessage(msg *CaminoMatrixMessage) (assembledMsg *CaminoMatrixMessage, complete bool, err error) // returns assembled message and true if message is complete. Otherwise, it returns an empty message and false
AssembleMessage(msg *matrix.CaminoMatrixMessage) (assembledMsg *matrix.CaminoMatrixMessage, complete bool, err error) // returns assembled message and true if message is complete. Otherwise, it returns an empty message and false
}

type messageAssembler struct {
partialMessages map[string][]*CaminoMatrixMessage
partialMessages map[string][]*matrix.CaminoMatrixMessage
decompressor compression.Decompressor
mu sync.RWMutex
}

func NewMessageAssembler() MessageAssembler {
return &messageAssembler{decompressor: &compression.ZSTDDecompressor{}, partialMessages: make(map[string][]*CaminoMatrixMessage)}
return &messageAssembler{decompressor: &compression.ZSTDDecompressor{}, partialMessages: make(map[string][]*matrix.CaminoMatrixMessage)}
}

func (a *messageAssembler) AssembleMessage(msg *CaminoMatrixMessage) (*CaminoMatrixMessage, bool, error) {
func (a *messageAssembler) AssembleMessage(msg *matrix.CaminoMatrixMessage) (*matrix.CaminoMatrixMessage, bool, error) {
if msg.Metadata.NumberOfChunks == 1 {
decompressedCaminoMsg, err := a.assembleAndDecompressCaminoMatrixMessages([]*CaminoMatrixMessage{msg})
decompressedCaminoMsg, err := a.assembleAndDecompressCaminoMatrixMessages([]*matrix.CaminoMatrixMessage{msg})
return decompressedCaminoMsg, err == nil, err
}
a.mu.Lock()
defer a.mu.Unlock()
id := msg.Metadata.RequestID
if _, ok := a.partialMessages[id]; !ok {
a.partialMessages[id] = []*CaminoMatrixMessage{}
a.partialMessages[id] = []*matrix.CaminoMatrixMessage{}
}

a.partialMessages[id] = append(a.partialMessages[id], msg)
// TODO: I believe it's safe to assume the number of chunks will not overflow
// #nosec G115
if len(a.partialMessages[id]) == int(msg.Metadata.NumberOfChunks) {
decompressedCaminoMsg, err := a.assembleAndDecompressCaminoMatrixMessages(a.partialMessages[id])
delete(a.partialMessages, id)
Expand All @@ -56,11 +55,11 @@ func (a *messageAssembler) AssembleMessage(msg *CaminoMatrixMessage) (*CaminoMat
return nil, false, nil
}

func (a *messageAssembler) assembleAndDecompressCaminoMatrixMessages(messages []*CaminoMatrixMessage) (*CaminoMatrixMessage, error) {
func (a *messageAssembler) assembleAndDecompressCaminoMatrixMessages(messages []*matrix.CaminoMatrixMessage) (*matrix.CaminoMatrixMessage, error) {
compressedPayloads := make([][]byte, 0, len(messages))

// chunks have to be sorted
sort.Sort(ByChunkIndex(messages))
sort.Sort(matrix.ByChunkIndex(messages))
for _, msg := range messages {
compressedPayloads = append(compressedPayloads, msg.CompressedContent)
}
Expand All @@ -71,7 +70,7 @@ func (a *messageAssembler) assembleAndDecompressCaminoMatrixMessages(messages []
return nil, fmt.Errorf("%w: %w", ErrDecompressFailed, err)
}

msg := CaminoMatrixMessage{
msg := matrix.CaminoMatrixMessage{
MessageEventContent: messages[0].MessageEventContent,
Metadata: messages[0].Metadata,
}
Expand Down
31 changes: 16 additions & 15 deletions internal/matrix/msg_assembler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/chain4travel/camino-messenger-bot/internal/compression"
"github.com/chain4travel/camino-messenger-bot/internal/messaging"
"github.com/chain4travel/camino-messenger-bot/internal/metadata"
"github.com/chain4travel/camino-messenger-bot/pkg/matrix"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)
Expand All @@ -32,11 +33,11 @@ func TestAssembleMessage(t *testing.T) {
},
}
type fields struct {
partialMessages map[string][]*CaminoMatrixMessage
partialMessages map[string][]*matrix.CaminoMatrixMessage
}

type args struct {
msg *CaminoMatrixMessage
msg *matrix.CaminoMatrixMessage
}

// mocks
Expand All @@ -47,13 +48,13 @@ func TestAssembleMessage(t *testing.T) {
fields fields
args args
prepare func()
want *CaminoMatrixMessage
want *matrix.CaminoMatrixMessage
isComplete bool
err error
}{
"err: decoder failed to decompress": {
args: args{
msg: &CaminoMatrixMessage{
msg: &matrix.CaminoMatrixMessage{
Metadata: metadata.Metadata{
RequestID: "test",
NumberOfChunks: 1,
Expand All @@ -68,7 +69,7 @@ func TestAssembleMessage(t *testing.T) {
},
"err: unknown message type": {
args: args{
msg: &CaminoMatrixMessage{
msg: &matrix.CaminoMatrixMessage{
Metadata: metadata.Metadata{
RequestID: "test",
NumberOfChunks: 1,
Expand All @@ -83,20 +84,20 @@ func TestAssembleMessage(t *testing.T) {
},
"empty input": {
fields: fields{
partialMessages: map[string][]*CaminoMatrixMessage{},
partialMessages: map[string][]*matrix.CaminoMatrixMessage{},
},
args: args{
msg: &CaminoMatrixMessage{},
msg: &matrix.CaminoMatrixMessage{},
},
isComplete: false,
err: nil,
},
"partial message delivery [metadata number fo chunks do not match provided messages]": {
fields: fields{
partialMessages: map[string][]*CaminoMatrixMessage{},
partialMessages: map[string][]*matrix.CaminoMatrixMessage{},
},
args: args{
msg: &CaminoMatrixMessage{
msg: &matrix.CaminoMatrixMessage{
Metadata: metadata.Metadata{
RequestID: "test",
NumberOfChunks: 2,
Expand All @@ -108,10 +109,10 @@ func TestAssembleMessage(t *testing.T) {
},
"success: single chunk message": {
fields: fields{
partialMessages: map[string][]*CaminoMatrixMessage{},
partialMessages: map[string][]*matrix.CaminoMatrixMessage{},
},
args: args{
msg: &CaminoMatrixMessage{
msg: &matrix.CaminoMatrixMessage{
MessageEventContent: event.MessageEventContent{
MsgType: event.MessageType(messaging.ActivitySearchResponse),
},
Expand All @@ -127,7 +128,7 @@ func TestAssembleMessage(t *testing.T) {
require.NoError(t, err)
mockedDecompressor.EXPECT().Decompress(gomock.Any()).Times(1).Return(msgBytes, nil)
},
want: &CaminoMatrixMessage{
want: &matrix.CaminoMatrixMessage{
Metadata: metadata.Metadata{
RequestID: "id",
NumberOfChunks: 1,
Expand All @@ -142,14 +143,14 @@ func TestAssembleMessage(t *testing.T) {
},
"success: multi-chunk message": {
fields: fields{
partialMessages: map[string][]*CaminoMatrixMessage{"id": {
partialMessages: map[string][]*matrix.CaminoMatrixMessage{"id": {
// only 2 chunks because the last one is passed as the last argument triggering the call of AssembleMessage
// msgType is necessary only for 1st chunk
{MessageEventContent: event.MessageEventContent{MsgType: event.MessageType(messaging.ActivitySearchResponse)}}, {},
}},
},
args: args{
msg: &CaminoMatrixMessage{
msg: &matrix.CaminoMatrixMessage{
Metadata: metadata.Metadata{
RequestID: "id",
NumberOfChunks: 3,
Expand All @@ -162,7 +163,7 @@ func TestAssembleMessage(t *testing.T) {
require.NoError(t, err)
mockedDecompressor.EXPECT().Decompress(gomock.Any()).Times(1).Return(msgBytes, nil)
},
want: &CaminoMatrixMessage{
want: &matrix.CaminoMatrixMessage{
MessageEventContent: event.MessageEventContent{
MsgType: event.MessageType(messaging.ActivitySearchResponse),
},
Expand Down
15 changes: 8 additions & 7 deletions internal/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import (
"strings"
"time"

"github.com/chain4travel/camino-messenger-bot/pkg/cheques"
"google.golang.org/grpc/metadata"
)

type Metadata struct {
RequestID string `json:"request_id"`
Sender string `json:"sender"`
Recipient string `json:"recipient"`
Cheques []map[string]interface{} `json:"cheques"`
Timestamps map[string]int64 `json:"timestamps"` // map of checkpoints to timestamps in unix milliseconds
NumberOfChunks uint `json:"number_of_chunks"`
ChunkIndex uint `json:"chunk_index"`
RequestID string `json:"request_id"`
Sender string `json:"sender"`
Recipient string `json:"recipient"`
Cheques []cheques.SignedCheque `json:"cheques"`
Timestamps map[string]int64 `json:"timestamps"` // map of checkpoints to timestamps in unix milliseconds
NumberOfChunks uint64 `json:"number_of_chunks"`
ChunkIndex uint64 `json:"chunk_index"`

// Deprecated: this metadata serves only as a temp solution and should be removed and addressed on the protocol level
ProviderOperator string `json:"provider_operator"`
Expand Down
Loading