-
Notifications
You must be signed in to change notification settings - Fork 11
/
main.go
151 lines (129 loc) · 2.73 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package main
import (
"log"
"os"
"sync"
"time"
"github.com/codegangsta/cli"
"github.com/garyburd/redigo/redis"
)
type Task struct {
list []string
}
type Worker func(queue chan Task, wg *sync.WaitGroup)
type Config struct {
Dest string
Source string
Workers int
Batch int
Prefix string
ClearDest bool
DryRun bool
}
var config Config
func main() {
app := cli.NewApp()
app.Name = "migr8"
app.Usage = "It's time to move some redis"
app.Commands = []cli.Command{
{
Name: "migrate",
Usage: "Migrate one redis to a new redis",
Action: Migrate,
},
{
Name: "delete",
Usage: "Delete all keys with the given prefix",
Action: Delete,
},
}
app.Flags = []cli.Flag{
cli.BoolFlag{
Name: "dry-run, n",
Usage: "Run in dry-run mode",
},
cli.StringFlag{
Name: "source, s",
Usage: "The redis server to pull data from",
Value: "127.0.0.1:6379",
},
cli.StringFlag{
Name: "dest, d",
Usage: "The destination redis server",
Value: "127.0.0.1:6379",
},
cli.IntFlag{
Name: "workers, w",
Usage: "The count of workers to spin up",
Value: 2,
},
cli.IntFlag{
Name: "batch, b",
Usage: "The batch size",
Value: 10,
},
cli.StringFlag{
Name: "prefix, p",
Usage: "The key prefix to act on",
},
cli.BoolFlag{
Name: "clear-dest, c",
Usage: "Clear the destination of all it's keys and values",
},
}
app.Run(os.Args)
}
func ParseConfig(c *cli.Context) {
config = Config{
Source: c.GlobalString("source"),
Dest: c.GlobalString("dest"),
Workers: c.GlobalInt("workers"),
Batch: c.GlobalInt("batch"),
Prefix: c.GlobalString("prefix"),
ClearDest: c.GlobalBool("clear-dest"),
DryRun: c.GlobalBool("dry-run"),
}
}
func sourceConnection(source string) redis.Conn {
// attempt to connect to source server
sourceConn, err := redis.Dial("tcp", source)
if err != nil {
panic(err)
}
return sourceConn
}
func destConnection(dest string) redis.Conn {
// attempt to connect to source server
destConn, err := redis.Dial("tcp", dest)
if err != nil {
panic(err)
}
return destConn
}
func RunAction(action Worker) {
wg := &sync.WaitGroup{}
workQueue := make(chan Task, config.Workers)
startedAt = time.Now()
wg.Add(1)
go scanKeys(workQueue, wg)
for i := 0; i <= config.Workers; i++ {
wg.Add(1)
go action(workQueue, wg)
}
wg.Wait()
}
func Migrate(c *cli.Context) {
ParseConfig(c)
log.Printf("Running migrate with config: %+v\n", config)
log.SetPrefix("migrate - ")
if config.ClearDest {
clearDestination(c.String("dest"))
}
RunAction(migrateKeys)
}
func Delete(c *cli.Context) {
ParseConfig(c)
log.Printf("Running delete with config: %+v\n", config)
log.SetPrefix("delete - ")
RunAction(deleteKeys)
}