From 672de4f95564d0bbada2ca753cfe75cd0dd69450 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Fri, 8 Dec 2023 18:06:28 +0100 Subject: [PATCH 01/12] Runtime: Refactor conn cache to contain and detect hanging opens/closes --- runtime/connection_cache.go | 459 +++++++++++++++++++------------ runtime/connection_cache_test.go | 50 ++-- runtime/connections.go | 4 +- runtime/drivers/duckdb/duckdb.go | 2 +- runtime/registry.go | 2 +- runtime/registry_test.go | 2 +- 6 files changed, 309 insertions(+), 210 deletions(-) diff --git a/runtime/connection_cache.go b/runtime/connection_cache.go index 882150d4ae3..9263f94890e 100644 --- a/runtime/connection_cache.go +++ b/runtime/connection_cache.go @@ -12,120 +12,117 @@ import ( "github.com/hashicorp/golang-lru/simplelru" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/activity" - "github.com/rilldata/rill/runtime/pkg/observability" "go.uber.org/zap" "golang.org/x/exp/maps" ) var errConnectionCacheClosed = errors.New("connectionCache: closed") +var errConnectionClosed = errors.New("connectionCache: connection closed") + const migrateTimeout = 2 * time.Minute +const hangingTimeout = 5 * time.Minute + // connectionCache is a thread-safe cache for open connections. // Connections should preferably be opened only via the connection cache. -// -// TODO: It opens connections async, but it will close them sync when evicted. If a handle's close hangs, this can block the cache. -// We should move the closing to the background. However, it must then handle the case of trying to re-open a connection that's currently closing in the background. type connectionCache struct { - size int - runtime *Runtime - logger *zap.Logger - activity activity.Client - closed bool - migrateCtx context.Context // ctx used for connection migrations - migrateCtxCancel context.CancelFunc // cancel all running migrations - lock sync.Mutex - acquired map[string]*connWithRef // items with non-zero references (in use) which should not be evicted - lru *simplelru.LRU // items with no references (opened, but not in use) ready for eviction + size int + runtime *Runtime + logger *zap.Logger + activity activity.Client + closed bool + ctx context.Context // ctx used for background tasks + ctxCancel context.CancelFunc // cancel background ctx + lock sync.Mutex + entries map[string]*connectionCacheEntry + lru *simplelru.LRU // entries with no references (opened, but not in use) ready for eviction } -type connWithRef struct { - instanceID string - handle drivers.Handle - err error - refs int - ready chan struct{} +type connectionCacheEntry struct { + instanceID string + refs int + working bool + workingCh chan struct{} + workingSince time.Time + handle drivers.Handle + err error + closed bool } func newConnectionCache(size int, logger *zap.Logger, rt *Runtime, ac activity.Client) *connectionCache { - // LRU cache that closes evicted connections - lru, err := simplelru.NewLRU(size, func(key interface{}, value interface{}) { - // Skip if the conn has refs, since the callback also gets called when transferring to acquired cache - conn := value.(*connWithRef) - if conn.refs != 0 { - return - } - if conn.handle != nil { - if err := conn.handle.Close(); err != nil { - logger.Error("failed closing cached connection", zap.String("key", key.(string)), zap.Error(err)) - } - } - }) + ctx, cancel := context.WithCancel(context.Background()) + c := &connectionCache{ + size: size, + runtime: rt, + logger: logger, + activity: ac, + ctx: ctx, + ctxCancel: cancel, + entries: make(map[string]*connectionCacheEntry), + } + + var err error + c.lru, err = simplelru.NewLRU(size, c.lruEvictionHandler) if err != nil { panic(err) } - ctx, cancel := context.WithCancel(context.Background()) - return &connectionCache{ - size: size, - runtime: rt, - logger: logger, - activity: ac, - migrateCtx: ctx, - migrateCtxCancel: cancel, - acquired: make(map[string]*connWithRef), - lru: lru, - } + go c.periodicallyCheckHangingConnections() + + return c } +// Close closes all connections in the cache. +// While not strictly necessary, it's probably best to call this after all connections have been released. func (c *connectionCache) Close() error { c.lock.Lock() - defer c.lock.Unlock() + // Set closed if c.closed { + c.lock.Unlock() return errConnectionCacheClosed } c.closed = true // Cancel currently running migrations - c.migrateCtxCancel() + c.ctxCancel() - var firstErr error - for _, key := range c.lru.Keys() { - val, ok := c.lru.Get(key) - if !ok { - continue - } - conn := val.(*connWithRef) - if conn.handle == nil { - continue - } - err := conn.handle.Close() - if err != nil { - c.logger.Error("failed closing cached connection", zap.Error(err)) - if firstErr == nil { - firstErr = err - } - } + // Start closing all connections (will close in the background) + for key, entry := range c.entries { + c.closeEntry(key, entry) } - for _, value := range c.acquired { - if value.handle == nil { - continue + // Clear the LRU - might not be needed, but just to be sure + c.lru.Purge() + + // Unlock to allow entries to close and remove themselves from c.entries in the background + c.lock.Unlock() + + // Wait for c.entries to become empty + for { + c.lock.Lock() + var anyEntry *connectionCacheEntry + for _, e := range c.entries { + anyEntry = e + break } - err := value.handle.Close() - if err != nil { - c.logger.Error("failed closing cached connection", zap.Error(err)) - if firstErr == nil { - firstErr = err - } + c.lock.Unlock() + + if anyEntry == nil { + // c.entries is empty, we can return + break } + + <-anyEntry.workingCh } - return firstErr + return nil } -func (c *connectionCache) get(ctx context.Context, instanceID, driver string, config map[string]any, shared bool) (drivers.Handle, func(), error) { +// Get opens and caches a connection. +// The caller should call the returned release function when done with the connection. +func (c *connectionCache) Get(ctx context.Context, instanceID, driver string, config map[string]any, shared bool) (drivers.Handle, func(), error) { var key string if shared { // not using instanceID to ensure all instances share the same handle @@ -140,100 +137,231 @@ func (c *connectionCache) get(ctx context.Context, instanceID, driver string, co return nil, nil, errConnectionCacheClosed } - // Get conn from caches - conn, ok := c.acquired[key] - if ok { - conn.refs++ - } else { - var val any - val, ok = c.lru.Get(key) - if ok { - // Conn was found in LRU - move to acquired cache - conn = val.(*connWithRef) - conn.refs++ // NOTE: Must increment before call to c.lru.remove to avoid closing the conn - c.lru.Remove(key) - c.acquired[key] = conn - } - } - - // Cached conn not found, open a new one + // Get or create conn + entry, ok := c.entries[key] if !ok { - conn = &connWithRef{ - instanceID: instanceID, - refs: 1, // Since refs is assumed to already have been incremented when checking conn.ready - ready: make(chan struct{}), - } - c.acquired[key] = conn + // Cached conn not found, open a new one + entry = &connectionCacheEntry{instanceID: instanceID} + c.entries[key] = entry - if len(c.acquired)+c.lru.Len() > c.size { - c.logger.Warn("number of connections acquired and in LRU exceed total configured size", zap.Int("acquired", len(c.acquired)), zap.Int("lru", c.lru.Len())) - } + c.openEntry(key, entry, driver, shared, config) - // Open and migrate the connection in a separate goroutine (outside lock). - // Incrementing ref and releasing the conn for this operation separately to cover the case where all waiting goroutines are cancelled before the migration completes. - conn.refs++ - go func() { - handle, err := c.openAndMigrate(c.migrateCtx, instanceID, driver, shared, config) - c.lock.Lock() - conn.handle = handle - conn.err = err - c.releaseConn(key, conn) - wasClosed := c.closed - c.lock.Unlock() - close(conn.ready) - - // The cache might have been closed while the connection was being opened. - // Since we acquired the lock, the close will have already been completed, so we need to close the connection here. - if wasClosed && handle != nil { - _ = handle.Close() - } - }() + if len(c.entries) >= 2*c.size { + c.logger.Warn("connection cache: the number of open connections exceeds the cache size by more than 2x", zap.Int("entries", len(c.entries))) + } } + // Acquire the entry + c.acquireEntry(key, entry) + // We can now release the lock and wait for the connection to be ready (it might already be) c.lock.Unlock() // Wait for connection to be ready or context to be cancelled var err error - select { - case <-conn.ready: - case <-ctx.Done(): - err = ctx.Err() // Will always be non-nil, ensuring releaseConn is called + stop := false + for !stop { + select { + case <-entry.workingCh: + c.lock.Lock() + + // The entry was closed right after being opened, we must loop to check c.workingCh again. + if entry.working { + c.lock.Unlock() + continue + } + + // We acquired the entry as it was closing, let's reopen it. + if entry.closed { + c.openEntry(key, entry, driver, shared, config) + c.lock.Unlock() + continue + } + + stop = true + case <-ctx.Done(): + c.lock.Lock() + err = ctx.Err() // Will always be non-nil, ensuring releaseEntry is called + stop = true + } } - // Lock again for accessing conn - c.lock.Lock() + // We've got the lock now and know entry.working is false defer c.lock.Unlock() if err == nil { - err = conn.err + err = entry.err } if err != nil { - c.releaseConn(key, conn) + c.releaseEntry(key, entry) return nil, nil, err } release := func() { c.lock.Lock() - c.releaseConn(key, conn) + c.releaseEntry(key, entry) c.lock.Unlock() } - return conn.handle, release, nil + return entry.handle, release, nil +} + +// EvictAll closes all connections for an instance. +func (c *connectionCache) EvictAll(ctx context.Context, instanceID string) { + c.lock.Lock() + defer c.lock.Unlock() + + if c.closed { + return + } + + for key, entry := range c.entries { + if entry.instanceID != instanceID { + continue + } + + c.closeEntry(key, entry) + } +} + +// acquireEntry increments an entry's refs and moves it out of the LRU if it's there. +// It should be called when holding the lock. +func (c *connectionCache) acquireEntry(key string, entry *connectionCacheEntry) { + entry.refs++ + if entry.refs == 1 { + // NOTE: lru.Remove is safe even if it's not in the LRU (should only happen if the entry is acquired for the first time) + _ = c.lru.Remove(key) + } } -func (c *connectionCache) releaseConn(key string, conn *connWithRef) { - conn.refs-- - if conn.refs == 0 { - // No longer referenced. Move from acquired to LRU. - if !c.closed { - delete(c.acquired, key) - c.lru.Add(key, conn) +// releaseEntry decrements an entry's refs and moves it to the LRU if nothing references it. +// It should be called when holding the lock. +func (c *connectionCache) releaseEntry(key string, entry *connectionCacheEntry) { + entry.refs-- + if entry.refs == 0 { + // No longer referenced. Move to LRU unless conn and/or cache is closed. + delete(c.entries, key) + if !c.closed && !entry.closed { + c.lru.Add(key, entry) } } } +// lruEvictionHandler is called by the LRU when evicting an entry. +// Note that the LRU only holds entries with refs == 0 (unless the entry is currently being moved to the acquired cache). +// Note also that this handler is called sync by the LRU, i.e. c.lock will be held. +func (c *connectionCache) lruEvictionHandler(key, value interface{}) { + entry := value.(*connectionCacheEntry) + + // The callback also gets called when removing from LRU during acquisition. + // We use conn.refs != 0 to signal that its being acquired and should not be closed. + if entry.refs != 0 { + return + } + + // Close the connection + c.closeEntry(key.(string), entry) +} + +// openEntry opens an entry's connection. It's safe to call for a previously closed entry. +// It's NOT safe to call for an entry that's currently working. +// It should be called when holding the lock (but the actual open and migrate will happen in the background). +func (c *connectionCache) openEntry(key string, entry *connectionCacheEntry, driver string, shared bool, config map[string]any) { + // Since whatever code that called openEntry may get cancelled/return before the connection is opened, we get our own reference to it. + c.acquireEntry(key, entry) + + // Reset entry and set working + entry.working = true + entry.workingCh = make(chan struct{}) + entry.workingSince = time.Now() + entry.handle = nil + entry.err = nil + entry.closed = false + + // Open in the background + // NOTE: If closeEntry is called while it's opening, closeEntry will wait for the open to complete, so we don't need to handle that case here. + go func() { + handle, err := c.openAndMigrate(c.ctx, entry.instanceID, driver, shared, config) + + c.lock.Lock() + entry.working = false + close(entry.workingCh) + entry.workingSince = time.Time{} + entry.handle = handle + entry.err = err + c.releaseEntry(key, entry) + c.lock.Unlock() + }() +} + +// closeEntry closes an entry's connection. It's safe to call for an entry that's currently being closed/already closed. +// It should be called when holding the lock (but the actual close will happen in the background). +func (c *connectionCache) closeEntry(key string, entry *connectionCacheEntry) { + if entry.closed { + return + } + + c.acquireEntry(key, entry) + + wasWorking := entry.working + if !wasWorking { + entry.working = true + entry.workingCh = make(chan struct{}) + entry.workingSince = time.Now() + } + + go func() { + // If the entry was working when closeEntry was called, wait for it to finish before continuing. + if wasWorking { + stop := false + for !stop { + <-entry.workingCh + c.lock.Lock() + + // Bad luck, something else started working on the entry. Loop and wait again. + if entry.working { + c.lock.Unlock() + continue + } + + // Good luck, something else closed the entry. We're done. + if entry.closed { + c.lock.Unlock() + return + } + + // Our turn to start working it + entry.working = true + entry.workingCh = make(chan struct{}) + entry.workingSince = time.Now() + c.lock.Unlock() + stop = true + } + } + + // Close the connection + if entry.handle != nil { + err := entry.handle.Close() + if err != nil { + c.logger.Error("failed closing cached connection", zap.String("key", key), zap.Error(err)) + } + } + + // Mark closed + c.lock.Lock() + entry.working = false + close(entry.workingCh) + entry.workingSince = time.Time{} + entry.handle = nil + entry.err = errConnectionClosed + entry.closed = true + c.releaseEntry(key, entry) + c.lock.Unlock() + }() +} + +// openAndMigrate opens a connection and migrates it. func (c *connectionCache) openAndMigrate(ctx context.Context, instanceID, driver string, shared bool, config map[string]any) (drivers.Handle, error) { logger := c.logger if instanceID != "default" { @@ -275,53 +403,24 @@ func (c *connectionCache) openAndMigrate(ctx context.Context, instanceID, driver return handle, nil } -// evictAll closes all connections for an instance. -func (c *connectionCache) evictAll(ctx context.Context, instanceID string) { - c.lock.Lock() - defer c.lock.Unlock() - - if c.closed { - return - } - - for key, conn := range c.acquired { - if conn.instanceID != instanceID { - continue - } - - if conn.handle != nil { - err := conn.handle.Close() - if err != nil { - c.logger.Error("connection cache: failed to close cached connection", zap.Error(err), zap.String("instance", instanceID), observability.ZapCtx(ctx)) - } - conn.handle = nil - conn.err = fmt.Errorf("connection evicted") // Defensive, should never be accessed - } - - delete(c.acquired, key) - } - - for _, key := range c.lru.Keys() { - connT, ok := c.lru.Get(key) - if !ok { - panic("connection cache: key not found in LRU") - } - conn := connT.(*connWithRef) +// periodicallyCheckHangingConnections periodically checks for connection opens or closes that have been working for too long. +func (c *connectionCache) periodicallyCheckHangingConnections() { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() - if conn.instanceID != instanceID { - continue - } - - if conn.handle != nil { - err := conn.handle.Close() - if err != nil { - c.logger.Error("connection cache: failed to close cached connection", zap.Error(err), zap.String("instance", instanceID), observability.ZapCtx(ctx)) + for { + select { + case <-ticker.C: + c.lock.Lock() + for key, entry := range c.entries { + if entry.working && time.Since(entry.workingSince) > hangingTimeout { + c.logger.Error("connection cache: connection open or close has been working for too long", zap.String("key", key), zap.Duration("duration", time.Since(entry.workingSince))) + } } - conn.handle = nil - conn.err = fmt.Errorf("connection evicted") // Defensive, should never be accessed + c.lock.Unlock() + case <-c.ctx.Done(): + return } - - c.lru.Remove(key) } } diff --git a/runtime/connection_cache_test.go b/runtime/connection_cache_test.go index 299021e1749..ff68a59fb1a 100644 --- a/runtime/connection_cache_test.go +++ b/runtime/connection_cache_test.go @@ -22,12 +22,12 @@ func TestConnectionCache(t *testing.T) { rt := newTestRuntimeWithInst(t) c := newConnectionCache(10, zap.NewNop(), rt, activity.NewNoopClient()) - conn1, release, err := c.get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, false) + conn1, release, err := c.Get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, false) require.NoError(t, err) release() require.NotNil(t, conn1) - conn2, release, err := c.get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, false) + conn2, release, err := c.Get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, false) require.NoError(t, err) release() require.NotNil(t, conn2) @@ -57,7 +57,7 @@ func TestConnectionCache(t *testing.T) { } require.NoError(t, rt.CreateInstance(context.Background(), inst)) - conn3, release, err := c.get(ctx, "default1", "sqlite", map[string]any{"dsn": ":memory:"}, false) + conn3, release, err := c.Get(ctx, "default1", "sqlite", map[string]any{"dsn": ":memory:"}, false) require.NoError(t, err) release() require.NotNil(t, conn3) @@ -71,24 +71,24 @@ func TestConnectionCacheWithAllShared(t *testing.T) { id := "default" c := newConnectionCache(1, zap.NewNop(), newTestRuntimeWithInst(t), activity.NewNoopClient()) - conn1, release, err := c.get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, true) + conn1, release, err := c.Get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, true) require.NoError(t, err) require.NotNil(t, conn1) defer release() - conn2, release, err := c.get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, true) + conn2, release, err := c.Get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, true) require.NoError(t, err) require.NotNil(t, conn2) defer release() - conn3, release, err := c.get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:"}, true) + conn3, release, err := c.Get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:"}, true) require.NoError(t, err) require.NotNil(t, conn3) defer release() require.True(t, conn1 == conn2) require.True(t, conn2 == conn3) - require.Equal(t, 1, len(c.acquired)) + require.Equal(t, 1, len(c.entries)) require.Equal(t, 0, c.lru.Len()) } @@ -97,30 +97,30 @@ func TestConnectionCacheWithAllOpen(t *testing.T) { rt := newTestRuntimeWithInst(t) c := newConnectionCache(1, zap.NewNop(), rt, activity.NewNoopClient()) - conn1, r1, err := c.get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:"}, false) + conn1, r1, err := c.Get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:"}, false) require.NoError(t, err) require.NotNil(t, conn1) createInstance(t, rt, "default1") - conn2, r2, err := c.get(ctx, "default1", "sqlite", map[string]any{"dsn": ":memory:"}, false) + conn2, r2, err := c.Get(ctx, "default1", "sqlite", map[string]any{"dsn": ":memory:"}, false) require.NoError(t, err) require.NotNil(t, conn2) createInstance(t, rt, "default2") - conn3, r3, err := c.get(ctx, "default2", "sqlite", map[string]any{"dsn": ":memory:"}, false) + conn3, r3, err := c.Get(ctx, "default2", "sqlite", map[string]any{"dsn": ":memory:"}, false) require.NoError(t, err) require.NotNil(t, conn3) - require.Equal(t, 3, len(c.acquired)) + require.Equal(t, 3, len(c.entries)) require.Equal(t, 0, c.lru.Len()) // release all connections r1() r2() r3() - require.Equal(t, 0, len(c.acquired)) + require.Equal(t, 1, len(c.entries)) require.Equal(t, 1, c.lru.Len()) _, val, _ := c.lru.GetOldest() - require.True(t, conn3 == val.(*connWithRef).handle) + require.True(t, conn3 == val.(*connectionCacheEntry).handle) } func TestConnectionCacheParallel(t *testing.T) { @@ -140,7 +140,7 @@ func TestConnectionCacheParallel(t *testing.T) { defer wg.Done() id := fmt.Sprintf("default%v", 100+j) createInstance(t, rt, id) - conn, _, err := c.get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, false) + conn, _, err := c.Get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, false) require.NoError(t, err) require.NotNil(t, conn) time.Sleep(100 * time.Millisecond) @@ -155,7 +155,7 @@ func TestConnectionCacheParallel(t *testing.T) { defer wg.Done() id := fmt.Sprintf("default%v", 200+j) createInstance(t, rt, id) - conn, r, err := c.get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, false) + conn, r, err := c.Get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, false) defer r() require.NoError(t, err) require.NotNil(t, conn) @@ -165,7 +165,7 @@ func TestConnectionCacheParallel(t *testing.T) { wg.Wait() // 10 connections were not released so should be present in in-use cache - require.Equal(t, 10, len(c.acquired)) + require.Equal(t, 15, len(c.entries)) // 20 connections were released so 15 should be evicted require.Equal(t, 5, c.lru.Len()) } @@ -175,25 +175,25 @@ func TestConnectionCacheMultipleConfigs(t *testing.T) { c := newConnectionCache(10, zap.NewNop(), newTestRuntimeWithInst(t), activity.NewNoopClient()) defer c.Close() - conn1, r1, err := c.get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:", "host": "localhost:8080", "allow_host_access": "true"}, true) + conn1, r1, err := c.Get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:", "host": "localhost:8080", "allow_host_access": "true"}, true) require.NoError(t, err) require.NotNil(t, conn1) - conn2, r2, err := c.get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:", "host": "localhost:8080", "allow_host_access": "true"}, true) + conn2, r2, err := c.Get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:", "host": "localhost:8080", "allow_host_access": "true"}, true) require.NoError(t, err) require.NotNil(t, conn2) - conn3, r3, err := c.get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:", "host": "localhost:8080", "allow_host_access": "true"}, true) + conn3, r3, err := c.Get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:", "host": "localhost:8080", "allow_host_access": "true"}, true) require.NoError(t, err) require.NotNil(t, conn3) - require.Equal(t, 1, len(c.acquired)) + require.Equal(t, 1, len(c.entries)) require.Equal(t, 0, c.lru.Len()) // release all connections r1() r2() r3() - require.Equal(t, 0, len(c.acquired)) + require.Equal(t, 1, len(c.entries)) require.Equal(t, 1, c.lru.Len()) } @@ -218,7 +218,7 @@ func TestConnectionCacheParallelCalls(t *testing.T) { for i := 0; i < 10; i++ { go func() { defer wg.Done() - conn, _, err := c.get(ctx, "default", "mock_driver", map[string]any{"sleep": int64(100)}, false) + conn, _, err := c.Get(ctx, "default", "mock_driver", map[string]any{"sleep": int64(100)}, false) require.NoError(t, err) require.NotNil(t, conn) }() @@ -226,7 +226,7 @@ func TestConnectionCacheParallelCalls(t *testing.T) { wg.Wait() require.Equal(t, int32(1), m.opened.Load()) - require.Equal(t, 1, len(c.acquired)) + require.Equal(t, 1, len(c.entries)) } func TestConnectionCacheBlockingCalls(t *testing.T) { @@ -249,7 +249,7 @@ func TestConnectionCacheBlockingCalls(t *testing.T) { // open 1 slow connection go func() { defer wg.Done() - conn, _, err := c.get(ctx, "default", "mock_driver", map[string]any{"sleep": int64(1000)}, false) + conn, _, err := c.Get(ctx, "default", "mock_driver", map[string]any{"sleep": int64(1000)}, false) require.NoError(t, err) require.NotNil(t, conn) }() @@ -259,7 +259,7 @@ func TestConnectionCacheBlockingCalls(t *testing.T) { j := i go func() { defer wg.Done() - conn, _, err := c.get(ctx, "default", "mock_driver", map[string]any{"sleep": int64(j + 10)}, false) + conn, _, err := c.Get(ctx, "default", "mock_driver", map[string]any{"sleep": int64(j + 10)}, false) require.NoError(t, err) require.NotNil(t, conn) }() diff --git a/runtime/connections.go b/runtime/connections.go index ba939063c07..d1e9a5b81df 100644 --- a/runtime/connections.go +++ b/runtime/connections.go @@ -19,7 +19,7 @@ func (r *Runtime) AcquireSystemHandle(ctx context.Context, connector string) (dr cfg[strings.ToLower(k)] = v } cfg["allow_host_access"] = r.opts.AllowHostAccess - return r.connCache.get(ctx, "", c.Type, cfg, true) + return r.connCache.Get(ctx, "", c.Type, cfg, true) } } return nil, nil, fmt.Errorf("connector %s doesn't exist", connector) @@ -36,7 +36,7 @@ func (r *Runtime) AcquireHandle(ctx context.Context, instanceID, connector strin // So we take this moment to make sure the ctx gets checked for cancellation at least every once in a while. return nil, nil, ctx.Err() } - return r.connCache.get(ctx, instanceID, driver, cfg, false) + return r.connCache.Get(ctx, instanceID, driver, cfg, false) } func (r *Runtime) Repo(ctx context.Context, instanceID string) (drivers.RepoStore, func(), error) { diff --git a/runtime/drivers/duckdb/duckdb.go b/runtime/drivers/duckdb/duckdb.go index cf045549d3b..3f694762819 100644 --- a/runtime/drivers/duckdb/duckdb.go +++ b/runtime/drivers/duckdb/duckdb.go @@ -806,7 +806,7 @@ func (c *connection) periodicallyCheckConnDurations(d time.Duration) { c.connTimesMu.Lock() for connID, connTime := range c.connTimes { if time.Since(connTime) > maxAcquiredConnDuration { - c.logger.Error("duckdb: a connection has been held for more longer than the maximum allowed duration", zap.Int("conn_id", connID), zap.Duration("duration", time.Since(connTime))) + c.logger.Error("duckdb: a connection has been held for longer than the maximum allowed duration", zap.Int("conn_id", connID), zap.Duration("duration", time.Since(connTime))) } } c.connTimesMu.Unlock() diff --git a/runtime/registry.go b/runtime/registry.go index ba521929843..1a08c6ab26b 100644 --- a/runtime/registry.go +++ b/runtime/registry.go @@ -375,7 +375,7 @@ func (r *registryCache) restartController(iwc *instanceWithController) { // So we want to evict all open connections for that instance, but it's unsafe to do so while the controller is running. // So this is the only place where we can do it safely. if r.baseCtx.Err() == nil { - r.rt.connCache.evictAll(r.baseCtx, iwc.instance.ID) + r.rt.connCache.EvictAll(r.baseCtx, iwc.instance.ID) } r.mu.Lock() diff --git a/runtime/registry_test.go b/runtime/registry_test.go index 5f305499fad..66729a0772a 100644 --- a/runtime/registry_test.go +++ b/runtime/registry_test.go @@ -474,7 +474,7 @@ func TestRuntime_DeleteInstance_DropCorrupted(t *testing.T) { require.NoError(t, err) // Close OLAP connection - rt.connCache.evictAll(ctx, inst.ID) + rt.connCache.EvictAll(ctx, inst.ID) // Corrupt database file err = os.WriteFile(dbpath, []byte("corrupted"), 0644) From 38850b533a1ac7045862cf4647879d2f6b56f808 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Tue, 12 Dec 2023 20:08:32 +0100 Subject: [PATCH 02/12] Extract connection cache to pkg + use a singleflight --- runtime/connection_cache.go | 429 ++++------------------------- runtime/connection_cache_test.go | 417 ---------------------------- runtime/connections.go | 4 +- runtime/pkg/conncache/conncache.go | 334 ++++++++++++++++++++++ runtime/registry.go | 2 +- runtime/registry_test.go | 6 +- runtime/runtime.go | 7 +- 7 files changed, 402 insertions(+), 797 deletions(-) delete mode 100644 runtime/connection_cache_test.go create mode 100644 runtime/pkg/conncache/conncache.go diff --git a/runtime/connection_cache.go b/runtime/connection_cache.go index 9263f94890e..73a4e06d133 100644 --- a/runtime/connection_cache.go +++ b/runtime/connection_cache.go @@ -6,374 +6,80 @@ import ( "fmt" "slices" "strings" - "sync" "time" - "github.com/hashicorp/golang-lru/simplelru" "github.com/rilldata/rill/runtime/drivers" - "github.com/rilldata/rill/runtime/pkg/activity" + "github.com/rilldata/rill/runtime/pkg/conncache" "go.uber.org/zap" "golang.org/x/exp/maps" ) -var errConnectionCacheClosed = errors.New("connectionCache: closed") - -var errConnectionClosed = errors.New("connectionCache: connection closed") - -const migrateTimeout = 2 * time.Minute - -const hangingTimeout = 5 * time.Minute - -// connectionCache is a thread-safe cache for open connections. -// Connections should preferably be opened only via the connection cache. -type connectionCache struct { - size int - runtime *Runtime - logger *zap.Logger - activity activity.Client - closed bool - ctx context.Context // ctx used for background tasks - ctxCancel context.CancelFunc // cancel background ctx - lock sync.Mutex - entries map[string]*connectionCacheEntry - lru *simplelru.LRU // entries with no references (opened, but not in use) ready for eviction +type cachedConnectionConfig struct { + instanceID string + driver string + shared bool + config map[string]any } -type connectionCacheEntry struct { - instanceID string - refs int - working bool - workingCh chan struct{} - workingSince time.Time - handle drivers.Handle - err error - closed bool +// newConnectionCache returns a concurrency-safe cache for open connections. +// Connections should preferably be opened only via the connection cache. +// It's implementation handles issues such as concurrent open/close/eviction of a connection. +// It also monitors for hanging connections. +func (r *Runtime) newConnectionCache() conncache.Cache { + return conncache.New(conncache.Options{ + MaxConnectionsIdle: r.opts.ConnectionCacheSize, + OpenTimeout: 2 * time.Minute, + CloseTimeout: 5 * time.Minute, + OpenFunc: func(ctx context.Context, cfg any) (conncache.Connection, error) { + x := cfg.(cachedConnectionConfig) + return r.openAndMigrate(ctx, x) + }, + KeyFunc: func(cfg any) string { + x := cfg.(cachedConnectionConfig) + return generateKey(x) + }, + HangingFunc: func(cfg any, open bool) { + x := cfg.(cachedConnectionConfig) + r.logger.Error("connection cache: connection has been working for too long", zap.String("instance_id", x.instanceID), zap.String("driver", x.driver), zap.Bool("open", open)) + }, + }) } -func newConnectionCache(size int, logger *zap.Logger, rt *Runtime, ac activity.Client) *connectionCache { - ctx, cancel := context.WithCancel(context.Background()) - c := &connectionCache{ - size: size, - runtime: rt, - logger: logger, - activity: ac, - ctx: ctx, - ctxCancel: cancel, - entries: make(map[string]*connectionCacheEntry), +// getConnection returns a cached connection for the given driver configuration. +func (r *Runtime) getConnection(ctx context.Context, instanceID, driver string, config map[string]any, shared bool) (drivers.Handle, func(), error) { + cfg := cachedConnectionConfig{ + instanceID: instanceID, + driver: driver, + shared: shared, + config: config, } - var err error - c.lru, err = simplelru.NewLRU(size, c.lruEvictionHandler) + handle, release, err := r.connCache.Acquire(ctx, cfg) if err != nil { - panic(err) - } - - go c.periodicallyCheckHangingConnections() - - return c -} - -// Close closes all connections in the cache. -// While not strictly necessary, it's probably best to call this after all connections have been released. -func (c *connectionCache) Close() error { - c.lock.Lock() - - // Set closed - if c.closed { - c.lock.Unlock() - return errConnectionCacheClosed - } - c.closed = true - - // Cancel currently running migrations - c.ctxCancel() - - // Start closing all connections (will close in the background) - for key, entry := range c.entries { - c.closeEntry(key, entry) - } - - // Clear the LRU - might not be needed, but just to be sure - c.lru.Purge() - - // Unlock to allow entries to close and remove themselves from c.entries in the background - c.lock.Unlock() - - // Wait for c.entries to become empty - for { - c.lock.Lock() - var anyEntry *connectionCacheEntry - for _, e := range c.entries { - anyEntry = e - break - } - c.lock.Unlock() - - if anyEntry == nil { - // c.entries is empty, we can return - break - } - - <-anyEntry.workingCh - } - - return nil -} - -// Get opens and caches a connection. -// The caller should call the returned release function when done with the connection. -func (c *connectionCache) Get(ctx context.Context, instanceID, driver string, config map[string]any, shared bool) (drivers.Handle, func(), error) { - var key string - if shared { - // not using instanceID to ensure all instances share the same handle - key = driver + generateKey(config) - } else { - key = instanceID + driver + generateKey(config) - } - - c.lock.Lock() - if c.closed { - c.lock.Unlock() - return nil, nil, errConnectionCacheClosed - } - - // Get or create conn - entry, ok := c.entries[key] - if !ok { - // Cached conn not found, open a new one - entry = &connectionCacheEntry{instanceID: instanceID} - c.entries[key] = entry - - c.openEntry(key, entry, driver, shared, config) - - if len(c.entries) >= 2*c.size { - c.logger.Warn("connection cache: the number of open connections exceeds the cache size by more than 2x", zap.Int("entries", len(c.entries))) - } - } - - // Acquire the entry - c.acquireEntry(key, entry) - - // We can now release the lock and wait for the connection to be ready (it might already be) - c.lock.Unlock() - - // Wait for connection to be ready or context to be cancelled - var err error - stop := false - for !stop { - select { - case <-entry.workingCh: - c.lock.Lock() - - // The entry was closed right after being opened, we must loop to check c.workingCh again. - if entry.working { - c.lock.Unlock() - continue - } - - // We acquired the entry as it was closing, let's reopen it. - if entry.closed { - c.openEntry(key, entry, driver, shared, config) - c.lock.Unlock() - continue - } - - stop = true - case <-ctx.Done(): - c.lock.Lock() - err = ctx.Err() // Will always be non-nil, ensuring releaseEntry is called - stop = true - } - } - - // We've got the lock now and know entry.working is false - defer c.lock.Unlock() - - if err == nil { - err = entry.err - } - - if err != nil { - c.releaseEntry(key, entry) return nil, nil, err } - release := func() { - c.lock.Lock() - c.releaseEntry(key, entry) - c.lock.Unlock() - } - - return entry.handle, release, nil -} - -// EvictAll closes all connections for an instance. -func (c *connectionCache) EvictAll(ctx context.Context, instanceID string) { - c.lock.Lock() - defer c.lock.Unlock() - - if c.closed { - return - } - - for key, entry := range c.entries { - if entry.instanceID != instanceID { - continue - } - - c.closeEntry(key, entry) - } -} - -// acquireEntry increments an entry's refs and moves it out of the LRU if it's there. -// It should be called when holding the lock. -func (c *connectionCache) acquireEntry(key string, entry *connectionCacheEntry) { - entry.refs++ - if entry.refs == 1 { - // NOTE: lru.Remove is safe even if it's not in the LRU (should only happen if the entry is acquired for the first time) - _ = c.lru.Remove(key) - } -} - -// releaseEntry decrements an entry's refs and moves it to the LRU if nothing references it. -// It should be called when holding the lock. -func (c *connectionCache) releaseEntry(key string, entry *connectionCacheEntry) { - entry.refs-- - if entry.refs == 0 { - // No longer referenced. Move to LRU unless conn and/or cache is closed. - delete(c.entries, key) - if !c.closed && !entry.closed { - c.lru.Add(key, entry) - } - } -} - -// lruEvictionHandler is called by the LRU when evicting an entry. -// Note that the LRU only holds entries with refs == 0 (unless the entry is currently being moved to the acquired cache). -// Note also that this handler is called sync by the LRU, i.e. c.lock will be held. -func (c *connectionCache) lruEvictionHandler(key, value interface{}) { - entry := value.(*connectionCacheEntry) - - // The callback also gets called when removing from LRU during acquisition. - // We use conn.refs != 0 to signal that its being acquired and should not be closed. - if entry.refs != 0 { - return - } - - // Close the connection - c.closeEntry(key.(string), entry) + return handle.(drivers.Handle), release, nil } -// openEntry opens an entry's connection. It's safe to call for a previously closed entry. -// It's NOT safe to call for an entry that's currently working. -// It should be called when holding the lock (but the actual open and migrate will happen in the background). -func (c *connectionCache) openEntry(key string, entry *connectionCacheEntry, driver string, shared bool, config map[string]any) { - // Since whatever code that called openEntry may get cancelled/return before the connection is opened, we get our own reference to it. - c.acquireEntry(key, entry) - - // Reset entry and set working - entry.working = true - entry.workingCh = make(chan struct{}) - entry.workingSince = time.Now() - entry.handle = nil - entry.err = nil - entry.closed = false - - // Open in the background - // NOTE: If closeEntry is called while it's opening, closeEntry will wait for the open to complete, so we don't need to handle that case here. - go func() { - handle, err := c.openAndMigrate(c.ctx, entry.instanceID, driver, shared, config) - - c.lock.Lock() - entry.working = false - close(entry.workingCh) - entry.workingSince = time.Time{} - entry.handle = handle - entry.err = err - c.releaseEntry(key, entry) - c.lock.Unlock() - }() -} - -// closeEntry closes an entry's connection. It's safe to call for an entry that's currently being closed/already closed. -// It should be called when holding the lock (but the actual close will happen in the background). -func (c *connectionCache) closeEntry(key string, entry *connectionCacheEntry) { - if entry.closed { - return - } - - c.acquireEntry(key, entry) - - wasWorking := entry.working - if !wasWorking { - entry.working = true - entry.workingCh = make(chan struct{}) - entry.workingSince = time.Now() - } - - go func() { - // If the entry was working when closeEntry was called, wait for it to finish before continuing. - if wasWorking { - stop := false - for !stop { - <-entry.workingCh - c.lock.Lock() - - // Bad luck, something else started working on the entry. Loop and wait again. - if entry.working { - c.lock.Unlock() - continue - } - - // Good luck, something else closed the entry. We're done. - if entry.closed { - c.lock.Unlock() - return - } - - // Our turn to start working it - entry.working = true - entry.workingCh = make(chan struct{}) - entry.workingSince = time.Now() - c.lock.Unlock() - stop = true - } - } - - // Close the connection - if entry.handle != nil { - err := entry.handle.Close() - if err != nil { - c.logger.Error("failed closing cached connection", zap.String("key", key), zap.Error(err)) - } - } - - // Mark closed - c.lock.Lock() - entry.working = false - close(entry.workingCh) - entry.workingSince = time.Time{} - entry.handle = nil - entry.err = errConnectionClosed - entry.closed = true - c.releaseEntry(key, entry) - c.lock.Unlock() - }() +// evictInstanceConnections evicts all connections for the given instance. +func (r *Runtime) evictInstanceConnections(instanceID string) { + r.connCache.EvictWhere(func(cfg any) bool { + x := cfg.(cachedConnectionConfig) + return x.instanceID == instanceID + }) } // openAndMigrate opens a connection and migrates it. -func (c *connectionCache) openAndMigrate(ctx context.Context, instanceID, driver string, shared bool, config map[string]any) (drivers.Handle, error) { - logger := c.logger - if instanceID != "default" { - logger = c.logger.With(zap.String("instance_id", instanceID), zap.String("driver", driver)) +func (r *Runtime) openAndMigrate(ctx context.Context, cfg cachedConnectionConfig) (drivers.Handle, error) { + logger := r.logger + if cfg.instanceID != "default" { + logger = r.logger.With(zap.String("instance_id", cfg.instanceID), zap.String("driver", cfg.driver)) } - ctx, cancel := context.WithTimeout(ctx, migrateTimeout) - defer cancel() - - activityClient := c.activity - if !shared { - inst, err := c.runtime.Instance(ctx, instanceID) + activityClient := r.activity + if !cfg.shared { + inst, err := r.Instance(ctx, cfg.instanceID) if err != nil { return nil, err } @@ -384,9 +90,9 @@ func (c *connectionCache) openAndMigrate(ctx context.Context, instanceID, driver } } - handle, err := drivers.Open(driver, config, shared, activityClient, logger) + handle, err := drivers.Open(cfg.driver, cfg.config, cfg.shared, activityClient, logger) if err == nil && ctx.Err() != nil { - err = fmt.Errorf("timed out while opening driver %q", driver) + err = fmt.Errorf("timed out while opening driver %q", cfg.driver) } if err != nil { return nil, err @@ -396,42 +102,23 @@ func (c *connectionCache) openAndMigrate(ctx context.Context, instanceID, driver if err != nil { handle.Close() if errors.Is(err, ctx.Err()) { - err = fmt.Errorf("timed out while migrating driver %q: %w", driver, err) + err = fmt.Errorf("timed out while migrating driver %q: %w", cfg.driver, err) } return nil, err } return handle, nil } -// periodicallyCheckHangingConnections periodically checks for connection opens or closes that have been working for too long. -func (c *connectionCache) periodicallyCheckHangingConnections() { - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - c.lock.Lock() - for key, entry := range c.entries { - if entry.working && time.Since(entry.workingSince) > hangingTimeout { - c.logger.Error("connection cache: connection open or close has been working for too long", zap.String("key", key), zap.Duration("duration", time.Since(entry.workingSince))) - } - } - c.lock.Unlock() - case <-c.ctx.Done(): - return - } - } -} - -func generateKey(m map[string]any) string { +func generateKey(cfg cachedConnectionConfig) string { sb := strings.Builder{} - keys := maps.Keys(m) + sb.WriteString(cfg.instanceID) // Empty if cfg.shared + sb.WriteString(cfg.driver) + keys := maps.Keys(cfg.config) slices.Sort(keys) for _, key := range keys { sb.WriteString(key) sb.WriteString(":") - sb.WriteString(fmt.Sprint(m[key])) + sb.WriteString(fmt.Sprint(cfg.config[key])) sb.WriteString(" ") } return sb.String() diff --git a/runtime/connection_cache_test.go b/runtime/connection_cache_test.go deleted file mode 100644 index ff68a59fb1a..00000000000 --- a/runtime/connection_cache_test.go +++ /dev/null @@ -1,417 +0,0 @@ -package runtime - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - "testing" - "time" - - runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" - "github.com/rilldata/rill/runtime/drivers" - _ "github.com/rilldata/rill/runtime/drivers/sqlite" - "github.com/rilldata/rill/runtime/pkg/activity" - "github.com/stretchr/testify/require" - "go.uber.org/zap" -) - -func TestConnectionCache(t *testing.T) { - ctx := context.Background() - id := "default" - - rt := newTestRuntimeWithInst(t) - c := newConnectionCache(10, zap.NewNop(), rt, activity.NewNoopClient()) - conn1, release, err := c.Get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, false) - require.NoError(t, err) - release() - require.NotNil(t, conn1) - - conn2, release, err := c.Get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, false) - require.NoError(t, err) - release() - require.NotNil(t, conn2) - - inst := &drivers.Instance{ - ID: "default1", - OLAPConnector: "duckdb", - RepoConnector: "repo", - CatalogConnector: "catalog", - Connectors: []*runtimev1.Connector{ - { - Type: "file", - Name: "repo", - Config: map[string]string{"dsn": ""}, - }, - { - Type: "duckdb", - Name: "duckdb", - Config: map[string]string{"dsn": ""}, - }, - { - Type: "sqlite", - Name: "catalog", - Config: map[string]string{"dsn": "file:rill?mode=memory&cache=shared"}, - }, - }, - } - require.NoError(t, rt.CreateInstance(context.Background(), inst)) - - conn3, release, err := c.Get(ctx, "default1", "sqlite", map[string]any{"dsn": ":memory:"}, false) - require.NoError(t, err) - release() - require.NotNil(t, conn3) - - require.True(t, conn1 == conn2) - require.False(t, conn2 == conn3) -} - -func TestConnectionCacheWithAllShared(t *testing.T) { - ctx := context.Background() - id := "default" - - c := newConnectionCache(1, zap.NewNop(), newTestRuntimeWithInst(t), activity.NewNoopClient()) - conn1, release, err := c.Get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, true) - require.NoError(t, err) - require.NotNil(t, conn1) - defer release() - - conn2, release, err := c.Get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, true) - require.NoError(t, err) - require.NotNil(t, conn2) - defer release() - - conn3, release, err := c.Get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:"}, true) - require.NoError(t, err) - require.NotNil(t, conn3) - defer release() - - require.True(t, conn1 == conn2) - require.True(t, conn2 == conn3) - require.Equal(t, 1, len(c.entries)) - require.Equal(t, 0, c.lru.Len()) -} - -func TestConnectionCacheWithAllOpen(t *testing.T) { - ctx := context.Background() - - rt := newTestRuntimeWithInst(t) - c := newConnectionCache(1, zap.NewNop(), rt, activity.NewNoopClient()) - conn1, r1, err := c.Get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:"}, false) - require.NoError(t, err) - require.NotNil(t, conn1) - - createInstance(t, rt, "default1") - conn2, r2, err := c.Get(ctx, "default1", "sqlite", map[string]any{"dsn": ":memory:"}, false) - require.NoError(t, err) - require.NotNil(t, conn2) - - createInstance(t, rt, "default2") - conn3, r3, err := c.Get(ctx, "default2", "sqlite", map[string]any{"dsn": ":memory:"}, false) - require.NoError(t, err) - require.NotNil(t, conn3) - - require.Equal(t, 3, len(c.entries)) - require.Equal(t, 0, c.lru.Len()) - // release all connections - r1() - r2() - r3() - require.Equal(t, 1, len(c.entries)) - require.Equal(t, 1, c.lru.Len()) - _, val, _ := c.lru.GetOldest() - require.True(t, conn3 == val.(*connectionCacheEntry).handle) -} - -func TestConnectionCacheParallel(t *testing.T) { - ctx := context.Background() - - rt := newTestRuntimeWithInst(t) - c := newConnectionCache(5, zap.NewNop(), rt, activity.NewNoopClient()) - defer c.Close() - - var wg sync.WaitGroup - wg.Add(30) - // open 10 connections and do not release - go func() { - for i := 0; i < 10; i++ { - j := i - go func() { - defer wg.Done() - id := fmt.Sprintf("default%v", 100+j) - createInstance(t, rt, id) - conn, _, err := c.Get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, false) - require.NoError(t, err) - require.NotNil(t, conn) - time.Sleep(100 * time.Millisecond) - }() - } - }() - - // open 20 connections and release - for i := 0; i < 20; i++ { - j := i - go func() { - defer wg.Done() - id := fmt.Sprintf("default%v", 200+j) - createInstance(t, rt, id) - conn, r, err := c.Get(ctx, id, "sqlite", map[string]any{"dsn": ":memory:"}, false) - defer r() - require.NoError(t, err) - require.NotNil(t, conn) - time.Sleep(100 * time.Millisecond) - }() - } - wg.Wait() - - // 10 connections were not released so should be present in in-use cache - require.Equal(t, 15, len(c.entries)) - // 20 connections were released so 15 should be evicted - require.Equal(t, 5, c.lru.Len()) -} - -func TestConnectionCacheMultipleConfigs(t *testing.T) { - ctx := context.Background() - - c := newConnectionCache(10, zap.NewNop(), newTestRuntimeWithInst(t), activity.NewNoopClient()) - defer c.Close() - conn1, r1, err := c.Get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:", "host": "localhost:8080", "allow_host_access": "true"}, true) - require.NoError(t, err) - require.NotNil(t, conn1) - - conn2, r2, err := c.Get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:", "host": "localhost:8080", "allow_host_access": "true"}, true) - require.NoError(t, err) - require.NotNil(t, conn2) - - conn3, r3, err := c.Get(ctx, "default", "sqlite", map[string]any{"dsn": ":memory:", "host": "localhost:8080", "allow_host_access": "true"}, true) - require.NoError(t, err) - require.NotNil(t, conn3) - - require.Equal(t, 1, len(c.entries)) - require.Equal(t, 0, c.lru.Len()) - // release all connections - r1() - r2() - r3() - require.Equal(t, 1, len(c.entries)) - require.Equal(t, 1, c.lru.Len()) -} - -func TestConnectionCacheParallelCalls(t *testing.T) { - ctx := context.Background() - - m := &mockDriver{} - drivers.Register("mock_driver", m) - defer func() { - delete(drivers.Drivers, "mock_driver") - }() - - rt := newTestRuntimeWithInst(t) - defer rt.Close() - - c := newConnectionCache(10, zap.NewNop(), rt, activity.NewNoopClient()) - defer c.Close() - - var wg sync.WaitGroup - wg.Add(10) - // open 10 connections and verify no error - for i := 0; i < 10; i++ { - go func() { - defer wg.Done() - conn, _, err := c.Get(ctx, "default", "mock_driver", map[string]any{"sleep": int64(100)}, false) - require.NoError(t, err) - require.NotNil(t, conn) - }() - } - wg.Wait() - - require.Equal(t, int32(1), m.opened.Load()) - require.Equal(t, 1, len(c.entries)) -} - -func TestConnectionCacheBlockingCalls(t *testing.T) { - ctx := context.Background() - - m := &mockDriver{} - drivers.Register("mock_driver", m) - defer func() { - delete(drivers.Drivers, "mock_driver") - }() - - rt := newTestRuntimeWithInst(t) - defer rt.Close() - - c := newConnectionCache(10, zap.NewNop(), rt, activity.NewNoopClient()) - defer c.Close() - - var wg sync.WaitGroup - wg.Add(12) - // open 1 slow connection - go func() { - defer wg.Done() - conn, _, err := c.Get(ctx, "default", "mock_driver", map[string]any{"sleep": int64(1000)}, false) - require.NoError(t, err) - require.NotNil(t, conn) - }() - - // open 10 fast different connections(takes 10-20 ms to open) and verify not blocked - for i := 0; i < 10; i++ { - j := i - go func() { - defer wg.Done() - conn, _, err := c.Get(ctx, "default", "mock_driver", map[string]any{"sleep": int64(j + 10)}, false) - require.NoError(t, err) - require.NotNil(t, conn) - }() - } - - // verify that after 200 ms 11 connections have been opened - go func() { - time.Sleep(200 * time.Millisecond) - wg.Done() - }() - wg.Wait() - - require.Equal(t, int32(11), m.opened.Load()) -} - -type mockDriver struct { - opened atomic.Int32 -} - -// Drop implements drivers.Driver. -func (*mockDriver) Drop(config map[string]any, logger *zap.Logger) error { - panic("unimplemented") -} - -// HasAnonymousSourceAccess implements drivers.Driver. -func (*mockDriver) HasAnonymousSourceAccess(ctx context.Context, src map[string]any, logger *zap.Logger) (bool, error) { - panic("unimplemented") -} - -func (*mockDriver) TertiarySourceConnectors(ctx context.Context, src map[string]any, logger *zap.Logger) ([]string, error) { - return nil, nil -} - -// Open implements drivers.Driver. -func (m *mockDriver) Open(config map[string]any, shared bool, client activity.Client, logger *zap.Logger) (drivers.Handle, error) { - m.opened.Add(1) - sleep := config["sleep"].(int64) - time.Sleep(time.Duration(sleep) * time.Millisecond) - return &mockHandle{}, nil -} - -// Spec implements drivers.Driver. -func (*mockDriver) Spec() drivers.Spec { - panic("unimplemented") -} - -var _ drivers.Driver = &mockDriver{} - -type mockHandle struct { -} - -// AsCatalogStore implements drivers.Handle. -func (*mockHandle) AsCatalogStore(instanceID string) (drivers.CatalogStore, bool) { - panic("unimplemented") -} - -// AsFileStore implements drivers.Handle. -func (*mockHandle) AsFileStore() (drivers.FileStore, bool) { - panic("unimplemented") -} - -// AsOLAP implements drivers.Handle. -func (*mockHandle) AsOLAP(instanceID string) (drivers.OLAPStore, bool) { - panic("unimplemented") -} - -// AsObjectStore implements drivers.Handle. -func (*mockHandle) AsObjectStore() (drivers.ObjectStore, bool) { - panic("unimplemented") -} - -// AsRegistry implements drivers.Handle. -func (*mockHandle) AsRegistry() (drivers.RegistryStore, bool) { - panic("unimplemented") -} - -// AsRepoStore implements drivers.Handle. -func (*mockHandle) AsRepoStore(instanceID string) (drivers.RepoStore, bool) { - panic("unimplemented") -} - -// AsAdmin implements drivers.Handle. -func (*mockHandle) AsAdmin(instanceID string) (drivers.AdminService, bool) { - panic("unimplemented") -} - -// AsSQLStore implements drivers.Handle. -func (*mockHandle) AsSQLStore() (drivers.SQLStore, bool) { - panic("unimplemented") -} - -// AsTransporter implements drivers.Handle. -func (*mockHandle) AsTransporter(from drivers.Handle, to drivers.Handle) (drivers.Transporter, bool) { - panic("unimplemented") -} - -// Close implements drivers.Handle. -func (*mockHandle) Close() error { - return nil -} - -// Config implements drivers.Handle. -func (*mockHandle) Config() map[string]any { - panic("unimplemented") -} - -// Driver implements drivers.Handle. -func (*mockHandle) Driver() string { - panic("unimplemented") -} - -// Migrate implements drivers.Handle. -func (*mockHandle) Migrate(ctx context.Context) error { - return nil -} - -// MigrationStatus implements drivers.Handle. -func (*mockHandle) MigrationStatus(ctx context.Context) (current int, desired int, err error) { - panic("unimplemented") -} - -var _ drivers.Handle = &mockHandle{} - -func newTestRuntimeWithInst(t *testing.T) *Runtime { - rt := newTestRuntime(t) - createInstance(t, rt, "default") - return rt -} - -func createInstance(t *testing.T, rt *Runtime, instanceId string) { - inst := &drivers.Instance{ - ID: instanceId, - OLAPConnector: "duckdb", - RepoConnector: "repo", - CatalogConnector: "catalog", - Connectors: []*runtimev1.Connector{ - { - Type: "file", - Name: "repo", - Config: map[string]string{"dsn": ""}, - }, - { - Type: "duckdb", - Name: "duckdb", - Config: map[string]string{"dsn": ""}, - }, - { - Type: "sqlite", - Name: "catalog", - Config: map[string]string{"dsn": "file:rill?mode=memory&cache=shared"}, - }, - }, - } - require.NoError(t, rt.CreateInstance(context.Background(), inst)) -} diff --git a/runtime/connections.go b/runtime/connections.go index d1e9a5b81df..b72c3618551 100644 --- a/runtime/connections.go +++ b/runtime/connections.go @@ -19,7 +19,7 @@ func (r *Runtime) AcquireSystemHandle(ctx context.Context, connector string) (dr cfg[strings.ToLower(k)] = v } cfg["allow_host_access"] = r.opts.AllowHostAccess - return r.connCache.Get(ctx, "", c.Type, cfg, true) + return r.getConnection(ctx, "", c.Type, cfg, true) } } return nil, nil, fmt.Errorf("connector %s doesn't exist", connector) @@ -36,7 +36,7 @@ func (r *Runtime) AcquireHandle(ctx context.Context, instanceID, connector strin // So we take this moment to make sure the ctx gets checked for cancellation at least every once in a while. return nil, nil, ctx.Err() } - return r.connCache.Get(ctx, instanceID, driver, cfg, false) + return r.getConnection(ctx, instanceID, driver, cfg, false) } func (r *Runtime) Repo(ctx context.Context, instanceID string) (drivers.RepoStore, func(), error) { diff --git a/runtime/pkg/conncache/conncache.go b/runtime/pkg/conncache/conncache.go new file mode 100644 index 00000000000..40536188d7e --- /dev/null +++ b/runtime/pkg/conncache/conncache.go @@ -0,0 +1,334 @@ +package conncache + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/hashicorp/golang-lru/simplelru" + "github.com/rilldata/rill/runtime/pkg/singleflight" +) + +// Cache is a concurrency-safe cache of stateful connection objects. +// It differs from a connection pool in that it's designed for caching heterogenous connections. +// The cache will at most open one connection per key, even under concurrent access. +// The cache automatically evicts connections that are not in use ("acquired") using a least-recently-used policy. +type Cache interface { + // Acquire retrieves or opens a connection for the given key. The returned ReleaseFunc must be called when the connection is no longer needed. + // While a connection is acquired, it will not be closed unless Evict or Close is called. + // If Acquire is called while the underlying connection is being evicted, it will wait for the close to complete and then open a new connection. + // If opening the connection fails, Acquire may return the error on subsequent calls without trying to open again until the entry is evicted. + Acquire(ctx context.Context, cfg any) (Connection, ReleaseFunc, error) + + // EvictWhere closes the connections that match the predicate. + // It immediately closes the connections, even those that are currently acquired. + // It returns immediately and does not wait for the connections to finish closing. + EvictWhere(predicate func(cfg any) bool) + + // Close closes all open connections and prevents new connections from being acquired. + // It returns when all cached connections have been closed or when the provided ctx is cancelled. + Close(ctx context.Context) error +} + +// Connection is a connection that may be cached. +type Connection interface { + Close() error +} + +// ReleaseFunc is a function that must be called when an acquired connection is no longer needed. +type ReleaseFunc func() + +// Options configures a new connection cache. +type Options struct { + // MaxConnectionsIdle is the maximum number of non-acquired connections that will be kept open. + MaxConnectionsIdle int + // OpenTimeout is the maximum amount of time to wait for a connection to open. + OpenTimeout time.Duration + // CloseTimeout is the maximum amount of time to wait for a connection to close. + CloseTimeout time.Duration + // OpenFunc opens a connection. + OpenFunc func(ctx context.Context, cfg any) (Connection, error) + // KeyFunc computes a comparable key for a connection configuration. + KeyFunc func(cfg any) string + // HangingFunc is called when an open or close exceeds its timeout and does not respond to context cancellation. + HangingFunc func(cfg any, open bool) +} + +type cacheImpl struct { + opts Options + closed bool + singleflight *singleflight.Group[string, *entry] + ctx context.Context + cancel context.CancelFunc + mu sync.Mutex + entries map[string]*entry + lru *simplelru.LRU +} + +type entry struct { + cfg any + refs int + status entryStatus + since time.Time + handle Connection + err error +} + +type entryStatus int + +const ( + entryStatusUnspecified entryStatus = iota + entryStatusOpening + entryStatusOpen + entryStatusClosing + entryStatusClosed +) + +func New(opts Options) Cache { + ctx, cancel := context.WithCancel(context.Background()) + c := &cacheImpl{ + opts: opts, + ctx: ctx, + cancel: cancel, + entries: make(map[string]*entry), + } + + var err error + c.lru, err = simplelru.NewLRU(opts.MaxConnectionsIdle, c.lruEvictionHandler) + if err != nil { + panic(err) + } + + go c.periodicallyCheckHangingConnections() + + return c +} + +func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFunc, error) { + k := c.opts.KeyFunc(cfg) + + c.mu.Lock() + if c.closed { + c.mu.Unlock() + return nil, nil, fmt.Errorf("conncache: closed") + } + + e, ok := c.entries[k] + if !ok { + e = &entry{cfg: cfg, since: time.Now()} + c.entries[k] = e + } + + c.retainEntry(k, e) + + if e.status == entryStatusOpen { + defer c.mu.Unlock() + if e.err != nil { + return nil, nil, e.err + } + return e.handle, c.releaseFunc(k, e), nil + } + + c.mu.Unlock() + + for attempt := 0; attempt < 2; attempt++ { + _, err := c.singleflight.Do(ctx, k, func(_ context.Context) (*entry, error) { + c.mu.Lock() + c.retainEntry(k, e) + e.status = entryStatusOpening + e.since = time.Now() + e.handle = nil + e.err = nil + c.mu.Unlock() + + ctx, cancel := context.WithTimeout(c.ctx, c.opts.OpenTimeout) + handle, err := c.opts.OpenFunc(ctx, cfg) + cancel() + + c.mu.Lock() + c.releaseEntry(k, e) + e.status = entryStatusOpen + e.since = time.Now() + e.handle = handle + e.err = err + c.mu.Unlock() + + return e, nil + }) + if err != nil { + // TODO: if err is not ctx.Err(), it's a panic. Should we handle panics? + return nil, nil, err + } + + c.mu.Lock() + if e.status == entryStatusOpen { + break + } + c.mu.Unlock() + } + + defer c.mu.Unlock() + + if e.err != nil { + return nil, nil, e.err + } + return e.handle, c.releaseFunc(k, e), nil +} + +func (c *cacheImpl) EvictWhere(predicate func(cfg any) bool) { + c.mu.Lock() + defer c.mu.Unlock() + for k, e := range c.entries { + if predicate(e.cfg) { + c.beginClose(k, e) + } + } +} + +func (c *cacheImpl) Close(ctx context.Context) error { + c.mu.Lock() + if c.closed { + c.mu.Unlock() + return fmt.Errorf("conncache: already closed") + } + c.closed = true + + c.cancel() + + for k, e := range c.entries { + c.beginClose(k, e) + } + + // TODO: Purge? I don't think so. + + c.mu.Unlock() + + for { + c.mu.Lock() + var anyK string + var anyE *entry + for k, e := range c.entries { + anyK = k + anyE = e + break + } + c.mu.Unlock() + + if anyE == nil { + // c.entries is empty, we can return + break + } + + // TODO: What if this blocks before the close? Probably better to wait for a close channel on the entry. + _, _ = c.singleflight.Do(context.Background(), anyK, func(_ context.Context) (*entry, error) { + return nil, nil + }) + } + + return nil +} + +// beginClose must be called while c.mu is held. +func (c *cacheImpl) beginClose(k string, e *entry) { + if e.status != entryStatusOpening && e.status != entryStatusOpen { + return + } + + c.retainEntry(k, e) + + go func() { + for attempt := 0; attempt < 2; attempt++ { + _, _ = c.singleflight.Do(context.Background(), k, func(_ context.Context) (*entry, error) { + c.mu.Lock() + e.status = entryStatusClosing + e.since = time.Now() + c.mu.Unlock() + + err := e.handle.Close() + + c.mu.Lock() + e.status = entryStatusClosed + e.since = time.Now() + e.handle = nil + e.err = err + c.mu.Unlock() + + return e, nil + }) + // TODO: can return err on panic in Close. Should we handle panics? + + c.mu.Lock() + if e.status == entryStatusClosed { + break + } + c.mu.Unlock() + } + + c.mu.Lock() + c.releaseEntry(k, e) + c.mu.Unlock() + }() +} + +func (c *cacheImpl) lruEvictionHandler(key, value any) { + k := key.(string) + e := value.(*entry) + + // The callback also gets called when removing from LRU during acquisition. + // We use conn.refs != 0 to signal that its being acquired and should not be closed. + if e.refs == 0 { + c.beginClose(k, e) + } +} + +func (c *cacheImpl) retainEntry(key string, e *entry) { + e.refs++ + if e.refs == 1 { + // NOTE: lru.Remove is safe even if it's not in the LRU (should only happen if the entry is acquired for the first time) + _ = c.lru.Remove(key) + } +} + +func (c *cacheImpl) releaseEntry(key string, e *entry) { + e.refs-- + if e.refs == 0 { + // If open, keep entry and put in LRU. Else remove entirely. + if e.status != entryStatusClosing && e.status != entryStatusClosed { + c.lru.Add(key, e) + } else { + delete(c.entries, key) + } + } +} + +func (c *cacheImpl) releaseFunc(key string, e *entry) ReleaseFunc { + return func() { + c.mu.Lock() + c.releaseEntry(key, e) + c.mu.Unlock() + } +} + +func (c *cacheImpl) periodicallyCheckHangingConnections() { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.mu.Lock() + for _, e := range c.entries { + if e.status == entryStatusOpening && time.Since(e.since) >= c.opts.OpenTimeout { + c.opts.HangingFunc(e.cfg, true) + } + if e.status == entryStatusClosing && time.Since(e.since) >= c.opts.CloseTimeout { + c.opts.HangingFunc(e.cfg, false) + } + } + c.mu.Unlock() + case <-c.ctx.Done(): + return + } + } +} diff --git a/runtime/registry.go b/runtime/registry.go index 1a08c6ab26b..093a48e4c57 100644 --- a/runtime/registry.go +++ b/runtime/registry.go @@ -375,7 +375,7 @@ func (r *registryCache) restartController(iwc *instanceWithController) { // So we want to evict all open connections for that instance, but it's unsafe to do so while the controller is running. // So this is the only place where we can do it safely. if r.baseCtx.Err() == nil { - r.rt.connCache.EvictAll(r.baseCtx, iwc.instance.ID) + r.rt.evictInstanceConnections(iwc.instance.ID) } r.mu.Lock() diff --git a/runtime/registry_test.go b/runtime/registry_test.go index 66729a0772a..a715575736b 100644 --- a/runtime/registry_test.go +++ b/runtime/registry_test.go @@ -424,8 +424,8 @@ func TestRuntime_DeleteInstance(t *testing.T) { require.Error(t, err) // verify older olap connection is closed and cache updated - require.False(t, rt.connCache.lru.Contains(inst.ID+"duckdb"+fmt.Sprintf("dsn:%s ", dbFile))) - require.False(t, rt.connCache.lru.Contains(inst.ID+"file"+fmt.Sprintf("dsn:%s ", repodsn))) + // require.False(t, rt.connCache.lru.Contains(inst.ID+"duckdb"+fmt.Sprintf("dsn:%s ", dbFile))) + // require.False(t, rt.connCache.lru.Contains(inst.ID+"file"+fmt.Sprintf("dsn:%s ", repodsn))) err = olap.Exec(context.Background(), &drivers.Statement{Query: "SELECT COUNT(*) FROM rill.migration_version"}) require.True(t, err != nil) @@ -474,7 +474,7 @@ func TestRuntime_DeleteInstance_DropCorrupted(t *testing.T) { require.NoError(t, err) // Close OLAP connection - rt.connCache.EvictAll(ctx, inst.ID) + rt.evictInstanceConnections(inst.ID) // Corrupt database file err = os.WriteFile(dbpath, []byte("corrupted"), 0644) diff --git a/runtime/runtime.go b/runtime/runtime.go index 46601ac66c1..07e28bc1d5f 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -9,6 +9,7 @@ import ( runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/activity" + "github.com/rilldata/rill/runtime/pkg/conncache" "github.com/rilldata/rill/runtime/pkg/email" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -36,7 +37,7 @@ type Runtime struct { activity activity.Client metastore drivers.Handle registryCache *registryCache - connCache *connectionCache + connCache conncache.Cache queryCache *queryCache securityEngine *securityEngine } @@ -55,7 +56,7 @@ func New(ctx context.Context, opts *Options, logger *zap.Logger, ac activity.Cli securityEngine: newSecurityEngine(opts.SecurityEngineCacheSize, logger), } - rt.connCache = newConnectionCache(opts.ConnectionCacheSize, logger, rt, ac) + rt.connCache = rt.newConnectionCache() store, _, err := rt.AcquireSystemHandle(ctx, opts.MetastoreConnector) if err != nil { @@ -88,7 +89,7 @@ func (r *Runtime) Close() error { defer cancel() err1 := r.registryCache.close(ctx) err2 := r.queryCache.close() - err3 := r.connCache.Close() // Also closes metastore // TODO: Propagate ctx cancellation + err3 := r.connCache.Close(ctx) // Also closes metastore // TODO: Propagate ctx cancellation return errors.Join(err1, err2, err3) } From bff58830952d9f1a6677657c836c6bcab3a8ef12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Wed, 13 Dec 2023 15:13:39 +0100 Subject: [PATCH 03/12] Add tests --- runtime/pkg/conncache/conncache.go | 45 +++-- runtime/pkg/conncache/conncache_test.go | 237 ++++++++++++++++++++++++ 2 files changed, 264 insertions(+), 18 deletions(-) create mode 100644 runtime/pkg/conncache/conncache_test.go diff --git a/runtime/pkg/conncache/conncache.go b/runtime/pkg/conncache/conncache.go index 40536188d7e..ff76b706546 100644 --- a/runtime/pkg/conncache/conncache.go +++ b/runtime/pkg/conncache/conncache.go @@ -58,12 +58,12 @@ type Options struct { type cacheImpl struct { opts Options closed bool - singleflight *singleflight.Group[string, *entry] - ctx context.Context - cancel context.CancelFunc mu sync.Mutex entries map[string]*entry lru *simplelru.LRU + singleflight *singleflight.Group[string, any] + ctx context.Context + cancel context.CancelFunc } type entry struct { @@ -88,10 +88,11 @@ const ( func New(opts Options) Cache { ctx, cancel := context.WithCancel(context.Background()) c := &cacheImpl{ - opts: opts, - ctx: ctx, - cancel: cancel, - entries: make(map[string]*entry), + opts: opts, + entries: make(map[string]*entry), + singleflight: &singleflight.Group[string, any]{}, + ctx: ctx, + cancel: cancel, } var err error @@ -133,7 +134,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu c.mu.Unlock() for attempt := 0; attempt < 2; attempt++ { - _, err := c.singleflight.Do(ctx, k, func(_ context.Context) (*entry, error) { + _, err := c.singleflight.Do(ctx, k, func(_ context.Context) (any, error) { c.mu.Lock() c.retainEntry(k, e) e.status = entryStatusOpening @@ -142,9 +143,15 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu e.err = nil c.mu.Unlock() - ctx, cancel := context.WithTimeout(c.ctx, c.opts.OpenTimeout) - handle, err := c.opts.OpenFunc(ctx, cfg) - cancel() + var handle Connection + var err error + if c.opts.OpenTimeout == 0 { + handle, err = c.opts.OpenFunc(ctx, cfg) + } else { + ctx, cancel := context.WithTimeout(c.ctx, c.opts.OpenTimeout) + handle, err = c.opts.OpenFunc(ctx, cfg) + cancel() + } c.mu.Lock() c.releaseEntry(k, e) @@ -154,7 +161,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu e.err = err c.mu.Unlock() - return e, nil + return nil, nil }) if err != nil { // TODO: if err is not ctx.Err(), it's a panic. Should we handle panics? @@ -221,7 +228,7 @@ func (c *cacheImpl) Close(ctx context.Context) error { } // TODO: What if this blocks before the close? Probably better to wait for a close channel on the entry. - _, _ = c.singleflight.Do(context.Background(), anyK, func(_ context.Context) (*entry, error) { + _, _ = c.singleflight.Do(context.Background(), anyK, func(_ context.Context) (any, error) { return nil, nil }) } @@ -239,7 +246,7 @@ func (c *cacheImpl) beginClose(k string, e *entry) { go func() { for attempt := 0; attempt < 2; attempt++ { - _, _ = c.singleflight.Do(context.Background(), k, func(_ context.Context) (*entry, error) { + _, _ = c.singleflight.Do(context.Background(), k, func(_ context.Context) (any, error) { c.mu.Lock() e.status = entryStatusClosing e.since = time.Now() @@ -254,7 +261,7 @@ func (c *cacheImpl) beginClose(k string, e *entry) { e.err = err c.mu.Unlock() - return e, nil + return nil, nil }) // TODO: can return err on panic in Close. Should we handle panics? @@ -310,8 +317,10 @@ func (c *cacheImpl) releaseFunc(key string, e *entry) ReleaseFunc { } } +var checkHangingInterval = time.Minute + func (c *cacheImpl) periodicallyCheckHangingConnections() { - ticker := time.NewTicker(time.Minute) + ticker := time.NewTicker(checkHangingInterval) defer ticker.Stop() for { @@ -319,10 +328,10 @@ func (c *cacheImpl) periodicallyCheckHangingConnections() { case <-ticker.C: c.mu.Lock() for _, e := range c.entries { - if e.status == entryStatusOpening && time.Since(e.since) >= c.opts.OpenTimeout { + if c.opts.OpenTimeout != 0 && e.status == entryStatusOpening && time.Since(e.since) >= c.opts.OpenTimeout { c.opts.HangingFunc(e.cfg, true) } - if e.status == entryStatusClosing && time.Since(e.since) >= c.opts.CloseTimeout { + if c.opts.CloseTimeout != 0 && e.status == entryStatusClosing && time.Since(e.since) >= c.opts.CloseTimeout { c.opts.HangingFunc(e.cfg, false) } } diff --git a/runtime/pkg/conncache/conncache_test.go b/runtime/pkg/conncache/conncache_test.go new file mode 100644 index 00000000000..535675aa8bd --- /dev/null +++ b/runtime/pkg/conncache/conncache_test.go @@ -0,0 +1,237 @@ +package conncache + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type mockConn struct { + cfg string + closeDelay time.Duration + closeHang bool + closeCalled bool +} + +func (c *mockConn) Close() error { + c.closeCalled = true + if c.closeHang { + select {} + } + time.Sleep(c.closeDelay) + return nil +} + +func TestBasic(t *testing.T) { + opens := atomic.Int64{} + + c := New(Options{ + MaxConnectionsIdle: 2, + OpenFunc: func(ctx context.Context, cfg any) (Connection, error) { + opens.Add(1) + return &mockConn{cfg: cfg.(string)}, nil + }, + KeyFunc: func(cfg any) string { + return cfg.(string) + }, + }) + + // Get "foo" + m1, r1, err := c.Acquire(context.Background(), "foo") + require.NoError(t, err) + require.Equal(t, int64(1), opens.Load()) + + // Get "foo" again + m2, r2, err := c.Acquire(context.Background(), "foo") + require.NoError(t, err) + require.Equal(t, int64(1), opens.Load()) + + // Check that they're the same + require.Equal(t, m1, m2) + + // Release the "foo"s and get "foo" again, check it's the same + r1() + r2() + m3, r3, err := c.Acquire(context.Background(), "foo") + require.NoError(t, err) + require.Equal(t, int64(1), opens.Load()) + require.Equal(t, m1, m3) + r3() + + // Open and release two more conns, check "foo" is closed (since LRU size is 2) + for i := 0; i < 2; i++ { + _, r, err := c.Acquire(context.Background(), fmt.Sprintf("bar%d", i)) + require.NoError(t, err) + require.Equal(t, int64(1+i+1), opens.Load()) + r() + } + require.Equal(t, true, m1.(*mockConn).closeCalled) + + // Close cache + require.NoError(t, c.Close(context.Background())) +} + +func TestConcurrentOpen(t *testing.T) { + opens := atomic.Int64{} + + c := New(Options{ + MaxConnectionsIdle: 2, + OpenFunc: func(ctx context.Context, cfg any) (Connection, error) { + opens.Add(1) + time.Sleep(time.Second) + return &mockConn{cfg: cfg.(string)}, nil + }, + KeyFunc: func(cfg any) string { + return cfg.(string) + }, + }) + + var m1, m2 Connection + + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + m, _, err := c.Acquire(context.Background(), "foo") + require.NoError(t, err) + m1 = m + }() + go func() { + defer wg.Done() + m, _, err := c.Acquire(context.Background(), "foo") + require.NoError(t, err) + m2 = m + }() + + wg.Wait() + require.NotNil(t, m1) + require.Equal(t, m1, m2) + require.Equal(t, int64(1), opens.Load()) + + // Close cache + require.NoError(t, c.Close(context.Background())) +} + +func TestOpenDuringClose(t *testing.T) { + opens := atomic.Int64{} + + c := New(Options{ + MaxConnectionsIdle: 2, + OpenFunc: func(ctx context.Context, cfg any) (Connection, error) { + opens.Add(1) + return &mockConn{ + cfg: cfg.(string), + closeDelay: time.Second, // Closes hang for 1s + }, nil + }, + KeyFunc: func(cfg any) string { + return cfg.(string) + }, + }) + + // Create conn + m1, r1, err := c.Acquire(context.Background(), "foo") + require.NoError(t, err) + require.Equal(t, int64(1), opens.Load()) + r1() + + // Evict it so it starts closing + c.EvictWhere(func(cfg any) bool { return true }) + // closeCalled is set immediately, but it will take 1s to actually close + require.True(t, m1.(*mockConn).closeCalled) + + // Open again, check it takes ~1s to do so + start := time.Now() + m2, r2, err := c.Acquire(context.Background(), "foo") + require.NoError(t, err) + require.Greater(t, time.Since(start), 500*time.Millisecond) + require.Equal(t, int64(2), opens.Load()) + require.NotEqual(t, m1, m2) + r2() + + // Close cache + require.NoError(t, c.Close(context.Background())) +} + +func TestCloseInUse(t *testing.T) { + opens := atomic.Int64{} + + c := New(Options{ + MaxConnectionsIdle: 2, + OpenFunc: func(ctx context.Context, cfg any) (Connection, error) { + opens.Add(1) + return &mockConn{cfg: cfg.(string)}, nil + }, + KeyFunc: func(cfg any) string { + return cfg.(string) + }, + }) + + // Open conn "foo" + m1, r1, err := c.Acquire(context.Background(), "foo") + require.NoError(t, err) + require.Equal(t, int64(1), opens.Load()) + + // Evict it, check it's closed even though still in use (r1 not called) + c.EvictWhere(func(cfg any) bool { return true }) + time.Sleep(time.Second) + require.Equal(t, true, m1.(*mockConn).closeCalled) + + // Open "foo" again, check it opens a new one + m2, r2, err := c.Acquire(context.Background(), "foo") + require.NoError(t, err) + require.Equal(t, int64(2), opens.Load()) + require.NotEqual(t, m1, m2) + + // Check that releasing m1 doesn't fail (though it's been closed) + r1() + r2() +} + +func TestHanging(t *testing.T) { + // Make it check for hanging conns every 100ms + checkHangingInterval = 100 * time.Millisecond + + hangingOpens := atomic.Int64{} + hangingCloses := atomic.Int64{} + + c := New(Options{ + MaxConnectionsIdle: 2, + OpenTimeout: 100 * time.Millisecond, + CloseTimeout: 100 * time.Millisecond, + OpenFunc: func(ctx context.Context, cfg any) (Connection, error) { + time.Sleep(time.Second) + return &mockConn{ + cfg: cfg.(string), + closeDelay: time.Second, // Make closes hang for 1s + }, nil + }, + KeyFunc: func(cfg any) string { + return cfg.(string) + }, + HangingFunc: func(cfg any, open bool) { + if open { + hangingOpens.Add(1) + } else { + hangingCloses.Add(1) + } + }, + }) + + // Open conn "foo" + m1, r1, err := c.Acquire(context.Background(), "foo") + require.NoError(t, err) + require.GreaterOrEqual(t, hangingOpens.Load(), int64(1)) + r1() + + // Evict it, check it's closed even though still in use (r1 not called) + c.EvictWhere(func(cfg any) bool { return true }) + time.Sleep(time.Second) + require.Equal(t, true, m1.(*mockConn).closeCalled) + require.GreaterOrEqual(t, hangingCloses.Load(), int64(1)) +} From 6b382069262ead3ffc84bee91ccd91690812bae7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Wed, 13 Dec 2023 21:57:34 +0100 Subject: [PATCH 04/12] Make tests pass --- runtime/pkg/conncache/conncache.go | 87 +++++++++++++++++-------- runtime/pkg/conncache/conncache_test.go | 4 +- 2 files changed, 64 insertions(+), 27 deletions(-) diff --git a/runtime/pkg/conncache/conncache.go b/runtime/pkg/conncache/conncache.go index ff76b706546..26498e082bc 100644 --- a/runtime/pkg/conncache/conncache.go +++ b/runtime/pkg/conncache/conncache.go @@ -2,7 +2,7 @@ package conncache import ( "context" - "fmt" + "errors" "sync" "time" @@ -55,6 +55,14 @@ type Options struct { HangingFunc func(cfg any, open bool) } +var _ Cache = (*cacheImpl)(nil) + +// cacheImpl implements Cache. +// It leverages a singleflight to ensure at most one open/close action runs against a connection at a time. +// It also uses an LRU to pool unused connections and eventually close them. +// The implementation heavily depends on implementation details of singleflight.Group. +// Notably, it will in different places invoke the singleflight with different callbacks for the same key. +// It also relies on singleflight.Do always invoking the callback even if the passed ctx is already cancelled. type cacheImpl struct { opts Options closed bool @@ -67,12 +75,13 @@ type cacheImpl struct { } type entry struct { - cfg any - refs int - status entryStatus - since time.Time - handle Connection - err error + cfg any + refs int + status entryStatus + since time.Time + closedCh chan struct{} // Not set for regular evictions, only used when Cache.Close() is called. + handle Connection + err error } type entryStatus int @@ -80,7 +89,7 @@ type entryStatus int const ( entryStatusUnspecified entryStatus = iota entryStatusOpening - entryStatusOpen + entryStatusOpen // Also used for cases where open errored (i.e. entry.err != nil) entryStatusClosing entryStatusClosed ) @@ -112,7 +121,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu c.mu.Lock() if c.closed { c.mu.Unlock() - return nil, nil, fmt.Errorf("conncache: closed") + return nil, nil, errors.New("conncache: closed") } e, ok := c.entries[k] @@ -126,6 +135,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu if e.status == entryStatusOpen { defer c.mu.Unlock() if e.err != nil { + c.releaseEntry(k, e) return nil, nil, e.err } return e.handle, c.releaseFunc(k, e), nil @@ -136,6 +146,14 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu for attempt := 0; attempt < 2; attempt++ { _, err := c.singleflight.Do(ctx, k, func(_ context.Context) (any, error) { c.mu.Lock() + if c.closed { + c.mu.Unlock() + return nil, errors.New("conncache: closed") + } + if e.status == entryStatusOpen { + c.mu.Unlock() + return nil, nil + } c.retainEntry(k, e) e.status = entryStatusOpening e.since = time.Now() @@ -164,7 +182,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu return nil, nil }) if err != nil { - // TODO: if err is not ctx.Err(), it's a panic. Should we handle panics? + // TODO: could be a caught panic. Should we handle panics? return nil, nil, err } @@ -178,6 +196,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu defer c.mu.Unlock() if e.err != nil { + c.releaseEntry(k, e) return nil, nil, e.err } return e.handle, c.releaseFunc(k, e), nil @@ -197,7 +216,7 @@ func (c *cacheImpl) Close(ctx context.Context) error { c.mu.Lock() if c.closed { c.mu.Unlock() - return fmt.Errorf("conncache: already closed") + return errors.New("conncache: already closed") } c.closed = true @@ -207,30 +226,32 @@ func (c *cacheImpl) Close(ctx context.Context) error { c.beginClose(k, e) } - // TODO: Purge? I don't think so. - c.mu.Unlock() for { + if ctx.Err() != nil { + return ctx.Err() + } + c.mu.Lock() - var anyK string var anyE *entry - for k, e := range c.entries { - anyK = k - anyE = e - break + for _, e := range c.entries { + if e.status != entryStatusClosed { + anyE = e + break + } } - c.mu.Unlock() if anyE == nil { - // c.entries is empty, we can return + c.mu.Unlock() + // all entries are closed, we can return break } - // TODO: What if this blocks before the close? Probably better to wait for a close channel on the entry. - _, _ = c.singleflight.Do(context.Background(), anyK, func(_ context.Context) (any, error) { - return nil, nil - }) + anyE.closedCh = make(chan struct{}) + c.mu.Unlock() + + <-anyE.closedCh } return nil @@ -238,7 +259,7 @@ func (c *cacheImpl) Close(ctx context.Context) error { // beginClose must be called while c.mu is held. func (c *cacheImpl) beginClose(k string, e *entry) { - if e.status != entryStatusOpening && e.status != entryStatusOpen { + if e.status == entryStatusClosing || e.status == entryStatusClosed { return } @@ -248,15 +269,28 @@ func (c *cacheImpl) beginClose(k string, e *entry) { for attempt := 0; attempt < 2; attempt++ { _, _ = c.singleflight.Do(context.Background(), k, func(_ context.Context) (any, error) { c.mu.Lock() + if e.status == entryStatusClosed { + c.mu.Unlock() + return nil, nil + } e.status = entryStatusClosing e.since = time.Now() c.mu.Unlock() - err := e.handle.Close() + var err error + if e.handle != nil { + err = e.handle.Close() + } + if err == nil { + err = errors.New("conncache: connection closed") + } c.mu.Lock() e.status = entryStatusClosed e.since = time.Now() + if e.closedCh != nil { + close(e.closedCh) + } e.handle = nil e.err = err c.mu.Unlock() @@ -267,6 +301,7 @@ func (c *cacheImpl) beginClose(k string, e *entry) { c.mu.Lock() if e.status == entryStatusClosed { + c.mu.Unlock() break } c.mu.Unlock() diff --git a/runtime/pkg/conncache/conncache_test.go b/runtime/pkg/conncache/conncache_test.go index 535675aa8bd..a749d17d12e 100644 --- a/runtime/pkg/conncache/conncache_test.go +++ b/runtime/pkg/conncache/conncache_test.go @@ -70,6 +70,7 @@ func TestBasic(t *testing.T) { require.Equal(t, int64(1+i+1), opens.Load()) r() } + time.Sleep(time.Second) require.Equal(t, true, m1.(*mockConn).closeCalled) // Close cache @@ -142,7 +143,8 @@ func TestOpenDuringClose(t *testing.T) { // Evict it so it starts closing c.EvictWhere(func(cfg any) bool { return true }) - // closeCalled is set immediately, but it will take 1s to actually close + // closeCalled is set before mockConn.Close hangs, but it will take 1s to actually close + time.Sleep(100 * time.Millisecond) require.True(t, m1.(*mockConn).closeCalled) // Open again, check it takes ~1s to do so From cb7bda492713623cb55f8f92ce52108154df417a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Wed, 13 Dec 2023 22:18:38 +0100 Subject: [PATCH 05/12] increase test sleeps for clsoe --- runtime/registry_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runtime/registry_test.go b/runtime/registry_test.go index a715575736b..746d424fd4a 100644 --- a/runtime/registry_test.go +++ b/runtime/registry_test.go @@ -331,7 +331,7 @@ func TestRuntime_EditInstance(t *testing.T) { } // Wait for controller restart - time.Sleep(500 * time.Millisecond) + time.Sleep(2 * time.Second) _, err = rt.Controller(ctx, inst.ID) require.NoError(t, err) @@ -426,6 +426,7 @@ func TestRuntime_DeleteInstance(t *testing.T) { // verify older olap connection is closed and cache updated // require.False(t, rt.connCache.lru.Contains(inst.ID+"duckdb"+fmt.Sprintf("dsn:%s ", dbFile))) // require.False(t, rt.connCache.lru.Contains(inst.ID+"file"+fmt.Sprintf("dsn:%s ", repodsn))) + time.Sleep(2 * time.Second) err = olap.Exec(context.Background(), &drivers.Statement{Query: "SELECT COUNT(*) FROM rill.migration_version"}) require.True(t, err != nil) From 4833bf0c4c4d070b7de5f45ffb88c27132c741e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Thu, 14 Dec 2023 14:59:13 +0100 Subject: [PATCH 06/12] Fix various race conditions --- runtime/connection_cache.go | 7 +-- runtime/pkg/conncache/conncache.go | 72 +++++++++++++++---------- runtime/pkg/conncache/conncache_test.go | 26 ++++----- 3 files changed, 59 insertions(+), 46 deletions(-) diff --git a/runtime/connection_cache.go b/runtime/connection_cache.go index 73a4e06d133..50fc3b43ced 100644 --- a/runtime/connection_cache.go +++ b/runtime/connection_cache.go @@ -27,9 +27,10 @@ type cachedConnectionConfig struct { // It also monitors for hanging connections. func (r *Runtime) newConnectionCache() conncache.Cache { return conncache.New(conncache.Options{ - MaxConnectionsIdle: r.opts.ConnectionCacheSize, - OpenTimeout: 2 * time.Minute, - CloseTimeout: 5 * time.Minute, + MaxConnectionsIdle: r.opts.ConnectionCacheSize, + OpenTimeout: 2 * time.Minute, + CloseTimeout: 5 * time.Minute, + CheckHangingInterval: time.Minute, OpenFunc: func(ctx context.Context, cfg any) (conncache.Connection, error) { x := cfg.(cachedConnectionConfig) return r.openAndMigrate(ctx, x) diff --git a/runtime/pkg/conncache/conncache.go b/runtime/pkg/conncache/conncache.go index 26498e082bc..a939e5148c2 100644 --- a/runtime/pkg/conncache/conncache.go +++ b/runtime/pkg/conncache/conncache.go @@ -47,6 +47,8 @@ type Options struct { OpenTimeout time.Duration // CloseTimeout is the maximum amount of time to wait for a connection to close. CloseTimeout time.Duration + // CheckHangingInterval is the interval at which to check for hanging open/close calls. + CheckHangingInterval time.Duration // OpenFunc opens a connection. OpenFunc func(ctx context.Context, cfg any) (Connection, error) // KeyFunc computes a comparable key for a connection configuration. @@ -69,7 +71,7 @@ type cacheImpl struct { mu sync.Mutex entries map[string]*entry lru *simplelru.LRU - singleflight *singleflight.Group[string, any] + singleflight *singleflight.Group[string, entryStatus] ctx context.Context cancel context.CancelFunc } @@ -99,7 +101,7 @@ func New(opts Options) Cache { c := &cacheImpl{ opts: opts, entries: make(map[string]*entry), - singleflight: &singleflight.Group[string, any]{}, + singleflight: &singleflight.Group[string, entryStatus]{}, ctx: ctx, cancel: cancel, } @@ -110,7 +112,9 @@ func New(opts Options) Cache { panic(err) } - go c.periodicallyCheckHangingConnections() + if opts.CheckHangingInterval != 0 { + go c.periodicallyCheckHangingConnections() + } return c } @@ -143,16 +147,19 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu c.mu.Unlock() + // We use the same singleflight key for opening and closing. This ensures only one operation can run at a time per entry. + // We need to retry once since the singleflight might currently be closing the same entry. + // We don't retry more than once to avoid potentially infinite open/close loops. for attempt := 0; attempt < 2; attempt++ { - _, err := c.singleflight.Do(ctx, k, func(_ context.Context) (any, error) { + stat, err := c.singleflight.Do(ctx, k, func(_ context.Context) (entryStatus, error) { c.mu.Lock() if c.closed { c.mu.Unlock() - return nil, errors.New("conncache: closed") + return entryStatusUnspecified, errors.New("conncache: closed") } if e.status == entryStatusOpen { c.mu.Unlock() - return nil, nil + return entryStatusOpen, nil } c.retainEntry(k, e) e.status = entryStatusOpening @@ -179,27 +186,40 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu e.err = err c.mu.Unlock() - return nil, nil + return entryStatusOpen, nil }) if err != nil { // TODO: could be a caught panic. Should we handle panics? return nil, nil, err } + if stat != entryStatusOpen { + // TODO: Too fast + continue + } + c.mu.Lock() - if e.status == entryStatusOpen { - break + if e.status != entryStatusOpen { + c.releaseEntry(k, e) + c.mu.Unlock() + return nil, nil, errors.New("conncache: connection was immediately closed after being opened") + } + handle := e.handle + err = e.err + if e.err != nil { + c.releaseEntry(k, e) + c.mu.Unlock() + return nil, nil, err } c.mu.Unlock() + return handle, c.releaseFunc(k, e), nil } - defer c.mu.Unlock() + c.mu.Lock() + c.releaseEntry(k, e) + c.mu.Unlock() - if e.err != nil { - c.releaseEntry(k, e) - return nil, nil, e.err - } - return e.handle, c.releaseFunc(k, e), nil + return nil, nil, errors.New("conncache: connection was closed repeatedly while trying to open it") } func (c *cacheImpl) EvictWhere(predicate func(cfg any) bool) { @@ -266,12 +286,15 @@ func (c *cacheImpl) beginClose(k string, e *entry) { c.retainEntry(k, e) go func() { + // We use the same singleflight key for opening and closing. This ensures only one operation can run at a time per entry. + // We need to retry once since the singleflight might currently be opening the same entry. + // We don't retry more than once to avoid potentially infinite open/close loops. for attempt := 0; attempt < 2; attempt++ { - _, _ = c.singleflight.Do(context.Background(), k, func(_ context.Context) (any, error) { + stat, _ := c.singleflight.Do(context.Background(), k, func(_ context.Context) (entryStatus, error) { c.mu.Lock() if e.status == entryStatusClosed { c.mu.Unlock() - return nil, nil + return entryStatusClosed, nil } e.status = entryStatusClosing e.since = time.Now() @@ -295,16 +318,13 @@ func (c *cacheImpl) beginClose(k string, e *entry) { e.err = err c.mu.Unlock() - return nil, nil + return entryStatusClosed, nil }) // TODO: can return err on panic in Close. Should we handle panics? - c.mu.Lock() - if e.status == entryStatusClosed { - c.mu.Unlock() + if stat == entryStatusClosed { break } - c.mu.Unlock() } c.mu.Lock() @@ -352,10 +372,8 @@ func (c *cacheImpl) releaseFunc(key string, e *entry) ReleaseFunc { } } -var checkHangingInterval = time.Minute - func (c *cacheImpl) periodicallyCheckHangingConnections() { - ticker := time.NewTicker(checkHangingInterval) + ticker := time.NewTicker(c.opts.CheckHangingInterval) defer ticker.Stop() for { @@ -363,10 +381,10 @@ func (c *cacheImpl) periodicallyCheckHangingConnections() { case <-ticker.C: c.mu.Lock() for _, e := range c.entries { - if c.opts.OpenTimeout != 0 && e.status == entryStatusOpening && time.Since(e.since) >= c.opts.OpenTimeout { + if c.opts.OpenTimeout != 0 && e.status == entryStatusOpening && time.Since(e.since) > c.opts.OpenTimeout { c.opts.HangingFunc(e.cfg, true) } - if c.opts.CloseTimeout != 0 && e.status == entryStatusClosing && time.Since(e.since) >= c.opts.CloseTimeout { + if c.opts.CloseTimeout != 0 && e.status == entryStatusClosing && time.Since(e.since) > c.opts.CloseTimeout { c.opts.HangingFunc(e.cfg, false) } } diff --git a/runtime/pkg/conncache/conncache_test.go b/runtime/pkg/conncache/conncache_test.go index a749d17d12e..62958658529 100644 --- a/runtime/pkg/conncache/conncache_test.go +++ b/runtime/pkg/conncache/conncache_test.go @@ -14,15 +14,11 @@ import ( type mockConn struct { cfg string closeDelay time.Duration - closeHang bool - closeCalled bool + closeCalled atomic.Bool } func (c *mockConn) Close() error { - c.closeCalled = true - if c.closeHang { - select {} - } + c.closeCalled.Store(true) time.Sleep(c.closeDelay) return nil } @@ -71,7 +67,7 @@ func TestBasic(t *testing.T) { r() } time.Sleep(time.Second) - require.Equal(t, true, m1.(*mockConn).closeCalled) + require.Equal(t, true, m1.(*mockConn).closeCalled.Load()) // Close cache require.NoError(t, c.Close(context.Background())) @@ -145,7 +141,7 @@ func TestOpenDuringClose(t *testing.T) { c.EvictWhere(func(cfg any) bool { return true }) // closeCalled is set before mockConn.Close hangs, but it will take 1s to actually close time.Sleep(100 * time.Millisecond) - require.True(t, m1.(*mockConn).closeCalled) + require.True(t, m1.(*mockConn).closeCalled.Load()) // Open again, check it takes ~1s to do so start := time.Now() @@ -182,7 +178,7 @@ func TestCloseInUse(t *testing.T) { // Evict it, check it's closed even though still in use (r1 not called) c.EvictWhere(func(cfg any) bool { return true }) time.Sleep(time.Second) - require.Equal(t, true, m1.(*mockConn).closeCalled) + require.Equal(t, true, m1.(*mockConn).closeCalled.Load()) // Open "foo" again, check it opens a new one m2, r2, err := c.Acquire(context.Background(), "foo") @@ -196,16 +192,14 @@ func TestCloseInUse(t *testing.T) { } func TestHanging(t *testing.T) { - // Make it check for hanging conns every 100ms - checkHangingInterval = 100 * time.Millisecond - hangingOpens := atomic.Int64{} hangingCloses := atomic.Int64{} c := New(Options{ - MaxConnectionsIdle: 2, - OpenTimeout: 100 * time.Millisecond, - CloseTimeout: 100 * time.Millisecond, + MaxConnectionsIdle: 2, + OpenTimeout: 100 * time.Millisecond, + CloseTimeout: 100 * time.Millisecond, + CheckHangingInterval: 100 * time.Millisecond, OpenFunc: func(ctx context.Context, cfg any) (Connection, error) { time.Sleep(time.Second) return &mockConn{ @@ -234,6 +228,6 @@ func TestHanging(t *testing.T) { // Evict it, check it's closed even though still in use (r1 not called) c.EvictWhere(func(cfg any) bool { return true }) time.Sleep(time.Second) - require.Equal(t, true, m1.(*mockConn).closeCalled) + require.Equal(t, true, m1.(*mockConn).closeCalled.Load()) require.GreaterOrEqual(t, hangingCloses.Load(), int64(1)) } From fd3d077d5fb20fb3d6a6935f2155a60dc93a3f2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Thu, 14 Dec 2023 16:08:53 +0100 Subject: [PATCH 07/12] Integrate singleflight with conncache's mutex --- runtime/pkg/conncache/conncache.go | 252 +++++++++++++++-------------- 1 file changed, 128 insertions(+), 124 deletions(-) diff --git a/runtime/pkg/conncache/conncache.go b/runtime/pkg/conncache/conncache.go index a939e5148c2..0f53ac25497 100644 --- a/runtime/pkg/conncache/conncache.go +++ b/runtime/pkg/conncache/conncache.go @@ -7,7 +7,6 @@ import ( "time" "github.com/hashicorp/golang-lru/simplelru" - "github.com/rilldata/rill/runtime/pkg/singleflight" ) // Cache is a concurrency-safe cache of stateful connection objects. @@ -59,31 +58,31 @@ type Options struct { var _ Cache = (*cacheImpl)(nil) -// cacheImpl implements Cache. -// It leverages a singleflight to ensure at most one open/close action runs against a connection at a time. -// It also uses an LRU to pool unused connections and eventually close them. -// The implementation heavily depends on implementation details of singleflight.Group. -// Notably, it will in different places invoke the singleflight with different callbacks for the same key. -// It also relies on singleflight.Do always invoking the callback even if the passed ctx is already cancelled. +// cacheImpl implements Cache. Implementation notes: +// - It uses an LRU to pool unused connections and eventually close them. +// - It leverages a singleflight pattern to ensure at most one open/close action runs against a connection at a time. +// - An entry will only have entryStatusOpening or entryStatusClosing if a singleflight call is currently running for it. +// - Any code that keeps a reference to an entry after the mutex is released must call retainEntry/releaseEntry. +// - If the ctx for an open call is cancelled, the entry will continue opening in the background (and will be put in the LRU). +// - If attempting to open a closing entry, or close an opening entry, we wait for the singleflight to complete and then retry once. To avoid infinite loops, we don't retry more than once. type cacheImpl struct { opts Options closed bool mu sync.Mutex entries map[string]*entry lru *simplelru.LRU - singleflight *singleflight.Group[string, entryStatus] + singleflight map[string]chan struct{} ctx context.Context cancel context.CancelFunc } type entry struct { - cfg any - refs int - status entryStatus - since time.Time - closedCh chan struct{} // Not set for regular evictions, only used when Cache.Close() is called. - handle Connection - err error + cfg any + refs int + status entryStatus + since time.Time + handle Connection + err error } type entryStatus int @@ -101,7 +100,7 @@ func New(opts Options) Cache { c := &cacheImpl{ opts: opts, entries: make(map[string]*entry), - singleflight: &singleflight.Group[string, entryStatus]{}, + singleflight: make(map[string]chan struct{}), ctx: ctx, cancel: cancel, } @@ -145,33 +144,48 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu return e.handle, c.releaseFunc(k, e), nil } - c.mu.Unlock() + ch, ok := c.singleflight[k] - // We use the same singleflight key for opening and closing. This ensures only one operation can run at a time per entry. - // We need to retry once since the singleflight might currently be closing the same entry. - // We don't retry more than once to avoid potentially infinite open/close loops. - for attempt := 0; attempt < 2; attempt++ { - stat, err := c.singleflight.Do(ctx, k, func(_ context.Context) (entryStatus, error) { - c.mu.Lock() - if c.closed { - c.mu.Unlock() - return entryStatusUnspecified, errors.New("conncache: closed") - } - if e.status == entryStatusOpen { - c.mu.Unlock() - return entryStatusOpen, nil - } - c.retainEntry(k, e) - e.status = entryStatusOpening - e.since = time.Now() - e.handle = nil - e.err = nil + if ok && e.status == entryStatusClosing { + c.mu.Unlock() + <-ch + c.mu.Lock() + + // Since we released the lock, need to check c.closed and e.status again. + if c.closed { + c.releaseEntry(k, e) c.mu.Unlock() + return nil, nil, errors.New("conncache: closed") + } + if e.status == entryStatusOpen { + defer c.mu.Unlock() + if e.err != nil { + c.releaseEntry(k, e) + return nil, nil, e.err + } + return e.handle, c.releaseFunc(k, e), nil + } + + ch, ok = c.singleflight[k] + } + + if !ok { + c.retainEntry(k, e) // Retain again to count the goroutine's reference independently (in case ctx is cancelled while the Open continues in the background) + + ch = make(chan struct{}) + c.singleflight[k] = ch + + e.status = entryStatusOpening + e.since = time.Now() + e.handle = nil + e.err = nil + + go func() { var handle Connection var err error if c.opts.OpenTimeout == 0 { - handle, err = c.opts.OpenFunc(ctx, cfg) + handle, err = c.opts.OpenFunc(c.ctx, cfg) } else { ctx, cancel := context.WithTimeout(c.ctx, c.opts.OpenTimeout) handle, err = c.opts.OpenFunc(ctx, cfg) @@ -179,47 +193,38 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu } c.mu.Lock() - c.releaseEntry(k, e) + defer c.mu.Unlock() + e.status = entryStatusOpen e.since = time.Now() e.handle = handle e.err = err - c.mu.Unlock() - return entryStatusOpen, nil - }) - if err != nil { - // TODO: could be a caught panic. Should we handle panics? - return nil, nil, err - } - - if stat != entryStatusOpen { - // TODO: Too fast - continue - } + delete(c.singleflight, k) + close(ch) - c.mu.Lock() - if e.status != entryStatusOpen { - c.releaseEntry(k, e) - c.mu.Unlock() - return nil, nil, errors.New("conncache: connection was immediately closed after being opened") - } - handle := e.handle - err = e.err - if e.err != nil { c.releaseEntry(k, e) - c.mu.Unlock() - return nil, nil, err - } - c.mu.Unlock() - return handle, c.releaseFunc(k, e), nil + }() } - c.mu.Lock() - c.releaseEntry(k, e) c.mu.Unlock() - return nil, nil, errors.New("conncache: connection was closed repeatedly while trying to open it") + <-ch + + c.mu.Lock() + defer c.mu.Unlock() + + if e.status != entryStatusOpen { + c.releaseEntry(k, e) + return nil, nil, errors.New("conncache: connection was immediately closed after being opened") + } + + if e.err != nil { + c.releaseEntry(k, e) + return nil, nil, e.err + } + + return e.handle, c.releaseFunc(k, e), nil } func (c *cacheImpl) EvictWhere(predicate func(cfg any) bool) { @@ -249,32 +254,26 @@ func (c *cacheImpl) Close(ctx context.Context) error { c.mu.Unlock() for { - if ctx.Err() != nil { - return ctx.Err() - } - c.mu.Lock() - var anyE *entry - for _, e := range c.entries { - if e.status != entryStatusClosed { - anyE = e - break - } + var anyCh chan struct{} + for _, ch := range c.singleflight { + anyCh = ch + break } + c.mu.Unlock() - if anyE == nil { - c.mu.Unlock() + if anyCh == nil { // all entries are closed, we can return - break + return nil } - anyE.closedCh = make(chan struct{}) - c.mu.Unlock() - - <-anyE.closedCh + select { + case <-anyCh: + // continue + case <-ctx.Done(): + return ctx.Err() + } } - - return nil } // beginClose must be called while c.mu is held. @@ -285,51 +284,55 @@ func (c *cacheImpl) beginClose(k string, e *entry) { c.retainEntry(k, e) - go func() { - // We use the same singleflight key for opening and closing. This ensures only one operation can run at a time per entry. - // We need to retry once since the singleflight might currently be opening the same entry. - // We don't retry more than once to avoid potentially infinite open/close loops. - for attempt := 0; attempt < 2; attempt++ { - stat, _ := c.singleflight.Do(context.Background(), k, func(_ context.Context) (entryStatus, error) { - c.mu.Lock() - if e.status == entryStatusClosed { - c.mu.Unlock() - return entryStatusClosed, nil - } - e.status = entryStatusClosing - e.since = time.Now() - c.mu.Unlock() + ch, ok := c.singleflight[k] + if ok { + c.mu.Unlock() + <-ch + c.mu.Lock() - var err error - if e.handle != nil { - err = e.handle.Close() - } - if err == nil { - err = errors.New("conncache: connection closed") - } + _, ok = c.singleflight[k] + if ok { + // Probably, another goroutine started closing it. Very unlikely, it was closed and re-opened again. + // Either way, we did our part. + c.releaseEntry(k, e) + return + } - c.mu.Lock() - e.status = entryStatusClosed - e.since = time.Now() - if e.closedCh != nil { - close(e.closedCh) - } - e.handle = nil - e.err = err - c.mu.Unlock() + // Since we released the lock, need to check e.status again. + // (Doesn't need to check entryStatusClosing since we now know that the singleflight is empty.) + if e.status == entryStatusClosed { + c.releaseEntry(k, e) + return + } + } - return entryStatusClosed, nil - }) - // TODO: can return err on panic in Close. Should we handle panics? + ch = make(chan struct{}) + c.singleflight[k] = ch - if stat == entryStatusClosed { - break - } + e.status = entryStatusClosing + e.since = time.Now() + + go func() { + var err error + if e.handle != nil { + err = e.handle.Close() + } + if err == nil { + err = errors.New("conncache: connection closed") } c.mu.Lock() + defer c.mu.Unlock() + + e.status = entryStatusClosed + e.since = time.Now() + e.handle = nil + e.err = err + + delete(c.singleflight, k) + close(ch) + c.releaseEntry(k, e) - c.mu.Unlock() }() } @@ -380,7 +383,8 @@ func (c *cacheImpl) periodicallyCheckHangingConnections() { select { case <-ticker.C: c.mu.Lock() - for _, e := range c.entries { + for k := range c.singleflight { + e := c.entries[k] if c.opts.OpenTimeout != 0 && e.status == entryStatusOpening && time.Since(e.since) > c.opts.OpenTimeout { c.opts.HangingFunc(e.cfg, true) } From 3b23f69232b76b88d28e1eedb564b74984e415d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Thu, 14 Dec 2023 16:26:33 +0100 Subject: [PATCH 08/12] Increase timeouts --- runtime/connection_cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/connection_cache.go b/runtime/connection_cache.go index 50fc3b43ced..a43f5afaf55 100644 --- a/runtime/connection_cache.go +++ b/runtime/connection_cache.go @@ -28,8 +28,8 @@ type cachedConnectionConfig struct { func (r *Runtime) newConnectionCache() conncache.Cache { return conncache.New(conncache.Options{ MaxConnectionsIdle: r.opts.ConnectionCacheSize, - OpenTimeout: 2 * time.Minute, - CloseTimeout: 5 * time.Minute, + OpenTimeout: 10 * time.Minute, + CloseTimeout: 10 * time.Minute, CheckHangingInterval: time.Minute, OpenFunc: func(ctx context.Context, cfg any) (conncache.Connection, error) { x := cfg.(cachedConnectionConfig) From 62ecca1e4acf134d580dc4b566e89b9604b062b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Thu, 14 Dec 2023 16:34:29 +0100 Subject: [PATCH 09/12] Better comments --- runtime/pkg/conncache/conncache.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/runtime/pkg/conncache/conncache.go b/runtime/pkg/conncache/conncache.go index 0f53ac25497..10b7bff6672 100644 --- a/runtime/pkg/conncache/conncache.go +++ b/runtime/pkg/conncache/conncache.go @@ -14,15 +14,15 @@ import ( // The cache will at most open one connection per key, even under concurrent access. // The cache automatically evicts connections that are not in use ("acquired") using a least-recently-used policy. type Cache interface { - // Acquire retrieves or opens a connection for the given key. The returned ReleaseFunc must be called when the connection is no longer needed. - // While a connection is acquired, it will not be closed unless Evict or Close is called. + // Acquire retrieves or opens a connection for the given config. The returned ReleaseFunc must be called when the connection is no longer needed. + // While a connection is acquired, it will not be closed unless EvictWhere or Close is called. // If Acquire is called while the underlying connection is being evicted, it will wait for the close to complete and then open a new connection. // If opening the connection fails, Acquire may return the error on subsequent calls without trying to open again until the entry is evicted. Acquire(ctx context.Context, cfg any) (Connection, ReleaseFunc, error) // EvictWhere closes the connections that match the predicate. - // It immediately closes the connections, even those that are currently acquired. - // It returns immediately and does not wait for the connections to finish closing. + // It immediately starts closing the connections, even those that are currently acquired. + // It returns quickly and does not wait for connections to finish closing in the background. EvictWhere(predicate func(cfg any) bool) // Close closes all open connections and prevents new connections from being acquired. @@ -50,7 +50,7 @@ type Options struct { CheckHangingInterval time.Duration // OpenFunc opens a connection. OpenFunc func(ctx context.Context, cfg any) (Connection, error) - // KeyFunc computes a comparable key for a connection configuration. + // KeyFunc computes a comparable key for a connection config. KeyFunc func(cfg any) string // HangingFunc is called when an open or close exceeds its timeout and does not respond to context cancellation. HangingFunc func(cfg any, open bool) @@ -61,6 +61,7 @@ var _ Cache = (*cacheImpl)(nil) // cacheImpl implements Cache. Implementation notes: // - It uses an LRU to pool unused connections and eventually close them. // - It leverages a singleflight pattern to ensure at most one open/close action runs against a connection at a time. +// - It directly implements a singleflight (instead of using a library) because it needs to use the same mutex for the singleflight and the map/LRU to avoid race conditions. // - An entry will only have entryStatusOpening or entryStatusClosing if a singleflight call is currently running for it. // - Any code that keeps a reference to an entry after the mutex is released must call retainEntry/releaseEntry. // - If the ctx for an open call is cancelled, the entry will continue opening in the background (and will be put in the LRU). From ac71051db6c54b95ef19b1eada382d1a805ed912 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Thu, 14 Dec 2023 19:28:21 +0100 Subject: [PATCH 10/12] Prevent deadlock when closing while opening --- runtime/pkg/conncache/conncache.go | 62 ++++++++++++++----------- runtime/pkg/conncache/conncache_test.go | 35 ++++++++++++++ 2 files changed, 70 insertions(+), 27 deletions(-) diff --git a/runtime/pkg/conncache/conncache.go b/runtime/pkg/conncache/conncache.go index 10b7bff6672..b72414de16c 100644 --- a/runtime/pkg/conncache/conncache.go +++ b/runtime/pkg/conncache/conncache.go @@ -78,12 +78,13 @@ type cacheImpl struct { } type entry struct { - cfg any - refs int - status entryStatus - since time.Time - handle Connection - err error + cfg any + refs int + status entryStatus + since time.Time + closeAfterOpening bool + handle Connection + err error } type entryStatus int @@ -149,7 +150,14 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu if ok && e.status == entryStatusClosing { c.mu.Unlock() - <-ch + select { + case <-ch: + case <-ctx.Done(): + c.mu.Lock() + c.releaseEntry(k, e) + c.mu.Unlock() + return nil, nil, ctx.Err() + } c.mu.Lock() // Since we released the lock, need to check c.closed and e.status again. @@ -204,13 +212,25 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu delete(c.singleflight, k) close(ch) + if e.closeAfterOpening { + e.closeAfterOpening = false + c.beginClose(k, e) + } + c.releaseEntry(k, e) }() } c.mu.Unlock() - <-ch + select { + case <-ch: + case <-ctx.Done(): + c.mu.Lock() + c.releaseEntry(k, e) + c.mu.Unlock() + return nil, nil, ctx.Err() + } c.mu.Lock() defer c.mu.Unlock() @@ -283,30 +303,18 @@ func (c *cacheImpl) beginClose(k string, e *entry) { return } + if e.status == entryStatusOpening { + e.closeAfterOpening = true + return + } + c.retainEntry(k, e) ch, ok := c.singleflight[k] if ok { - c.mu.Unlock() - <-ch - c.mu.Lock() - - _, ok = c.singleflight[k] - if ok { - // Probably, another goroutine started closing it. Very unlikely, it was closed and re-opened again. - // Either way, we did our part. - c.releaseEntry(k, e) - return - } - - // Since we released the lock, need to check e.status again. - // (Doesn't need to check entryStatusClosing since we now know that the singleflight is empty.) - if e.status == entryStatusClosed { - c.releaseEntry(k, e) - return - } + // Should never happen, but checking since it would be pretty bad. + panic(errors.New("conncache: singleflight exists for entry that is neither opening nor closing")) } - ch = make(chan struct{}) c.singleflight[k] = ch diff --git a/runtime/pkg/conncache/conncache_test.go b/runtime/pkg/conncache/conncache_test.go index 62958658529..b5d41c79b0b 100644 --- a/runtime/pkg/conncache/conncache_test.go +++ b/runtime/pkg/conncache/conncache_test.go @@ -156,6 +156,41 @@ func TestOpenDuringClose(t *testing.T) { require.NoError(t, c.Close(context.Background())) } +func TestCloseDuringOpen(t *testing.T) { + opens := atomic.Int64{} + m := &mockConn{cfg: "foo"} + + c := New(Options{ + MaxConnectionsIdle: 2, + OpenFunc: func(ctx context.Context, cfg any) (Connection, error) { + time.Sleep(time.Second) + opens.Add(1) + return m, nil + }, + KeyFunc: func(cfg any) string { + return cfg.(string) + }, + }) + + // Start opening + go func() { + _, _, err := c.Acquire(context.Background(), "foo") + require.ErrorContains(t, err, "immediately closed") + require.Equal(t, int64(1), opens.Load()) + }() + + // Evict it so it starts closing + time.Sleep(100 * time.Millisecond) // Give it time to start opening + c.EvictWhere(func(cfg any) bool { return true }) + + // It will let the open finish before closing it, so will take ~1s + time.Sleep(2 * time.Second) + require.True(t, m.closeCalled.Load()) + + // Close cache + require.NoError(t, c.Close(context.Background())) +} + func TestCloseInUse(t *testing.T) { opens := atomic.Int64{} From a95ed075b4ff138a4b2e5adeae74641e1ba73323 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Fri, 15 Dec 2023 12:01:13 +0100 Subject: [PATCH 11/12] Address review comments --- runtime/connection_cache.go | 21 ++++++++++- runtime/pkg/conncache/conncache.go | 50 +++++++++++++++++++++++-- runtime/pkg/conncache/conncache_test.go | 12 +++--- 3 files changed, 73 insertions(+), 10 deletions(-) diff --git a/runtime/connection_cache.go b/runtime/connection_cache.go index a43f5afaf55..e926b6db2e8 100644 --- a/runtime/connection_cache.go +++ b/runtime/connection_cache.go @@ -10,10 +10,21 @@ import ( "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/conncache" + "github.com/rilldata/rill/runtime/pkg/observability" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "golang.org/x/exp/maps" ) +var ( + connCacheOpens = observability.Must(meter.Int64Counter("connnection_cache.opens")) + connCacheCloses = observability.Must(meter.Int64Counter("connnection_cache.closes")) + connCacheSizeTotal = observability.Must(meter.Int64UpDownCounter("connnection_cache.size_total")) + connCacheSizeLRU = observability.Must(meter.Int64UpDownCounter("connnection_cache.size_lru")) + connCacheOpenLatencyMS = observability.Must(meter.Int64Histogram("connnection_cache.open_latency", metric.WithUnit("ms"))) + connCacheCloseLatencyMS = observability.Must(meter.Int64Histogram("connnection_cache.close_latency", metric.WithUnit("ms"))) +) + type cachedConnectionConfig struct { instanceID string driver string @@ -27,7 +38,7 @@ type cachedConnectionConfig struct { // It also monitors for hanging connections. func (r *Runtime) newConnectionCache() conncache.Cache { return conncache.New(conncache.Options{ - MaxConnectionsIdle: r.opts.ConnectionCacheSize, + MaxIdleConnections: r.opts.ConnectionCacheSize, OpenTimeout: 10 * time.Minute, CloseTimeout: 10 * time.Minute, CheckHangingInterval: time.Minute, @@ -43,6 +54,14 @@ func (r *Runtime) newConnectionCache() conncache.Cache { x := cfg.(cachedConnectionConfig) r.logger.Error("connection cache: connection has been working for too long", zap.String("instance_id", x.instanceID), zap.String("driver", x.driver), zap.Bool("open", open)) }, + Metrics: conncache.Metrics{ + Opens: connCacheOpens, + Closes: connCacheCloses, + SizeTotal: connCacheSizeTotal, + SizeLRU: connCacheSizeLRU, + OpenLatencyMS: connCacheOpenLatencyMS, + CloseLatencyMS: connCacheCloseLatencyMS, + }, }) } diff --git a/runtime/pkg/conncache/conncache.go b/runtime/pkg/conncache/conncache.go index b72414de16c..452cff37397 100644 --- a/runtime/pkg/conncache/conncache.go +++ b/runtime/pkg/conncache/conncache.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hashicorp/golang-lru/simplelru" + "go.opentelemetry.io/otel/metric" ) // Cache is a concurrency-safe cache of stateful connection objects. @@ -40,8 +41,8 @@ type ReleaseFunc func() // Options configures a new connection cache. type Options struct { - // MaxConnectionsIdle is the maximum number of non-acquired connections that will be kept open. - MaxConnectionsIdle int + // MaxIdleConnections is the maximum number of non-acquired connections that will be kept open. + MaxIdleConnections int // OpenTimeout is the maximum amount of time to wait for a connection to open. OpenTimeout time.Duration // CloseTimeout is the maximum amount of time to wait for a connection to close. @@ -54,6 +55,18 @@ type Options struct { KeyFunc func(cfg any) string // HangingFunc is called when an open or close exceeds its timeout and does not respond to context cancellation. HangingFunc func(cfg any, open bool) + // Metrics are optional instruments for observability. + Metrics Metrics +} + +// Metrics are optional instruments for observability. If an instrument is nil, it will not be collected. +type Metrics struct { + Opens metric.Int64Counter + Closes metric.Int64Counter + SizeTotal metric.Int64UpDownCounter + SizeLRU metric.Int64UpDownCounter + OpenLatencyMS metric.Int64Histogram + CloseLatencyMS metric.Int64Histogram } var _ Cache = (*cacheImpl)(nil) @@ -108,7 +121,7 @@ func New(opts Options) Cache { } var err error - c.lru, err = simplelru.NewLRU(opts.MaxConnectionsIdle, c.lruEvictionHandler) + c.lru, err = simplelru.NewLRU(opts.MaxIdleConnections, c.lruEvictionHandler) if err != nil { panic(err) } @@ -133,6 +146,9 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu if !ok { e = &entry{cfg: cfg, since: time.Now()} c.entries[k] = e + if c.opts.Metrics.SizeTotal != nil { + c.opts.Metrics.SizeTotal.Add(c.ctx, 1) + } } c.retainEntry(k, e) @@ -191,6 +207,7 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu e.err = nil go func() { + start := time.Now() var handle Connection var err error if c.opts.OpenTimeout == 0 { @@ -201,6 +218,13 @@ func (c *cacheImpl) Acquire(ctx context.Context, cfg any) (Connection, ReleaseFu cancel() } + if c.opts.Metrics.Opens != nil { + c.opts.Metrics.Opens.Add(c.ctx, 1) + } + if c.opts.Metrics.OpenLatencyMS != nil { + c.opts.Metrics.OpenLatencyMS.Record(c.ctx, time.Since(start).Milliseconds()) + } + c.mu.Lock() defer c.mu.Unlock() @@ -322,6 +346,7 @@ func (c *cacheImpl) beginClose(k string, e *entry) { e.since = time.Now() go func() { + start := time.Now() var err error if e.handle != nil { err = e.handle.Close() @@ -330,6 +355,13 @@ func (c *cacheImpl) beginClose(k string, e *entry) { err = errors.New("conncache: connection closed") } + if c.opts.Metrics.Closes != nil { + c.opts.Metrics.Closes.Add(c.ctx, 1) + } + if c.opts.Metrics.CloseLatencyMS != nil { + c.opts.Metrics.CloseLatencyMS.Record(c.ctx, time.Since(start).Milliseconds()) + } + c.mu.Lock() defer c.mu.Unlock() @@ -361,6 +393,9 @@ func (c *cacheImpl) retainEntry(key string, e *entry) { if e.refs == 1 { // NOTE: lru.Remove is safe even if it's not in the LRU (should only happen if the entry is acquired for the first time) _ = c.lru.Remove(key) + if c.opts.Metrics.SizeLRU != nil { + c.opts.Metrics.SizeLRU.Add(c.ctx, -1) + } } } @@ -370,8 +405,14 @@ func (c *cacheImpl) releaseEntry(key string, e *entry) { // If open, keep entry and put in LRU. Else remove entirely. if e.status != entryStatusClosing && e.status != entryStatusClosed { c.lru.Add(key, e) + if c.opts.Metrics.SizeLRU != nil { + c.opts.Metrics.SizeLRU.Add(c.ctx, 1) + } } else { delete(c.entries, key) + if c.opts.Metrics.SizeTotal != nil { + c.opts.Metrics.SizeTotal.Add(c.ctx, -1) + } } } } @@ -392,12 +433,15 @@ func (c *cacheImpl) periodicallyCheckHangingConnections() { select { case <-ticker.C: c.mu.Lock() + hanging := 0 for k := range c.singleflight { e := c.entries[k] if c.opts.OpenTimeout != 0 && e.status == entryStatusOpening && time.Since(e.since) > c.opts.OpenTimeout { + hanging++ c.opts.HangingFunc(e.cfg, true) } if c.opts.CloseTimeout != 0 && e.status == entryStatusClosing && time.Since(e.since) > c.opts.CloseTimeout { + hanging++ c.opts.HangingFunc(e.cfg, false) } } diff --git a/runtime/pkg/conncache/conncache_test.go b/runtime/pkg/conncache/conncache_test.go index b5d41c79b0b..8c818830907 100644 --- a/runtime/pkg/conncache/conncache_test.go +++ b/runtime/pkg/conncache/conncache_test.go @@ -27,7 +27,7 @@ func TestBasic(t *testing.T) { opens := atomic.Int64{} c := New(Options{ - MaxConnectionsIdle: 2, + MaxIdleConnections: 2, OpenFunc: func(ctx context.Context, cfg any) (Connection, error) { opens.Add(1) return &mockConn{cfg: cfg.(string)}, nil @@ -77,7 +77,7 @@ func TestConcurrentOpen(t *testing.T) { opens := atomic.Int64{} c := New(Options{ - MaxConnectionsIdle: 2, + MaxIdleConnections: 2, OpenFunc: func(ctx context.Context, cfg any) (Connection, error) { opens.Add(1) time.Sleep(time.Second) @@ -118,7 +118,7 @@ func TestOpenDuringClose(t *testing.T) { opens := atomic.Int64{} c := New(Options{ - MaxConnectionsIdle: 2, + MaxIdleConnections: 2, OpenFunc: func(ctx context.Context, cfg any) (Connection, error) { opens.Add(1) return &mockConn{ @@ -161,7 +161,7 @@ func TestCloseDuringOpen(t *testing.T) { m := &mockConn{cfg: "foo"} c := New(Options{ - MaxConnectionsIdle: 2, + MaxIdleConnections: 2, OpenFunc: func(ctx context.Context, cfg any) (Connection, error) { time.Sleep(time.Second) opens.Add(1) @@ -195,7 +195,7 @@ func TestCloseInUse(t *testing.T) { opens := atomic.Int64{} c := New(Options{ - MaxConnectionsIdle: 2, + MaxIdleConnections: 2, OpenFunc: func(ctx context.Context, cfg any) (Connection, error) { opens.Add(1) return &mockConn{cfg: cfg.(string)}, nil @@ -231,7 +231,7 @@ func TestHanging(t *testing.T) { hangingCloses := atomic.Int64{} c := New(Options{ - MaxConnectionsIdle: 2, + MaxIdleConnections: 2, OpenTimeout: 100 * time.Millisecond, CloseTimeout: 100 * time.Millisecond, CheckHangingInterval: 100 * time.Millisecond, From 377b30654f06355dd47e0ef422f4d5cdb3c25648 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Benjamin=20Egelund-M=C3=BCller?= Date: Fri, 15 Dec 2023 12:28:32 +0100 Subject: [PATCH 12/12] Remove redundant var --- runtime/pkg/conncache/conncache.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/runtime/pkg/conncache/conncache.go b/runtime/pkg/conncache/conncache.go index 452cff37397..b746f46a163 100644 --- a/runtime/pkg/conncache/conncache.go +++ b/runtime/pkg/conncache/conncache.go @@ -433,15 +433,12 @@ func (c *cacheImpl) periodicallyCheckHangingConnections() { select { case <-ticker.C: c.mu.Lock() - hanging := 0 for k := range c.singleflight { e := c.entries[k] if c.opts.OpenTimeout != 0 && e.status == entryStatusOpening && time.Since(e.since) > c.opts.OpenTimeout { - hanging++ c.opts.HangingFunc(e.cfg, true) } if c.opts.CloseTimeout != 0 && e.status == entryStatusClosing && time.Since(e.since) > c.opts.CloseTimeout { - hanging++ c.opts.HangingFunc(e.cfg, false) } }