forked from tidwall/uhaha
-
Notifications
You must be signed in to change notification settings - Fork 0
/
uhaha.go
3497 lines (3242 loc) · 88.7 KB
/
uhaha.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2020 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package uhaha
import (
"bufio"
"compress/gzip"
"crypto/rand"
"crypto/sha1"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"net"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/golang/snappy"
"github.com/gomodule/redigo/redis"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"
"github.com/tidwall/match"
"github.com/tidwall/redcon"
"github.com/tidwall/redlog/v2"
"github.com/tidwall/rtime"
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
raftleveldb "github.com/tidwall/raft-leveldb"
)
// Main entrypoint for the cluster node. This must be called once and only
// once, and as the last call in the Go main() function. There are no return
// values as all application operations, logging, and I/O will be forever
// transferred.
func Main(conf Config) {
confInit(&conf)
conf.AddService(redisService())
hclogger, log := logInit(conf)
tm := remoteTimeInit(conf, log)
dir, data := dataDirInit(conf, log)
m := machineInit(conf, dir, data, log)
tlscfg := tlsInit(conf, log)
svr, addr := serverInit(conf, tlscfg, log)
trans := transportInit(conf, tlscfg, svr, hclogger, log)
lstore, sstore := storeInit(conf, dir, log)
snaps := snapshotInit(conf, dir, m, hclogger, log)
ra := raftInit(conf, hclogger, m, lstore, sstore, snaps, trans, log)
joinClusterIfNeeded(conf, ra, addr, tlscfg, log)
startUserServices(conf, svr, m, ra, log)
go runMaintainServers(ra)
go runWriteApplier(conf, m, ra)
go runLogLoadedPoller(conf, m, ra, tlscfg, log)
go runTicker(conf, tm, m, ra, log)
log.Fatal(svr.serve())
}
const usage = `{{NAME}} version: {{VERSION}} ({{GITSHA}})
Usage: {{NAME}} [-n id] [-a addr] [options]
Basic options:
-h : display help, this screen
-a addr : bind to address (default: 127.0.0.1:11001)
-n id : node ID (default: 1)
-d dir : data directory (default: data)
-j addr : leader address of a cluster to join
-l level : log level (default: info) [debug,verb,info,warn,silent]
Security options:
--tls-cert path : path to TLS certificate
--tls-key path : path to TLS private key
--auth auth : cluster authorization, shared by all servers and clients
Networking options:
--advertise addr : advertise address (default: network bound address)
Advanced options:
--nosync : turn off syncing data to disk after every write. This leads
to faster write operations but opens up the chance for data
loss due to catastrophic events such as power failure.
--openreads : allow followers to process read commands, but with the
possibility of returning stale data.
--localtime : have the raft machine time synchronized with the local
server rather than the public internet. This will run the
risk of time shifts when the local server time is
drastically changed during live operation.
--restore path : restore a raft machine from a snapshot file. This will
start a brand new single-node cluster using the snapshot as
initial data. The other nodes must be re-joined. This
operation is ignored when a data directory already exists.
Cannot be used with -j flag.
--init-run-quit : initialize a bootstrap operation and then quit.
`
// Config is the configuration for managing the behavior of the application.
// This must be fill out prior and then passed to the uhaha.Main() function.
type Config struct {
cmds map[string]command // appended by AddCommand
catchall command // set by AddCatchallCommand
services []serviceEntry // appended by AddService
jsonType reflect.Type // used by UseJSONSnapshots
jsonSnaps bool // used by UseJSONSnapshots
// Name gives the server application a name. Default "uhaha-app"
Name string
// Version of the application. Default "0.0.0"
Version string
// GitSHA of the application.
GitSHA string
// Flag is used to manage the application startup flags.
Flag struct {
// Custom tells Main to not automatically parse the application startup
// flags. When set it is up to the user to parse the os.Args manually
// or with a different library.
Custom bool
// Usage is an optional function that allows for altering the usage
// message.
Usage func(usage string) string
// PreParse is an optional function that allows for adding command line
// flags before the user flags are parsed.
PreParse func()
// PostParse is an optional function that fires after user flags are
// parsed.
PostParse func()
}
// Snapshot fires when a snapshot
Snapshot func(data interface{}) (Snapshot, error)
// Restore returns a data object that is fully restored from the previous
// snapshot using the input Reader. A restore operation on happens once,
// if needed, at the start of the application.
Restore func(rd io.Reader) (data interface{}, err error)
// UseJSONSnapshots is a convienence field that tells the machine to use
// JSON as the format for all snapshots and restores. This may be good for
// small simple data models which have types that can be fully marshalled
// into JSON, ie. all imporant data fields need to exportable (Capitalized).
// For more complicated or specialized data, it's proabably best to assign
// custom functions to the Config.Snapshot and Config.Restore fields.
// It's invalid to set this field while also setting Snapshot and/or
// Restore. Default false
UseJSONSnapshots bool
// Tick fires at regular intervals as specified by TickDelay. This function
// can be used to make updates to the database.
Tick func(m Machine)
// DataDirReady is an optional callback function that fires containing the
// path to the directory where all the logs and snapshots are stored.
DataDirReady func(dir string)
// LogReady is an optional callback function that fires when the logger has
// been initialized. The logger is can be safely used concurrently.
LogReady func(log Logger)
// ServerReady is an optional callback function that fires when the server
// socket is listening and is ready to accept incoming connections. The
// network address, auth, and tls-config are provided to allow for
// background connections to be made to self, if desired.
ServerReady func(addr, auth string, tlscfg *tls.Config)
// ConnOpened is an optional callback function that fires when a new
// network connection was opened on this machine. You can accept or deny
// the connection, and optionally provide a client-specific context that
// stick around until the connection is closed with ConnClosed.
ConnOpened func(addr string) (context interface{}, accept bool)
// ConnClosed is an optional callback function that fires when a network
// connection has been closed on this machine.
ConnClosed func(context interface{}, addr string)
// ResponseFilter is and options function used to filter every response
// prior to send into a client connection.
ResponseFilter ResponseFilter
// StateChange is an optional callback function that fires when the raft
// state has changed.
StateChange func(state State)
// LocalConnector is an optional callback function that returns a new
// connector that allows for establishing "local" connections through
// the Redis protocol. A local connection bypasses the network and
// communicates directly with this server, though the same process.
LocalConnector func(lconn LocalConnector)
LocalTime bool // default false
TickDelay time.Duration // default 200ms
BackupPath string // default ""
InitialData interface{} // default nil
NodeID string // default "1"
Addr string // default ":11001"
DataDir string // default "data"
LogOutput io.Writer // default os.Stderr
LogLevel string // default "notice"
JoinAddr string // default ""
Backend Backend // default LevelDB
NoSync bool // default false
OpenReads bool // default false
MaxPool int // default 8
TLSCertPath string // default ""
TLSKeyPath string // default ""
Auth string // default ""
Advertise string // default ""
TryErrors bool // default false (return TRY instead of MOVED)
InitRunQuit bool // default false
}
// State captures the state of a Raft node: Follower, Candidate, Leader,
// or Shutdown.
type State byte
const (
// Follower is the initial state of a Raft node.
Follower State = iota
// Candidate is one of the valid states of a Raft node.
Candidate
// Leader is one of the valid states of a Raft node.
Leader
// Shutdown is the terminal state of a Raft node.
Shutdown
)
func (state State) String() string {
switch state {
case Follower:
return "Follower"
case Candidate:
return "Candidate"
case Leader:
return "Leader"
case Shutdown:
return "Shutdown"
default:
return "Unknown"
}
}
// The Backend database format used for storing Raft logs and meta data.
type Backend int
const (
// LevelDB is an on-disk LSM (LSM log-structured merge-tree) database. This
// format is optimized for fast sequential writes, which is ideal for most
// Raft implementations. This is the default format used by Uhaha.
LevelDB Backend = iota
// Bolt is an on-disk single-file b+tree database. This format has been a
// popular choice for Go-based Raft implementations for years.
Bolt
)
func (conf *Config) def() {
if conf.Addr == "" {
conf.Addr = "127.0.0.1:11001"
}
if conf.Version == "" {
conf.Version = "0.0.0"
}
if conf.Name == "" {
conf.Name = "uhaha-app"
}
if conf.NodeID == "" {
conf.NodeID = "1"
}
if conf.DataDir == "" {
conf.DataDir = "data"
}
if conf.LogLevel == "" {
conf.LogLevel = "info"
}
if conf.LogOutput == nil {
conf.LogOutput = os.Stderr
}
if conf.TickDelay == 0 {
conf.TickDelay = time.Millisecond * 200
}
if conf.MaxPool == 0 {
conf.MaxPool = 8
}
}
func confInit(conf *Config) {
conf.def()
if conf.Flag.Custom {
return
}
flag.Usage = func() {
w := os.Stderr
for _, arg := range os.Args {
if arg == "-h" || arg == "--help" {
w = os.Stdout
break
}
}
s := usage
s = strings.Replace(s, "{{VERSION}}", conf.Version, -1)
if conf.GitSHA == "" {
s = strings.Replace(s, " ({{GITSHA}})", "", -1)
s = strings.Replace(s, "{{GITSHA}}", "", -1)
} else {
s = strings.Replace(s, "{{GITSHA}}", conf.GitSHA, -1)
}
s = strings.Replace(s, "{{NAME}}", conf.Name, -1)
if conf.Flag.Usage != nil {
s = conf.Flag.Usage(s)
}
s = strings.Replace(s, "{{USAGE}}", "", -1)
w.Write([]byte(s))
if w == os.Stdout {
os.Exit(0)
}
}
var backend string
var testNode string
// var vers bool
// flag.BoolVar(&vers, "v", false, "")
flag.StringVar(&conf.Addr, "a", conf.Addr, "")
flag.StringVar(&conf.NodeID, "n", conf.NodeID, "")
flag.StringVar(&conf.DataDir, "d", conf.DataDir, "")
flag.StringVar(&conf.JoinAddr, "j", conf.JoinAddr, "")
flag.StringVar(&conf.LogLevel, "l", conf.LogLevel, "")
flag.StringVar(&backend, "backend", "leveldb", "")
flag.StringVar(&conf.TLSCertPath, "tls-cert", conf.TLSCertPath, "")
flag.StringVar(&conf.TLSKeyPath, "tls-key", conf.TLSKeyPath, "")
flag.BoolVar(&conf.NoSync, "nosync", conf.NoSync, "")
flag.BoolVar(&conf.OpenReads, "openreads", conf.OpenReads, "")
flag.StringVar(&conf.BackupPath, "restore", conf.BackupPath, "")
flag.BoolVar(&conf.LocalTime, "localtime", conf.LocalTime, "")
flag.StringVar(&conf.Auth, "auth", conf.Auth, "")
flag.StringVar(&conf.Advertise, "advertise", conf.Advertise, "")
flag.StringVar(&testNode, "t", "", "")
flag.BoolVar(&conf.TryErrors, "try-errors", conf.TryErrors, "")
flag.BoolVar(&conf.InitRunQuit, "init-run-quit", conf.InitRunQuit, "")
if conf.Flag.PreParse != nil {
conf.Flag.PreParse()
}
flag.Parse()
// if vers {
// fmt.Printf("%s\n", versline(*conf))
// os.Exit(0)
// }
switch backend {
case "leveldb":
conf.Backend = LevelDB
case "bolt":
conf.Backend = Bolt
default:
fmt.Fprintf(os.Stderr, "invalid --backend: '%s'\n", backend)
}
switch testNode {
case "1", "2", "3", "4", "5", "6", "7", "8", "9":
if conf.Addr == "" {
conf.Addr = ":1100" + testNode
} else {
conf.Addr = conf.Addr[:len(conf.Addr)-1] + testNode
}
conf.NodeID = testNode
if testNode != "1" {
conf.JoinAddr = conf.Addr[:len(conf.Addr)-1] + "1"
}
case "":
default:
fmt.Fprintf(os.Stderr, "invalid usage of test flag -t\n")
os.Exit(1)
}
if conf.TLSCertPath != "" && conf.TLSKeyPath == "" {
fmt.Fprintf(os.Stderr,
"flag --tls-key cannot be empty when --tls-cert is provided\n")
os.Exit(1)
} else if conf.TLSCertPath == "" && conf.TLSKeyPath != "" {
fmt.Fprintf(os.Stderr,
"flag --tls-cert cannot be empty when --tls-key is provided\n")
os.Exit(1)
}
if conf.Advertise != "" {
colon := strings.IndexByte(conf.Advertise, ':')
if colon == -1 {
fmt.Fprintf(os.Stderr, "flag --advertise is missing port number\n")
os.Exit(1)
}
_, err := strconv.ParseUint(conf.Advertise[colon+1:], 10, 16)
if err != nil {
fmt.Fprintf(os.Stderr, "flat --advertise port number invalid\n")
os.Exit(1)
}
}
if conf.Flag.PostParse != nil {
conf.Flag.PostParse()
}
if conf.UseJSONSnapshots {
if conf.Restore != nil || conf.Snapshot != nil {
fmt.Fprintf(os.Stderr,
"UseJSONSnapshots: Restore or Snapshot are set\n")
os.Exit(1)
}
if conf.InitialData != nil {
t := reflect.TypeOf(conf.InitialData)
if t.Kind() != reflect.Ptr {
fmt.Fprintf(os.Stderr,
"UseJSONSnapshots: InitialData is not a pointer\n")
os.Exit(1)
}
conf.jsonType = t.Elem()
}
conf.jsonSnaps = true
}
}
func (conf *Config) addCommand(kind byte, name string,
fn func(m Machine, args []string) (interface{}, error),
) {
name = strings.ToLower(name)
if conf.cmds == nil {
conf.cmds = make(map[string]command)
}
conf.cmds[name] = command{kind, func(m Machine, ra *raftWrap,
args []string) (interface{}, error) {
return fn(m, args)
}}
}
// AddCatchallCommand adds a intermediate command that will execute for any
// input that was not previously defined with AddIntermediateCommand,
// AddWriteCommand, or AddReadCommand.
func (conf *Config) AddCatchallCommand(
fn func(m Machine, args []string) (interface{}, error),
) {
conf.catchall = command{'s', func(m Machine, ra *raftWrap,
args []string) (interface{}, error) {
return fn(m, args)
}}
}
// AddIntermediateCommand adds a command that is for peforming client and system
// specific operations. It *is not* intended for working with the machine data,
// and doing so will risk data corruption.
func (conf *Config) AddIntermediateCommand(name string,
fn func(m Machine, args []string) (interface{}, error),
) {
conf.addCommand('s', name, fn)
}
// AddReadCommand adds a command for reading machine data.
func (conf *Config) AddReadCommand(name string,
fn func(m Machine, args []string) (interface{}, error),
) {
conf.addCommand('r', name, fn)
}
// AddWriteCommand adds a command for reading or altering machine data.
func (conf *Config) AddWriteCommand(name string,
fn func(m Machine, args []string) (interface{}, error),
) {
conf.addCommand('w', name, fn)
}
// AddService adds a custom client network service, such as HTTP or gRPC.
// By default, a Redis compatible service is already included.
func (conf *Config) AddService(
name string,
sniff func(rd io.Reader) bool,
acceptor func(s Service, ln net.Listener),
) {
conf.services = append(conf.services, serviceEntry{name, sniff, acceptor})
}
type jsonSnapshotType struct{ jsdata []byte }
func (s *jsonSnapshotType) Done(path string) {}
func (s *jsonSnapshotType) Persist(wr io.Writer) error {
_, err := wr.Write(s.jsdata)
return err
}
func jsonSnapshot(data interface{}) (Snapshot, error) {
if data == nil {
return &jsonSnapshotType{}, nil
}
jsdata, err := json.Marshal(data)
if err != nil {
return nil, err
}
return &jsonSnapshotType{jsdata: jsdata}, nil
}
func jsonRestore(rd io.Reader, typ reflect.Type) (interface{}, error) {
jsdata, err := io.ReadAll(rd)
if err != nil {
return nil, err
}
if typ == nil {
return nil, nil
}
data := reflect.New(typ).Interface()
if err = json.Unmarshal(jsdata, data); err != nil {
return nil, err
}
return data, err
}
func versline(conf Config) string {
sha := ""
if conf.GitSHA != "" {
sha = " (" + conf.GitSHA + ")"
}
return fmt.Sprintf("%s version %s%s", conf.Name, conf.Version, sha)
}
func logInit(conf Config) (hclog.Logger, *redlog.Logger) {
var log *redlog.Logger
logLevel := conf.LogLevel
wr := conf.LogOutput
lopts := *redlog.DefaultOptions
lopts.Filter =
func(line string, tty bool) (msg string, app byte, level int) {
line = stateChangeFilter(line, log)
return redlog.HashicorpRaftFilter(line, tty)
}
lopts.App = 'S'
switch logLevel {
case "debug":
lopts.Level = 0
case "verbose", "verb":
lopts.Level = 1
case "notice", "info":
lopts.Level = 2
case "warning", "warn":
lopts.Level = 3
case "quiet", "silent":
lopts.Level = 3
wr = io.Discard
default:
fmt.Fprintf(os.Stderr, "invalid -loglevel: %s\n", logLevel)
os.Exit(1)
}
log = redlog.New(wr, &lopts)
hclopts := *hclog.DefaultOptions
hclopts.Color = hclog.ColorOff
hclopts.Output = log
if conf.LogReady != nil {
conf.LogReady(log)
}
log.Warningf("starting %s", versline(conf))
return hclog.New(&hclopts), log
}
func stateChangeFilter(line string, log *redlog.Logger) string {
if strings.Contains(line, "entering ") {
if strings.Contains(line, "entering candidate state") {
log.SetApp('C')
} else if strings.Contains(line, "entering follower state") {
log.SetApp('F')
} else if strings.Contains(line, "entering leader state") {
log.SetApp('L')
}
}
return line
}
type restoreData struct {
data interface{}
ts int64
seed int64
start int64
}
func dataDirInit(conf Config, log *redlog.Logger) (string, *restoreData) {
var rdata *restoreData
dir := filepath.Join(conf.DataDir, conf.Name, conf.NodeID)
if conf.BackupPath != "" {
_, err := os.Stat(dir)
if err == nil {
log.Warningf("backup restore ignored: "+
"data directory already exists: path=%s", dir)
return dir, nil
}
log.Printf("restoring backup: path=%s", conf.BackupPath)
if !os.IsNotExist(err) {
log.Fatal(err)
}
rdata, err = dataDirRestoreBackup(conf, dir, log)
if err != nil {
log.Fatal(err)
}
log.Printf("recovery successful")
} else {
if err := os.MkdirAll(dir, 0777); err != nil {
log.Fatal(err)
}
}
if conf.DataDirReady != nil {
conf.DataDirReady(dir)
}
return dir, rdata
}
func dataDirRestoreBackup(conf Config, dir string, log *redlog.Logger,
) (rdata *restoreData, err error) {
rdata = new(restoreData)
f, err := os.Open(conf.BackupPath)
if err != nil {
return nil, err
}
defer f.Close()
gr, err := gzip.NewReader(f)
if err != nil {
return nil, err
}
rdata.start, rdata.ts, rdata.seed, err = readSnapHead(gr)
if err != nil {
return nil, err
}
if conf.Restore != nil {
rdata.data, err = conf.Restore(gr)
if err != nil {
return nil, err
}
} else if conf.jsonSnaps {
rdata.data, err = func(rd io.Reader) (data interface{}, err error) {
return jsonRestore(rd, conf.jsonType)
}(gr)
if err != nil {
return nil, err
}
} else {
rdata.data = conf.InitialData
}
return rdata, nil
}
func storeInit(conf Config, dir string, log *redlog.Logger,
) (raft.LogStore, raft.StableStore) {
switch conf.Backend {
case Bolt:
store, err := raftboltdb.NewBoltStore(filepath.Join(dir, "store"))
if err != nil {
log.Fatalf("bolt store open: %s", err)
}
return store, store
case LevelDB:
dur := raftleveldb.High
if conf.NoSync {
dur = raftleveldb.Medium
}
store, err := raftleveldb.NewLevelDBStore(
filepath.Join(dir, "store"), dur)
if err != nil {
log.Fatalf("leveldb store open: %s", err)
}
return store, store
default:
log.Fatalf("invalid backend")
}
return nil, nil
}
func snapshotInit(conf Config, dir string, m *machine, hclogger hclog.Logger,
log *redlog.Logger,
) raft.SnapshotStore {
snaps, err := raft.NewFileSnapshotStoreWithLogger(dir, 3, hclogger)
if err != nil {
log.Fatal(err)
}
m.snaps = snaps
return snaps
}
func machineInit(conf Config, dir string, rdata *restoreData,
log *redlog.Logger,
) *machine {
m := new(machine)
m.dir = dir
m.vers = versline(conf)
m.tickedSig = sync.NewCond(&m.mu)
m.created = time.Now().UnixNano()
m.wrC = make(chan *writeRequestFuture, 1024)
m.tickDelay = conf.TickDelay
m.openReads = conf.OpenReads
if rdata != nil {
m.data = rdata.data
m.start = rdata.start
m.seed = rdata.seed
m.ts = rdata.ts
} else {
m.data = conf.InitialData
}
m.log = log
m.connClosed = conf.ConnClosed
m.connOpened = conf.ConnOpened
m.snapshot = conf.Snapshot
m.restore = conf.Restore
m.jsonSnaps = conf.jsonSnaps
m.jsonType = conf.jsonType
m.tick = conf.Tick
m.commands = map[string]command{
"tick": {'w', cmdTICK},
"barrier": {'w', cmdBARRIER},
"raft": {'s', cmdRAFT},
"cluster": {'s', cmdCLUSTER},
"machine": {'r', cmdMACHINE},
"version": {'s', cmdVERSION},
}
if conf.TryErrors {
delete(m.commands, "cluster")
}
for k, v := range conf.cmds {
if _, ok := m.commands[k]; !ok {
m.commands[k] = v
}
}
m.catchall = conf.catchall
return m
}
type remoteTime struct {
remote bool // use remote
mu sync.Mutex // lock times
rtime time.Time // remote time
ltime time.Time // local time
ctime time.Time // calcd time
}
func (rt *remoteTime) Now() time.Time {
if !rt.remote {
return time.Now()
}
rt.mu.Lock()
ctime := rt.rtime.Add(time.Since(rt.ltime))
if !ctime.After(rt.ctime) {
// ensure time is monotonic and increasing
ctime = rt.ctime.Add(1)
rt.ctime = ctime
}
rt.mu.Unlock()
return ctime
}
// remoteTimeInit initializes the remote time fetching services, and
// continueously runs it in the background to keep synchronized.
func remoteTimeInit(conf Config, log *redlog.Logger) *remoteTime {
rt := new(remoteTime)
if conf.LocalTime {
log.Warning("using local time")
return rt
}
var wg sync.WaitGroup
var once int32
wg.Add(1)
go func() {
for {
tm := rtime.Now()
if tm.IsZero() {
time.Sleep(time.Second)
continue
}
rt.mu.Lock()
if tm.After(rt.rtime) {
rt.ltime = time.Now()
rt.rtime = tm
log.Debugf("synchronized time: %s", rt.rtime)
if atomic.LoadInt32(&once) == 0 {
atomic.StoreInt32(&once, 1)
wg.Done()
}
}
rt.mu.Unlock()
time.Sleep(time.Second * 30)
}
}()
go func() {
time.Sleep(time.Second * 2)
if atomic.LoadInt32(&once) != 0 {
return
}
for {
log.Warning("synchronized time: waiting for internet connection")
if atomic.LoadInt32(&once) != 0 {
break
}
time.Sleep(time.Second * 5)
}
}()
wg.Wait()
log.Printf("synchronized time")
return rt
}
func raftInit(conf Config, hclogger hclog.Logger, fsm raft.FSM,
logStore raft.LogStore, stableStore raft.StableStore,
snaps raft.SnapshotStore, trans raft.Transport, log *redlog.Logger,
) *raftWrap {
rconf := raft.DefaultConfig()
rconf.Logger = hclogger
rconf.LocalID = raft.ServerID(conf.NodeID)
ra, err := raft.NewRaft(rconf, fsm, logStore, stableStore, snaps, trans)
if err != nil {
log.Fatal(err)
}
if conf.StateChange != nil {
// monitor the state changes.
lstate := raft.Shutdown
conf.StateChange(Shutdown)
go func() {
for {
state := ra.State()
if state != lstate {
lstate = state
switch state {
case raft.Candidate:
conf.StateChange(Candidate)
case raft.Follower:
conf.StateChange(Follower)
case raft.Leader:
conf.StateChange(Leader)
case raft.Shutdown:
conf.StateChange(Shutdown)
}
}
time.Sleep(time.Second / 4)
}
}()
}
return &raftWrap{
Raft: ra,
conf: conf,
advertise: conf.Advertise,
}
}
// joinClusterIfNeeded attempts to make this server join a Raft cluster. If
// the server already belongs to a cluster or if the server is bootstrapping
// then this operation is ignored.
func joinClusterIfNeeded(conf Config, ra *raftWrap, addr net.Addr,
tlscfg *tls.Config, log *redlog.Logger,
) {
// Get the current Raft cluster configuration for determining whether this
// server needs to bootstrap a new cluster, or join/re-join an existing
// cluster.
f := ra.GetConfiguration()
if err := f.Error(); err != nil {
log.Fatalf("could not get Raft configuration: %v", err)
}
var addrStr string
if ra.advertise != "" {
addrStr = conf.Advertise
} else {
addrStr = addr.String()
}
cfg := f.Configuration()
servers := cfg.Servers
if len(servers) == 0 {
// Empty configuration. Either bootstrap or join an existing cluster.
if conf.JoinAddr == "" {
// No '-join' flag provided.
// Bootstrap new cluster.
log.Noticef("bootstrapping new cluster")
var configuration raft.Configuration
configuration.Servers = []raft.Server{
{
ID: raft.ServerID(conf.NodeID),
Address: raft.ServerAddress(addrStr),
},
}
err := ra.BootstrapCluster(configuration).Error()
if err != nil && err != raft.ErrCantBootstrap {
log.Fatalf("bootstrap: %s", err)
}
} else {
// Joining an existing cluster
joinAddr := conf.JoinAddr
log.Noticef("joining existing cluster at %v", joinAddr)
err := func() error {
for {
conn, err := RedisDial(joinAddr, conf.Auth, tlscfg)
if err != nil {
return err
}
defer conn.Close()
res, err := redis.String(conn.Do("raft", "server", "add",
conf.NodeID, addrStr))
if err != nil {
if strings.HasPrefix(err.Error(), "MOVED ") {
parts := strings.Split(err.Error(), " ")
if len(parts) == 3 {
joinAddr = parts[2]
time.Sleep(time.Millisecond * 100)
continue
}
}
return err
}
if res != "1" {
return fmt.Errorf("'1', got '%s'", res)
}
return nil
}
}()
if err != nil {
log.Fatalf("raft server add: %v", err)
}
}
} else {
if conf.JoinAddr != "" {
log.Warningf("ignoring join request because server already " +
"belongs to a cluster")
}
if ra.advertise != "" {
// Check that the address is the same as before
found := false
same := true
before := ra.advertise
for _, s := range servers {
if string(s.ID) == conf.NodeID {
found = true
if string(s.Address) != ra.advertise {
same = false
before = string(s.Address)
break
}
}
}
if !found {
log.Fatalf("advertise address changed but node not found\n")
} else if !same {
log.Fatalf("advertise address change from \"%s\" to \"%s\" ",
before, ra.advertise)
}
}
}
}
// RedisDial is a helper function that dials out to another Uhaha server with
// redis protocol and using the provded TLS config and Auth token. The TLS/Auth
// must be correct in order to establish a connection.
func RedisDial(addr, auth string, tlscfg *tls.Config) (redis.Conn, error) {
var conn redis.Conn
var err error
if tlscfg != nil {
conn, err = redis.Dial("tcp", addr,
redis.DialUseTLS(true), redis.DialTLSConfig(tlscfg))
} else {
conn, err = redis.Dial("tcp", addr)
}
if err != nil {
return nil, err
}
if auth != "" {
res, err := redis.String(conn.Do("auth", auth))
if err != nil {
conn.Close()
return nil, err
}
if res != "OK" {
conn.Close()
return nil, fmt.Errorf("'OK', got '%s'", res)
}
}
return conn, nil
}
func startUserServices(conf Config, svr *splitServer, m *machine, ra *raftWrap,
log *redlog.Logger,
) {
// rearrange so that services with nil sniffers are last
var nilServices []serviceEntry
var services []serviceEntry
for i := 0; i < len(conf.services); i++ {
if conf.services[i].sniff == nil {
nilServices = append(nilServices, conf.services[i])
} else {
services = append(services, conf.services[i])
}
}
conf.services = append(services, nilServices...)
for _, s := range conf.services {
ln := svr.split(func(rd io.Reader) (n int, ok bool) {
if s.sniff == nil {