From 6f15b3bd9fffe873b8495eb46e3567c4439cf2ae Mon Sep 17 00:00:00 2001 From: pashakostohrys Date: Tue, 12 Dec 2023 16:44:44 +0200 Subject: [PATCH] feat: support multiple transports --- Procfile | 2 +- .../commands/event_reporter_server.go | 48 +++--- event_reporter/application/client.go | 148 ++++++++++++++++++ event_reporter/controller/controller.go | 6 +- event_reporter/handlers/handlers.go | 5 +- .../reporter/application_event_reporter.go | 8 +- event_reporter/server.go | 4 +- 7 files changed, 190 insertions(+), 31 deletions(-) create mode 100644 event_reporter/application/client.go diff --git a/Procfile b/Procfile index 2bb26a086fb1d..b3545698c000b 100644 --- a/Procfile +++ b/Procfile @@ -1,7 +1,7 @@ controller: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-application-controller $COMMAND --loglevel debug --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --repo-server localhost:${ARGOCD_E2E_REPOSERVER_PORT:-8081} --otlp-address=${ARGOCD_OTLP_ADDRESS} --application-namespaces=${ARGOCD_APPLICATION_NAMESPACES:-''}" api-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-server $COMMAND --loglevel debug --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --disable-auth=${ARGOCD_E2E_DISABLE_AUTH:-'true'} --insecure --dex-server http://localhost:${ARGOCD_E2E_DEX_PORT:-5556} --repo-server localhost:${ARGOCD_E2E_REPOSERVER_PORT:-8081} --port ${ARGOCD_E2E_APISERVER_PORT:-8080} --otlp-address=${ARGOCD_OTLP_ADDRESS} --application-namespaces=${ARGOCD_APPLICATION_NAMESPACES:-''}" dex: sh -c "ARGOCD_BINARY_NAME=argocd-dex go run github.com/argoproj/argo-cd/v2/cmd gendexcfg -o `pwd`/dist/dex.yaml && (test -f dist/dex.yaml || { echo 'Failed to generate dex configuration'; exit 1; }) && docker run --rm -p ${ARGOCD_E2E_DEX_PORT:-5556}:${ARGOCD_E2E_DEX_PORT:-5556} -v `pwd`/dist/dex.yaml:/dex.yaml ghcr.io/dexidp/dex:$(grep "image: ghcr.io/dexidp/dex" manifests/base/dex/argocd-dex-server-deployment.yaml | cut -d':' -f3) dex serve /dex.yaml" -redis: bash -c "if [ \"$ARGOCD_REDIS_LOCAL\" = 'true' ]; then redis-server --save '' --appendonly no --port ${ARGOCD_E2E_REDIS_PORT:-6379}; else docker run --rm --name argocd-redis -i -p ${ARGOCD_E2E_REDIS_PORT:-6379}:${ARGOCD_E2E_REDIS_PORT:-6379} docker.io/library/redis:$(grep "image: redis" manifests/base/redis/argocd-redis-deployment.yaml | cut -d':' -f3) --save '' --appendonly no --port ${ARGOCD_E2E_REDIS_PORT:-6379}; fi" +redis: bash -c "if [ \"$ARGOCD_REDIS_LOCAL\" = 'true' ]; then redis-server --save '' --appendonly no --port ${ARGOCD_E2E_REDIS_PORT:-6379}; else docker run --rm --name argocd-redis -i -p ${ARGOCD_E2E_REDIS_PORT:-6379}:${ARGOCD_E2E_REDIS_PORT:-6379} docker.io/library/redis:7.0.14-alpine --save '' --appendonly no --port ${ARGOCD_E2E_REDIS_PORT:-6379}; fi" repo-server: [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_GNUPGHOME=${ARGOCD_GNUPGHOME:-/tmp/argocd-local/gpg/keys} ARGOCD_PLUGINSOCKFILEPATH=${ARGOCD_PLUGINSOCKFILEPATH:-./test/cmp} ARGOCD_GPG_DATA_PATH=${ARGOCD_GPG_DATA_PATH:-/tmp/argocd-local/gpg/source} ARGOCD_TLS_DATA_PATH=${ARGOCD_TLS_DATA_PATH:-/tmp/argocd-local/tls} ARGOCD_SSH_DATA_PATH=${ARGOCD_SSH_DATA_PATH:-/tmp/argocd-local/ssh} ARGOCD_BINARY_NAME=argocd-repo-server ARGOCD_GPG_ENABLED=${ARGOCD_GPG_ENABLED:-false} $COMMAND --loglevel debug --port ${ARGOCD_E2E_REPOSERVER_PORT:-8081} --redis localhost:${ARGOCD_E2E_REDIS_PORT:-6379} --otlp-address=${ARGOCD_OTLP_ADDRESS}" cmp-server: [ "$ARGOCD_E2E_TEST" = 'true' ] && exit 0 || [ "$BIN_MODE" = 'true' ] && COMMAND=./dist/argocd || COMMAND='go run ./cmd/main.go' && sh -c "FORCE_LOG_COLORS=1 ARGOCD_FAKE_IN_CLUSTER=true ARGOCD_BINARY_NAME=argocd-cmp-server ARGOCD_PLUGINSOCKFILEPATH=${ARGOCD_PLUGINSOCKFILEPATH:-./test/cmp} $COMMAND --config-dir-path ./test/cmp --loglevel debug --otlp-address=${ARGOCD_OTLP_ADDRESS}" ui: sh -c 'cd ui && ${ARGOCD_E2E_YARN_CMD:-yarn} start' diff --git a/cmd/event-reporter-server/commands/event_reporter_server.go b/cmd/event-reporter-server/commands/event_reporter_server.go index 700e6360351fc..833e4fe33349f 100644 --- a/cmd/event-reporter-server/commands/event_reporter_server.go +++ b/cmd/event-reporter-server/commands/event_reporter_server.go @@ -7,6 +7,7 @@ import ( "time" "github.com/argoproj/argo-cd/v2/event_reporter" + appclient "github.com/argoproj/argo-cd/v2/event_reporter/application" "github.com/argoproj/argo-cd/v2/event_reporter/codefresh" "github.com/argoproj/argo-cd/v2/pkg/apiclient" @@ -45,6 +46,31 @@ func init() { failureRetryPeriodMilliSeconds = env.ParseNumFromEnv(failureRetryPeriodMilliSecondsEnv, failureRetryPeriodMilliSeconds, 0, 1000) } +func getApplicationClient(useGrpc bool, address, token string) appclient.ApplicationClient { + if useGrpc { + applicationClientSet, err := apiclient.NewClient(&apiclient.ClientOptions{ + ServerAddr: address, + Insecure: true, + GRPCWeb: true, + PlainText: true, + AuthToken: token, + }) + + errors.CheckError(err) + + closer, applicationClient, err := applicationClientSet.NewApplicationClient() + + errors.CheckError(err) + + defer func() { + _ = closer.Close() + }() + + return applicationClient + } + return appclient.NewHttpApplicationClient(token, address) +} + // NewCommand returns a new instance of an event reporter command func NewCommand() *cobra.Command { var ( @@ -68,6 +94,7 @@ func NewCommand() *cobra.Command { codefreshUrl string codefreshToken string shardingAlgorithm string + useGrpc bool ) var command = &cobra.Command{ Use: cliName, @@ -130,24 +157,6 @@ func NewCommand() *cobra.Command { repoclientset := repoapiclient.NewRepoServerClientset(repoServerAddress, repoServerTimeoutSeconds, tlsConfig) - applicationClientSet, err := apiclient.NewClient(&apiclient.ClientOptions{ - ServerAddr: applicationServerAddress, - Insecure: true, - GRPCWeb: true, - PlainText: true, - AuthToken: argocdToken, - }) - - errors.CheckError(err) - - closer, applicationClient, err := applicationClientSet.NewApplicationClient() - - errors.CheckError(err) - - defer func() { - _ = closer.Close() - }() - eventReporterServerOpts := event_reporter.EventReporterServerOpts{ ListenPort: listenPort, ListenHost: listenHost, @@ -160,7 +169,7 @@ func NewCommand() *cobra.Command { Cache: cache, RedisClient: redisClient, ApplicationNamespaces: applicationNamespaces, - ApplicationServiceClient: applicationClient, + ApplicationServiceClient: getApplicationClient(useGrpc, applicationServerAddress, argocdToken), CodefreshConfig: &codefresh.CodefreshConfig{ BaseURL: codefreshUrl, AuthToken: codefreshToken, @@ -206,6 +215,7 @@ func NewCommand() *cobra.Command { command.Flags().StringVar(&codefreshUrl, "codefresh-url", env.StringFromEnv("CODEFRESH_URL", "https://g.codefresh.io"), "Codefresh API url") command.Flags().StringVar(&codefreshToken, "codefresh-token", env.StringFromEnv("CODEFRESH_TOKEN", ""), "Codefresh token") command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvEventReporterShardingAlgorithm, common.DefaultEventReporterShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy] ") + command.Flags().BoolVar(&useGrpc, "grpc", env.ParseBoolFromEnv("USE_GRPC", true), "Use grpc for interact with argocd server") cacheSrc = servercache.AddCacheFlagsToCmd(command, func(client *redis.Client) { redisClient = client }) diff --git a/event_reporter/application/client.go b/event_reporter/application/client.go new file mode 100644 index 0000000000000..6eef4babb6e45 --- /dev/null +++ b/event_reporter/application/client.go @@ -0,0 +1,148 @@ +package application + +import ( + "context" + "encoding/json" + "errors" + "fmt" + appclient "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + repoapiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient" + "google.golang.org/grpc" + "io" + "net/http" + "strings" + "time" +) + +type ApplicationClient interface { + Get(ctx context.Context, in *appclient.ApplicationQuery, opts ...grpc.CallOption) (*v1alpha1.Application, error) + + RevisionMetadata(ctx context.Context, in *appclient.RevisionMetadataQuery, opts ...grpc.CallOption) (*v1alpha1.RevisionMetadata, error) + + GetManifests(ctx context.Context, in *appclient.ApplicationManifestQuery, opts ...grpc.CallOption) (*repoapiclient.ManifestResponse, error) + + ResourceTree(ctx context.Context, in *appclient.ResourcesQuery, opts ...grpc.CallOption) (*v1alpha1.ApplicationTree, error) + + GetResource(ctx context.Context, in *appclient.ApplicationResourceRequest, opts ...grpc.CallOption) (*appclient.ApplicationResourceResponse, error) + + List(ctx context.Context, in *appclient.ApplicationQuery, opts ...grpc.CallOption) (*v1alpha1.ApplicationList, error) +} + +type httpApplicationClient struct { + httpClient *http.Client + baseUrl string + token string +} + +func NewHttpApplicationClient(token string, address string) ApplicationClient { + if !strings.Contains(address, "http") { + address = "http://" + address + } + + return &httpApplicationClient{ + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + baseUrl: address, + token: token, + } +} + +func (c *httpApplicationClient) execute(ctx context.Context, url string, result interface{}, printBody ...bool) error { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+c.token) + + res, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + b, _ := io.ReadAll(res.Body) + + isStatusOK := res.StatusCode >= 200 && res.StatusCode < 300 + if !isStatusOK { + return errors.New(fmt.Sprintf("argocd server respond with code %d, msg is: %s", res.StatusCode, string(b))) + } + + err = json.Unmarshal(b, &result) + if err != nil { + return err + } + return nil +} + +func (c *httpApplicationClient) Get(ctx context.Context, in *appclient.ApplicationQuery, opts ...grpc.CallOption) (*v1alpha1.Application, error) { + url := fmt.Sprintf("%s/api/v1/applications/%s", c.baseUrl, *in.Name) + application := &v1alpha1.Application{} + err := c.execute(ctx, url, application) + if err != nil { + return nil, err + } + return application, nil +} + +func (c *httpApplicationClient) RevisionMetadata(ctx context.Context, in *appclient.RevisionMetadataQuery, opts ...grpc.CallOption) (*v1alpha1.RevisionMetadata, error) { + url := fmt.Sprintf("%s/api/v1/applications/%s/revisions/%s/metadata", c.baseUrl, *in.Name, *in.Revision) + revisionMetadata := &v1alpha1.RevisionMetadata{} + err := c.execute(ctx, url, revisionMetadata) + if err != nil { + return nil, err + } + return revisionMetadata, nil +} + +func (c *httpApplicationClient) GetManifests(ctx context.Context, in *appclient.ApplicationManifestQuery, opts ...grpc.CallOption) (*repoapiclient.ManifestResponse, error) { + url := fmt.Sprintf("%s/api/v1/applications/%s/manifests", c.baseUrl, *in.Name) + + manifest := &repoapiclient.ManifestResponse{} + err := c.execute(ctx, url, manifest) + if err != nil { + return nil, err + } + return manifest, nil +} + +func (c *httpApplicationClient) ResourceTree(ctx context.Context, in *appclient.ResourcesQuery, opts ...grpc.CallOption) (*v1alpha1.ApplicationTree, error) { + url := fmt.Sprintf("%s/api/v1/applications/%s/resource-tree", c.baseUrl, *in.ApplicationName) + tree := &v1alpha1.ApplicationTree{} + err := c.execute(ctx, url, tree) + if err != nil { + return nil, err + } + return tree, nil +} + +func (c *httpApplicationClient) GetResource(ctx context.Context, in *appclient.ApplicationResourceRequest, opts ...grpc.CallOption) (*appclient.ApplicationResourceResponse, error) { + params := fmt.Sprintf("?namespace=%s&resourceName=%s&version=%s&group=%s&kind=%s", + *in.Namespace, + *in.ResourceName, + *in.Version, + *in.Group, + *in.Kind) + url := fmt.Sprintf("%s/api/v1/applications/%s/resource%s", c.baseUrl, *in.Name, params) + + applicationResource := &appclient.ApplicationResourceResponse{} + err := c.execute(ctx, url, applicationResource, true) + if err != nil { + return nil, err + } + return applicationResource, nil +} + +func (c *httpApplicationClient) List(ctx context.Context, in *appclient.ApplicationQuery, opts ...grpc.CallOption) (*v1alpha1.ApplicationList, error) { + url := fmt.Sprintf("%s/api/v1/applications", c.baseUrl) + + apps := &v1alpha1.ApplicationList{} + err := c.execute(ctx, url, apps) + if err != nil { + return nil, err + } + return apps, nil +} diff --git a/event_reporter/controller/controller.go b/event_reporter/controller/controller.go index 564712e742758..018a14074e6a6 100644 --- a/event_reporter/controller/controller.go +++ b/event_reporter/controller/controller.go @@ -2,6 +2,7 @@ package controller import ( "context" + appclient "github.com/argoproj/argo-cd/v2/event_reporter/application" "math" "strings" "time" @@ -10,7 +11,6 @@ import ( "github.com/argoproj/argo-cd/v2/event_reporter/codefresh" "github.com/argoproj/argo-cd/v2/event_reporter/metrics" "github.com/argoproj/argo-cd/v2/event_reporter/reporter" - applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" servercache "github.com/argoproj/argo-cd/v2/server/cache" @@ -37,11 +37,11 @@ type eventReporterController struct { applicationEventReporter reporter.ApplicationEventReporter cache *servercache.Cache appLister applisters.ApplicationLister - applicationServiceClient applicationpkg.ApplicationServiceClient + applicationServiceClient appclient.ApplicationClient metricsServer *metrics.MetricsServer } -func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient applicationpkg.ApplicationServiceClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager) EventReporterController { +func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager) EventReporterController { appBroadcaster := reporter.NewBroadcaster(featureManager) appInformer.AddEventHandler(appBroadcaster) return &eventReporterController{ diff --git a/event_reporter/handlers/handlers.go b/event_reporter/handlers/handlers.go index 951af200a0ad8..d196a9280fc32 100644 --- a/event_reporter/handlers/handlers.go +++ b/event_reporter/handlers/handlers.go @@ -2,6 +2,7 @@ package handlers import ( "encoding/json" + appclient "github.com/argoproj/argo-cd/v2/event_reporter/application" "github.com/argoproj/argo-cd/v2/event_reporter/sharding" applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" "net/http" @@ -10,10 +11,10 @@ import ( ) type RequestHandlers struct { - ApplicationServiceClient applicationpkg.ApplicationServiceClient + ApplicationServiceClient appclient.ApplicationClient } -func GetRequestHandlers(applicationServiceClient applicationpkg.ApplicationServiceClient) *RequestHandlers { +func GetRequestHandlers(applicationServiceClient appclient.ApplicationClient) *RequestHandlers { return &RequestHandlers{ ApplicationServiceClient: applicationServiceClient, } diff --git a/event_reporter/reporter/application_event_reporter.go b/event_reporter/reporter/application_event_reporter.go index b5af74a741a77..5ff504d3c4303 100644 --- a/event_reporter/reporter/application_event_reporter.go +++ b/event_reporter/reporter/application_event_reporter.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/argoproj/argo-cd/v2/reposerver/apiclient" "math" "reflect" "strings" @@ -12,7 +13,6 @@ import ( argocommon "github.com/argoproj/argo-cd/v2/common" "github.com/argoproj/argo-cd/v2/event_reporter/codefresh" "github.com/argoproj/argo-cd/v2/event_reporter/metrics" - applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" servercache "github.com/argoproj/argo-cd/v2/server/cache" "github.com/argoproj/argo-cd/v2/util/env" @@ -28,11 +28,11 @@ import ( "k8s.io/apimachinery/pkg/watch" "sigs.k8s.io/yaml" + appclient "github.com/argoproj/argo-cd/v2/event_reporter/application" "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" appv1reg "github.com/argoproj/argo-cd/v2/pkg/apis/application" appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - "github.com/argoproj/argo-cd/v2/reposerver/apiclient" ) var ( @@ -43,7 +43,7 @@ type applicationEventReporter struct { cache *servercache.Cache codefreshClient codefresh.CodefreshClient appLister applisters.ApplicationLister - applicationServiceClient applicationpkg.ApplicationServiceClient + applicationServiceClient appclient.ApplicationClient metricsServer *metrics.MetricsServer } @@ -59,7 +59,7 @@ type ApplicationEventReporter interface { ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool) } -func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient applicationpkg.ApplicationServiceClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer) ApplicationEventReporter { +func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer) ApplicationEventReporter { return &applicationEventReporter{ cache: cache, applicationServiceClient: applicationServiceClient, diff --git a/event_reporter/server.go b/event_reporter/server.go index 9f76a3b7e4920..628c7ad6afa71 100644 --- a/event_reporter/server.go +++ b/event_reporter/server.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "fmt" + appclient "github.com/argoproj/argo-cd/v2/event_reporter/application" "github.com/argoproj/argo-cd/v2/event_reporter/reporter" "net" "net/http" @@ -16,7 +17,6 @@ import ( event_reporter "github.com/argoproj/argo-cd/v2/event_reporter/controller" "github.com/argoproj/argo-cd/v2/event_reporter/handlers" "github.com/argoproj/argo-cd/v2/event_reporter/metrics" - applicationpkg "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" appinformer "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions" applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" @@ -86,7 +86,7 @@ type EventReporterServerOpts struct { KubeClientset kubernetes.Interface AppClientset appclientset.Interface RepoClientset repoapiclient.Clientset - ApplicationServiceClient applicationpkg.ApplicationServiceClient + ApplicationServiceClient appclient.ApplicationClient Cache *servercache.Cache RedisClient *redis.Client ApplicationNamespaces []string