Skip to content

Commit

Permalink
Graceful Shutdown: Updates function signatures to match new API, and …
Browse files Browse the repository at this point in the history
…add context.TODO()s (dapr#5831)

* Updates function signatures to match new API, and add context.TODO()s

Signed-off-by: joshvanl <[email protected]>

* Revert state Features to not take a context.Context, see
dapr/components-contrib#2474 (comment)

Signed-off-by: joshvanl <[email protected]>

* Update config interface to satisfy contrib requirements

Signed-off-by: joshvanl <[email protected]>

* Updated pinned components-contrib

Signed-off-by: ItalyPaleAle <[email protected]>

* 💄

Signed-off-by: ItalyPaleAle <[email protected]>

---------

Signed-off-by: joshvanl <[email protected]>
Signed-off-by: ItalyPaleAle <[email protected]>
Co-authored-by: ItalyPaleAle <[email protected]>
  • Loading branch information
JoshVanL and ItalyPaleAle authored Feb 17, 2023
1 parent 71b1088 commit 3a5aac4
Show file tree
Hide file tree
Showing 41 changed files with 119 additions and 71 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dapr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
runs-on: ${{ matrix.os }}
env:
GOVER: "^1.19.2"
GOLANGCILINT_VER: v1.50.1
GOLANGCILINT_VER: "v1.51.1"
PROTOC_VERSION: "21.12"
GOOS: ${{ matrix.target_os }}
GOARCH: ${{ matrix.target_arch }}
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ linters:
disable:
# TODO Enforce the below linters later
- nosnakecase
- musttag
- dupl
- errcheck
- funlen
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ test-race:
################################################################################
# Target: lint #
################################################################################
# Please use golangci-lint version v1.50.1 , otherwise you might encounter errors.
# You can download version v1.50.1 at https://github.com/golangci/golangci-lint/releases/tag/v1.50.1
# Please use golangci-lint version v1.51.1 , otherwise you might encounter errors.
# You can download version v1.51.1 at https://github.com/golangci/golangci-lint/releases/tag/v1.51.1
.PHONY: lint
lint:
$(GOLANGCI_LINT) run --timeout=20m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ limitations under the License.
package components

import (
"github.com/dapr/components-contrib/bindings/alicloud/dubbo"
"github.com/dapr/components-contrib/bindings/dubbo"
bindingsLoader "github.com/dapr/dapr/pkg/components/bindings"
)

func init() {
bindingsLoader.DefaultRegistry.RegisterOutputBinding(dubbo.NewDubboOutput, "alicloud.dubbo")
bindingsLoader.DefaultRegistry.RegisterOutputBinding(dubbo.NewDubboOutput, "dubbo", "alicloud.dubbo")
}
4 changes: 3 additions & 1 deletion cmd/daprd/components/middleware_http_bearer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ limitations under the License.
package components

import (
"context"

"github.com/dapr/components-contrib/middleware"
"github.com/dapr/components-contrib/middleware/http/bearer"
httpMiddlewareLoader "github.com/dapr/dapr/pkg/components/middleware/http"
Expand All @@ -24,7 +26,7 @@ import (
func init() {
httpMiddlewareLoader.DefaultRegistry.RegisterComponent(func(log logger.Logger) httpMiddlewareLoader.FactoryMethod {
return func(metadata middleware.Metadata) (httpMiddleware.Middleware, error) {
return bearer.NewBearerMiddleware(log).GetHandler(metadata)
return bearer.NewBearerMiddleware(log).GetHandler(context.TODO(), metadata)
}
}, "bearer")
}
4 changes: 3 additions & 1 deletion cmd/daprd/components/middleware_http_oauth2.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ limitations under the License.
package components

import (
"context"

"github.com/dapr/components-contrib/middleware"
"github.com/dapr/components-contrib/middleware/http/oauth2"
httpMiddlewareLoader "github.com/dapr/dapr/pkg/components/middleware/http"
Expand All @@ -24,7 +26,7 @@ import (
func init() {
httpMiddlewareLoader.DefaultRegistry.RegisterComponent(func(log logger.Logger) httpMiddlewareLoader.FactoryMethod {
return func(metadata middleware.Metadata) (httpMiddleware.Middleware, error) {
return oauth2.NewOAuth2Middleware(log).GetHandler(metadata)
return oauth2.NewOAuth2Middleware(log).GetHandler(context.TODO(), metadata)
}
}, "oauth2")
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ limitations under the License.
package components

import (
"context"

"github.com/dapr/components-contrib/middleware"
"github.com/dapr/components-contrib/middleware/http/oauth2clientcredentials"
httpMiddlewareLoader "github.com/dapr/dapr/pkg/components/middleware/http"
Expand All @@ -24,7 +26,7 @@ import (
func init() {
httpMiddlewareLoader.DefaultRegistry.RegisterComponent(func(log logger.Logger) httpMiddlewareLoader.FactoryMethod {
return func(metadata middleware.Metadata) (httpMiddleware.Middleware, error) {
return oauth2clientcredentials.NewOAuth2ClientCredentialsMiddleware(log).GetHandler(metadata)
return oauth2clientcredentials.NewOAuth2ClientCredentialsMiddleware(log).GetHandler(context.TODO(), metadata)
}
}, "oauth2clientcredentials")
}
4 changes: 3 additions & 1 deletion cmd/daprd/components/middleware_http_opa.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ limitations under the License.
package components

import (
"context"

"github.com/dapr/components-contrib/middleware"
"github.com/dapr/components-contrib/middleware/http/opa"
httpMiddlewareLoader "github.com/dapr/dapr/pkg/components/middleware/http"
Expand All @@ -24,7 +26,7 @@ import (
func init() {
httpMiddlewareLoader.DefaultRegistry.RegisterComponent(func(log logger.Logger) httpMiddlewareLoader.FactoryMethod {
return func(metadata middleware.Metadata) (httpMiddleware.Middleware, error) {
return opa.NewMiddleware(log).GetHandler(metadata)
return opa.NewMiddleware(log).GetHandler(context.TODO(), metadata)
}
}, "opa")
}
4 changes: 3 additions & 1 deletion cmd/daprd/components/middleware_http_ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ limitations under the License.
package components

import (
"context"

"github.com/dapr/components-contrib/middleware"
"github.com/dapr/components-contrib/middleware/http/ratelimit"
httpMiddlewareLoader "github.com/dapr/dapr/pkg/components/middleware/http"
Expand All @@ -24,7 +26,7 @@ import (
func init() {
httpMiddlewareLoader.DefaultRegistry.RegisterComponent(func(log logger.Logger) httpMiddlewareLoader.FactoryMethod {
return func(metadata middleware.Metadata) (httpMiddleware.Middleware, error) {
return ratelimit.NewRateLimitMiddleware(log).GetHandler(metadata)
return ratelimit.NewRateLimitMiddleware(log).GetHandler(context.TODO(), metadata)
}
}, "ratelimit")
}
4 changes: 3 additions & 1 deletion cmd/daprd/components/middleware_http_routeralias.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ limitations under the License.
package components

import (
"context"

"github.com/dapr/components-contrib/middleware"
"github.com/dapr/components-contrib/middleware/http/routeralias"
httpMiddlewareLoader "github.com/dapr/dapr/pkg/components/middleware/http"
Expand All @@ -24,7 +26,7 @@ import (
func init() {
httpMiddlewareLoader.DefaultRegistry.RegisterComponent(func(log logger.Logger) httpMiddlewareLoader.FactoryMethod {
return func(metadata middleware.Metadata) (httpMiddleware.Middleware, error) {
return routeralias.NewMiddleware(log).GetHandler(metadata)
return routeralias.NewMiddleware(log).GetHandler(context.TODO(), metadata)
}
}, "routeralias")
}
4 changes: 3 additions & 1 deletion cmd/daprd/components/middleware_http_routerchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ limitations under the License.
package components

import (
"context"

"github.com/dapr/components-contrib/middleware"
"github.com/dapr/components-contrib/middleware/http/routerchecker"
httpMiddlewareLoader "github.com/dapr/dapr/pkg/components/middleware/http"
Expand All @@ -24,7 +26,7 @@ import (
func init() {
httpMiddlewareLoader.DefaultRegistry.RegisterComponent(func(log logger.Logger) httpMiddlewareLoader.FactoryMethod {
return func(metadata middleware.Metadata) (httpMiddleware.Middleware, error) {
return routerchecker.NewMiddleware(log).GetHandler(metadata)
return routerchecker.NewMiddleware(log).GetHandler(context.TODO(), metadata)
}
}, "routerchecker")
}
4 changes: 3 additions & 1 deletion cmd/daprd/components/middleware_http_sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ limitations under the License.
package components

import (
"context"

"github.com/dapr/components-contrib/middleware"
"github.com/dapr/components-contrib/middleware/http/sentinel"
httpMiddlewareLoader "github.com/dapr/dapr/pkg/components/middleware/http"
Expand All @@ -24,7 +26,7 @@ import (
func init() {
httpMiddlewareLoader.DefaultRegistry.RegisterComponent(func(log logger.Logger) httpMiddlewareLoader.FactoryMethod {
return func(metadata middleware.Metadata) (httpMiddleware.Middleware, error) {
return sentinel.NewMiddleware(log).GetHandler(metadata)
return sentinel.NewMiddleware(log).GetHandler(context.TODO(), metadata)
}
}, "sentinel")
}
4 changes: 3 additions & 1 deletion cmd/daprd/components/middleware_http_webassembly.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ limitations under the License.
package components

import (
"context"

"github.com/dapr/components-contrib/middleware"
"github.com/dapr/components-contrib/middleware/http/wasm"
httpMiddlewareLoader "github.com/dapr/dapr/pkg/components/middleware/http"
Expand All @@ -24,7 +26,7 @@ import (
func init() {
httpMiddlewareLoader.DefaultRegistry.RegisterComponent(func(log logger.Logger) httpMiddlewareLoader.FactoryMethod {
return func(metadata middleware.Metadata) (httpMiddleware.Middleware, error) {
return wasm.NewMiddleware(log).GetHandler(metadata)
return wasm.NewMiddleware(log).GetHandler(context.TODO(), metadata)
}
}, "wasm")
}
2 changes: 1 addition & 1 deletion docker/Dockerfile-dev
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ARG DAPR_CLI_VERSION="latest"
ARG PROTOC_VERSION="21.12"
ARG PROTOC_GEN_GO_VERSION="1.28.1"
ARG PROTOC_GEN_GO_GRPC_VERSION="1.2.0"
ARG GOLANGCI_LINT_VERSION="1.50.1"
ARG GOLANGCI_LINT_VERSION="1.51.1"

# This Dockerfile adds a non-root 'dapr' user with sudo access. However, for Linux,
# this user's GID/UID must match your local user UID/GID to avoid permission issues
Expand Down
2 changes: 1 addition & 1 deletion docker/custom-scripts/install-dapr-tools.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ DAPR_CLI_VERSION=${4:-""}
PROTOC_VERSION=${5:-"21.12"}
PROTOC_GEN_GO_VERSION=${6:-"1.28.1"}
PROTOC_GEN_GO_GRPC_VERSION=${7:-"1.2.0"}
GOLANGCI_LINT_VERSION=${8:-"1.50.1"}
GOLANGCI_LINT_VERSION=${8:-"1.51.1"}

set -e

Expand Down
2 changes: 1 addition & 1 deletion docs/development/developing-dapr.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ This command will:
- format, test and lint all the code
- check if you forgot to `git commit` something

Note: To run linter locally, please use golangci-lint version v1.50.1, otherwise you might encounter errors. You can download version v1.50.1 [here](https://github.com/golangci/golangci-lint/releases/tag/v1.50.1).
Note: To run linter locally, please use golangci-lint version v1.51.1, otherwise you might encounter errors. You can download version v1.51.1 [here](https://github.com/golangci/golangci-lint/releases/tag/v1.51.1).

## Debug Dapr

Expand Down
2 changes: 1 addition & 1 deletion docs/development/setup-dapr-development-env.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ This document helps you get started developing Dapr. If you find any problems wh
2. Install [Delve](https://github.com/go-delve/delve/tree/master/Documentation/installation) for Go debugging, if desired.
3. Install [golangci-lint](https://golangci-lint.run/usage/install) version 1.50.1.
3. Install [golangci-lint](https://golangci-lint.run/usage/install) version 1.51.1.
## Setup a Kubernetes development environment
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/PuerkitoBio/purell v1.2.0
github.com/benbjohnson/clock v1.3.0
github.com/cenkalti/backoff/v4 v4.2.0
github.com/dapr/components-contrib v1.10.0-rc.1
github.com/dapr/components-contrib v1.10.0-rc.1.0.20230216221835-d098e38d6a4c
github.com/dapr/kit v0.0.4
github.com/fasthttp/router v1.4.15
github.com/ghodss/yaml v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -716,8 +716,8 @@ github.com/dancannon/gorethink v4.0.0+incompatible h1:KFV7Gha3AuqT+gr0B/eKvGhbjm
github.com/dancannon/gorethink v4.0.0+incompatible/go.mod h1:BLvkat9KmZc1efyYwhz3WnybhRZtgF1K929FD8z1avU=
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
github.com/dapr/components-contrib v1.10.0-rc.1 h1:ZkUzIe+D4HcDVu06c5FPfNdJvmrt1Xl6i8BNkQfYkos=
github.com/dapr/components-contrib v1.10.0-rc.1/go.mod h1:8OvVS4m3kK/ocDqhVYKdu4j7OS4gPlbXMi+eRjlPAwc=
github.com/dapr/components-contrib v1.10.0-rc.1.0.20230216221835-d098e38d6a4c h1:gqj+iTHX9zcsQxlDEqht79e8f5K7DsEg6YxGpUe4InY=
github.com/dapr/components-contrib v1.10.0-rc.1.0.20230216221835-d098e38d6a4c/go.mod h1:xe+Gh1MW0GFqzidHR66W77KF6kKTRpCdNatA0bpxOHI=
github.com/dapr/kit v0.0.4 h1:i+7TIN4crC1Mo0JFyWpIkwAE8orlliA0O6/ibvs2AaE=
github.com/dapr/kit v0.0.4/go.mod h1:RFN6r5pZzhrelB0SUr8Dha44ckRBl7t+B01X5aw8WeE=
github.com/dave/jennifer v1.4.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
Expand Down
2 changes: 1 addition & 1 deletion pkg/actors/actors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (f *fakeStateStore) newItem(data []byte) *fakeStateStoreItem {
}
}

func (f *fakeStateStore) Init(metadata state.Metadata) error {
func (f *fakeStateStore) Init(ctx context.Context, metadata state.Metadata) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/channel/http/http_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (h *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethod
}
}))
execPipeline.ServeHTTP(rw, channelReq)
resp = rw.Result()
resp = rw.Result() //nolint:bodyclose
} else {
// Send request to user application
// (Body is closed below, but linter isn't detecting that)
Expand Down
33 changes: 31 additions & 2 deletions pkg/components/bindings/input_pluggable.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"io"
"sync"
"sync/atomic"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/dapr/pkg/components/pluggable"
Expand All @@ -30,10 +31,14 @@ type grpcInputBinding struct {
*pluggable.GRPCConnector[proto.InputBindingClient]
bindings.InputBinding
logger logger.Logger

closed atomic.Bool
wg sync.WaitGroup
closeCh chan struct{}
}

// Init initializes the grpc inputbinding passing out the metadata to the grpc component.
func (b *grpcInputBinding) Init(metadata bindings.Metadata) error {
func (b *grpcInputBinding) Init(ctx context.Context, metadata bindings.Metadata) error {
if err := b.Dial(metadata.Name); err != nil {
return err
}
Expand Down Expand Up @@ -104,7 +109,18 @@ func (b *grpcInputBinding) Read(ctx context.Context, handler bindings.Handler) e
streamCtx, cancel := context.WithCancel(readStream.Context())
handle := b.adaptHandler(streamCtx, readStream, handler)

b.wg.Add(2)
// Cancel on input binding close.
go func() {
defer b.wg.Done()
defer cancel()
select {
case <-b.closeCh:
case <-streamCtx.Done():
}
}()
go func() {
defer b.wg.Done()
defer cancel()
for {
msg, err := readStream.Recv()
Expand All @@ -117,13 +133,25 @@ func (b *grpcInputBinding) Read(ctx context.Context, handler bindings.Handler) e
b.logger.Errorf("failed to receive message: %v", err)
return
}
go handle(msg)
b.wg.Add(1)
go func() {
defer b.wg.Done()
handle(msg)
}()
}
}()

return nil
}

func (b *grpcInputBinding) Close() error {
defer b.wg.Wait()
if b.closed.CompareAndSwap(false, true) {
close(b.closeCh)
}
return b.InputBinding.Close()
}

// Returns the component metadata options
func (b *grpcInputBinding) GetComponentMetadata() map[string]string {
// GetComponentMetadata does not apply to pluggable components as there is no standard metadata to return
Expand All @@ -135,6 +163,7 @@ func inputFromConnector(l logger.Logger, connector *pluggable.GRPCConnector[prot
return &grpcInputBinding{
GRPCConnector: connector,
logger: l,
closeCh: make(chan struct{}),
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/components/bindings/input_pluggable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestInputBindingCalls(t *testing.T) {
}()

conn := inputFromConnector(testLogger, connector)
err = conn.Init(bindings.Metadata{
err = conn.Init(context.Background(), bindings.Metadata{
Base: contribMetadata.Base{},
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/components/bindings/output_pluggable.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type grpcOutputBinding struct {
}

// Init initializes the grpc outputbinding passing out the metadata to the grpc component.
func (b *grpcOutputBinding) Init(metadata bindings.Metadata) error {
func (b *grpcOutputBinding) Init(ctx context.Context, metadata bindings.Metadata) error {
if err := b.Dial(metadata.Name); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/components/bindings/output_pluggable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestOutputBindingCalls(t *testing.T) {
}()

conn := outputFromConnector(testLogger, connector)
err = conn.Init(bindings.Metadata{
err = conn.Init(context.Background(), bindings.Metadata{
Base: contribMetadata.Base{},
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/components/pubsub/pluggable.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type grpcPubSub struct {

// Init initializes the grpc pubsub passing out the metadata to the grpc component.
// It also fetches and set the component features.
func (p *grpcPubSub) Init(metadata pubsub.Metadata) error {
func (p *grpcPubSub) Init(ctx context.Context, metadata pubsub.Metadata) error {
if err := p.Dial(metadata.Name); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/components/pubsub/pluggable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestPubSubPluggableCalls(t *testing.T) {
}()

ps := fromConnector(testLogger, connector)
err = ps.Init(pubsub.Metadata{
err = ps.Init(context.Background(), pubsub.Metadata{
Base: contribMetadata.Base{},
})

Expand Down
Loading

0 comments on commit 3a5aac4

Please sign in to comment.