Skip to content

Commit

Permalink
feat: remove id from SetRecordRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
hydra-yse committed Oct 21, 2024
1 parent 5c470c2 commit 5630459
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 375 deletions.
2 changes: 1 addition & 1 deletion middleware/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func UnaryAuth(config *config.Config) grpc.UnaryServerInterceptor {
var signature string
setRecordReq, ok := req.(*proto.SetRecordRequest)
if ok {
toVerify = fmt.Sprintf("%v-%v-%x-%v", setRecordReq.Record.Id, setRecordReq.Record.Version, setRecordReq.Record.Data, setRecordReq.RequestTime)
toVerify = fmt.Sprintf("%x-%v-%v", setRecordReq.RecordData, setRecordReq.RecordVersion, setRecordReq.RequestTime)
signature = setRecordReq.Signature
}

Expand Down
345 changes: 80 additions & 265 deletions proto/sync.pb.go

Large diffs are not rendered by default.

12 changes: 4 additions & 8 deletions proto/sync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@ message Record {
}

message SetRecordRequest {
Record record = 1;
uint32 request_time = 2;
string signature = 3;
}
enum SetRecordStatus {
SUCCESS = 0;
CONFLICT = 1;
bytes record_data = 1;
float record_version = 2;
uint32 request_time = 3;
string signature = 4;
}
message SetRecordReply {
SetRecordStatus status = 1;
int64 new_id = 2;
}

Expand Down
23 changes: 2 additions & 21 deletions store/sync_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package store
import (
"context"
"database/sql"
"errors"
"fmt"
"os"
"path"
Expand All @@ -17,15 +16,13 @@ import (
_ "github.com/mattn/go-sqlite3"
)

var ErrSetConflict = errors.New("set conflict")

type StoredRecord struct {
Id int64
Version float32
Data []byte
}
type SyncStorage interface {
SetRecord(ctx context.Context, userRecordId int64, version float32, data []byte) (int64, error)
SetRecord(ctx context.Context, version float32, data []byte) (int64, error)
ListChanges(ctx context.Context, fromId int64) ([]StoredRecord, error)
}

Expand Down Expand Up @@ -61,23 +58,7 @@ func Connect(file string) (*SQLiteSyncStorage, error) {
return &SQLiteSyncStorage{db: db}, nil
}

func (s *SQLiteSyncStorage) SetRecord(ctx context.Context, userRecordId int64, version float32, data []byte) (int64, error) {
var latestRecordId int64
err := s.db.QueryRow(`
SELECT id
FROM RECORDS
ORDER BY id DESC
LIMIT 1
`).Scan(&latestRecordId)
if err != sql.ErrNoRows {
if err != nil {
return 0, fmt.Errorf("failed to currenet version %w", err)
}
if latestRecordId+1 != userRecordId {
return 0, ErrSetConflict
}
}

func (s *SQLiteSyncStorage) SetRecord(ctx context.Context, version float32, data []byte) (int64, error) {
res, err := s.db.Exec("INSERT INTO RECORDS (version, data) VALUES (?, ?)", version, data)
if err != nil {
return 0, fmt.Errorf("failed to insert record: %w", err)
Expand Down
27 changes: 5 additions & 22 deletions store/sync_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,19 @@ func TestAddRecords(t *testing.T) {
require.NoError(t, err, "failed to connect")

records := []StoredRecord{
{Id: 0, Version: 0.1, Data: []byte("{}")},
{Id: 1, Version: 0.1, Data: []byte("{}")},
{Id: 2, Version: 0.1, Data: []byte("{}")},
}

newId, err := storage.SetRecord(context.Background(), records[0].Id, records[0].Version, records[0].Data)
newId, err := storage.SetRecord(context.Background(), records[0].Version, records[0].Data)
require.NoError(t, err, "failed to call SetRecord 0")
require.Equal(t, newId, int64(0))
require.Equal(t, newId, records[0].Id)

newId, err = storage.SetRecord(context.Background(), records[1].Id, records[1].Version, records[1].Data)
newId, err = storage.SetRecord(context.Background(), records[1].Version, records[1].Data)
require.NoError(t, err, "failed to call SetRecord 1")
require.Equal(t, newId, int64(1))
require.Equal(t, newId, records[1].Id)

fetchedRecords, err := storage.ListChanges(context.Background(), 0)
require.NoError(t, err, "failed to call list changes")
require.Equal(t, fetchedRecords, records)
}

func TestConflict(t *testing.T) {
storage, err := Connect("file:testconflict?mode=memory&cache=shared")
require.NoError(t, err, "failed to connect")

record := StoredRecord{
Id: 0, Version: 0.1, Data: []byte("{}"),
}

newId, err := storage.SetRecord(context.Background(), record.Id, record.Version, record.Data)
require.NoError(t, err, "failed to call SetRecord 0")
require.Equal(t, newId, int64(0))

newId, err = storage.SetRecord(context.Background(), record.Id, record.Version, record.Data)
require.Error(t, err, "Second call of SetRecord 0 is supposed to fail due to set conflict")
require.Equal(t, err, ErrSetConflict)
}
56 changes: 9 additions & 47 deletions sync_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,30 +57,15 @@ func TestSyncService(t *testing.T) {
if setRecordRequest, ok := testCase.request.(*proto.SetRecordRequest); ok {
testSetRecord(t, privateKey, client1, setRecordRequest, testCase)

if testCase.reply.(*proto.SetRecordReply).Status != proto.SetRecordStatus_SUCCESS {
continue
}

// Test that the expected value matches the one received from the stream
record1, err := changes_stream1.Recv()
require.NoError(t, err, "failed to receive changes")

received_json, err := json.Marshal(record1)
require.NoError(t, err, "failed to serialize received record")

expected_json, err := json.Marshal(testCase.request.(*proto.SetRecordRequest).Record)
require.NoError(t, err, "failed to serialize expected record")

require.Equal(t, received_json, expected_json)
require.Equal(t, record1.Data, testCase.request.(*proto.SetRecordRequest).RecordData)

// Test that the second client also received a valid value
record2, err := changes_stream2.Recv()
require.NoError(t, err, "failed to receive changes")

received_json, err = json.Marshal(record2)
require.NoError(t, err, "failed to serialize received record")

require.Equal(t, received_json, expected_json)
require.Equal(t, record2.Data, testCase.request.(*proto.SetRecordRequest).RecordData)
}
if listChangesRequest, ok := testCase.request.(*proto.ListChangesRequest); ok {
testListChanges(t, privateKey, client1, listChangesRequest, testCase)
Expand All @@ -106,46 +91,23 @@ func testCases() []testCase {
{
name: "first record insert",
request: &proto.SetRecordRequest{
Record: &proto.Record{
Id: 1,
Version: 0.1,
Data: []byte("{}"),
},
RecordData: []byte("{}"),
RecordVersion: 0.1,
},
reply: &proto.SetRecordReply{
Status: proto.SetRecordStatus_SUCCESS,
NewId: 1,
NewId: 1,
},
},

// set second record
{
name: "second record insert",
request: &proto.SetRecordRequest{
Record: &proto.Record{
Id: 2,
Version: 0.1,
Data: []byte("{}"),
},
},
reply: &proto.SetRecordReply{
Status: proto.SetRecordStatus_SUCCESS,
NewId: 2,
},
},

// test conflict
{
name: "test conflict",
request: &proto.SetRecordRequest{
Record: &proto.Record{
Id: 1,
Version: 0.1,
Data: []byte("{}"),
},
RecordData: []byte("{}"),
RecordVersion: 0.1,
},
reply: &proto.SetRecordReply{
Status: proto.SetRecordStatus_CONFLICT,
NewId: 2,
},
},

Expand Down Expand Up @@ -184,7 +146,7 @@ func listenChanges(t *testing.T, privateKey *btcec.PrivateKey, client proto.Sync

func testSetRecord(t *testing.T, privateKey *btcec.PrivateKey, client proto.SyncerClient, request *proto.SetRecordRequest, test testCase) {
requestTime := time.Now().Unix()
toSign := fmt.Sprintf("%v-%v-%x-%v", request.Record.Id, request.Record.Version, request.Record.Data, requestTime)
toSign := fmt.Sprintf("%x-%v-%v", request.RecordData, request.RecordVersion, requestTime)
signature, err := middleware.SignMessage(privateKey, []byte(toSign))
require.NoError(t, err, "failed to sign message")
request.RequestTime = uint32(requestTime)
Expand Down
19 changes: 8 additions & 11 deletions syncer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,10 @@ type PersistentSyncerServer struct {
func (s *PersistentSyncerServer) SetRecord(c context.Context, msg *proto.SetRecordRequest) (*proto.SetRecordReply, error) {
newId, err := c.Value(middleware.USER_DB_CONTEXT_KEY).(*store.SQLiteSyncStorage).SetRecord(
c,
msg.Record.Id,
msg.Record.Version,
msg.Record.Data,
msg.RecordVersion,
msg.RecordData,
)
if err != nil {
if err == store.ErrSetConflict {
return &proto.SetRecordReply{
Status: proto.SetRecordStatus_CONFLICT,
}, nil
}
return nil, err
}

Expand All @@ -48,11 +42,14 @@ func (s *PersistentSyncerServer) SetRecord(c context.Context, msg *proto.SetReco
if _, exists := s.users[pubkey]; !exists {
addUser(s, pubkey)
}
s.users[pubkey].records_channel <- msg.Record
s.users[pubkey].records_channel <- &proto.Record{
Id: newId,
Data: msg.RecordData,
Version: msg.RecordVersion,
}

return &proto.SetRecordReply{
Status: proto.SetRecordStatus_SUCCESS,
NewId: newId,
NewId: newId,
}, nil
}

Expand Down

0 comments on commit 5630459

Please sign in to comment.