forked from argoproj/argo-cd
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: support new reporting component (#265)
* move reporter to new place * move reporter to new place * move reporter to new place * move reporter to new place * move reporter to new place * move reporter to new place * move reporter to new place * move reporter to new place * take token from envs (#252) * feat: reporting v2 manifest generation (#254) * move reporter to new place * manifests generation * manifests generation * manifests generation and naming fixes (#255) * feat: codefresh client (#251) * codefresh client * event type, cmd arguments * fix * fix * update --------- Co-authored-by: pasha-codefresh <[email protected]> * fetch argocd token from cluster (#256) * add parameters to manifest (#257) * update image * feat/reporter: metrics (#253) * reporter: metrics * reporter: default metrics port * reporter: renamed variable * reporter: fixed metrics server config * reporter: token * reporter: update metrics * cleanup * updated ports * change deployment to statefulset + change variables (#260) * fix bind adress (#259) * logs (#261) * adjust manifests (#263) * feat/reporting-v2-switching (#262) * reporter: fixed shard env variable * improve logs about skipped shard and fix metric * add application name to metrics * feat/reporting-v2-sharding-app-distribution (#264) * reporter: added 'app-distribution' query with conditional query param 'shardings' * reporter: moved request handlers to specific package * cleanup * integrate feature manager for report from v2 or v1 * reporter: fixed unit tests * resolve PR comments * resolve PR comments * resolve PR comments * resolve PR comments --------- Co-authored-by: Denis Melnik <[email protected]> Co-authored-by: Andrii Shaforostov <[email protected]> Co-authored-by: Oleksandr Saulyak <[email protected]> Co-authored-by: Yaroslav Drachenko <[email protected]>
- Loading branch information
1 parent
b82303b
commit e1c2b03
Showing
30 changed files
with
3,761 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
package commands | ||
|
||
const ( | ||
// cliName is the name of the CLI | ||
cliName = "event-reporter-server" | ||
) |
213 changes: 213 additions & 0 deletions
213
cmd/event-reporter-server/commands/event_reporter_server.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
package commands | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math" | ||
"time" | ||
|
||
"github.com/argoproj/argo-cd/v2/event_reporter" | ||
"github.com/argoproj/argo-cd/v2/event_reporter/codefresh" | ||
"github.com/argoproj/argo-cd/v2/pkg/apiclient" | ||
|
||
"github.com/argoproj/pkg/stats" | ||
"github.com/redis/go-redis/v9" | ||
log "github.com/sirupsen/logrus" | ||
"github.com/spf13/cobra" | ||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/tools/clientcmd" | ||
|
||
cmdutil "github.com/argoproj/argo-cd/v2/cmd/util" | ||
"github.com/argoproj/argo-cd/v2/common" | ||
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" | ||
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" | ||
repoapiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient" | ||
servercache "github.com/argoproj/argo-cd/v2/server/cache" | ||
"github.com/argoproj/argo-cd/v2/util/cli" | ||
"github.com/argoproj/argo-cd/v2/util/env" | ||
"github.com/argoproj/argo-cd/v2/util/errors" | ||
"github.com/argoproj/argo-cd/v2/util/kube" | ||
"github.com/argoproj/argo-cd/v2/util/tls" | ||
) | ||
|
||
const ( | ||
failureRetryCountEnv = "EVENT_REPORTER_K8S_RETRY_COUNT" | ||
failureRetryPeriodMilliSecondsEnv = "EVENT_REPORTE_K8S_RETRY_DURATION_MILLISECONDS" | ||
) | ||
|
||
var ( | ||
failureRetryCount = 0 | ||
failureRetryPeriodMilliSeconds = 100 | ||
) | ||
|
||
func init() { | ||
failureRetryCount = env.ParseNumFromEnv(failureRetryCountEnv, failureRetryCount, 0, 10) | ||
failureRetryPeriodMilliSeconds = env.ParseNumFromEnv(failureRetryPeriodMilliSecondsEnv, failureRetryPeriodMilliSeconds, 0, 1000) | ||
} | ||
|
||
// NewCommand returns a new instance of an event reporter command | ||
func NewCommand() *cobra.Command { | ||
var ( | ||
redisClient *redis.Client | ||
insecure bool | ||
listenHost string | ||
listenPort int | ||
metricsHost string | ||
metricsPort int | ||
glogLevel int | ||
clientConfig clientcmd.ClientConfig | ||
repoServerTimeoutSeconds int | ||
repoServerAddress string | ||
applicationServerAddress string | ||
cacheSrc func() (*servercache.Cache, error) | ||
contentSecurityPolicy string | ||
repoServerPlaintext bool | ||
repoServerStrictTLS bool | ||
applicationNamespaces []string | ||
argocdToken string | ||
codefreshUrl string | ||
codefreshToken string | ||
shardingAlgorithm string | ||
) | ||
var command = &cobra.Command{ | ||
Use: cliName, | ||
Short: "Run the Event Reporter server", | ||
Long: "The Event reporter is a server that listens to Kubernetes events and reports them to the Codefresh server.", | ||
DisableAutoGenTag: true, | ||
Run: func(c *cobra.Command, args []string) { | ||
ctx := c.Context() | ||
|
||
vers := common.GetVersion() | ||
namespace, _, err := clientConfig.Namespace() | ||
errors.CheckError(err) | ||
vers.LogStartupInfo( | ||
"Event Reporter Server", | ||
map[string]any{ | ||
"namespace": namespace, | ||
"port": listenPort, | ||
}, | ||
) | ||
|
||
cli.SetLogFormat(cmdutil.LogFormat) | ||
cli.SetLogLevel(cmdutil.LogLevel) | ||
cli.SetGLogLevel(glogLevel) | ||
|
||
config, err := clientConfig.ClientConfig() | ||
errors.CheckError(err) | ||
errors.CheckError(v1alpha1.SetK8SConfigDefaults(config)) | ||
|
||
cache, err := cacheSrc() | ||
errors.CheckError(err) | ||
|
||
kubeclientset := kubernetes.NewForConfigOrDie(config) | ||
|
||
appclientsetConfig, err := clientConfig.ClientConfig() | ||
errors.CheckError(err) | ||
errors.CheckError(v1alpha1.SetK8SConfigDefaults(appclientsetConfig)) | ||
config.UserAgent = fmt.Sprintf("argocd-server/%s (%s)", vers.Version, vers.Platform) | ||
|
||
if failureRetryCount > 0 { | ||
appclientsetConfig = kube.AddFailureRetryWrapper(appclientsetConfig, failureRetryCount, failureRetryPeriodMilliSeconds) | ||
} | ||
appClientSet := appclientset.NewForConfigOrDie(appclientsetConfig) | ||
tlsConfig := repoapiclient.TLSConfiguration{ | ||
DisableTLS: repoServerPlaintext, | ||
StrictValidation: repoServerStrictTLS, | ||
} | ||
|
||
// Load CA information to use for validating connections to the | ||
// repository server, if strict TLS validation was requested. | ||
if !repoServerPlaintext && repoServerStrictTLS { | ||
pool, err := tls.LoadX509CertPool( | ||
fmt.Sprintf("%s/server/tls/tls.crt", env.StringFromEnv(common.EnvAppConfigPath, common.DefaultAppConfigPath)), | ||
fmt.Sprintf("%s/server/tls/ca.crt", env.StringFromEnv(common.EnvAppConfigPath, common.DefaultAppConfigPath)), | ||
) | ||
if err != nil { | ||
log.Fatalf("%v", err) | ||
} | ||
tlsConfig.Certificates = pool | ||
} | ||
|
||
repoclientset := repoapiclient.NewRepoServerClientset(repoServerAddress, repoServerTimeoutSeconds, tlsConfig) | ||
|
||
applicationClientSet, err := apiclient.NewClient(&apiclient.ClientOptions{ | ||
ServerAddr: applicationServerAddress, | ||
Insecure: true, | ||
GRPCWeb: true, | ||
PlainText: true, | ||
AuthToken: argocdToken, | ||
}) | ||
|
||
errors.CheckError(err) | ||
|
||
closer, applicationClient, err := applicationClientSet.NewApplicationClient() | ||
|
||
errors.CheckError(err) | ||
|
||
defer func() { | ||
_ = closer.Close() | ||
}() | ||
|
||
eventReporterServerOpts := event_reporter.EventReporterServerOpts{ | ||
ListenPort: listenPort, | ||
ListenHost: listenHost, | ||
MetricsPort: metricsPort, | ||
MetricsHost: metricsHost, | ||
Namespace: namespace, | ||
KubeClientset: kubeclientset, | ||
AppClientset: appClientSet, | ||
RepoClientset: repoclientset, | ||
Cache: cache, | ||
RedisClient: redisClient, | ||
ApplicationNamespaces: applicationNamespaces, | ||
ApplicationServiceClient: applicationClient, | ||
CodefreshConfig: &codefresh.CodefreshConfig{ | ||
BaseURL: codefreshUrl, | ||
AuthToken: codefreshToken, | ||
}, | ||
} | ||
|
||
stats.RegisterStackDumper() | ||
stats.StartStatsTicker(10 * time.Minute) | ||
stats.RegisterHeapDumper("memprofile") | ||
eventReporterServer := event_reporter.NewEventReporterServer(ctx, eventReporterServerOpts) | ||
eventReporterServer.Init(ctx) | ||
lns, err := eventReporterServer.Listen() | ||
errors.CheckError(err) | ||
for { | ||
var closer func() | ||
ctx, cancel := context.WithCancel(ctx) | ||
eventReporterServer.Run(ctx, lns) | ||
cancel() | ||
if closer != nil { | ||
closer() | ||
} | ||
} | ||
}, | ||
} | ||
|
||
clientConfig = cli.AddKubectlFlagsToCmd(command) | ||
command.Flags().BoolVar(&insecure, "insecure", env.ParseBoolFromEnv("EVENT_REPORTER_INSECURE", false), "Run server without TLS") | ||
command.Flags().StringVar(&cmdutil.LogFormat, "logformat", env.StringFromEnv("EVENT_REPORTER_LOGFORMAT", "text"), "Set the logging format. One of: text|json") | ||
command.Flags().StringVar(&cmdutil.LogLevel, "loglevel", env.StringFromEnv("EVENT_REPORTER_LOG_LEVEL", "info"), "Set the logging level. One of: debug|info|warn|error") | ||
command.Flags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level") | ||
command.Flags().StringVar(&applicationServerAddress, "application-server", env.StringFromEnv("EVENT_REPORTER_APPLICATION_SERVER", common.DefaultApplicationServerAddr), "Application server address") | ||
command.Flags().StringVar(&argocdToken, "argocd-token", env.StringFromEnv("ARGOCD_TOKEN", ""), "ArgoCD server JWT token") | ||
command.Flags().StringVar(&repoServerAddress, "repo-server", env.StringFromEnv("EVENT_REPORTER_REPO_SERVER", common.DefaultRepoServerAddr), "Repo server address") | ||
command.AddCommand(cli.NewVersionCmd(cliName)) | ||
command.Flags().StringVar(&listenHost, "address", env.StringFromEnv("EVENT_REPORTER_LISTEN_ADDRESS", common.DefaultAddressEventReporterServer), "Listen on given address") | ||
command.Flags().IntVar(&listenPort, "port", common.DefaultPortEventReporterServer, "Listen on given port") | ||
command.Flags().StringVar(&metricsHost, env.StringFromEnv("EVENT_REPORTER_METRICS_LISTEN_ADDRESS", "metrics-address"), common.DefaultAddressEventReporterServerMetrics, "Listen for metrics on given address") | ||
command.Flags().IntVar(&metricsPort, "metrics-port", common.DefaultPortEventReporterServerMetrics, "Start metrics on given port") | ||
command.Flags().IntVar(&repoServerTimeoutSeconds, "repo-server-timeout-seconds", env.ParseNumFromEnv("EVENT_REPORTER_REPO_SERVER_TIMEOUT_SECONDS", 60, 0, math.MaxInt64), "Repo server RPC call timeout seconds.") | ||
command.Flags().StringVar(&contentSecurityPolicy, "content-security-policy", env.StringFromEnv("EVENT_REPORTER_CONTENT_SECURITY_POLICY", "frame-ancestors 'self';"), "Set Content-Security-Policy header in HTTP responses to `value`. To disable, set to \"\".") | ||
command.Flags().BoolVar(&repoServerPlaintext, "repo-server-plaintext", env.ParseBoolFromEnv("EVENT_REPORTER_REPO_SERVER_PLAINTEXT", false), "Use a plaintext client (non-TLS) to connect to repository server") | ||
command.Flags().BoolVar(&repoServerStrictTLS, "repo-server-strict-tls", env.ParseBoolFromEnv("EVENT_REPORTER_REPO_SERVER_STRICT_TLS", false), "Perform strict validation of TLS certificates when connecting to repo server") | ||
command.Flags().StringVar(&codefreshUrl, "codefresh-url", env.StringFromEnv("CODEFRESH_URL", "https://g.codefresh.io"), "Codefresh API url") | ||
command.Flags().StringVar(&codefreshToken, "codefresh-token", env.StringFromEnv("CODEFRESH_TOKEN", ""), "Codefresh token") | ||
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvEventReporterShardingAlgorithm, common.DefaultEventReporterShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy] ") | ||
cacheSrc = servercache.AddCacheFlagsToCmd(command, func(client *redis.Client) { | ||
redisClient = client | ||
}) | ||
return command | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package codefresh | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/argoproj/argo-cd/v2/pkg/apiclient/events" | ||
"github.com/pkg/errors" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
type CodefreshConfig struct { | ||
BaseURL string | ||
AuthToken string | ||
} | ||
|
||
type codefreshClient struct { | ||
cfConfig *CodefreshConfig | ||
httpClient *http.Client | ||
} | ||
|
||
type CodefreshClient interface { | ||
Send(ctx context.Context, appName string, event *events.Event) error | ||
} | ||
|
||
func NewCodefreshClient(cfConfig *CodefreshConfig) CodefreshClient { | ||
return &codefreshClient{ | ||
cfConfig: cfConfig, | ||
httpClient: &http.Client{ | ||
Timeout: 30 * time.Second, | ||
}, | ||
} | ||
} | ||
|
||
func (cc *codefreshClient) Send(ctx context.Context, appName string, event *events.Event) error { | ||
return WithRetry(&DefaultBackoff, func() error { | ||
url := cc.cfConfig.BaseURL + "/2.0/api/events" | ||
log.Infof("Sending application event for %s", appName) | ||
|
||
wrappedPayload := map[string]json.RawMessage{ | ||
"data": event.Payload, | ||
} | ||
|
||
newPayloadBytes, err := json.Marshal(wrappedPayload) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(newPayloadBytes)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
req.Header.Set("Content-Type", "application/json") | ||
req.Header.Set("Authorization", cc.cfConfig.AuthToken) | ||
|
||
res, err := cc.httpClient.Do(req) | ||
if err != nil { | ||
return errors.Wrap(err, fmt.Sprintf("failed reporting to Codefresh, event: %s", string(event.Payload))) | ||
} | ||
defer res.Body.Close() | ||
|
||
isStatusOK := res.StatusCode >= 200 && res.StatusCode < 300 | ||
if !isStatusOK { | ||
b, _ := io.ReadAll(res.Body) | ||
return errors.Errorf("failed reporting to Codefresh, got response: status code %d and body %s, original request body: %s", | ||
res.StatusCode, string(b), string(event.Payload)) | ||
} | ||
|
||
log.Infof("Application event for %s successfully sent", appName) | ||
return nil | ||
}) | ||
} |
Oops, something went wrong.