Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reuse filewatcher #4475

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions internal/filewatcher/filewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package filewatcher
import (
"errors"
"fmt"
"os"
"path/filepath"
"sync"

Expand Down Expand Up @@ -90,7 +91,6 @@ func (fw *fileWatcher) Add(path string) error {
return err
}

// Stop watching a path
func (fw *fileWatcher) Remove(path string) error {
fw.mu.Lock()
defer fw.mu.Unlock()
Expand Down Expand Up @@ -142,9 +142,7 @@ func (fw *fileWatcher) getWorker(path string) (*workerState, string, string, err
return nil, "", "", errors.New("using a closed watcher")
}

cleanedPath := filepath.Clean(path)
parentPath, _ := filepath.Split(cleanedPath)

cleanedPath, parentPath := getPath(path)
ws, workerExists := fw.workers[parentPath]
if !workerExists {
wk, err := newWorker(parentPath, fw.funcs)
Expand All @@ -167,8 +165,7 @@ func (fw *fileWatcher) findWorker(path string) (*workerState, string, error) {
return nil, "", errors.New("using a closed watcher")
}

cleanedPath := filepath.Clean(path)
parentPath, _ := filepath.Split(cleanedPath)
cleanedPath, parentPath := getPath(path)

ws, workerExists := fw.workers[parentPath]
if !workerExists {
Expand All @@ -177,3 +174,13 @@ func (fw *fileWatcher) findWorker(path string) (*workerState, string, error) {

return ws, cleanedPath, nil
}

func getPath(path string) (cleanedPath, parentPath string) {
cleanedPath = filepath.Clean(path)
parentPath, _ = filepath.Split(cleanedPath)
if f, err := os.Lstat(cleanedPath); err == nil && f.IsDir() {
parentPath = cleanedPath
}

return
}
65 changes: 50 additions & 15 deletions internal/filewatcher/filewatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"runtime"
"sync"
"testing"
"time"

"github.com/fsnotify/fsnotify"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -173,6 +174,44 @@ func TestWatchFile(t *testing.T) {
})
}

func TestWatchDir(t *testing.T) {
// Given a file being watched
watchFile := newWatchFile(t)
_, err := os.Stat(watchFile)
require.NoError(t, err)

w := NewWatcher()
defer func() {
_ = w.Close()
}()
d := path.Dir(watchFile)
require.NoError(t, w.Add(d))

timeout := time.After(5 * time.Second)

wg := sync.WaitGroup{}
var timeoutErr error
wg.Add(1)
go func() {
select {
case <-w.Events(d):

case <-w.Events(watchFile):

case <-timeout:
timeoutErr = errors.New("timeout")
}
wg.Done()
}()

// Overwriting the file and waiting its event to be received.
err = os.WriteFile(watchFile, []byte("foo: baz\n"), 0o600)
require.NoError(t, err)
wg.Wait()

require.NoErrorf(t, timeoutErr, "timeout waiting for event")
}

func TestWatcherLifecycle(t *testing.T) {
watchFile1, watchFile2 := newTwoWatchFile(t)

Expand Down Expand Up @@ -295,27 +334,23 @@ func TestBadAddWatcher(t *testing.T) {

func TestDuplicateAdd(t *testing.T) {
w := NewWatcher()

name := newWatchFile(t)
defer func() {
_ = w.Close()
_ = os.Remove(name)
}()

if err := w.Add(name); err != nil {
t.Errorf("Expecting nil, got %v", err)
}

if err := w.Add(name); err == nil {
t.Errorf("Expecting error, got nil")
}

_ = w.Close()
require.NoError(t, w.Add(name))
require.Error(t, w.Add(name))
}

func TestBogusRemove(t *testing.T) {
w := NewWatcher()

name := newWatchFile(t)
if err := w.Remove(name); err == nil {
t.Errorf("Expecting error, got nil")
}
defer func() {
_ = w.Close()
_ = os.Remove(name)
}()

_ = w.Close()
require.Error(t, w.Remove(name))
}
44 changes: 26 additions & 18 deletions internal/filewatcher/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
type worker struct {
mu sync.RWMutex

// watcher is an fsnotify watcher that watches the parent
// watcher is a fsnotify watcher that watches the parent
// dir of watchedFiles.
dirWatcher *fsnotify.Watcher

Expand Down Expand Up @@ -96,10 +96,10 @@
continue
}

sum := getHashSum(path)
if !bytes.Equal(sum, ft.hash) {
sum, isDir := getHashSum(path)
fmt.Printf("isDir: %v sum: %v, ft.hash: %v\n", isDir, sum, ft.hash)
if isDir || !bytes.Equal(sum, ft.hash) {
ft.hash = sum

select {
case ft.events <- event:
// nothing to do
Expand Down Expand Up @@ -141,7 +141,7 @@
}
}

// used only by the worker goroutine
// drainRetiringTrackers used only by the worker goroutine
func (wk *worker) drainRetiringTrackers() {
// cleanup any trackers that were in the process
// of being retired, but didn't get processed due
Expand All @@ -156,7 +156,7 @@
}
}

// make a local copy of the set of trackers to avoid contention with callers
// getTrackers make a local copy of the set of trackers to avoid contention with callers
// used only by the worker goroutine
func (wk *worker) getTrackers() map[string]*fileTracker {
wk.mu.RLock()
Expand Down Expand Up @@ -184,36 +184,34 @@

func (wk *worker) addPath(path string) error {
wk.mu.Lock()
defer wk.mu.Unlock()

ft := wk.watchedFiles[path]
if ft != nil {
wk.mu.Unlock()
return fmt.Errorf("path %s is already being watched", path)
}

h, _ := getHashSum(path)
ft = &fileTracker{
events: make(chan fsnotify.Event),
errors: make(chan error),
hash: getHashSum(path),
hash: h,
}

wk.watchedFiles[path] = ft
wk.mu.Unlock()

return nil
}

func (wk *worker) removePath(path string) error {
wk.mu.Lock()
defer wk.mu.Unlock()

ft := wk.watchedFiles[path]
if ft == nil {
wk.mu.Unlock()
return fmt.Errorf("path %s not found", path)
}

delete(wk.watchedFiles, path)
wk.mu.Unlock()

wk.retireTrackerCh <- ft
return nil
Expand Down Expand Up @@ -241,16 +239,26 @@
return nil
}

// gets the hash of the given file, or nil if there's a problem
func getHashSum(file string) []byte {
// getHashSum return the hash of the given file, or nil if there's a problem, or it's a directory.
func getHashSum(file string) ([]byte, bool) {
f, err := os.Open(file)
if err != nil {
return nil
return nil, false
}
defer f.Close()
r := bufio.NewReader(f)
defer func() {
_ = f.Close()
}()

fi, err := f.Stat()
if err != nil {
return nil, false
}

Check warning on line 255 in internal/filewatcher/worker.go

View check run for this annotation

Codecov / codecov/patch

internal/filewatcher/worker.go#L254-L255

Added lines #L254 - L255 were not covered by tests
if fi.IsDir() {
return nil, true
}

r := bufio.NewReader(f)
h := sha256.New()
_, _ = io.Copy(h, r)
return h.Sum(nil)
return h.Sum(nil), false
}
49 changes: 33 additions & 16 deletions internal/provider/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,34 @@

"github.com/fsnotify/fsnotify"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/healthz"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/filewatcher"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/utils/path"
)

type Provider struct {
paths []string
logger logr.Logger
notifier *Notifier
watcher filewatcher.FileWatcher
resourcesStore *resourcesStore
}

func New(svr *config.Server, resources *message.ProviderResources) (*Provider, error) {
logger := svr.Logger.Logger

notifier, err := NewNotifier(logger)
if err != nil {
return nil, err
paths := sets.New[string]()
if svr.EnvoyGateway.Provider.Custom.Resource.File != nil {
paths.Insert(svr.EnvoyGateway.Provider.Custom.Resource.File.Paths...)

Check warning on line 37 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L35-L37

Added lines #L35 - L37 were not covered by tests
}

return &Provider{
paths: svr.EnvoyGateway.Provider.Custom.Resource.File.Paths,
paths: paths.UnsortedList(),

Check warning on line 41 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L41

Added line #L41 was not covered by tests
logger: logger,
notifier: notifier,
watcher: filewatcher.NewWatcher(),

Check warning on line 43 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L43

Added line #L43 was not covered by tests
resourcesStore: newResourcesStore(svr.EnvoyGateway.Gateway.ControllerName, resources, logger),
}, nil
}
Expand All @@ -48,35 +50,50 @@
}

func (p *Provider) Start(ctx context.Context) error {
dirs, files, err := getDirsAndFilesForWatcher(p.paths)
if err != nil {
return fmt.Errorf("failed to get directories and files for the watcher: %w", err)
}
defer func() {
_ = p.watcher.Close()
}()

Check warning on line 55 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L53-L55

Added lines #L53 - L55 were not covered by tests

// Start runnable servers.
go p.startHealthProbeServer(ctx)

dirs, files := path.ListDirsAndFiles(p.paths)

Check warning on line 60 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L60

Added line #L60 was not covered by tests
// Initially load resources from paths on host.
if err = p.resourcesStore.LoadAndStore(files.UnsortedList(), dirs.UnsortedList()); err != nil {
if err := p.resourcesStore.LoadAndStore(files.UnsortedList(), dirs.UnsortedList()); err != nil {

Check warning on line 62 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L62

Added line #L62 was not covered by tests
return fmt.Errorf("failed to load resources into store: %w", err)
}

// Start watchers in notifier.
p.notifier.Watch(ctx, dirs, files)
defer p.notifier.Close()
// aggregate all path channel into one
aggCh := make(chan fsnotify.Event)
for _, path := range p.paths {
if err := p.watcher.Add(path); err != nil {
p.logger.Error(err, "failed to add watch", "path", path)
}
p.logger.Info("Watching file changed", "path", path)
ch := p.watcher.Events(path)
go func(c chan fsnotify.Event) {
for msg := range c {
aggCh <- msg
}

Check warning on line 77 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L67-L77

Added lines #L67 - L77 were not covered by tests
}(ch)
}

for {
select {
case <-ctx.Done():
return nil
case event := <-p.notifier.Events:
case event := <-aggCh:
p.logger.Info("file changed", "op", event.Op, "name", event.Name)

Check warning on line 86 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L85-L86

Added lines #L85 - L86 were not covered by tests
switch event.Op {
case fsnotify.Create:
dirs.Insert(event.Name)
files.Insert(event.Name)
case fsnotify.Remove:
dirs.Delete(event.Name)
files.Delete(event.Name)
default:
// do nothing
continue

Check warning on line 96 in internal/provider/file/file.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/file/file.go#L94-L96

Added lines #L94 - L96 were not covered by tests
}

p.resourcesStore.HandleEvent(event, files.UnsortedList(), dirs.UnsortedList())
Expand Down
Loading
Loading