forked from RedisLabs/sentinel_tunnel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sentinel_tunnelling_client.go
125 lines (107 loc) · 3.37 KB
/
sentinel_tunnelling_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
// "bufio"
"encoding/json"
"fmt"
"github.com/RedisLabs/sentinel_tunnel/st_logger"
"github.com/RedisLabs/sentinel_tunnel/st_sentinel_connection"
"io"
"io/ioutil"
"net"
"os"
"time"
)
type SentinelTunnellingDbConfig struct {
Name string
Local_port string
}
type SentinelTunnellingConfiguration struct {
Sentinels_addresses_list []string
Databases []SentinelTunnellingDbConfig
}
type SentinelTunnellingClient struct {
configuration SentinelTunnellingConfiguration
sentinel_connection *st_sentinel_connection.Sentinel_connection
}
type get_db_address_by_name_function func(db_name string) (string, error)
func NewSentinelTunnellingClient(config_file_location string) *SentinelTunnellingClient {
data, err := ioutil.ReadFile(config_file_location)
if err != nil {
st_logger.WriteLogMessage(st_logger.FATAL, "an error has occur during configuration read",
err.Error())
}
Tunnelling_client := SentinelTunnellingClient{}
err = json.Unmarshal(data, &(Tunnelling_client.configuration))
if err != nil {
st_logger.WriteLogMessage(st_logger.FATAL, "an error has occur during configuration read,",
err.Error())
}
Tunnelling_client.sentinel_connection, err =
st_sentinel_connection.NewSentinelConnection(Tunnelling_client.configuration.Sentinels_addresses_list)
if err != nil {
st_logger.WriteLogMessage(st_logger.FATAL, "an error has occur, ",
err.Error())
}
st_logger.WriteLogMessage(st_logger.INFO, "done initializing Tunnelling")
return &Tunnelling_client
}
func createTunnelling(conn1 net.Conn, conn2 net.Conn) {
io.Copy(conn1, conn2)
conn1.Close()
conn2.Close()
}
func handleConnection(c net.Conn, db_name string,
get_db_address_by_name get_db_address_by_name_function) {
db_address, err := get_db_address_by_name(db_name)
if err != nil {
st_logger.WriteLogMessage(st_logger.ERROR, "cannot get db address for ", db_name,
",", err.Error())
c.Close()
return
}
db_conn, err := net.Dial("tcp", db_address)
if err != nil {
st_logger.WriteLogMessage(st_logger.ERROR, "cannot connect to db ", db_name,
",", err.Error())
c.Close()
return
}
go createTunnelling(c, db_conn)
go createTunnelling(db_conn, c)
}
func handleSigleDbConnections(listening_port string, db_name string,
get_db_address_by_name get_db_address_by_name_function) {
listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%s", listening_port))
if err != nil {
st_logger.WriteLogMessage(st_logger.FATAL, "cannot listen to port ",
listening_port, err.Error())
}
st_logger.WriteLogMessage(st_logger.INFO, "listening on port ", listening_port,
" for connections to database: ", db_name)
for {
conn, err := listener.Accept()
if err != nil {
st_logger.WriteLogMessage(st_logger.FATAL, "cannot accept connections on port ",
listening_port, err.Error())
}
go handleConnection(conn, db_name, get_db_address_by_name)
}
}
func (st_client *SentinelTunnellingClient) Start() {
for _, db_conf := range st_client.configuration.Databases {
go handleSigleDbConnections(db_conf.Local_port, db_conf.Name,
st_client.sentinel_connection.GetAddressByDbName)
}
}
func main() {
if len(os.Args) < 3 {
fmt.Println("usage : sentinel_tunnel <config_file_path> <log_file_path>")
return
}
st_logger.InitializeLogger(os.Args[2])
st_client := NewSentinelTunnellingClient(os.Args[1])
st_client.Start()
for {
time.Sleep(1000 * time.Millisecond)
}
}