-
Notifications
You must be signed in to change notification settings - Fork 12
/
server_fork.go
146 lines (121 loc) · 3.31 KB
/
server_fork.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package fastdns
import (
"errors"
"log/slog"
"os"
"os/exec"
"runtime"
"strconv"
)
// ForkServer implements a prefork DNS server.
type ForkServer struct {
// handler to invoke
Handler Handler
// stats to invoke
Stats Stats
// ErrorLog specifies an optional logger for errors accepting
// connections, unexpected behavior from handlers, and
// underlying FileSystem errors.
// If nil, logging is disabled.
ErrorLog *slog.Logger
// The maximum number of procs the server may spawn. use runtime.NumCPU() if empty
MaxProcs int
// SetAffinity sets the CPU affinity mask of current process.
SetAffinity bool
// The maximum number of concurrent clients the server may serve.
Concurrency int
}
// ListenAndServe serves DNS requests from the given UDP addr.
func (s *ForkServer) ListenAndServe(addr string) error {
if s.Index() == 0 {
return s.fork(addr, s.MaxProcs)
}
if s.SetAffinity {
// set cpu affinity for performance
err := taskset((s.Index() - 1) % runtime.NumCPU())
if err != nil {
if s.ErrorLog != nil {
s.ErrorLog.Error("forkserver set cpu_affinity failed", "error", err, "index", s.Index(), "cpu_affinity", s.Index()-1)
}
}
}
// so_reuseport listen for performance
conn, err := listen("udp", addr)
if err != nil {
if s.ErrorLog != nil {
s.ErrorLog.Error("forkserver set listen on addr failed", "error", err, "index", s.Index(), "addr", addr)
}
return err
}
// s.ErrorLog.Printf("forkserver-%d pid-%d serving dns on %s", s.Index(), os.Getpid(), conn.LocalAddr())
return serve(conn, s.Handler, s.Stats, s.ErrorLog, s.Concurrency)
}
// Index indicates the index of Server instances.
func (s *ForkServer) Index() (index int) {
index, _ = strconv.Atoi(os.Getenv("FASTDNS_CHILD_INDEX"))
return
}
func fork(index int) (*exec.Cmd, error) {
/* #nosec G204 */
cmd := exec.Command(os.Args[0], os.Args[1:]...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Env = append([]string{"FASTDNS_CHILD_INDEX=" + strconv.Itoa(index)}, os.Environ()...)
return cmd, cmd.Start()
}
func (s *ForkServer) fork(addr string, maxProcs int) (err error) {
type racer struct {
index int
pid int
err error
}
if maxProcs == 0 {
maxProcs = runtime.NumCPU()
}
if runtime.GOOS != "linux" {
maxProcs = 1
}
ch := make(chan racer, maxProcs)
childs := make(map[int]*exec.Cmd)
defer func() {
for _, proc := range childs {
_ = proc.Process.Kill()
}
}()
for i := 1; i <= maxProcs; i++ {
var cmd *exec.Cmd
if cmd, err = fork(i); err != nil {
if s.ErrorLog != nil {
s.ErrorLog.Error("forkserver failed to start a child process", "error", err)
}
return
}
childs[cmd.Process.Pid] = cmd
go func(index int) {
ch <- racer{index, cmd.Process.Pid, cmd.Wait()}
}(i)
}
var exited int
for sig := range ch {
delete(childs, sig.pid)
if s.ErrorLog != nil {
s.ErrorLog.Error("forkserver one of the child processes exited", "error", err)
}
if exited++; exited > 200 {
if s.ErrorLog != nil {
s.ErrorLog.Error("forkserver child workers exit too many times", "count", exited)
}
err = errors.New("forkserver child workers exit too many times")
break
}
var cmd *exec.Cmd
if cmd, err = fork(sig.index); err != nil {
break
}
childs[cmd.Process.Pid] = cmd
go func(index int) {
ch <- racer{index, cmd.Process.Pid, cmd.Wait()}
}(sig.index)
}
return
}