forked from ChainSafe/chainbridge-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
relayer.go
89 lines (73 loc) · 2.35 KB
/
relayer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// Copyright 2021 ChainSafe Systems
// SPDX-License-Identifier: LGPL-3.0-only
package relayer
import (
"fmt"
"github.com/ChainSafe/chainbridge-core/relayer/messageprocessors"
"github.com/ChainSafe/chainbridge-core/relayer/message"
"github.com/rs/zerolog/log"
)
type Metrics interface {
TrackDepositMessage(m *message.Message)
}
type RelayedChain interface {
PollEvents(stop <-chan struct{}, sysErr chan<- error, eventsChan chan *message.Message)
Write(message *message.Message) error
DomainID() uint8
}
func NewRelayer(chains []RelayedChain, metrics Metrics, messageProcessors ...messageprocessors.MessageProcessor) *Relayer {
return &Relayer{relayedChains: chains, messageProcessors: messageProcessors, metrics: metrics}
}
type Relayer struct {
metrics Metrics
relayedChains []RelayedChain
registry map[uint8]RelayedChain
messageProcessors []messageprocessors.MessageProcessor
}
// Start function starts the relayer. Relayer routine is starting all the chains
// and passing them with a channel that accepts unified cross chain message format
func (r *Relayer) Start(stop <-chan struct{}, sysErr chan error) {
log.Debug().Msgf("Starting relayer")
messagesChannel := make(chan *message.Message)
for _, c := range r.relayedChains {
log.Debug().Msgf("Starting chain %v", c.DomainID())
r.addRelayedChain(c)
go c.PollEvents(stop, sysErr, messagesChannel)
}
for {
select {
case m := <-messagesChannel:
go r.route(m)
continue
case <-stop:
return
}
}
}
// Route function winds destination writer by mapping DestinationID from message to registered writer.
func (r *Relayer) route(m *message.Message) {
r.metrics.TrackDepositMessage(m)
destChain, ok := r.registry[m.Destination]
if !ok {
log.Error().Msgf("no resolver for destID %v to send message registered", m.Destination)
return
}
for _, mp := range r.messageProcessors {
if err := mp(m); err != nil {
log.Error().Err(fmt.Errorf("error %w processing mesage %v", err, m))
return
}
}
log.Debug().Msgf("Sending message %v to destination %v", m.String(), m.Destination)
if err := destChain.Write(m); err != nil {
log.Error().Err(err).Msgf("writing message %v", m.String())
return
}
}
func (r *Relayer) addRelayedChain(c RelayedChain) {
if r.registry == nil {
r.registry = make(map[uint8]RelayedChain)
}
domainID := c.DomainID()
r.registry[domainID] = c
}