-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
173 lines (142 loc) · 5.11 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// By Tyler Montgomery, 2015
package main
import (
//"errors"
"fmt"
"github.com/jessevdk/go-flags"
"os"
"github.com/op/go-logging"
"gopkg.in/mcuadros/go-syslog.v2"
"gopkg.in/redis.v3"
"encoding/json"
//"strconv"
//"strings"
"runtime"
)
const APP_NAME = "gologq"
const APP_VERSION = "0.0.2"
var log = logging.MustGetLogger(APP_NAME)
var format = logging.MustStringFormatter(
`%{color}%{level:-7s}: %{time} %{shortfile} %{longfunc} %{id:03x}%{color:reset} %{message}`,
)
var opts struct {
Verbose bool `short:"v" long:"verbose" env:"GOLOGQ_VERBOSE" description:"Enable DEBUG logging"`
DoVersion bool `short:"V" long:"version" description:"Print version and exit"`
// Syslog specific options
ListenAddress string `long:"listen" env:"GOLOGQ_LISTEN_ADDR" description:"Syslog receiver host" default:"0.0.0.0"`
ListenPort int `long:"port" env:"GOLOGQ_LISTEN_PORT" description:"Syslog receiver port" default:"514"`
// Redis specific options
RedisHost string `long:"redis_host" env:"GOLOGQ_REDIS_HOST" description:"Redis host" default:"localhost"`
RedisPort int `long:"redis_port" env:"GOLOGQ_REDIS_PORT" description:"Redis port" default:"6379"`
RedisKey string `long:"redis_key" env:"GOLOGQ_REDIS_KEY" description:"Redis list key" default:"gologq"`
RedisPassword string `long:"redis_password" env:"GOLOGQ_REDIS_PASSWORD" description:"Redis password"`
RedisDB int64 `long:"redis_db" env:"GOLOGQ_REDIS_DB" description:"Redis DB index"`
// Worker specific options
NumWorkers int `long:"workers" env:"GOLOGQ_NUM_WORKERS" description:"Number of worker threads to spawn (Default: CPUs * 3)"`
}
// Start a Redis-backed Syslog server
func main() {
// Parse arguments
_, err := flags.Parse(&opts)
// From https://www.snip2code.com/Snippet/605806/go-flags-suggested--h-documentation
if err != nil {
typ := err.(*flags.Error).Type
if typ == flags.ErrHelp {
os.Exit(0)
} else {
fmt.Println(err)
os.Exit(1)
}
}
// Configure logger
log_backend := logging.NewLogBackend(os.Stderr, "", 0)
backend_formatter := logging.NewBackendFormatter(log_backend, format)
logging.SetBackend(backend_formatter)
// Print version number if requested from command line
if opts.DoVersion == true {
fmt.Printf("%s %s at your service.\n", APP_NAME, APP_VERSION)
os.Exit(10)
}
// Enable debug logging
if opts.Verbose == true {
logging.SetLevel(logging.DEBUG, "")
} else {
logging.SetLevel(logging.INFO, "")
}
// Cap number of workers spawned by command line args to 1024
// this prevents someone from overwhelming the number of automatically generated Redis threads
num_workers := opts.NumWorkers
if num_workers != 0 {
if num_workers > 1024 {
log.Fatalf("Can't spawn more than 1024 worker threads. (You requested %d)", num_workers)
os.Exit(1)
}
} else {
// If we happen to have more than 1024 threads by autodetection, it should be fine.
num_workers = runtime.NumCPU() * 3
}
hostname, _ := os.Hostname()
log.Infof("Starting %s version: %s on host %s", APP_NAME, APP_VERSION, hostname)
// Let's get moving.
log.Debugf("Commandline options: %+v", opts)
redis_client := setupRedisClient()
startServer(redis_client, num_workers)
log.Info("Server finished. Exiting.")
}
// Initiate a Redis client
func setupRedisClient() *redis.Client {
client := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", opts.RedisHost, opts.RedisPort),
Password: opts.RedisPassword,
DB: opts.RedisDB,
})
// Are we able to ping the Redis server and receive a successful result?
pong, err := client.Ping().Result()
if err != nil {
log.Fatalf("Unable to contact redis server: %s", err)
} else {
log.Debugf("Ping response from redis server: %s", pong)
}
// Hand back a redis.Client object
return client
}
// Worker thread for incoming logs
// A channel, redis client, and worker ID are required.
func handleIncomingLogs(channel syslog.LogPartsChannel, redis_client *redis.Client, worker_id int) {
log.Debug("Started log worker #%d", worker_id)
for logParts := range channel {
json_data, err := json.Marshal(logParts)
if err != nil {
log.Errorf("Worker %d JSON error:", worker_id, err)
}
log.Debugf("Worker #%d RECV: %s", worker_id, json_data)
// Push to redis list
err = redis_client.LPush(opts.RedisKey, fmt.Sprintf("%s", json_data)).Err()
if err != nil {
log.Errorf("Worker #%d Redis error: %s", worker_id, err)
}
}
}
// Start the server and launch the workers
func startServer(redis_client *redis.Client, num_workers int) {
log.Debug("Entered startServer")
// Start up the syslog server
channel := make(syslog.LogPartsChannel)
handler := syslog.NewChannelHandler(channel)
server := syslog.NewServer()
server.SetFormat(syslog.Automatic)
server.SetHandler(handler)
err := server.ListenTCP(fmt.Sprintf("%s:%d", opts.ListenAddress, opts.ListenPort))
// Were we able to start the tcp server?
if err != nil {
log.Fatalf("Can't listen to TCP socket. Failing. %+v", err)
}
server.Boot()
// Spawn worker threads
log.Infof("Spawning %d worker threads...", num_workers)
for w := 1; w <= num_workers; w++ {
go handleIncomingLogs(channel, redis_client, w)
}
log.Info("Listening for connections")
server.Wait()
}