Skip to content

Commit

Permalink
Merge pull request #325 from perun-network/explicit-encoding
Browse files Browse the repository at this point in the history
Initialize wire encoding explicitly
  • Loading branch information
matthiasgeihs authored Feb 18, 2022
2 parents d698d08 + 0dda64c commit 8e60c9f
Show file tree
Hide file tree
Showing 30 changed files with 148 additions and 108 deletions.
1 change: 0 additions & 1 deletion client/proposalmsgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"perun.network/go-perun/channel/test"
"perun.network/go-perun/client"
clienttest "perun.network/go-perun/client/test"
_ "perun.network/go-perun/wire/perunio/serializer" // wire serialzer init
peruniotest "perun.network/go-perun/wire/perunio/test"
pkgtest "polycry.pt/poly-go/test"
)
Expand Down
1 change: 0 additions & 1 deletion wire/controlmsgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package wire_test
import (
"testing"

_ "perun.network/go-perun/wire/perunio/serializer" // wire serialzer init
peruniotest "perun.network/go-perun/wire/perunio/test"
wiretest "perun.network/go-perun/wire/test"
)
Expand Down
23 changes: 0 additions & 23 deletions wire/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,6 @@ import (
"perun.network/go-perun/wire/perunio"
)

var envelopeSerializer EnvelopeSerializer

// SetEnvelopeSerializer sets the global envelope serializer instance. Must not
// be called directly but through importing the needed backend.
func SetEnvelopeSerializer(e EnvelopeSerializer) {
if envelopeSerializer != nil {
panic("envelope serializer already set")
}
envelopeSerializer = e
}

type (
// Msg is the top-level abstraction for all messages sent between Perun
// nodes.
Expand All @@ -59,18 +48,6 @@ type (
}
)

// EncodeEnvelope serializes the envelope into the writer, using the global
// envelope serialzer instance.
func EncodeEnvelope(w io.Writer, env *Envelope) error {
return envelopeSerializer.Encode(w, env)
}

// DecodeEnvelope deserializes an envelope from the reader, using the global
// envelope serialzer instance.
func DecodeEnvelope(r io.Reader) (*Envelope, error) {
return envelopeSerializer.Decode(r)
}

// EncodeMsg encodes a message into an io.Writer. It also encodes the message
// type whereas the Msg.Encode implementation is assumed not to write the type.
func EncodeMsg(msg Msg, w io.Writer) error {
Expand Down
4 changes: 2 additions & 2 deletions wire/net/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ const (

// NewBus creates a new network bus. The dialer and listener are used to
// establish new connections internally, while id is this node's identity.
func NewBus(id wire.Account, d Dialer) *Bus {
func NewBus(id wire.Account, d Dialer, s wire.EnvelopeSerializer) *Bus {
b := &Bus{
mainRecv: wire.NewReceiver(),
recvs: make(map[wallet.AddrKey]wire.Consumer),
}

onNewEndpoint := func(wire.Address) wire.Consumer { return b.mainRecv }
b.reg = NewEndpointRegistry(id, onNewEndpoint, d)
b.reg = NewEndpointRegistry(id, onNewEndpoint, d, s)
go b.dispatchMsgs()

return b
Expand Down
3 changes: 2 additions & 1 deletion wire/net/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"perun.network/go-perun/wire"
"perun.network/go-perun/wire/net"
nettest "perun.network/go-perun/wire/net/test"
perunio "perun.network/go-perun/wire/perunio/serializer"
wiretest "perun.network/go-perun/wire/test"
)

Expand All @@ -32,7 +33,7 @@ func TestBus(t *testing.T) {
var hub nettest.ConnHub

wiretest.GenericBusTest(t, func(acc wire.Account) wire.Bus {
bus := net.NewBus(acc, hub.NewNetDialer())
bus := net.NewBus(acc, hub.NewNetDialer(), perunio.Serializer())
hub.OnClose(func() { bus.Close() })
go bus.Listen(hub.NewNetListener(acc.Address()))
return bus
Expand Down
10 changes: 6 additions & 4 deletions wire/net/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
// Dialer is an interface that allows creating a connection to a peer via its
// Perun address. The established connections are not authenticated yet.
type Dialer interface {
// Dial creates a connection to a peer.
// The passed context is used to abort the dialing process. The returned
// connection might not belong to the requested address.
// Dial creates a connection to a peer. The passed context is used to abort
// the dialing process. The returned connection might not belong to the
// requested address.
//
// `ser` is used for message serialization.
//
// Dial needs to be reentrant, and concurrent calls to Close() must abort
// any ongoing Dial() calls.
Dial(ctx context.Context, addr wire.Address) (Conn, error)
Dial(ctx context.Context, addr wire.Address, ser wire.EnvelopeSerializer) (Conn, error)
// Close aborts any ongoing calls to Dial().
//
// Close() needs to be reentrant, and repeated calls to Close() need to
Expand Down
5 changes: 3 additions & 2 deletions wire/net/endpoint_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
_ "perun.network/go-perun/backend/sim" // backend init
wallettest "perun.network/go-perun/wallet/test"
"perun.network/go-perun/wire"
perunio "perun.network/go-perun/wire/perunio/serializer"
wiretest "perun.network/go-perun/wire/test"
"polycry.pt/poly-go/test"
)
Expand All @@ -53,7 +54,7 @@ func makeSetup(rng *rand.Rand) *setup {
}

// Dial simulates creating a connection to a.
func (s *setup) Dial(ctx context.Context, addr wire.Address) (Conn, error) {
func (s *setup) Dial(ctx context.Context, addr wire.Address, _ wire.EnvelopeSerializer) (Conn, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()

Expand Down Expand Up @@ -99,7 +100,7 @@ func makeClient(conn Conn, rng *rand.Rand, dialer Dialer) *client {
receiver := wire.NewReceiver()
registry := NewEndpointRegistry(wallettest.NewRandomAccount(rng), func(wire.Address) wire.Consumer {
return receiver
}, dialer)
}, dialer, perunio.Serializer())

return &client{
endpoint: registry.addEndpoint(wallettest.NewRandomAddress(rng), conn, true),
Expand Down
13 changes: 10 additions & 3 deletions wire/net/endpoint_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type EndpointRegistry struct {
id wire.Account // The identity of the node.
dialer Dialer // Used for dialing peers.
onNewEndpoint func(wire.Address) wire.Consumer // Selects Consumer for new Endpoints' receive loop.
ser wire.EnvelopeSerializer

endpoints map[wallet.AddrKey]*fullEndpoint // The list of all of all established Endpoints.
dialing map[wallet.AddrKey]*dialingEndpoint
Expand All @@ -81,11 +82,17 @@ const exchangeAddrsTimeout = 10 * time.Second
// NewEndpointRegistry creates a new registry.
// The provided callback is used to set up new peer's subscriptions and it is
// called before the peer starts receiving messages.
func NewEndpointRegistry(id wire.Account, onNewEndpoint func(wire.Address) wire.Consumer, dialer Dialer) *EndpointRegistry {
func NewEndpointRegistry(
id wire.Account,
onNewEndpoint func(wire.Address) wire.Consumer,
dialer Dialer,
ser wire.EnvelopeSerializer,
) *EndpointRegistry {
return &EndpointRegistry{
id: id,
onNewEndpoint: onNewEndpoint,
dialer: dialer,
ser: ser,

endpoints: make(map[wallet.AddrKey]*fullEndpoint),
dialing: make(map[wallet.AddrKey]*dialingEndpoint),
Expand Down Expand Up @@ -138,7 +145,7 @@ func (r *EndpointRegistry) Listen(listener Listener) {
// Start listener and accept all incoming peer connections, writing them to
// the registry.
for {
conn, err := listener.Accept()
conn, err := listener.Accept(r.ser)
if err != nil {
r.Log().Debugf("EndpointRegistry.Listen: Accept() loop: %v", err)
return
Expand Down Expand Up @@ -240,7 +247,7 @@ func (r *EndpointRegistry) authenticatedDial(
close(de.created)
}()

conn, err := r.dialer.Dial(ctx, addr)
conn, err := r.dialer.Dial(ctx, addr, r.ser)
if err != nil {
return nil, errors.WithMessage(err, "failed to dial")
}
Expand Down
9 changes: 5 additions & 4 deletions wire/net/endpoint_registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"perun.network/go-perun/wire"
"perun.network/go-perun/wire/net"
nettest "perun.network/go-perun/wire/net/test"
perunio "perun.network/go-perun/wire/perunio/serializer"
ctxtest "polycry.pt/poly-go/context/test"
"polycry.pt/poly-go/sync"
"polycry.pt/poly-go/test"
Expand All @@ -43,8 +44,8 @@ func TestEndpointRegistry_Get_Pair(t *testing.T) {
var hub nettest.ConnHub
dialerID := wallettest.NewRandomAccount(rng)
listenerID := wallettest.NewRandomAccount(rng)
dialerReg := net.NewEndpointRegistry(dialerID, nilConsumer, hub.NewNetDialer())
listenerReg := net.NewEndpointRegistry(listenerID, nilConsumer, nil)
dialerReg := net.NewEndpointRegistry(dialerID, nilConsumer, hub.NewNetDialer(), perunio.Serializer())
listenerReg := net.NewEndpointRegistry(listenerID, nilConsumer, nil, perunio.Serializer())
listener := hub.NewNetListener(listenerID.Address())

done := make(chan struct{})
Expand Down Expand Up @@ -88,8 +89,8 @@ func TestEndpointRegistry_Get_Multiple(t *testing.T) {
t.Logf("subscribing %s\n", addr)
return nil
}
dialerReg := net.NewEndpointRegistry(dialerID, logPeer, dialer)
listenerReg := net.NewEndpointRegistry(listenerID, logPeer, nil)
dialerReg := net.NewEndpointRegistry(dialerID, logPeer, dialer, perunio.Serializer())
listenerReg := net.NewEndpointRegistry(listenerID, logPeer, nil, perunio.Serializer())
listener := hub.NewNetListener(listenerID.Address())

done := make(chan struct{})
Expand Down
47 changes: 32 additions & 15 deletions wire/net/endpoint_registry_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"perun.network/go-perun/wallet"
wallettest "perun.network/go-perun/wallet/test"
"perun.network/go-perun/wire"
perunio "perun.network/go-perun/wire/perunio/serializer"
wiretest "perun.network/go-perun/wire/test"
ctxtest "polycry.pt/poly-go/context/test"
"polycry.pt/poly-go/sync/atomic"
Expand All @@ -52,7 +53,7 @@ func (d *mockDialer) Close() error {
return nil
}

func (d *mockDialer) Dial(ctx context.Context, addr wire.Address) (Conn, error) {
func (d *mockDialer) Dial(ctx context.Context, addr wire.Address, _ wire.EnvelopeSerializer) (Conn, error) {
d.mutex.Lock()
defer d.mutex.Unlock()

Expand Down Expand Up @@ -85,8 +86,8 @@ type mockListener struct {
dialer mockDialer
}

func (l *mockListener) Accept() (Conn, error) {
return l.dialer.Dial(context.Background(), nil)
func (l *mockListener) Accept(ser wire.EnvelopeSerializer) (Conn, error) {
return l.dialer.Dial(context.Background(), nil, ser)
}

func (l *mockListener) Close() error {
Expand Down Expand Up @@ -123,7 +124,7 @@ func TestRegistry_Get(t *testing.T) {
t.Parallel()

dialer := newMockDialer()
r := NewEndpointRegistry(id, nilConsumer, dialer)
r := NewEndpointRegistry(id, nilConsumer, dialer, perunio.Serializer())
existing := newEndpoint(peerAddr, newMockConn())

r.endpoints[wallet.Key(peerAddr)] = newFullEndpoint(existing)
Expand All @@ -138,7 +139,7 @@ func TestRegistry_Get(t *testing.T) {
t.Parallel()

dialer := newMockDialer()
r := NewEndpointRegistry(id, nilConsumer, dialer)
r := NewEndpointRegistry(id, nilConsumer, dialer, perunio.Serializer())

dialer.Close()
ctxtest.AssertTerminates(t, timeout, func() {
Expand All @@ -154,7 +155,7 @@ func TestRegistry_Get(t *testing.T) {
t.Parallel()

dialer := newMockDialer()
r := NewEndpointRegistry(id, nilConsumer, dialer)
r := NewEndpointRegistry(id, nilConsumer, dialer, perunio.Serializer())

ct := test.NewConcurrent(t)
a, b := newPipeConnPair()
Expand Down Expand Up @@ -182,7 +183,7 @@ func TestRegistry_authenticatedDial(t *testing.T) {
rng := test.Prng(t)
id := wallettest.NewRandomAccount(rng)
d := &mockDialer{dial: make(chan Conn)}
r := NewEndpointRegistry(id, nilConsumer, d)
r := NewEndpointRegistry(id, nilConsumer, d, perunio.Serializer())

remoteID := wallettest.NewRandomAccount(rng)
remoteAddr := remoteID.Address()
Expand Down Expand Up @@ -267,7 +268,7 @@ func TestRegistry_setupConn(t *testing.T) {

t.Run("ExchangeAddrs fail", func(t *testing.T) {
d := &mockDialer{dial: make(chan Conn)}
r := NewEndpointRegistry(id, nilConsumer, d)
r := NewEndpointRegistry(id, nilConsumer, d, perunio.Serializer())
a, b := newPipeConnPair()
go func() {
err := b.Send(&wire.Envelope{
Expand All @@ -286,7 +287,7 @@ func TestRegistry_setupConn(t *testing.T) {

t.Run("ExchangeAddrs success (peer already exists)", func(t *testing.T) {
d := &mockDialer{dial: make(chan Conn)}
r := NewEndpointRegistry(id, nilConsumer, d)
r := NewEndpointRegistry(id, nilConsumer, d, perunio.Serializer())
a, b := newPipeConnPair()
go func() {
err := ExchangeAddrsActive(context.Background(), remoteID, id.Address(), b)
Expand All @@ -303,7 +304,7 @@ func TestRegistry_setupConn(t *testing.T) {

t.Run("ExchangeAddrs success (peer did not exist)", func(t *testing.T) {
d := &mockDialer{dial: make(chan Conn)}
r := NewEndpointRegistry(id, nilConsumer, d)
r := NewEndpointRegistry(id, nilConsumer, d, perunio.Serializer())
a, b := newPipeConnPair()
go func() {
err := ExchangeAddrsActive(context.Background(), remoteID, id.Address(), b)
Expand Down Expand Up @@ -331,7 +332,7 @@ func TestRegistry_Listen(t *testing.T) {

d := newMockDialer()
l := newMockListener()
r := NewEndpointRegistry(id, nilConsumer, d)
r := NewEndpointRegistry(id, nilConsumer, d, perunio.Serializer())

go func() {
// Listen() will only terminate if the listener is closed.
Expand Down Expand Up @@ -365,7 +366,12 @@ func TestRegistry_addEndpoint_Subscribe(t *testing.T) {
t.Parallel()
rng := test.Prng(t)
called := false
r := NewEndpointRegistry(wallettest.NewRandomAccount(rng), func(wire.Address) wire.Consumer { called = true; return nil }, nil)
r := NewEndpointRegistry(
wallettest.NewRandomAccount(rng),
func(wire.Address) wire.Consumer { called = true; return nil },
nil,
perunio.Serializer(),
)

assert.False(t, called, "onNewEndpoint must not have been called yet")
r.addEndpoint(wallettest.NewRandomAddress(rng), newMockConn(), false)
Expand All @@ -378,15 +384,25 @@ func TestRegistry_Close(t *testing.T) {
rng := test.Prng(t)

t.Run("double close error", func(t *testing.T) {
r := NewEndpointRegistry(wallettest.NewRandomAccount(rng), nilConsumer, nil)
r := NewEndpointRegistry(
wallettest.NewRandomAccount(rng),
nilConsumer,
nil,
perunio.Serializer(),
)
r.Close()
assert.Error(t, r.Close())
})

t.Run("dialer close error", func(t *testing.T) {
d := &mockDialer{dial: make(chan Conn)}
d.Close()
r := NewEndpointRegistry(wallettest.NewRandomAccount(rng), nilConsumer, d)
r := NewEndpointRegistry(
wallettest.NewRandomAccount(rng),
nilConsumer,
d,
perunio.Serializer(),
)

assert.Error(t, r.Close())
})
Expand All @@ -395,5 +411,6 @@ func TestRegistry_Close(t *testing.T) {
// newPipeConnPair creates endpoints that are connected via pipes.
func newPipeConnPair() (a Conn, b Conn) {
c0, c1 := net.Pipe()
return NewIoConn(c0), NewIoConn(c1)
ser := perunio.Serializer()
return NewIoConn(c0, ser), NewIoConn(c1, ser)
}
1 change: 0 additions & 1 deletion wire/net/exchange_addr_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

wallettest "perun.network/go-perun/wallet/test"
"perun.network/go-perun/wire"
_ "perun.network/go-perun/wire/protobuf" // wire serialzer init
wiretest "perun.network/go-perun/wire/test"
ctxtest "polycry.pt/poly-go/context/test"
"polycry.pt/poly-go/test"
Expand Down
Loading

0 comments on commit 8e60c9f

Please sign in to comment.