Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/unixsock #309

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkg/exporter/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ import (
)

type InspServerConfig struct {
DebugMode bool `yaml:"debugMode" mapstructure:"debugMode" json:"debugMode"`
Port uint16 `yaml:"port" mapstructure:"port" json:"port"`
DebugMode bool `yaml:"debugMode" mapstructure:"debugMode" json:"debugMode"`
// A better way to set listen port is a `address` field that can be used for bind interface ip, or even unix domain socket
// Deprecated: use address instead
Port uint16 `yaml:"port" mapstructure:"port" json:"port"`

Address string `yaml:"address" mapstructure:"address" json:"address"`
EnableController bool `yaml:"enableController" mapstructure:"enableController" json:"enableController"`
MetricsConfig MetricsConfig `yaml:"metrics" mapstructure:"metrics" json:"metrics"`
EventConfig EventConfig `yaml:"event" mapstructure:"event" json:"event"`
Expand Down
79 changes: 72 additions & 7 deletions pkg/exporter/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ package cmd
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"reflect"
"strings"
"sync"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -92,7 +97,6 @@ type ProbeManager[T probe.Probe] interface {
StartProbe(ctx context.Context, probe T) error
StopProbe(ctx context.Context, probe T) error
}

type DynamicProbeServer[T probe.Probe] struct {
lock sync.Mutex
probeManager ProbeManager[T]
Expand Down Expand Up @@ -280,7 +284,57 @@ func (i *inspServer) reload() error {
return nil
}

func (i *inspServer) newHTTPServer(cfg *InspServerConfig) *http.Server {
func (i *inspServer) createListener(cfg *InspServerConfig) (net.Listener, error) {
if cfg.Address == "" {
if cfg.Port != 0 {
cfg.Address = fmt.Sprintf(":%d", cfg.Port)
log.Warningf("port is derepcated, use address instead")
} else {
return nil, fmt.Errorf("listen address is empty")
}
}

rawAddr := cfg.Address
if !strings.Contains(rawAddr, "//") {
log.Infof("address contains no protocol part, use tcp:// by default")
rawAddr = "tcp://" + rawAddr
}

protocol := ""
addr := ""

u, err := url.Parse(rawAddr)
if err != nil {
return nil, fmt.Errorf("invalid address %s, valid format is [protocol://]addr[:port]", cfg.Address)
}
protocol = u.Scheme
switch protocol {
case "unix":
// For Unix domain sockets, the path is in the opaque part
addr = strings.TrimPrefix(cfg.Address, "unix://")
if _, err = os.Stat(addr); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("failed stat sock file %s: %w", addr, err)
}
parent := path.Dir(addr)
if err = os.MkdirAll(parent, 0o755); err != nil {
return nil, fmt.Errorf("failed create director %s: %w", parent, err)
}
} else {
//socket file exists, just remove it
_ = os.Remove(addr)
}
case "tcp":
host, port, _ := net.SplitHostPort(cfg.Address)
addr = fmt.Sprintf("%s:%s", host, port)
default:
return nil, fmt.Errorf("unsupported protocol %s, only `tcp` and 'unix' are supported", protocol)
}

return net.Listen(protocol, addr)
}

func (i *inspServer) newHTTPServer(cfg *InspServerConfig) (*http.Server, net.Listener, error) {
http.Handle("/metrics", i.metricsServer)
http.Handle("/", http.HandlerFunc(defaultPage))
http.Handle("/status", http.HandlerFunc(i.statusPage))
Expand All @@ -293,9 +347,14 @@ func (i *inspServer) newHTTPServer(cfg *InspServerConfig) *http.Server {
)
http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
}
listenAddr := fmt.Sprintf(":%d", cfg.Port)
log.Infof("inspector start metric server, listenAddr: %s", listenAddr)
return &http.Server{Addr: listenAddr}

listener, err := i.createListener(cfg)
if err != nil {
return nil, nil, fmt.Errorf("failed create listener: %w", err)
}

log.Infof("inspector start metric server, listenAddr: %s", listener.Addr())
return &http.Server{}, listener, nil
}

func (i *inspServer) start(cfg *InspServerConfig) error {
Expand Down Expand Up @@ -350,11 +409,14 @@ func (i *inspServer) start(cfg *InspServerConfig) error {
log.Errorf("failed watch config, dynamic load would not work: %v", err)
}

srv := i.newHTTPServer(cfg)
srv, listener, err := i.newHTTPServer(cfg)
if err != nil {
return fmt.Errorf("failed start http server: %w", err)
}
serverClosedChan := make(chan struct{})
serverClosed := false
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
if err := srv.Serve(listener); err != nil && err != http.ErrServerClosed {
log.Errorf("server error: %v", err)
}

Expand All @@ -364,9 +426,12 @@ func (i *inspServer) start(cfg *InspServerConfig) error {

WaitSignals(serverClosedChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
close(done)

if !serverClosed {
_ = srv.Shutdown(ctx)
}
_ = listener.Close()

return nil
}

Expand Down
Loading