Skip to content

Commit

Permalink
Add compressor tests + minor refactorings
Browse files Browse the repository at this point in the history
  • Loading branch information
Kleonikos Kyriakis authored and knikos committed Apr 23, 2024
1 parent c546a7e commit 0407b60
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 38 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/rs/zerolog v1.31.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.16.0
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0
golang.org/x/sync v0.5.0
google.golang.org/grpc v1.59.0
Expand Down Expand Up @@ -68,7 +69,6 @@ require (
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/supranational/blst v0.3.11 // indirect
github.com/tidwall/gjson v1.17.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
Expand Down
6 changes: 6 additions & 0 deletions internal/compression/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ const (

var encoder, _ = zstd.NewWriter(nil)

// Compressor interface defines basic compression functionality
type Compressor[T any, R any] interface {
// Compress takes a byte array as input and returns the compressed data as a byte array
Compress(data T) (R, error)
}

func Compress(src []byte) []byte {
return encoder.EncodeAll(src, make([]byte, 0, len(src)))
}
65 changes: 46 additions & 19 deletions internal/matrix/matrix_compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package matrix

import (
"errors"
"fmt"

"github.com/chain4travel/camino-messenger-bot/internal/compression"
Expand All @@ -14,29 +15,59 @@ import (
"maunium.net/go/mautrix/event"
)

func compressAndSplitCaminoMatrixMsg(msg messaging.Message) ([]CaminoMatrixMessage, error) {
var messages []CaminoMatrixMessage
var (
_ compression.Compressor[messaging.Message, []CaminoMatrixMessage] = (*MatrixChunkingCompressor)(nil)
ErrCompressionProducedNoChunks = errors.New("compression produced no chunks")
ErrEncodingMsg = errors.New("error while encoding msg for compression")
)

var (
bytes []byte
err error
)
switch msg.Type.Category() {
case messaging.Request,
messaging.Response:
bytes, err = msg.MarshalContent()
default:
return nil, fmt.Errorf("could not categorize unknown message type: %v", msg.Type)
// MatrixChunkingCompressor is a concrete implementation of Compressor with chunking functionality
type MatrixChunkingCompressor struct {
maxChunkSize int
}

// Compress implements the Compressor interface for MatrixChunkingCompressor
func (c *MatrixChunkingCompressor) Compress(msg messaging.Message) ([]CaminoMatrixMessage, error) {
var matrixMessages []CaminoMatrixMessage

// 1. Compress the message
compressedContent, err := compress(msg)
if err != nil {
return matrixMessages, err
}

// 2. Split the compressed content into chunks
splitCompressedContent, err := c.split(compressedContent)
if err != nil {
return nil, fmt.Errorf("error while encoding msg for compression: %v", err)
return matrixMessages, err
}

splitCompressedContent := compressAndSplit(bytes)
// 3. Create CaminoMatrixMessage objects for each chunk
return splitCaminoMatrixMsg(msg, splitCompressedContent)
}
func (c *MatrixChunkingCompressor) split(bytes []byte) ([][]byte, error) {
splitCompressedContent := splitByteArray(bytes, c.maxChunkSize)

if len(splitCompressedContent) == 0 {
return nil, fmt.Errorf("compression produced no chunks") // should never happen
return nil, ErrCompressionProducedNoChunks // should never happen
}
return splitCompressedContent, nil
}

func compress(msg messaging.Message) ([]byte, error) {
var (
bytes []byte
err error
)
bytes, err = msg.MarshalContent()
if err != nil {
return nil, fmt.Errorf("%w: %w", ErrEncodingMsg, err)
}
return compression.Compress(bytes), nil
}
func splitCaminoMatrixMsg(msg messaging.Message, splitCompressedContent [][]byte) ([]CaminoMatrixMessage, error) {
var messages []CaminoMatrixMessage

// add first chunk to messages slice
{
caminoMatrixMsg := CaminoMatrixMessage{
Expand All @@ -61,10 +92,6 @@ func compressAndSplitCaminoMatrixMsg(msg messaging.Message) ([]CaminoMatrixMessa
return messages, nil
}

func compressAndSplit(bytes []byte) [][]byte {
return splitByteArray(compression.Compress(bytes), compression.MaxChunkSize)
}

func splitByteArray(src []byte, maxSize int) [][]byte {
numChunks := (len(src) + maxSize - 1) / maxSize
result := make([][]byte, numChunks)
Expand Down
143 changes: 143 additions & 0 deletions internal/matrix/matrix_compressor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright (C) 2022-2023, Chain4Travel AG. All rights reserved.
* See the file LICENSE for licensing terms.
*/

package matrix

import (
activityv1alpha "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/activity/v1alpha"
"github.com/chain4travel/camino-messenger-bot/internal/messaging"
"github.com/chain4travel/camino-messenger-bot/internal/metadata"
"github.com/stretchr/testify/require"
"maunium.net/go/mautrix/event"
"testing"
)

func TestMatrixChunkingCompressorCompress(t *testing.T) {
type args struct {
msg messaging.Message
maxSize int
}
tests := map[string]struct {
args args
want []CaminoMatrixMessage
err error
}{
"err: unknown message type": {
args: args{msg: messaging.Message{Type: "Unknown"}, maxSize: 5},
err: messaging.ErrUnknownMessageType,
},
"err: empty message": {
args: args{msg: messaging.Message{Type: messaging.ActivitySearchResponse}, maxSize: 5},
err: ErrCompressionProducedNoChunks,
},
"success: small message compressed without chunking (input<maxSize)": {
args: args{
msg: messaging.Message{
Type: messaging.ActivitySearchResponse,
Content: messaging.MessageContent{
ResponseContent: messaging.ResponseContent{
ActivitySearchResponse: activityv1alpha.ActivitySearchResponse{
Results: []*activityv1alpha.ActivitySearchResult{
{Info: &activityv1alpha.Activity{ServiceCode: "test"}},
},
},
},
},
},
maxSize: 100,
},
want: []CaminoMatrixMessage{
{
MessageEventContent: event.MessageEventContent{
MsgType: event.MessageType(messaging.ActivitySearchResponse),
},
Metadata: metadata.Metadata{
NumberOfChunks: 1,
ChunkIndex: 0,
},
},
},
},
"success: small message compressed without chunking (input=maxSize)": {
args: args{
msg: messaging.Message{
Type: messaging.ActivitySearchResponse,
Content: messaging.MessageContent{
ResponseContent: messaging.ResponseContent{
ActivitySearchResponse: activityv1alpha.ActivitySearchResponse{
Results: []*activityv1alpha.ActivitySearchResult{
{Info: &activityv1alpha.Activity{ServiceCode: "test"}},
},
},
},
},
},
maxSize: 23, // compressed size of msgType=ActivitySearchResponse and serviceCode="test"
},
want: []CaminoMatrixMessage{
{
MessageEventContent: event.MessageEventContent{
MsgType: event.MessageType(messaging.ActivitySearchResponse),
},
Metadata: metadata.Metadata{
NumberOfChunks: 1,
ChunkIndex: 0,
},
},
},
},
"success: large message compressed with chunking (input>maxSize)": {
args: args{
msg: messaging.Message{
Type: messaging.ActivitySearchResponse,
Content: messaging.MessageContent{
ResponseContent: messaging.ResponseContent{
ActivitySearchResponse: activityv1alpha.ActivitySearchResponse{
Results: []*activityv1alpha.ActivitySearchResult{
{Info: &activityv1alpha.Activity{ServiceCode: "test"}},
},
},
},
},
},
maxSize: 22, // < 23 = compressed size of msgType=ActivitySearchResponse and serviceCode="test"
},
want: []CaminoMatrixMessage{
{
MessageEventContent: event.MessageEventContent{
MsgType: event.MessageType(messaging.ActivitySearchResponse),
},
Metadata: metadata.Metadata{
NumberOfChunks: 2,
ChunkIndex: 0,
},
},
{
MessageEventContent: event.MessageEventContent{
MsgType: event.MessageType(messaging.ActivitySearchResponse),
},
Metadata: metadata.Metadata{
NumberOfChunks: 2,
ChunkIndex: 1,
},
},
},
},
}
for tc, tt := range tests {
t.Run(tc, func(t *testing.T) {
c := &MatrixChunkingCompressor{tt.args.maxSize}
got, err := c.Compress(tt.args.msg)
require.ErrorIs(t, err, tt.err)
require.Equal(t, len(got), len(tt.want))
for i, msg := range got {
require.Equal(t, msg.MessageEventContent.MsgType, tt.want[i].MsgType)
require.Equal(t, msg.Metadata.NumberOfChunks, tt.want[i].Metadata.NumberOfChunks)
require.Equal(t, msg.Metadata.ChunkIndex, tt.want[i].Metadata.ChunkIndex)
require.NotNil(t, msg.CompressedContent)
}
})
}
}
5 changes: 4 additions & 1 deletion internal/matrix/matrix_messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/chain4travel/camino-messenger-bot/internal/compression"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -39,6 +40,7 @@ type messenger struct {
client client
roomHandler RoomHandler
msgAssembler MessageAssembler
compressor compression.Compressor[messaging.Message, []CaminoMatrixMessage]
}

func NewMessenger(cfg *config.MatrixConfig, logger *zap.SugaredLogger) *messenger {
Expand All @@ -53,6 +55,7 @@ func NewMessenger(cfg *config.MatrixConfig, logger *zap.SugaredLogger) *messenge
client: client{Client: c},
roomHandler: NewRoomHandler(c, logger),
msgAssembler: NewMessageAssembler(logger),
compressor: &MatrixChunkingCompressor{maxChunkSize: compression.MaxChunkSize},
}
}
func (m *messenger) Checkpoint() string {
Expand Down Expand Up @@ -159,7 +162,7 @@ func (m *messenger) SendAsync(_ context.Context, msg messaging.Message) error {
return err
}

messages, err := compressAndSplitCaminoMatrixMsg(msg)
messages, err := m.compressor.Compress(msg)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/matrix/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ func (m *CaminoMatrixMessage) UnmarshalContent(src []byte) error {
case messaging.TransportSearchResponse:
return proto.Unmarshal(src, &m.Content.ResponseContent.TransportSearchResponse)
default:
return messaging.ErrInvalidMessageType
return messaging.ErrUnknownMessageType
}
}
Loading

0 comments on commit 0407b60

Please sign in to comment.