Skip to content

Commit

Permalink
Revert "Revert "feat: support new reporting component (#265)""
Browse files Browse the repository at this point in the history
This reverts commit 3514011.
  • Loading branch information
pasha-codefresh committed Nov 28, 2023
1 parent 3514011 commit e2cc015
Show file tree
Hide file tree
Showing 30 changed files with 3,761 additions and 12 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ COPY --from=argocd-build /go/src/github.com/argoproj/argo-cd/dist/argocd* /usr/l
USER root
RUN ln -s /usr/local/bin/argocd /usr/local/bin/argocd-server && \
ln -s /usr/local/bin/argocd /usr/local/bin/argocd-repo-server && \
ln -s /usr/local/bin/argocd /usr/local/bin/event-reporter-server && \
ln -s /usr/local/bin/argocd /usr/local/bin/argocd-cmp-server && \
ln -s /usr/local/bin/argocd /usr/local/bin/argocd-application-controller && \
ln -s /usr/local/bin/argocd /usr/local/bin/argocd-dex && \
Expand Down
6 changes: 6 additions & 0 deletions cmd/event-reporter-server/commands/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package commands

const (
// cliName is the name of the CLI
cliName = "event-reporter-server"
)
213 changes: 213 additions & 0 deletions cmd/event-reporter-server/commands/event_reporter_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package commands

import (
"context"
"fmt"
"math"
"time"

"github.com/argoproj/argo-cd/v2/event_reporter"
"github.com/argoproj/argo-cd/v2/event_reporter/codefresh"
"github.com/argoproj/argo-cd/v2/pkg/apiclient"

"github.com/argoproj/pkg/stats"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

cmdutil "github.com/argoproj/argo-cd/v2/cmd/util"
"github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
repoapiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient"
servercache "github.com/argoproj/argo-cd/v2/server/cache"
"github.com/argoproj/argo-cd/v2/util/cli"
"github.com/argoproj/argo-cd/v2/util/env"
"github.com/argoproj/argo-cd/v2/util/errors"
"github.com/argoproj/argo-cd/v2/util/kube"
"github.com/argoproj/argo-cd/v2/util/tls"
)

const (
failureRetryCountEnv = "EVENT_REPORTER_K8S_RETRY_COUNT"
failureRetryPeriodMilliSecondsEnv = "EVENT_REPORTE_K8S_RETRY_DURATION_MILLISECONDS"
)

var (
failureRetryCount = 0
failureRetryPeriodMilliSeconds = 100
)

func init() {
failureRetryCount = env.ParseNumFromEnv(failureRetryCountEnv, failureRetryCount, 0, 10)
failureRetryPeriodMilliSeconds = env.ParseNumFromEnv(failureRetryPeriodMilliSecondsEnv, failureRetryPeriodMilliSeconds, 0, 1000)
}

// NewCommand returns a new instance of an event reporter command
func NewCommand() *cobra.Command {
var (
redisClient *redis.Client
insecure bool
listenHost string
listenPort int
metricsHost string
metricsPort int
glogLevel int
clientConfig clientcmd.ClientConfig
repoServerTimeoutSeconds int
repoServerAddress string
applicationServerAddress string
cacheSrc func() (*servercache.Cache, error)
contentSecurityPolicy string
repoServerPlaintext bool
repoServerStrictTLS bool
applicationNamespaces []string
argocdToken string
codefreshUrl string
codefreshToken string
shardingAlgorithm string
)
var command = &cobra.Command{
Use: cliName,
Short: "Run the Event Reporter server",
Long: "The Event reporter is a server that listens to Kubernetes events and reports them to the Codefresh server.",
DisableAutoGenTag: true,
Run: func(c *cobra.Command, args []string) {
ctx := c.Context()

vers := common.GetVersion()
namespace, _, err := clientConfig.Namespace()
errors.CheckError(err)
vers.LogStartupInfo(
"Event Reporter Server",
map[string]any{
"namespace": namespace,
"port": listenPort,
},
)

cli.SetLogFormat(cmdutil.LogFormat)
cli.SetLogLevel(cmdutil.LogLevel)
cli.SetGLogLevel(glogLevel)

config, err := clientConfig.ClientConfig()
errors.CheckError(err)
errors.CheckError(v1alpha1.SetK8SConfigDefaults(config))

cache, err := cacheSrc()
errors.CheckError(err)

kubeclientset := kubernetes.NewForConfigOrDie(config)

appclientsetConfig, err := clientConfig.ClientConfig()
errors.CheckError(err)
errors.CheckError(v1alpha1.SetK8SConfigDefaults(appclientsetConfig))
config.UserAgent = fmt.Sprintf("argocd-server/%s (%s)", vers.Version, vers.Platform)

if failureRetryCount > 0 {
appclientsetConfig = kube.AddFailureRetryWrapper(appclientsetConfig, failureRetryCount, failureRetryPeriodMilliSeconds)
}
appClientSet := appclientset.NewForConfigOrDie(appclientsetConfig)
tlsConfig := repoapiclient.TLSConfiguration{
DisableTLS: repoServerPlaintext,
StrictValidation: repoServerStrictTLS,
}

// Load CA information to use for validating connections to the
// repository server, if strict TLS validation was requested.
if !repoServerPlaintext && repoServerStrictTLS {
pool, err := tls.LoadX509CertPool(
fmt.Sprintf("%s/server/tls/tls.crt", env.StringFromEnv(common.EnvAppConfigPath, common.DefaultAppConfigPath)),
fmt.Sprintf("%s/server/tls/ca.crt", env.StringFromEnv(common.EnvAppConfigPath, common.DefaultAppConfigPath)),
)
if err != nil {
log.Fatalf("%v", err)
}
tlsConfig.Certificates = pool
}

repoclientset := repoapiclient.NewRepoServerClientset(repoServerAddress, repoServerTimeoutSeconds, tlsConfig)

applicationClientSet, err := apiclient.NewClient(&apiclient.ClientOptions{
ServerAddr: applicationServerAddress,
Insecure: true,
GRPCWeb: true,
PlainText: true,
AuthToken: argocdToken,
})

errors.CheckError(err)

closer, applicationClient, err := applicationClientSet.NewApplicationClient()

errors.CheckError(err)

defer func() {
_ = closer.Close()
}()

eventReporterServerOpts := event_reporter.EventReporterServerOpts{
ListenPort: listenPort,
ListenHost: listenHost,
MetricsPort: metricsPort,
MetricsHost: metricsHost,
Namespace: namespace,
KubeClientset: kubeclientset,
AppClientset: appClientSet,
RepoClientset: repoclientset,
Cache: cache,
RedisClient: redisClient,
ApplicationNamespaces: applicationNamespaces,
ApplicationServiceClient: applicationClient,
CodefreshConfig: &codefresh.CodefreshConfig{
BaseURL: codefreshUrl,
AuthToken: codefreshToken,
},
}

stats.RegisterStackDumper()
stats.StartStatsTicker(10 * time.Minute)
stats.RegisterHeapDumper("memprofile")
eventReporterServer := event_reporter.NewEventReporterServer(ctx, eventReporterServerOpts)
eventReporterServer.Init(ctx)
lns, err := eventReporterServer.Listen()
errors.CheckError(err)
for {
var closer func()
ctx, cancel := context.WithCancel(ctx)
eventReporterServer.Run(ctx, lns)
cancel()
if closer != nil {
closer()
}
}
},
}

clientConfig = cli.AddKubectlFlagsToCmd(command)
command.Flags().BoolVar(&insecure, "insecure", env.ParseBoolFromEnv("EVENT_REPORTER_INSECURE", false), "Run server without TLS")
command.Flags().StringVar(&cmdutil.LogFormat, "logformat", env.StringFromEnv("EVENT_REPORTER_LOGFORMAT", "text"), "Set the logging format. One of: text|json")
command.Flags().StringVar(&cmdutil.LogLevel, "loglevel", env.StringFromEnv("EVENT_REPORTER_LOG_LEVEL", "info"), "Set the logging level. One of: debug|info|warn|error")
command.Flags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level")
command.Flags().StringVar(&applicationServerAddress, "application-server", env.StringFromEnv("EVENT_REPORTER_APPLICATION_SERVER", common.DefaultApplicationServerAddr), "Application server address")
command.Flags().StringVar(&argocdToken, "argocd-token", env.StringFromEnv("ARGOCD_TOKEN", ""), "ArgoCD server JWT token")
command.Flags().StringVar(&repoServerAddress, "repo-server", env.StringFromEnv("EVENT_REPORTER_REPO_SERVER", common.DefaultRepoServerAddr), "Repo server address")
command.AddCommand(cli.NewVersionCmd(cliName))
command.Flags().StringVar(&listenHost, "address", env.StringFromEnv("EVENT_REPORTER_LISTEN_ADDRESS", common.DefaultAddressEventReporterServer), "Listen on given address")
command.Flags().IntVar(&listenPort, "port", common.DefaultPortEventReporterServer, "Listen on given port")
command.Flags().StringVar(&metricsHost, env.StringFromEnv("EVENT_REPORTER_METRICS_LISTEN_ADDRESS", "metrics-address"), common.DefaultAddressEventReporterServerMetrics, "Listen for metrics on given address")
command.Flags().IntVar(&metricsPort, "metrics-port", common.DefaultPortEventReporterServerMetrics, "Start metrics on given port")
command.Flags().IntVar(&repoServerTimeoutSeconds, "repo-server-timeout-seconds", env.ParseNumFromEnv("EVENT_REPORTER_REPO_SERVER_TIMEOUT_SECONDS", 60, 0, math.MaxInt64), "Repo server RPC call timeout seconds.")
command.Flags().StringVar(&contentSecurityPolicy, "content-security-policy", env.StringFromEnv("EVENT_REPORTER_CONTENT_SECURITY_POLICY", "frame-ancestors 'self';"), "Set Content-Security-Policy header in HTTP responses to `value`. To disable, set to \"\".")
command.Flags().BoolVar(&repoServerPlaintext, "repo-server-plaintext", env.ParseBoolFromEnv("EVENT_REPORTER_REPO_SERVER_PLAINTEXT", false), "Use a plaintext client (non-TLS) to connect to repository server")
command.Flags().BoolVar(&repoServerStrictTLS, "repo-server-strict-tls", env.ParseBoolFromEnv("EVENT_REPORTER_REPO_SERVER_STRICT_TLS", false), "Perform strict validation of TLS certificates when connecting to repo server")
command.Flags().StringVar(&codefreshUrl, "codefresh-url", env.StringFromEnv("CODEFRESH_URL", "https://g.codefresh.io"), "Codefresh API url")
command.Flags().StringVar(&codefreshToken, "codefresh-token", env.StringFromEnv("CODEFRESH_TOKEN", ""), "Codefresh token")
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvEventReporterShardingAlgorithm, common.DefaultEventReporterShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy] ")
cacheSrc = servercache.AddCacheFlagsToCmd(command, func(client *redis.Client) {
redisClient = client
})
return command
}
3 changes: 3 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
reposerver "github.com/argoproj/argo-cd/v2/cmd/argocd-repo-server/commands"
apiserver "github.com/argoproj/argo-cd/v2/cmd/argocd-server/commands"
cli "github.com/argoproj/argo-cd/v2/cmd/argocd/commands"
eventreporterserver "github.com/argoproj/argo-cd/v2/cmd/event-reporter-server/commands"
)

const (
Expand All @@ -34,6 +35,8 @@ func main() {
command = cli.NewCommand()
case "argocd-server":
command = apiserver.NewCommand()
case "event-reporter-server":
command = eventreporterserver.NewCommand()
case "argocd-application-controller":
command = appcontroller.NewCommand()
case "argocd-repo-server":
Expand Down
39 changes: 28 additions & 11 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
// Default service addresses and URLS of Argo CD internal services
const (
// DefaultRepoServerAddr is the gRPC address of the Argo CD repo server
DefaultRepoServerAddr = "argocd-repo-server:8081"
DefaultRepoServerAddr = "argocd-repo-server:8081"
DefaultApplicationServerAddr = "argo-cd-server:80"
// DefaultDexServerAddr is the HTTP address of the Dex OIDC server, which we run a reverse proxy against
DefaultDexServerAddr = "argocd-dex-server:5556"
// DefaultRedisAddr is the default redis address
Expand Down Expand Up @@ -44,20 +45,24 @@ const (

// Default listener ports for ArgoCD components
const (
DefaultPortAPIServer = 8080
DefaultPortRepoServer = 8081
DefaultPortArgoCDMetrics = 8082
DefaultPortArgoCDAPIServerMetrics = 8083
DefaultPortRepoServerMetrics = 8084
DefaultPortAPIServer = 8080
DefaultPortRepoServer = 8081
DefaultPortArgoCDMetrics = 8082
DefaultPortArgoCDAPIServerMetrics = 8083
DefaultPortRepoServerMetrics = 8084
DefaultPortEventReporterServerMetrics = 8087
DefaultPortEventReporterServer = 8088
)

// DefaultAddressAPIServer for ArgoCD components
const (
DefaultAddressAdminDashboard = "localhost"
DefaultAddressAPIServer = "0.0.0.0"
DefaultAddressAPIServerMetrics = "0.0.0.0"
DefaultAddressRepoServer = "0.0.0.0"
DefaultAddressRepoServerMetrics = "0.0.0.0"
DefaultAddressAdminDashboard = "localhost"
DefaultAddressAPIServer = "0.0.0.0"
DefaultAddressAPIServerMetrics = "0.0.0.0"
DefaultAddressRepoServer = "0.0.0.0"
DefaultAddressRepoServerMetrics = "0.0.0.0"
DefaultAddressEventReporterServer = "0.0.0.0"
DefaultAddressEventReporterServerMetrics = "0.0.0.0"
)

// Default paths on the pod's file system
Expand Down Expand Up @@ -237,6 +242,12 @@ const (
EnvCMPWorkDir = "ARGOCD_CMP_WORKDIR"
// EnvGPGDataPath overrides the location where GPG keyring for signature verification is stored
EnvGPGDataPath = "ARGOCD_GPG_DATA_PATH"
// EnvEventReporterShardingAlgorithm is the distribution sharding algorithm to be used: legacy
EnvEventReporterShardingAlgorithm = "EVENT_REPORTER_SHARDING_ALGORITHM"
// EnvEventReporterReplicas is the number of EventReporter replicas
EnvEventReporterReplicas = "EVENT_REPORTER_REPLICAS"
// EnvEventReporterShard is the shard number that should be handled by reporter
EnvEventReporterShard = "EVENT_REPORTER_SHARD"
)

// Config Management Plugin related constants
Expand Down Expand Up @@ -345,3 +356,9 @@ const TokenVerificationError = "failed to verify the token"
var TokenVerificationErr = errors.New(TokenVerificationError)

var PermissionDeniedAPIError = status.Error(codes.PermissionDenied, "permission denied")

// Event reporter constants
const (
EventReporterLegacyShardingAlgorithm = "legacy"
DefaultEventReporterShardingAlgorithm = EventReporterLegacyShardingAlgorithm
)
78 changes: 78 additions & 0 deletions event_reporter/codefresh/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package codefresh

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"

"github.com/argoproj/argo-cd/v2/pkg/apiclient/events"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

type CodefreshConfig struct {
BaseURL string
AuthToken string
}

type codefreshClient struct {
cfConfig *CodefreshConfig
httpClient *http.Client
}

type CodefreshClient interface {
Send(ctx context.Context, appName string, event *events.Event) error
}

func NewCodefreshClient(cfConfig *CodefreshConfig) CodefreshClient {
return &codefreshClient{
cfConfig: cfConfig,
httpClient: &http.Client{
Timeout: 30 * time.Second,
},
}
}

func (cc *codefreshClient) Send(ctx context.Context, appName string, event *events.Event) error {
return WithRetry(&DefaultBackoff, func() error {
url := cc.cfConfig.BaseURL + "/2.0/api/events"
log.Infof("Sending application event for %s", appName)

wrappedPayload := map[string]json.RawMessage{
"data": event.Payload,
}

newPayloadBytes, err := json.Marshal(wrappedPayload)
if err != nil {
return err
}

req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(newPayloadBytes))
if err != nil {
return err
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", cc.cfConfig.AuthToken)

res, err := cc.httpClient.Do(req)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("failed reporting to Codefresh, event: %s", string(event.Payload)))
}
defer res.Body.Close()

isStatusOK := res.StatusCode >= 200 && res.StatusCode < 300
if !isStatusOK {
b, _ := io.ReadAll(res.Body)
return errors.Errorf("failed reporting to Codefresh, got response: status code %d and body %s, original request body: %s",
res.StatusCode, string(b), string(event.Payload))
}

log.Infof("Application event for %s successfully sent", appName)
return nil
})
}
Loading

0 comments on commit e2cc015

Please sign in to comment.