diff --git a/api/v1/testkube.yaml b/api/v1/testkube.yaml index 8cb1486e53f..18dddf01766 100644 --- a/api/v1/testkube.yaml +++ b/api/v1/testkube.yaml @@ -7157,6 +7157,16 @@ components: context: type: string description: Context value depending from its type + runnerIds: + type: array + items: + type: string + tags: + type: object + additionalProperties: + type: string + + Webhook: description: CRD based webhook data @@ -8001,6 +8011,10 @@ components: TestWorkflowExecutionRequest: type: object properties: + id: + type: string + description: custom execution id + default: "" name: type: string description: custom execution name @@ -8015,6 +8029,8 @@ components: default: false tags: $ref: "#/components/schemas/TestWorkflowTagValue" + runningContext: + $ref: "#/components/schemas/RunningContext" TestWorkflowWithExecution: type: object @@ -8108,6 +8124,15 @@ components: - false tags: $ref: "#/components/schemas/TestWorkflowTagValue" + runnerId: + type: string + description: Runner id that executed the test workflow + runningContext: + $ref: "#/components/schemas/RunningContext" + groupId: + type: string + description: test workflow correlation id when run against multiple agent in On Prem mode + required: - id - name @@ -8120,7 +8145,15 @@ components: type: string description: unique execution identifier format: bson objectId - example: "62f395e004109209b50edfc1" + example: "twes0m33x3cut10n1d" + runnerId: + type: string + description: runner identifier + example: "some-unique-id-per-env-1" + groupId: + type: string + description: group identifier when run against multiple runner + example: "twgs0m3gr0up1d" name: type: string description: execution name @@ -8142,6 +8175,8 @@ components: $ref: "#/components/schemas/TestWorkflowSummary" tags: $ref: "#/components/schemas/TestWorkflowTagValue" + runningContext: + $ref: "#/components/schemas/RunningContext" required: - id - name diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index ad57c53c700..866cef50206 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -15,6 +15,7 @@ import ( corev1 "k8s.io/api/core/v1" + "github.com/kubeshop/testkube/pkg/agent/handlers" "github.com/kubeshop/testkube/pkg/cache" "github.com/nats-io/nats.go" @@ -27,6 +28,7 @@ import ( testworkflow2 "github.com/kubeshop/testkube/pkg/repository/testworkflow" "github.com/kubeshop/testkube/pkg/secretmanager" "github.com/kubeshop/testkube/pkg/tcl/checktcl" + "github.com/kubeshop/testkube/pkg/tcl/controlplanetcl" "github.com/kubeshop/testkube/pkg/tcl/schedulertcl" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/presets" @@ -122,7 +124,7 @@ func runMigrations() (err error) { return migrations.Migrator.Run(version.Version, migrator.MigrationTypeServer) } -func runMongoMigrations(ctx context.Context, db *mongo.Database, migrationsDir string) error { +func runMongoMigrations(ctx context.Context, db *mongo.Database, _ string) error { migrationsCollectionName := "__migrations" activeMigrations, err := dbmigrator.GetDbMigrationsFromFs(dbmigrations.MongoMigrationsFs) if err != nil { @@ -147,13 +149,18 @@ func main() { cfg.CleanLegacyVars() exitOnError("error getting application config", err) + md := metadata.Pairs("api-key", cfg.TestkubeProAPIKey, "runner-id", cfg.TestkubeProRunnerId) + ctx := metadata.NewOutgoingContext(context.Background(), md) + features, err := featureflags.Get() exitOnError("error getting application feature flags", err) - log.DefaultLogger.Infow("Feature flags configured", "ff", features) + logger := log.DefaultLogger.With("apiVersion", version.Version) + + logger.Infow("Feature flags configured", "ff", features) // Run services within an errgroup to propagate errors between services. - g, ctx := errgroup.WithContext(context.Background()) + g, ctx := errgroup.WithContext(ctx) // Cancel the errgroup context on SIGINT and SIGTERM, // which shuts everything down gracefully. @@ -271,18 +278,18 @@ func main() { var artifactStorage domainstorage.ArtifactsStorage var storageClient domainstorage.Client if mode == common.ModeAgent { - resultsRepository = cloudresult.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey) - testResultsRepository = cloudtestresult.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey) - configRepository = cloudconfig.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey) + resultsRepository = cloudresult.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId) + testResultsRepository = cloudtestresult.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId) + configRepository = cloudconfig.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId) // Pro edition only (tcl protected code) - testWorkflowResultsRepository = cloudtestworkflow.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey) + testWorkflowResultsRepository = cloudtestworkflow.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId) var opts []cloudtestworkflow.Option if cfg.StorageSkipVerify { opts = append(opts, cloudtestworkflow.WithSkipVerify()) } - testWorkflowOutputRepository = cloudtestworkflow.NewCloudOutputRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, opts...) + testWorkflowOutputRepository = cloudtestworkflow.NewCloudOutputRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId, opts...) triggerLeaseBackend = triggers.NewAcquireAlwaysLeaseBackend() - artifactStorage = cloudartifacts.NewCloudArtifactsStorage(grpcClient, grpcConn, cfg.TestkubeProAPIKey) + artifactStorage = cloudartifacts.NewCloudArtifactsStorage(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId) } else { mongoSSLConfig := getMongoSSLConfig(cfg, secretClient) db, err := storage.GetMongoDatabase(cfg.APIMongoDSN, cfg.APIMongoDB, cfg.APIMongoDBType, cfg.APIMongoAllowTLS, mongoSSLConfig) @@ -431,13 +438,24 @@ func main() { exitOnError("Creating job templates", err) } - proContext := newProContext(cfg, grpcClient) + proContext := newProContext(ctx, cfg, grpcClient) + proContext.ClusterId = clusterId // Check Pro/Enterprise subscription var subscriptionChecker checktcl.SubscriptionChecker if mode == common.ModeAgent { subscriptionChecker, err = checktcl.NewSubscriptionChecker(ctx, proContext, grpcClient, grpcConn) exitOnError("Failed creating subscription checker", err) + + // Load environment/org details based on token grpc call + environment, err := controlplanetcl.GetEnvironment(ctx, proContext, grpcClient, grpcConn) + warnOnError("Getting environment details from control plane", err) + proContext.EnvID = environment.Id + proContext.EnvName = environment.Name + proContext.EnvSlug = environment.Slug + proContext.OrgID = environment.OrganizationId + proContext.OrgName = environment.OrganizationName + proContext.OrgSlug = environment.OrganizationSlug } serviceAccountNames := map[string]string{ @@ -599,6 +617,7 @@ func main() { cfg.ImageDataPersistentCacheKey, cfg.TestkubeDashboardURI, clusterId, + proContext.RunnerId, ) go testWorkflowExecutor.Recover(context.Background()) @@ -661,6 +680,11 @@ func main() { features, proContext, ) + + // register agent handlers (this one will remove need of API server in centralized environment mode) + // commands will be routed through grpc `Command` field + agentHandle.RegisterCommandHandler(cloud.ExecuteCommand, handlers.NewExecuteTestWorkflowHandler(testWorkflowExecutor, testWorkflowsClient, logger)) + if err != nil { exitOnError("Starting agent", err) } @@ -895,9 +919,10 @@ func newGRPCTransportCredentials(cfg *config.Config) (credentials.TransportCrede }) } -func newProContext(cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext { +func newProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext { proContext := config.ProContext{ APIKey: cfg.TestkubeProAPIKey, + RunnerId: cfg.TestkubeProRunnerId, URL: cfg.TestkubeProURL, TLSInsecure: cfg.TestkubeProTLSInsecure, WorkerCount: cfg.TestkubeProWorkerCount, @@ -915,22 +940,20 @@ func newProContext(cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) return proContext } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - md := metadata.Pairs("api-key", cfg.TestkubeProAPIKey) - ctx = metadata.NewOutgoingContext(ctx, md) + ctx, cancel := context.WithTimeout(ctx, time.Second*3) defer cancel() - getProContext, err := grpcClient.GetProContext(ctx, &emptypb.Empty{}) + proContextResponse, err := grpcClient.GetProContext(ctx, &emptypb.Empty{}) if err != nil { log.DefaultLogger.Warnf("cannot fetch pro-context from cloud: %s", err) return proContext } if proContext.EnvID == "" { - proContext.EnvID = getProContext.EnvId + proContext.EnvID = proContextResponse.EnvId } if proContext.OrgID == "" { - proContext.OrgID = getProContext.OrgId + proContext.OrgID = proContextResponse.OrgId } return proContext @@ -942,3 +965,9 @@ func exitOnError(title string, err error) { os.Exit(1) } } + +func warnOnError(title string, err error) { + if err != nil { + log.DefaultLogger.Errorw(title, "error", err) + } +} diff --git a/cmd/kubectl-testkube/commands/common/flags.go b/cmd/kubectl-testkube/commands/common/flags.go index 0b48f38db59..5437a789133 100644 --- a/cmd/kubectl-testkube/commands/common/flags.go +++ b/cmd/kubectl-testkube/commands/common/flags.go @@ -90,6 +90,7 @@ func PopulateMasterFlags(cmd *cobra.Command, opts *HelmOptions) { cmd.Flags().StringVar(&opts.Master.AgentToken, "agent-token", "", "Testkube Pro agent key [required for centralized mode]") cmd.Flags().StringVar(&opts.Master.OrgId, "org-id", "", "Testkube Pro organization id [required for centralized mode]") cmd.Flags().StringVar(&opts.Master.EnvId, "env-id", "", "Testkube Pro environment id [required for centralized mode]") + cmd.Flags().StringVar(&opts.Master.RunnerId, "runner-id", "", "Testkube Pro Multi Runner id [required for centralized mode]") cmd.Flags().BoolVar(&opts.Master.Features.LogsV2, "feature-logs-v2", false, "Logs v2 feature flag") } diff --git a/cmd/kubectl-testkube/commands/common/helper.go b/cmd/kubectl-testkube/commands/common/helper.go index fb5b6363385..eec4609bcb9 100644 --- a/cmd/kubectl-testkube/commands/common/helper.go +++ b/cmd/kubectl-testkube/commands/common/helper.go @@ -207,6 +207,10 @@ func prepareTestkubeProHelmArgs(options HelmOptions, isMigration bool) []string args = append(args, "--set", fmt.Sprintf("testkube-logs.pro.envId=%s", options.Master.EnvId)) } + if options.Master.RunnerId != "" { + args = append(args, "--set", fmt.Sprintf("testkube-api.cloud.runnerId=%s", options.Master.RunnerId)) + } + if options.Master.OrgId != "" { args = append(args, "--set", fmt.Sprintf("testkube-api.cloud.orgId=%s", options.Master.OrgId)) args = append(args, "--set", fmt.Sprintf("testkube-logs.pro.orgId=%s", options.Master.OrgId)) diff --git a/cmd/kubectl-testkube/commands/root.go b/cmd/kubectl-testkube/commands/root.go index 464f6657f65..96c2dd3ffce 100644 --- a/cmd/kubectl-testkube/commands/root.go +++ b/cmd/kubectl-testkube/commands/root.go @@ -48,6 +48,7 @@ func init() { RootCmd.AddCommand(NewGenerateCmd()) RootCmd.AddCommand(NewInitCmd()) + RootCmd.AddCommand(NewRunnerCmd()) RootCmd.AddCommand(NewUpgradeCmd()) RootCmd.AddCommand(NewPurgeCmd()) RootCmd.AddCommand(NewWatchCmd()) diff --git a/cmd/kubectl-testkube/commands/runner.go b/cmd/kubectl-testkube/commands/runner.go new file mode 100644 index 00000000000..5351bbef682 --- /dev/null +++ b/cmd/kubectl-testkube/commands/runner.go @@ -0,0 +1,22 @@ +package commands + +import ( + "github.com/spf13/cobra" + + "github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/pro" +) + +func NewRunnerCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "runner ", + Aliases: []string{""}, + Short: "Testkube Runner related commands", + Run: func(cmd *cobra.Command, args []string) { + cmd.Help() + }, + } + + cmd.AddCommand(pro.NewInitCmd()) + + return cmd +} diff --git a/cmd/kubectl-testkube/config/master.go b/cmd/kubectl-testkube/config/master.go index 05de6762290..7e2ceb97a55 100644 --- a/cmd/kubectl-testkube/config/master.go +++ b/cmd/kubectl-testkube/config/master.go @@ -7,6 +7,7 @@ type Master struct { IdToken string `json:"idToken,omitempty"` OrgId string `json:"orgId,omitempty"` EnvId string `json:"envId,omitempty"` + RunnerId string `json:"runnerId,omitempty"` Insecure bool `json:"insecure,omitempty"` UiUrlPrefix string `json:"uiUrlPrefix,omitempty"` AgentUrlPrefix string `json:"agentUrlPrefix,omitempty"` diff --git a/cmd/testworkflow-init/obfuscator/obfuscator.go b/cmd/testworkflow-init/obfuscator/obfuscator.go index 0b6b5607bb2..74667d3b5eb 100644 --- a/cmd/testworkflow-init/obfuscator/obfuscator.go +++ b/cmd/testworkflow-init/obfuscator/obfuscator.go @@ -135,7 +135,7 @@ func (s *obfuscator) Write(p []byte) (n int, err error) { // Write the rest of data if len(p) > 0 { - nn, err = s.dst.Write(p) + _, err = s.dst.Write(p) return size, err } return size, nil diff --git a/cmd/testworkflow-toolkit/commands/artifacts.go b/cmd/testworkflow-toolkit/commands/artifacts.go index 615cd6daf8f..d4b6d5f78b0 100644 --- a/cmd/testworkflow-toolkit/commands/artifacts.go +++ b/cmd/testworkflow-toolkit/commands/artifacts.go @@ -101,7 +101,7 @@ func NewArtifactsCmd() *cobra.Command { if env.CloudEnabled() { ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second) defer cancel() - ctx = agent.AddAPIKeyMeta(ctx, env.Config().Cloud.ApiKey) + ctx = agent.AddContextMetadata(ctx, env.Config().Cloud.ApiKey, "") executor, client := env.Cloud(ctx) proContext, err := client.GetProContext(ctx, &emptypb.Empty{}) var supported []*cloud.Capability diff --git a/cmd/testworkflow-toolkit/env/client.go b/cmd/testworkflow-toolkit/env/client.go index 879f9cf129d..3d459e7d6f7 100644 --- a/cmd/testworkflow-toolkit/env/client.go +++ b/cmd/testworkflow-toolkit/env/client.go @@ -90,5 +90,5 @@ func Cloud(ctx context.Context) (cloudexecutor.Executor, cloud.TestKubeCloudAPIC ui.Fail(fmt.Errorf("failed to connect with Cloud: %w", err)) } grpcClient := cloud.NewTestKubeCloudAPIClient(grpcConn) - return cloudexecutor.NewCloudGRPCExecutor(grpcClient, grpcConn, cfg.ApiKey), grpcClient + return cloudexecutor.NewCloudGRPCExecutor(grpcClient, grpcConn, cfg.ApiKey, cfg.RunnerId), grpcClient } diff --git a/cmd/testworkflow-toolkit/env/config.go b/cmd/testworkflow-toolkit/env/config.go index 3b952c2ca03..057b05a5884 100644 --- a/cmd/testworkflow-toolkit/env/config.go +++ b/cmd/testworkflow-toolkit/env/config.go @@ -33,6 +33,7 @@ type envCloudConfig struct { UiUrl string `envconfig:"TK_C_UI_URL"` OrgId string `envconfig:"TK_C_ORG_ID"` EnvId string `envconfig:"TK_C_ENV_ID"` + RunnerId string `envconfig:"TK_C_RUNNER_ID" default:""` SkipVerify bool `envconfig:"TK_C_SKIP_VERIFY" default:"false"` TlsInsecure bool `envconfig:"TK_C_TLS_INSECURE" default:"false"` } diff --git a/internal/app/api/v1/testworkflows.go b/internal/app/api/v1/testworkflows.go index b59a24f0755..cc926feeabd 100644 --- a/internal/app/api/v1/testworkflows.go +++ b/internal/app/api/v1/testworkflows.go @@ -13,7 +13,9 @@ import ( testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1" "github.com/kubeshop/testkube/internal/common" + "github.com/kubeshop/testkube/pkg/agent" "github.com/kubeshop/testkube/pkg/api/v1/testkube" + "github.com/kubeshop/testkube/pkg/log" "github.com/kubeshop/testkube/pkg/mapper/testworkflows" "github.com/kubeshop/testkube/pkg/scheduler" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowresolver" @@ -354,7 +356,8 @@ func (s *TestkubeAPI) PreviewTestWorkflowHandler() fiber.Handler { // TODO: Add metrics func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler { return func(c *fiber.Ctx) (err error) { - ctx := c.Context() + // pass metadata to context + ctx := agent.Context(c.Context(), *s.proContext) name := c.Params("id") errPrefix := fmt.Sprintf("failed to execute test workflow '%s'", name) workflow, err := s.TestWorkflowsClient.Get(name) @@ -369,6 +372,8 @@ func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler { return s.BadRequest(c, errPrefix, "invalid body", err) } + log.DefaultLogger.Infow("TestWorkflow execution request", "name", name, "request", request) + var results []testkube.TestWorkflowExecution var errs []error diff --git a/internal/config/config.go b/internal/config/config.go index 7f48a517350..e36d05c7e4e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -67,6 +67,7 @@ type Config struct { TestkubeOAuthProvider string `envconfig:"TESTKUBE_OAUTH_PROVIDER" default:""` TestkubeOAuthScopes string `envconfig:"TESTKUBE_OAUTH_SCOPES" default:""` TestkubeProAPIKey string `envconfig:"TESTKUBE_PRO_API_KEY" default:""` + TestkubeProRunnerId string `envconfig:"TESTKUBE_PRO_RUNNER_ID" default:"default-runner"` TestkubeProURL string `envconfig:"TESTKUBE_PRO_URL" default:""` TestkubeProTLSInsecure bool `envconfig:"TESTKUBE_PRO_TLS_INSECURE" default:"false"` TestkubeProWorkerCount int `envconfig:"TESTKUBE_PRO_WORKER_COUNT" default:"50"` diff --git a/internal/config/procontext.go b/internal/config/procontext.go index aa3b090be81..89827cfe5d1 100644 --- a/internal/config/procontext.go +++ b/internal/config/procontext.go @@ -9,8 +9,14 @@ type ProContext struct { WorkflowNotificationsWorkerCount int SkipVerify bool EnvID string + EnvName string + EnvSlug string OrgID string + OrgName string + OrgSlug string Migrate string ConnectionTimeout int DashboardURI string + ClusterId string + RunnerId string } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 8d98b4ea2b5..47baae3db75 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -32,17 +32,20 @@ import ( ) const ( - timeout = 10 * time.Second - apiKeyMeta = "api-key" - clusterIDMeta = "cluster-id" - cloudMigrateMeta = "migrate" - orgIdMeta = "environment-id" - envIdMeta = "organization-id" - healthcheckCommand = "healthcheck" -) + timeout = 10 * time.Second + + HeaderApiKey = "api-key" + HeaderRunnerId = "runner-id" + HeaderClusterId = "cluster-id" + HeaderMigrate = "migrate" + HeaderOrgId = "environment-id" + HeaderEnvID = "organization-id" + + HealthcheckCommand = "healthcheck" -// buffer up to five messages per worker -const bufferSizePerWorker = 5 + // buffer up to five messages per worker + bufferSizePerWorker = 5 +) func NewGRPCConnection( ctx context.Context, @@ -130,7 +133,6 @@ type Agent struct { client cloud.TestKubeCloudAPIClient handler fasthttp.RequestHandler logger *zap.SugaredLogger - apiKey string workerCount int requestBuffer chan *cloud.ExecuteRequest @@ -157,8 +159,12 @@ type Agent struct { features featureflags.FeatureFlags proContext config.ProContext + + commandHandlers map[cloud.Command]CommandHandler } +type CommandHandler func(ctx context.Context, c *cloud.ExecuteRequest) *cloud.ExecuteResponse + func NewAgent(logger *zap.SugaredLogger, handler fasthttp.RequestHandler, client cloud.TestKubeCloudAPIClient, @@ -173,7 +179,6 @@ func NewAgent(logger *zap.SugaredLogger, return &Agent{ handler: handler, logger: logger.With("service", "Agent", "environmentId", proContext.EnvID), - apiKey: proContext.APIKey, client: client, events: make(chan testkube.Event), workerCount: proContext.WorkerCount, @@ -195,9 +200,14 @@ func NewAgent(logger *zap.SugaredLogger, envs: envs, features: features, proContext: proContext, + commandHandlers: make(map[cloud.Command]CommandHandler), }, nil } +func (ag *Agent) RegisterCommandHandler(c cloud.Command, f func(ctx context.Context, c *cloud.ExecuteRequest) *cloud.ExecuteResponse) { + ag.commandHandlers[c] = f +} + func (ag *Agent) Run(ctx context.Context) error { for { if ctx.Err() != nil { @@ -212,36 +222,45 @@ func (ag *Agent) Run(ctx context.Context) error { } } -func (ag *Agent) run(ctx context.Context) (err error) { - g, groupCtx := errgroup.WithContext(ctx) +// updateContextWithMetadata adds metadata to the context +func (ag *Agent) updateContextWithMetadata(ctx context.Context) context.Context { + return Context(ctx, ag.proContext) +} +func (ag *Agent) run(parent context.Context) (err error) { + g, ctx := errgroup.WithContext(ag.updateContextWithMetadata(parent)) + g.Go(func() error { - return ag.runCommandLoop(groupCtx) + return ag.runCommandLoop(ctx) }) g.Go(func() error { - return ag.runWorkers(groupCtx, ag.workerCount) + return ag.runWorkers(ctx, ag.workerCount) }) g.Go(func() error { - return ag.runEventLoop(groupCtx) + return ag.runEventLoop(ctx) }) if !ag.features.LogsV2 { g.Go(func() error { - return ag.runLogStreamLoop(groupCtx) + return ag.runLogStreamLoop(ctx) }) g.Go(func() error { - return ag.runLogStreamWorker(groupCtx, ag.logStreamWorkerCount) + return ag.runLogStreamWorker(ctx, ag.logStreamWorkerCount) }) } g.Go(func() error { - return ag.runTestWorkflowNotificationsLoop(groupCtx) + return ag.runTestWorkflowNotificationsLoop(ctx) }) g.Go(func() error { - return ag.runTestWorkflowNotificationsWorker(groupCtx, ag.testWorkflowNotificationsWorkerCount) + return ag.runTestWorkflowNotificationsWorker(ctx, ag.testWorkflowNotificationsWorkerCount) }) + mdIn, _ := metadata.FromIncomingContext(ctx) + mdOut, _ := metadata.FromOutgoingContext(ctx) + ag.logger.Infow("initiating agent", "metadataIn", mdIn, "metadataOut", mdOut) + err = g.Wait() return err @@ -291,7 +310,7 @@ func (ag *Agent) receiveCommand(ctx context.Context, stream cloud.TestKubeCloudA err := resp.err if err != nil { - ag.logger.Errorf("agent stream receive: %v", err) + ag.logger.Errorf("received error from control plane: %v", err) return nil, err } case <-ctx.Done(): @@ -308,13 +327,6 @@ func (ag *Agent) receiveCommand(ctx context.Context, stream cloud.TestKubeCloudA } func (ag *Agent) runCommandLoop(ctx context.Context) error { - ctx = AddAPIKeyMeta(ctx, ag.proContext.APIKey) - - ctx = metadata.AppendToOutgoingContext(ctx, clusterIDMeta, ag.clusterID) - ctx = metadata.AppendToOutgoingContext(ctx, cloudMigrateMeta, ag.proContext.Migrate) - ctx = metadata.AppendToOutgoingContext(ctx, envIdMeta, ag.proContext.EnvID) - ctx = metadata.AppendToOutgoingContext(ctx, orgIdMeta, ag.proContext.OrgID) - ag.logger.Infow("initiating streaming connection with control plane") // creates a new Stream from the client side. ctx is used for the lifetime of the stream. opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name), grpc.MaxCallRecvMsgSize(math.MaxInt32)} @@ -364,6 +376,7 @@ func (ag *Agent) runWorkers(ctx context.Context, numWorkers int) error { for { select { case cmd := <-ag.requestBuffer: + select { case ag.responseBuffer <- ag.executeCommand(groupCtx, cmd): case <-groupCtx.Done(): @@ -378,10 +391,31 @@ func (ag *Agent) runWorkers(ctx context.Context, numWorkers int) error { return g.Wait() } +// handleCommand handles command from control plane, it's bypassing old proxy call to the HTTP API when PB command field is set +func (ag *Agent) handleCommand(ctx context.Context, c *cloud.ExecuteRequest) *cloud.ExecuteResponse { + cmd := cloud.Command(c.Command) + if handler, ok := ag.commandHandlers[cmd]; ok { + ag.logger.Infow("executing command", "command", c.Command) + return handler(ctx, c) + } + + var names string + for name := range ag.commandHandlers { + names += " " + string(name) + } + + ag.logger.Errorf("command not found", "availableHandlers", names, "command", c.Command) + return &cloud.ExecuteResponse{MessageId: c.MessageId, Status: 404, Body: []byte("command not found, you've passed unhandled GRPC request command, check agent / control plane versions for API consistency, available handlers: " + names)} +} + func (ag *Agent) executeCommand(ctx context.Context, cmd *cloud.ExecuteRequest) *cloud.ExecuteResponse { switch { - case cmd.Url == healthcheckCommand: + case cmd.Url == HealthcheckCommand || cmd.Command == string(cloud.HealthcheckCommand): return &cloud.ExecuteResponse{MessageId: cmd.MessageId, Status: 0} + // if command is set, we handle it as a command bypassing HTTP proxy + case cmd.Command != "": + return ag.handleCommand(ctx, cmd) + // defaults fallback to HTTP proxy requests were passed to API server, new mode will pass directly to the handlers above default: req := &fasthttp.RequestCtx{} r := fasthttp.AcquireRequest() @@ -426,8 +460,8 @@ func (ag *Agent) executeCommand(ctx context.Context, cmd *cloud.ExecuteRequest) } } -func AddAPIKeyMeta(ctx context.Context, apiKey string) context.Context { - md := metadata.Pairs(apiKeyMeta, apiKey) +func AddContextMetadata(ctx context.Context, apiKey, runnerId string) context.Context { + md := metadata.Pairs(HeaderApiKey, apiKey, HeaderRunnerId, runnerId) return metadata.NewOutgoingContext(ctx, md) } @@ -435,3 +469,16 @@ type cloudResponse struct { resp *cloud.ExecuteRequest err error } + +// Context returns new enriched context with meteadata from ProContext +func Context(ctx context.Context, proContext config.ProContext) context.Context { + return metadata.NewOutgoingContext(ctx, metadata.Pairs( + HeaderApiKey, proContext.APIKey, + HeaderClusterId, proContext.ClusterId, + HeaderMigrate, proContext.Migrate, + HeaderEnvID, proContext.EnvID, + HeaderOrgId, proContext.OrgID, + HeaderRunnerId, proContext.RunnerId, + )) + +} diff --git a/pkg/agent/events.go b/pkg/agent/events.go index 2af87a793d2..5e8216c1737 100644 --- a/pkg/agent/events.go +++ b/pkg/agent/events.go @@ -9,7 +9,6 @@ import ( "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/encoding/gzip" - "google.golang.org/grpc/metadata" "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/cloud" @@ -61,11 +60,7 @@ func (ag *Agent) Notify(event testkube.Event) (result testkube.EventResult) { } func (ag *Agent) runEventLoop(ctx context.Context) error { - opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name)} - md := metadata.Pairs(apiKeyMeta, ag.apiKey) - ctx = metadata.NewOutgoingContext(ctx, md) - - stream, err := ag.client.Send(ctx, opts...) + stream, err := ag.client.Send(ctx, grpc.UseCompressor(gzip.Name)) if err != nil { ag.logger.Errorf("failed to execute: %v", err) return errors.Wrap(err, "failed to setup stream") diff --git a/pkg/agent/handlers/execute.go b/pkg/agent/handlers/execute.go new file mode 100644 index 00000000000..16159ec5355 --- /dev/null +++ b/pkg/agent/handlers/execute.go @@ -0,0 +1,52 @@ +package handlers + +import ( + "context" + + "github.com/pkg/errors" + "go.uber.org/zap" + + testworkflowsv1 "github.com/kubeshop/testkube-operator/pkg/client/testworkflows/v1" + "github.com/kubeshop/testkube/pkg/agent" + "github.com/kubeshop/testkube/pkg/api/v1/testkube" + "github.com/kubeshop/testkube/pkg/cloud" + "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowexecutor" + "github.com/kubeshop/testkube/pkg/utils/codec" +) + +// TODO - valid error handling + +func NewExecuteTestWorkflowHandler( + testWorkflowExecutor testworkflowexecutor.TestWorkflowExecutor, + testWorkflowsClient testworkflowsv1.Interface, + log *zap.SugaredLogger, +) agent.CommandHandler { + return func(ctx context.Context, c *cloud.ExecuteRequest) *cloud.ExecuteResponse { + + log = log.With("messageId", c.MessageId) + request, err := codec.FromJSONBytes[testkube.TestWorkflowExecutionRequest](c.Body) + if err != nil { + return BadRequestResponse(cloud.ExecuteCommand, c.MessageId, errors.Wrap(err, "can't decode request body")) + } + log.Infow("got execute request", "request", request) + + workflow, err := testWorkflowsClient.Get(request.TestWorkflowExecutionName) + if err != nil { + return NotFoundResponse(cloud.ExecuteCommand, c.MessageId, errors.Wrap(err, "can't get workflow")) + } + log.Infow("got workflow", "request", request) + + execution, err := testWorkflowExecutor.Execute(ctx, *workflow, request) + if err != nil { + return InternalServerErrorResponse(cloud.ExecuteCommand, c.MessageId, errors.Wrap(err, "can't execute workflow")) + } + + log.Infow("executed workflow", "execution", execution) + + body, err := codec.ToJSONBytes(execution) + if err != nil { + return InternalServerErrorResponse(cloud.ExecuteCommand, c.MessageId, errors.Wrap(err, "can't encode response data to JSON")) + } + return SuccessResponse(cloud.ExecuteCommand, c.MessageId, body) + } +} diff --git a/pkg/agent/handlers/response.go b/pkg/agent/handlers/response.go new file mode 100644 index 00000000000..442d0b1c148 --- /dev/null +++ b/pkg/agent/handlers/response.go @@ -0,0 +1,47 @@ +package handlers + +import ( + "fmt" + + "github.com/kubeshop/testkube/pkg/cloud" + "github.com/kubeshop/testkube/pkg/problem" + "github.com/kubeshop/testkube/pkg/utils/codec" +) + +func BadRequestResponse(c cloud.Command, messageId string, err error) *cloud.ExecuteResponse { + return &cloud.ExecuteResponse{MessageId: messageId, Status: 400, Body: errToJSONProblemBytes(c, 400, "Bad Request", err)} +} + +func NotFoundResponse(c cloud.Command, messageId string, err error) *cloud.ExecuteResponse { + return &cloud.ExecuteResponse{MessageId: messageId, Status: 404, Body: errToJSONProblemBytes(c, 404, "Not Found", err)} +} + +func InternalServerErrorResponse(c cloud.Command, messageId string, err error) *cloud.ExecuteResponse { + return &cloud.ExecuteResponse{MessageId: messageId, Status: 500, Body: errToJSONProblemBytes(c, 500, "", err)} +} + +func SuccessResponse(c cloud.Command, messageId string, body any) *cloud.ExecuteResponse { + bytes, err := codec.ToJSONBytes(body) + if err != nil { + return &cloud.ExecuteResponse{MessageId: messageId, Status: 500, Body: errToJSONProblemBytes(c, 500, "can't encode response data to JSON", err)} + } + + return &cloud.ExecuteResponse{MessageId: messageId, Status: 200, Body: bytes} +} + +func errToJSONProblemBytes(c cloud.Command, status int, title string, err error) []byte { + var pr problem.Problem + body, err := problem.CommandErrorJSONBytes(c, status, title, err) + if err != nil { + pr = problem.New(status, fmt.Sprintf("%s: %s", title, err)) + } else { + pr = problem.Problem{Status: status, Title: title, Detail: string(body)} + } + + bytes, err := codec.ToJSONBytes(pr) + if err != nil { + return []byte(fmt.Sprintf("error encoding problem to JSON: %s", err)) + } + return bytes + +} diff --git a/pkg/agent/logs.go b/pkg/agent/logs.go index 4040e6f6f68..71f47be33ed 100644 --- a/pkg/agent/logs.go +++ b/pkg/agent/logs.go @@ -18,8 +18,6 @@ import ( const logStreamRetryCount = 10 func (ag *Agent) runLogStreamLoop(ctx context.Context) error { - ctx = AddAPIKeyMeta(ctx, ag.apiKey) - ag.logger.Infow("initiating log streaming connection with control plane") // creates a new Stream from the client side. ctx is used for the lifetime of the stream. opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name), grpc.MaxCallRecvMsgSize(math.MaxInt32)} @@ -167,7 +165,7 @@ func (ag *Agent) receiveLogStreamRequest(ctx context.Context, stream cloud.TestK err := resp.err if err != nil { - ag.logger.Errorf("agent stream receive: %v", err) + ag.logger.Errorf("received error from control plane: %v", err) return nil, err } case <-ctx.Done(): diff --git a/pkg/agent/testworkflows.go b/pkg/agent/testworkflows.go index 10f42021538..dec022ba9a9 100644 --- a/pkg/agent/testworkflows.go +++ b/pkg/agent/testworkflows.go @@ -29,8 +29,6 @@ func getTestWorkflowNotificationType(n testkube.TestWorkflowExecutionNotificatio } func (ag *Agent) runTestWorkflowNotificationsLoop(ctx context.Context) error { - ctx = AddAPIKeyMeta(ctx, ag.apiKey) - ag.logger.Infow("initiating workflow notifications streaming connection with Cloud API") // creates a new Stream from the client side. ctx is used for the lifetime of the stream. opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name), grpc.MaxCallRecvMsgSize(math.MaxInt32)} @@ -175,7 +173,7 @@ func (ag *Agent) receiveTestWorkflowNotificationsRequest(ctx context.Context, st err := resp.err if err != nil { - ag.logger.Errorf("agent stream receive: %v", err) + ag.logger.Errorf("received error from control plane: %v", err) return nil, err } case <-ctx.Done(): diff --git a/pkg/api/v1/testkube/model_running_context.go b/pkg/api/v1/testkube/model_running_context.go index b9a02879623..31f9655f3d4 100644 --- a/pkg/api/v1/testkube/model_running_context.go +++ b/pkg/api/v1/testkube/model_running_context.go @@ -14,5 +14,7 @@ type RunningContext struct { // One of possible context types Type_ string `json:"type"` // Context value depending from its type - Context string `json:"context,omitempty"` + Context string `json:"context,omitempty"` + RunnerIds []string `json:"runnerIds,omitempty"` + Tags map[string]string `json:"tags,omitempty"` } diff --git a/pkg/api/v1/testkube/model_test_workflow_execution.go b/pkg/api/v1/testkube/model_test_workflow_execution.go index a07d14927b4..c520e5e2b34 100644 --- a/pkg/api/v1/testkube/model_test_workflow_execution.go +++ b/pkg/api/v1/testkube/model_test_workflow_execution.go @@ -40,4 +40,9 @@ type TestWorkflowExecution struct { // whether webhooks on the execution of this test workflow are disabled DisableWebhooks bool `json:"disableWebhooks,omitempty"` Tags map[string]string `json:"tags,omitempty"` + // Runner id that executed the test workflow + RunnerId string `json:"runnerId,omitempty"` + RunningContext *RunningContext `json:"runningContext,omitempty"` + // test workflow correlation id when run against multiple agent in On Prem mode + GroupId string `json:"groupId,omitempty"` } diff --git a/pkg/api/v1/testkube/model_test_workflow_execution_request.go b/pkg/api/v1/testkube/model_test_workflow_execution_request.go index aa5840710f5..67041c5f50e 100644 --- a/pkg/api/v1/testkube/model_test_workflow_execution_request.go +++ b/pkg/api/v1/testkube/model_test_workflow_execution_request.go @@ -10,6 +10,8 @@ package testkube type TestWorkflowExecutionRequest struct { + // custom execution id + Id string `json:"id,omitempty"` // custom execution name Name string `json:"name,omitempty"` Config map[string]string `json:"config,omitempty"` @@ -18,4 +20,5 @@ type TestWorkflowExecutionRequest struct { // whether webhooks on the execution of this test workflow are disabled DisableWebhooks bool `json:"disableWebhooks,omitempty"` Tags map[string]string `json:"tags,omitempty"` + RunningContext *RunningContext `json:"runningContext,omitempty"` } diff --git a/pkg/api/v1/testkube/model_test_workflow_execution_summary.go b/pkg/api/v1/testkube/model_test_workflow_execution_summary.go index f52ed9dc025..afd66e5ac5e 100644 --- a/pkg/api/v1/testkube/model_test_workflow_execution_summary.go +++ b/pkg/api/v1/testkube/model_test_workflow_execution_summary.go @@ -16,6 +16,10 @@ import ( type TestWorkflowExecutionSummary struct { // unique execution identifier Id string `json:"id"` + // runner identifier + RunnerId string `json:"runnerId,omitempty"` + // group identifier when run against multiple runner + GroupId string `json:"groupId,omitempty"` // execution name Name string `json:"name"` // sequence number for the execution @@ -23,8 +27,9 @@ type TestWorkflowExecutionSummary struct { // when the execution has been scheduled to run ScheduledAt time.Time `json:"scheduledAt,omitempty"` // when the execution result's status has changed last time (queued, passed, failed) - StatusAt time.Time `json:"statusAt,omitempty"` - Result *TestWorkflowResultSummary `json:"result,omitempty"` - Workflow *TestWorkflowSummary `json:"workflow"` - Tags map[string]string `json:"tags,omitempty"` + StatusAt time.Time `json:"statusAt,omitempty"` + Result *TestWorkflowResultSummary `json:"result,omitempty"` + Workflow *TestWorkflowSummary `json:"workflow"` + Tags map[string]string `json:"tags,omitempty"` + RunningContext *RunningContext `json:"runningContext,omitempty"` } diff --git a/pkg/cloud/commands.go b/pkg/cloud/commands.go new file mode 100644 index 00000000000..533614c594f --- /dev/null +++ b/pkg/cloud/commands.go @@ -0,0 +1,12 @@ +package cloud + +// Command is a type for cloud commands +// passed in centralized anvironments instead of HTTP agetn API Proxy requests +type Command string + +const ( + ExecuteCommand Command = "execute" + // TODO check if this is even needed anymore + HealthcheckCommand Command = "healthcheck" + LogsCommand Command = "logs" +) diff --git a/pkg/cloud/data/artifact/artifacts_storage.go b/pkg/cloud/data/artifact/artifacts_storage.go index a6410a86eb3..ab98f6c3641 100644 --- a/pkg/cloud/data/artifact/artifacts_storage.go +++ b/pkg/cloud/data/artifact/artifacts_storage.go @@ -21,8 +21,8 @@ type CloudArtifactsStorage struct { var ErrOperationNotSupported = errors.New("operation not supported") -func NewCloudArtifactsStorage(cloudClient cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey string) *CloudArtifactsStorage { - return &CloudArtifactsStorage{executor: executor.NewCloudGRPCExecutor(cloudClient, grpcConn, apiKey)} +func NewCloudArtifactsStorage(cloudClient cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey, runnerId string) *CloudArtifactsStorage { + return &CloudArtifactsStorage{executor: executor.NewCloudGRPCExecutor(cloudClient, grpcConn, apiKey, runnerId)} } func (c *CloudArtifactsStorage) ListFiles(ctx context.Context, executionID, testName, testSuiteName, testWorkflowName string) ([]testkube.Artifact, error) { diff --git a/pkg/cloud/data/config/config.go b/pkg/cloud/data/config/config.go index 62ad485e3f2..93458f25ecf 100644 --- a/pkg/cloud/data/config/config.go +++ b/pkg/cloud/data/config/config.go @@ -18,8 +18,8 @@ type CloudRepository struct { executor executor.Executor } -func NewCloudResultRepository(cloudClient cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey string) *CloudRepository { - return &CloudRepository{executor: executor.NewCloudGRPCExecutor(cloudClient, grpcConn, apiKey)} +func NewCloudResultRepository(cloudClient cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey, runnerId string) *CloudRepository { + return &CloudRepository{executor: executor.NewCloudGRPCExecutor(cloudClient, grpcConn, apiKey, runnerId)} } func (r *CloudRepository) GetUniqueClusterId(ctx context.Context) (string, error) { diff --git a/pkg/cloud/data/controlplane/commands.go b/pkg/cloud/data/controlplane/commands.go new file mode 100644 index 00000000000..7a2b9cf0664 --- /dev/null +++ b/pkg/cloud/data/controlplane/commands.go @@ -0,0 +1,7 @@ +package controlplane + +import "github.com/kubeshop/testkube/pkg/cloud/data/executor" + +const ( + CmdControlPlaneGetEnvironment executor.Command = "get_environment" +) diff --git a/pkg/cloud/data/executor/executor.go b/pkg/cloud/data/executor/executor.go index 0ef9c5c7337..a029a72d8e6 100644 --- a/pkg/cloud/data/executor/executor.go +++ b/pkg/cloud/data/executor/executor.go @@ -13,22 +13,15 @@ import ( "github.com/kubeshop/testkube/pkg/cloud" ) -type Command string - -//go:generate mockgen -destination=./mock_executor.go -package=executor "github.com/kubeshop/testkube/pkg/cloud/data/executor" Executor -type Executor interface { - Execute(ctx context.Context, command Command, payload any) (response []byte, err error) - Close() error -} - type CloudGRPCExecutor struct { - client cloud.TestKubeCloudAPIClient - conn *grpc.ClientConn - apiKey string + client cloud.TestKubeCloudAPIClient + conn *grpc.ClientConn + apiKey string + runnerId string } -func NewCloudGRPCExecutor(client cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey string) *CloudGRPCExecutor { - return &CloudGRPCExecutor{client: client, conn: grpcConn, apiKey: apiKey} +func NewCloudGRPCExecutor(client cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey, runnerId string) *CloudGRPCExecutor { + return &CloudGRPCExecutor{client: client, conn: grpcConn, apiKey: apiKey, runnerId: runnerId} } func (e *CloudGRPCExecutor) Execute(ctx context.Context, command Command, payload any) (response []byte, err error) { @@ -44,7 +37,7 @@ func (e *CloudGRPCExecutor) Execute(ctx context.Context, command Command, payloa Command: string(command), Payload: &s, } - ctx = agent.AddAPIKeyMeta(ctx, e.apiKey) + ctx = agent.AddContextMetadata(ctx, e.apiKey, e.runnerId) opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name), grpc.MaxCallRecvMsgSize(math.MaxInt32)} cmdResponse, err := e.client.Call(ctx, &req, opts...) if err != nil { @@ -56,3 +49,8 @@ func (e *CloudGRPCExecutor) Execute(ctx context.Context, command Command, payloa func (e *CloudGRPCExecutor) Close() error { return e.conn.Close() } + +func ToResponse[T any](in []byte) (response T, err error) { + err = json.Unmarshal(in, &response) + return +} diff --git a/pkg/cloud/data/executor/interface.go b/pkg/cloud/data/executor/interface.go new file mode 100644 index 00000000000..645af7d3e1c --- /dev/null +++ b/pkg/cloud/data/executor/interface.go @@ -0,0 +1,11 @@ +package executor + +import "context" + +type Command string + +//go:generate mockgen -destination=./mock_executor.go -package=executor "github.com/kubeshop/testkube/pkg/cloud/data/executor" Executor +type Executor interface { + Execute(ctx context.Context, command Command, payload any) (response []byte, err error) + Close() error +} diff --git a/pkg/cloud/data/result/result.go b/pkg/cloud/data/result/result.go index 40ba4610376..58585eae9d2 100644 --- a/pkg/cloud/data/result/result.go +++ b/pkg/cloud/data/result/result.go @@ -25,8 +25,8 @@ type CloudRepository struct { executor executor.Executor } -func NewCloudResultRepository(cloudClient cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey string) *CloudRepository { - return &CloudRepository{executor: executor.NewCloudGRPCExecutor(cloudClient, grpcConn, apiKey)} +func NewCloudResultRepository(cloudClient cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey, runnerId string) *CloudRepository { + return &CloudRepository{executor: executor.NewCloudGRPCExecutor(cloudClient, grpcConn, apiKey, runnerId)} } func (r *CloudRepository) GetNextExecutionNumber(ctx context.Context, testName string) (int32, error) { diff --git a/pkg/cloud/data/testresult/testresult.go b/pkg/cloud/data/testresult/testresult.go index 9f5c5cd4ba5..2b66f064fe2 100644 --- a/pkg/cloud/data/testresult/testresult.go +++ b/pkg/cloud/data/testresult/testresult.go @@ -21,8 +21,8 @@ type CloudRepository struct { executor executor.Executor } -func NewCloudRepository(client cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey string) *CloudRepository { - return &CloudRepository{executor: executor.NewCloudGRPCExecutor(client, grpcConn, apiKey)} +func NewCloudRepository(client cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey, runnerId string) *CloudRepository { + return &CloudRepository{executor: executor.NewCloudGRPCExecutor(client, grpcConn, apiKey, runnerId)} } func (r *CloudRepository) Get(ctx context.Context, id string) (testkube.TestSuiteExecution, error) { diff --git a/pkg/cloud/data/testworkflow/execution.go b/pkg/cloud/data/testworkflow/execution.go index 46b0c17c6de..5893b42a3d0 100644 --- a/pkg/cloud/data/testworkflow/execution.go +++ b/pkg/cloud/data/testworkflow/execution.go @@ -20,8 +20,8 @@ type CloudRepository struct { executor executor.Executor } -func NewCloudRepository(client cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey string) *CloudRepository { - return &CloudRepository{executor: executor.NewCloudGRPCExecutor(client, grpcConn, apiKey)} +func NewCloudRepository(client cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey, runnerId string) *CloudRepository { + return &CloudRepository{executor: executor.NewCloudGRPCExecutor(client, grpcConn, apiKey, runnerId)} } func (r *CloudRepository) Get(ctx context.Context, id string) (testkube.TestWorkflowExecution, error) { diff --git a/pkg/cloud/data/testworkflow/output.go b/pkg/cloud/data/testworkflow/output.go index a08be8a0ee5..0c4e0242cb8 100644 --- a/pkg/cloud/data/testworkflow/output.go +++ b/pkg/cloud/data/testworkflow/output.go @@ -32,8 +32,8 @@ func WithSkipVerify() Option { } } -func NewCloudOutputRepository(client cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey string, opts ...Option) *CloudOutputRepository { - r := &CloudOutputRepository{executor: executor.NewCloudGRPCExecutor(client, grpcConn, apiKey), httpClient: http.DefaultClient} +func NewCloudOutputRepository(client cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn, apiKey, runnerId string, opts ...Option) *CloudOutputRepository { + r := &CloudOutputRepository{executor: executor.NewCloudGRPCExecutor(client, grpcConn, apiKey, runnerId), httpClient: http.DefaultClient} for _, opt := range opts { opt(r) } diff --git a/pkg/cloud/service.pb.go b/pkg/cloud/service.pb.go index 7a140af6a52..f9ddab789fd 100644 --- a/pkg/cloud/service.pb.go +++ b/pkg/cloud/service.pb.go @@ -7,13 +7,12 @@ package cloud import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" emptypb "google.golang.org/protobuf/types/known/emptypb" structpb "google.golang.org/protobuf/types/known/structpb" + reflect "reflect" + sync "sync" ) const ( @@ -465,6 +464,7 @@ type ExecuteRequest struct { Headers map[string]*HeaderValue `protobuf:"bytes,3,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` Body []byte `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"` MessageId string `protobuf:"bytes,5,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` + Command string `protobuf:"bytes,6,opt,name=command,proto3" json:"command,omitempty"` } func (x *ExecuteRequest) Reset() { @@ -534,6 +534,13 @@ func (x *ExecuteRequest) GetMessageId() string { return "" } +func (x *ExecuteRequest) GetCommand() string { + if x != nil { + return x.Command + } + return "" +} + type TestWorkflowNotificationsRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1010,7 +1017,7 @@ var file_proto_service_proto_rawDesc = []byte{ 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x2d, 0x0a, 0x0f, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0xfb, 0x01, 0x0a, 0x0e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, + 0x6e, 0x73, 0x65, 0x22, 0x95, 0x02, 0x0a, 0x0e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, @@ -1021,129 +1028,131 @@ var file_proto_service_proto_rawDesc = []byte{ 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x12, 0x1d, 0x0a, 0x0a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, - 0x64, 0x1a, 0x4e, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, - 0x6b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, - 0x01, 0x22, 0xb2, 0x01, 0x0a, 0x20, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, - 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, - 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x65, 0x78, 0x65, 0x63, 0x75, - 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x4e, 0x0a, 0x0c, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2b, 0x2e, 0x63, - 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, + 0x64, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x06, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x1a, 0x4e, 0x0a, 0x0c, 0x48, + 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, + 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x28, 0x0a, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, + 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, + 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xb2, 0x01, 0x0a, 0x20, + 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, + 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x64, 0x12, 0x21, 0x0a, + 0x0c, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0b, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, + 0x12, 0x4e, 0x0a, 0x0c, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2b, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x54, + 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, + 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, + 0x79, 0x70, 0x65, 0x52, 0x0b, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x79, 0x70, 0x65, + 0x22, 0xda, 0x01, 0x0a, 0x21, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0b, 0x72, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x54, 0x79, 0x70, 0x65, 0x22, 0xda, 0x01, 0x0a, 0x21, 0x54, 0x65, 0x73, 0x74, 0x57, - 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1b, 0x0a, 0x09, - 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x08, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x64, 0x12, 0x15, 0x0a, 0x06, 0x73, 0x65, 0x71, - 0x5f, 0x6e, 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x73, 0x65, 0x71, 0x4e, 0x6f, - 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x10, - 0x0a, 0x03, 0x72, 0x65, 0x66, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x72, 0x65, 0x66, - 0x12, 0x37, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x23, - 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, - 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, - 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x22, 0x79, 0x0a, 0x12, 0x50, 0x72, 0x6f, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, - 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x15, 0x0a, 0x06, 0x6f, 0x72, 0x67, - 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x72, 0x67, 0x49, 0x64, - 0x12, 0x15, 0x0a, 0x06, 0x65, 0x6e, 0x76, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x05, 0x65, 0x6e, 0x76, 0x49, 0x64, 0x12, 0x35, 0x0a, 0x0c, 0x63, 0x61, 0x70, 0x61, 0x62, - 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, - 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x43, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, - 0x52, 0x0c, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x22, 0x3a, - 0x0a, 0x0a, 0x43, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x12, 0x12, 0x0a, 0x04, - 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, - 0x12, 0x18, 0x0a, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x08, 0x52, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x22, 0x25, 0x0a, 0x0b, 0x48, 0x65, - 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x68, 0x65, 0x61, - 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x22, 0xeb, 0x01, 0x0a, 0x0f, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x3d, 0x0a, - 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x23, - 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, - 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x12, 0x0a, 0x04, - 0x62, 0x6f, 0x64, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, - 0x12, 0x1d, 0x0a, 0x0a, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x1a, - 0x4e, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, - 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, - 0x79, 0x12, 0x28, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x12, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, - 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, - 0x4a, 0x0a, 0x0d, 0x57, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x44, 0x61, 0x74, 0x61, - 0x12, 0x25, 0x0a, 0x06, 0x6f, 0x70, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, - 0x32, 0x0d, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x4f, 0x70, 0x63, 0x6f, 0x64, 0x65, 0x52, - 0x06, 0x6f, 0x70, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x2a, 0x48, 0x0a, 0x15, 0x4c, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x49, 0x64, 0x12, 0x15, 0x0a, 0x06, 0x73, 0x65, 0x71, 0x5f, 0x6e, 0x6f, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x05, 0x73, 0x65, 0x71, 0x4e, 0x6f, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x74, + 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x10, 0x0a, 0x03, 0x72, 0x65, 0x66, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x72, 0x65, 0x66, 0x12, 0x37, 0x0a, 0x04, 0x74, 0x79, + 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x23, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, + 0x2e, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, + 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, + 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x06, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x79, 0x0a, + 0x12, 0x50, 0x72, 0x6f, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x15, 0x0a, 0x06, 0x6f, 0x72, 0x67, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x72, 0x67, 0x49, 0x64, 0x12, 0x15, 0x0a, 0x06, 0x65, 0x6e, + 0x76, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x6e, 0x76, 0x49, + 0x64, 0x12, 0x35, 0x0a, 0x0c, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, + 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, + 0x43, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x52, 0x0c, 0x63, 0x61, 0x70, 0x61, + 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x22, 0x3a, 0x0a, 0x0a, 0x43, 0x61, 0x70, 0x61, + 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x6e, + 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x65, 0x6e, 0x61, + 0x62, 0x6c, 0x65, 0x64, 0x22, 0x25, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, + 0x6c, 0x75, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x09, 0x52, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x22, 0xeb, 0x01, 0x0a, 0x0f, + 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x3d, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, + 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, + 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x12, 0x1d, 0x0a, 0x0a, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x64, 0x1a, 0x4e, 0x0a, 0x0c, 0x48, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6c, 0x6f, + 0x75, 0x64, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x4a, 0x0a, 0x0d, 0x57, 0x65, 0x62, + 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x44, 0x61, 0x74, 0x61, 0x12, 0x25, 0x0a, 0x06, 0x6f, 0x70, + 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0d, 0x2e, 0x63, 0x6c, 0x6f, + 0x75, 0x64, 0x2e, 0x4f, 0x70, 0x63, 0x6f, 0x64, 0x65, 0x52, 0x06, 0x6f, 0x70, 0x63, 0x6f, 0x64, + 0x65, 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x04, 0x62, 0x6f, 0x64, 0x79, 0x2a, 0x48, 0x0a, 0x15, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x16, + 0x0a, 0x12, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x4c, 0x4f, 0x47, 0x5f, 0x4d, 0x45, 0x53, + 0x53, 0x41, 0x47, 0x45, 0x10, 0x00, 0x12, 0x17, 0x0a, 0x13, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, + 0x5f, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x10, 0x01, 0x2a, + 0x69, 0x0a, 0x24, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, + 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x57, 0x4f, 0x52, 0x4b, 0x46, + 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x4c, 0x4f, 0x47, 0x5f, 0x4d, + 0x45, 0x53, 0x53, 0x41, 0x47, 0x45, 0x10, 0x00, 0x12, 0x20, 0x0a, 0x1c, 0x57, 0x4f, 0x52, 0x4b, + 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x48, 0x45, 0x41, 0x4c, + 0x54, 0x48, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x10, 0x01, 0x2a, 0x8a, 0x01, 0x0a, 0x1c, 0x54, + 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, + 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x19, 0x0a, 0x15, 0x57, + 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x45, + 0x52, 0x52, 0x4f, 0x52, 0x10, 0x00, 0x12, 0x17, 0x0a, 0x13, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, + 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x4c, 0x4f, 0x47, 0x10, 0x01, 0x12, + 0x1a, 0x0a, 0x16, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, + 0x41, 0x4d, 0x5f, 0x52, 0x45, 0x53, 0x55, 0x4c, 0x54, 0x10, 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x57, + 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x4f, + 0x55, 0x54, 0x50, 0x55, 0x54, 0x10, 0x03, 0x2a, 0x4c, 0x0a, 0x06, 0x4f, 0x70, 0x63, 0x6f, 0x64, + 0x65, 0x12, 0x0e, 0x0a, 0x0a, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x46, 0x49, 0x45, 0x44, 0x10, + 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x54, 0x45, 0x58, 0x54, 0x5f, 0x46, 0x52, 0x41, 0x4d, 0x45, 0x10, + 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x46, 0x52, 0x41, 0x4d, + 0x45, 0x10, 0x02, 0x12, 0x10, 0x0a, 0x0c, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x5f, 0x43, 0x48, + 0x45, 0x43, 0x4b, 0x10, 0x03, 0x32, 0x8d, 0x04, 0x0a, 0x10, 0x54, 0x65, 0x73, 0x74, 0x4b, 0x75, + 0x62, 0x65, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x41, 0x50, 0x49, 0x12, 0x3c, 0x0a, 0x07, 0x45, 0x78, + 0x65, 0x63, 0x75, 0x74, 0x65, 0x12, 0x16, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x45, 0x78, + 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x15, 0x2e, + 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x36, 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, + 0x12, 0x14, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x57, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, + 0x65, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x28, 0x01, + 0x12, 0x35, 0x0a, 0x04, 0x43, 0x61, 0x6c, 0x6c, 0x12, 0x15, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, + 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x16, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x0c, 0x45, 0x78, 0x65, 0x63, 0x75, + 0x74, 0x65, 0x41, 0x73, 0x79, 0x6e, 0x63, 0x12, 0x16, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, + 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, + 0x15, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x48, 0x0a, 0x0d, 0x47, 0x65, + 0x74, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x19, 0x2e, 0x63, 0x6c, + 0x6f, 0x75, 0x64, 0x2e, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x18, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x4c, - 0x4f, 0x47, 0x5f, 0x4d, 0x45, 0x53, 0x53, 0x41, 0x47, 0x45, 0x10, 0x00, 0x12, 0x17, 0x0a, 0x13, - 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x5f, 0x43, 0x48, - 0x45, 0x43, 0x4b, 0x10, 0x01, 0x2a, 0x69, 0x0a, 0x24, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, - 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, - 0x1b, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, - 0x5f, 0x4c, 0x4f, 0x47, 0x5f, 0x4d, 0x45, 0x53, 0x53, 0x41, 0x47, 0x45, 0x10, 0x00, 0x12, 0x20, - 0x0a, 0x1c, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, - 0x4d, 0x5f, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x10, 0x01, - 0x2a, 0x8a, 0x01, 0x0a, 0x1c, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, - 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, - 0x65, 0x12, 0x19, 0x0a, 0x15, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, - 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x00, 0x12, 0x17, 0x0a, 0x13, - 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, - 0x4c, 0x4f, 0x47, 0x10, 0x01, 0x12, 0x1a, 0x0a, 0x16, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, - 0x57, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x52, 0x45, 0x53, 0x55, 0x4c, 0x54, 0x10, - 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x53, 0x54, - 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x4f, 0x55, 0x54, 0x50, 0x55, 0x54, 0x10, 0x03, 0x2a, 0x4c, 0x0a, - 0x06, 0x4f, 0x70, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x0e, 0x0a, 0x0a, 0x55, 0x4e, 0x53, 0x50, 0x45, - 0x43, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x54, 0x45, 0x58, 0x54, 0x5f, - 0x46, 0x52, 0x41, 0x4d, 0x45, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x42, 0x49, 0x4e, 0x41, 0x52, - 0x59, 0x5f, 0x46, 0x52, 0x41, 0x4d, 0x45, 0x10, 0x02, 0x12, 0x10, 0x0a, 0x0c, 0x48, 0x45, 0x41, - 0x4c, 0x54, 0x48, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x10, 0x03, 0x32, 0x8d, 0x04, 0x0a, 0x10, - 0x54, 0x65, 0x73, 0x74, 0x4b, 0x75, 0x62, 0x65, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x41, 0x50, 0x49, - 0x12, 0x3c, 0x0a, 0x07, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x12, 0x16, 0x2e, 0x63, 0x6c, - 0x6f, 0x75, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x1a, 0x15, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, - 0x75, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x36, - 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x14, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x57, - 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x16, 0x2e, 0x67, - 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x28, 0x01, 0x12, 0x35, 0x0a, 0x04, 0x43, 0x61, 0x6c, 0x6c, 0x12, 0x15, - 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x43, 0x6f, - 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, - 0x0c, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x41, 0x73, 0x79, 0x6e, 0x63, 0x12, 0x16, 0x2e, - 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x15, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x45, 0x78, - 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, - 0x12, 0x48, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x12, 0x19, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x18, 0x2e, 0x63, - 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x4c, 0x6f, 0x67, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x7b, 0x0a, 0x22, 0x47, 0x65, - 0x74, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, - 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x12, 0x28, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, - 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x27, 0x2e, 0x63, 0x6c, 0x6f, + 0x28, 0x01, 0x30, 0x01, 0x12, 0x7b, 0x0a, 0x22, 0x47, 0x65, 0x74, 0x54, 0x65, 0x73, 0x74, 0x57, + 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x28, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x54, 0x65, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, - 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x42, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x50, 0x72, - 0x6f, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, - 0x1a, 0x19, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x50, 0x72, 0x6f, 0x43, 0x6f, 0x6e, 0x74, - 0x65, 0x78, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x0b, 0x5a, 0x09, 0x70, - 0x6b, 0x67, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x27, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x54, 0x65, 0x73, + 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, + 0x01, 0x12, 0x42, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x43, 0x6f, 0x6e, 0x74, 0x65, + 0x78, 0x74, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x19, 0x2e, 0x63, 0x6c, 0x6f, + 0x75, 0x64, 0x2e, 0x50, 0x72, 0x6f, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x0b, 0x5a, 0x09, 0x70, 0x6b, 0x67, 0x2f, 0x63, 0x6c, 0x6f, + 0x75, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/cloud/service_grpc.pb.go b/pkg/cloud/service_grpc.pb.go index 8aab7a21191..40a960ee3c0 100644 --- a/pkg/cloud/service_grpc.pb.go +++ b/pkg/cloud/service_grpc.pb.go @@ -8,7 +8,6 @@ package cloud import ( context "context" - grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" diff --git a/pkg/event/bus/nats.go b/pkg/event/bus/nats.go index 79e61d1ebff..3607ea7beac 100644 --- a/pkg/event/bus/nats.go +++ b/pkg/event/bus/nats.go @@ -70,10 +70,6 @@ func NewNATSEncodedConnection(cfg ConnectionConfig, opts ...nats.Option) (*nats. return nil, err } - if err != nil { - log.DefaultLogger.Errorw("error creating NATS connection", "error", err) - } - return ec, nil } diff --git a/pkg/executor/scraper/factory/factory.go b/pkg/executor/scraper/factory/factory.go index 1b8d89bff00..e474338af4d 100644 --- a/pkg/executor/scraper/factory/factory.go +++ b/pkg/executor/scraper/factory/factory.go @@ -118,7 +118,7 @@ func getRemoteStorageUploader(ctx context.Context, params envs.Params) (uploader output.PrintLogf("%s Connected to Agent API", ui.IconCheckMark) grpcClient := cloud.NewTestKubeCloudAPIClient(grpcConn) - cloudExecutor := cloudexecutor.NewCloudGRPCExecutor(grpcClient, grpcConn, params.ProAPIKey) + cloudExecutor := cloudexecutor.NewCloudGRPCExecutor(grpcClient, grpcConn, params.ProAPIKey, "") return cloudscraper.NewCloudUploader(cloudExecutor, params.SkipVerify), nil } diff --git a/pkg/logs/client/stream.go b/pkg/logs/client/stream.go index 02b404d2d06..90e7f3bd2ab 100644 --- a/pkg/logs/client/stream.go +++ b/pkg/logs/client/stream.go @@ -155,7 +155,7 @@ func (c NatsLogStream) handleJetstreamMessage(log *zap.SugaredLogger, ch chan ev } // syncCall sends request to given subject and waits for response -func (c NatsLogStream) syncCall(ctx context.Context, subject, id string) (resp StreamResponse, err error) { +func (c NatsLogStream) syncCall(_ context.Context, subject, id string) (resp StreamResponse, err error) { b, err := json.Marshal(events.NewTrigger(id)) if err != nil { return resp, err diff --git a/pkg/logs/pb/logs.pb.go b/pkg/logs/pb/logs.pb.go index c7359be3ddf..938eabd8a09 100644 --- a/pkg/logs/pb/logs.pb.go +++ b/pkg/logs/pb/logs.pb.go @@ -1,18 +1,17 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 +// protoc-gen-go v1.28.1 // protoc v3.19.4 // source: pkg/logs/pb/logs.proto package pb import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" ) const ( diff --git a/pkg/logs/pb/logs_grpc.pb.go b/pkg/logs/pb/logs_grpc.pb.go index 77bc3950ef9..720ec076511 100644 --- a/pkg/logs/pb/logs_grpc.pb.go +++ b/pkg/logs/pb/logs_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.3.0 +// - protoc-gen-go-grpc v1.2.0 // - protoc v3.19.4 // source: pkg/logs/pb/logs.proto @@ -8,7 +8,6 @@ package pb import ( context "context" - grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" @@ -19,10 +18,6 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 -const ( - LogsService_Logs_FullMethodName = "/logs.LogsService/Logs" -) - // LogsServiceClient is the client API for LogsService service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -39,7 +34,7 @@ func NewLogsServiceClient(cc grpc.ClientConnInterface) LogsServiceClient { } func (c *logsServiceClient) Logs(ctx context.Context, in *LogRequest, opts ...grpc.CallOption) (LogsService_LogsClient, error) { - stream, err := c.cc.NewStream(ctx, &LogsService_ServiceDesc.Streams[0], LogsService_Logs_FullMethodName, opts...) + stream, err := c.cc.NewStream(ctx, &LogsService_ServiceDesc.Streams[0], "/logs.LogsService/Logs", opts...) if err != nil { return nil, err } @@ -136,11 +131,6 @@ var LogsService_ServiceDesc = grpc.ServiceDesc{ Metadata: "pkg/logs/pb/logs.proto", } -const ( - CloudLogsService_Stream_FullMethodName = "/logs.CloudLogsService/Stream" - CloudLogsService_Logs_FullMethodName = "/logs.CloudLogsService/Logs" -) - // CloudLogsServiceClient is the client API for CloudLogsService service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -158,7 +148,7 @@ func NewCloudLogsServiceClient(cc grpc.ClientConnInterface) CloudLogsServiceClie } func (c *cloudLogsServiceClient) Stream(ctx context.Context, opts ...grpc.CallOption) (CloudLogsService_StreamClient, error) { - stream, err := c.cc.NewStream(ctx, &CloudLogsService_ServiceDesc.Streams[0], CloudLogsService_Stream_FullMethodName, opts...) + stream, err := c.cc.NewStream(ctx, &CloudLogsService_ServiceDesc.Streams[0], "/logs.CloudLogsService/Stream", opts...) if err != nil { return nil, err } @@ -192,7 +182,7 @@ func (x *cloudLogsServiceStreamClient) CloseAndRecv() (*StreamResponse, error) { } func (c *cloudLogsServiceClient) Logs(ctx context.Context, in *CloudLogRequest, opts ...grpc.CallOption) (CloudLogsService_LogsClient, error) { - stream, err := c.cc.NewStream(ctx, &CloudLogsService_ServiceDesc.Streams[1], CloudLogsService_Logs_FullMethodName, opts...) + stream, err := c.cc.NewStream(ctx, &CloudLogsService_ServiceDesc.Streams[1], "/logs.CloudLogsService/Logs", opts...) if err != nil { return nil, err } diff --git a/pkg/problem/problem.go b/pkg/problem/problem.go index 133a4adc663..6849ad4865e 100644 --- a/pkg/problem/problem.go +++ b/pkg/problem/problem.go @@ -2,6 +2,9 @@ package problem import ( "github.com/moogar0880/problems" + + "github.com/kubeshop/testkube/pkg/cloud" + "github.com/kubeshop/testkube/pkg/utils/codec" ) // Porblem is struct defining RFC7807 Problem Details @@ -11,3 +14,16 @@ func New(status int, details string) Problem { pr := problems.NewDetailedProblem(status, details) return Problem(*pr) } + +func CommandErrorJSONBytes(command cloud.Command, status int, title string, err error) ([]byte, error) { + var errString string + if err != nil { + errString = err.Error() + } + pr := problems.NewDetailedProblem(status, errString) + pr.Type = "Command Error" + pr.Title = title + pr.Instance = string(command) + + return codec.ToJSONBytes(Problem(*pr)) +} diff --git a/pkg/repository/testworkflow/filter.go b/pkg/repository/testworkflow/filter.go index 2f4c5c9d87f..da45b319f30 100644 --- a/pkg/repository/testworkflow/filter.go +++ b/pkg/repository/testworkflow/filter.go @@ -6,6 +6,8 @@ import ( "github.com/kubeshop/testkube/pkg/api/v1/testkube" ) +var _ Filter = &FilterImpl{} + type FilterImpl struct { FName string FNames []string @@ -19,10 +21,13 @@ type FilterImpl struct { FSelector string FTagSelector string FLabelSelector *LabelSelector + FRunnerIds []string + FTags map[string]string + FRunnerTags map[string]string } func NewExecutionsFilter() *FilterImpl { - result := FilterImpl{FPage: 0, FPageSize: PageDefaultLimit} + result := FilterImpl{FPage: 0, FPageSize: PageDefaultLimit, FRunnerTags: make(map[string]string)} return &result } @@ -89,6 +94,21 @@ func (f *FilterImpl) WithLabelSelector(selector *LabelSelector) *FilterImpl { return f } +func (f *FilterImpl) WithTags(tags map[string]string) *FilterImpl { + f.FTags = tags + return f +} + +func (f *FilterImpl) WithRunnerTags(tags map[string]string) *FilterImpl { + f.FRunnerTags = tags + return f +} + +func (f *FilterImpl) WithRunnerIds(runnerIds []string) *FilterImpl { + f.FRunnerIds = runnerIds + return f +} + func (f FilterImpl) Name() string { return f.FName } @@ -164,3 +184,11 @@ func (f FilterImpl) TagSelector() string { func (f FilterImpl) LabelSelector() *LabelSelector { return f.FLabelSelector } + +func (f FilterImpl) RunnerIds() []string { + return f.FRunnerIds +} + +func (f FilterImpl) RunnerTags() map[string]string { + return f.FRunnerTags +} diff --git a/pkg/repository/testworkflow/interface.go b/pkg/repository/testworkflow/interface.go index 05cf41a47f5..b51811aaa25 100644 --- a/pkg/repository/testworkflow/interface.go +++ b/pkg/repository/testworkflow/interface.go @@ -41,6 +41,8 @@ type Filter interface { Selector() string TagSelector() string LabelSelector() *LabelSelector + RunnerIds() []string + RunnerTags() map[string]string } //go:generate mockgen -destination=./mock_repository.go -package=testworkflow "github.com/kubeshop/testkube/pkg/repository/testworkflow" Repository diff --git a/pkg/repository/testworkflow/mongo.go b/pkg/repository/testworkflow/mongo.go index 94ef6dffe9b..dbd22a29044 100644 --- a/pkg/repository/testworkflow/mongo.go +++ b/pkg/repository/testworkflow/mongo.go @@ -376,6 +376,7 @@ func composeQueryAndOpts(filter Filter) (bson.M, *options.FindOptions) { } else if len(elements) == 1 { existsValues["tags."+utils.EscapeDots(elements[0])] = struct{}{} } + } subquery := bson.A{} for tag, values := range inValues { @@ -417,6 +418,20 @@ func composeQueryAndOpts(filter Filter) (bson.M, *options.FindOptions) { query["$or"] = subquery } + if len(filter.RunnerIds()) > 0 { + query["runnerid"] = bson.M{"$in": filter.RunnerIds()} + } + + // TODO - use tags for tags and runnerId - remove it from the model + // this one needs wildard index or changing the model to {k:X v:Y} + if len(filter.RunnerTags()) > 0 { + q := []bson.M{} + for k, v := range filter.RunnerTags() { + q = append(q, bson.M{"runningcontext.tags." + k: v}) + } + query["$and"] = q + } + opts.SetSkip(int64(filter.Page() * filter.PageSize())) opts.SetLimit(int64(filter.PageSize())) opts.SetSort(bson.D{{Key: "scheduledat", Value: -1}}) diff --git a/pkg/tcl/checktcl/subscription.go b/pkg/tcl/checktcl/subscription.go index cb426e96b80..2236427ed63 100644 --- a/pkg/tcl/checktcl/subscription.go +++ b/pkg/tcl/checktcl/subscription.go @@ -29,7 +29,7 @@ type SubscriptionChecker struct { // NewSubscriptionChecker creates a new subscription checker using the agent token func NewSubscriptionChecker(ctx context.Context, proContext config.ProContext, cloudClient cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn) (SubscriptionChecker, error) { - executor := executor.NewCloudGRPCExecutor(cloudClient, grpcConn, proContext.APIKey) + executor := executor.NewCloudGRPCExecutor(cloudClient, grpcConn, proContext.APIKey, proContext.RunnerId) req := GetOrganizationPlanRequest{} response, err := executor.Execute(ctx, cloudconfig.CmdConfigGetOrganizationPlan, req) diff --git a/pkg/tcl/controlplanetcl/environments.go b/pkg/tcl/controlplanetcl/environments.go new file mode 100644 index 00000000000..9abcab8fd9f --- /dev/null +++ b/pkg/tcl/controlplanetcl/environments.go @@ -0,0 +1,36 @@ +package controlplanetcl + +import ( + "context" + + "google.golang.org/grpc" + + "github.com/kubeshop/testkube/internal/config" + "github.com/kubeshop/testkube/pkg/cloud" + "github.com/kubeshop/testkube/pkg/cloud/data/controlplane" + "github.com/kubeshop/testkube/pkg/cloud/data/executor" +) + +func GetEnvironment(ctx context.Context, proContext config.ProContext, cloudClient cloud.TestKubeCloudAPIClient, grpcConn *grpc.ClientConn) (resp GetEnvironmentResponse, err error) { + ex := executor.NewCloudGRPCExecutor(cloudClient, grpcConn, proContext.APIKey, proContext.RunnerId) + bytes, err := ex.Execute(ctx, controlplane.CmdControlPlaneGetEnvironment, GetEnvironmentRequest{}) + if err != nil { + return resp, err + } + return executor.ToResponse[GetEnvironmentResponse](bytes) +} + +// GetEnvironmentRequest represents a request to get an environment by the token +type GetEnvironmentRequest struct { +} + +// GetEnvironmentResponse represents a response with env org data of connected runner +type GetEnvironmentResponse struct { + Id string + Name string + Slug string + + OrganizationId string + OrganizationSlug string + OrganizationName string +} diff --git a/pkg/testworkflows/testworkflowexecutor/executor.go b/pkg/testworkflows/testworkflowexecutor/executor.go index bd28841fbac..41d70583711 100644 --- a/pkg/testworkflows/testworkflowexecutor/executor.go +++ b/pkg/testworkflows/testworkflowexecutor/executor.go @@ -72,6 +72,8 @@ type executor struct { imageDataPersistentCacheKey string dashboardURI string clusterID string + runnerID string + isRuner bool serviceAccountNames map[string]string } @@ -88,7 +90,7 @@ func New(emitter *event.Emitter, secretManager secretmanager.SecretManager, serviceAccountNames map[string]string, globalTemplateName, namespace, apiUrl, defaultRegistry string, - enableImageDataPersistentCache bool, imageDataPersistentCacheKey, dashboardURI, clusterID string) TestWorkflowExecutor { + enableImageDataPersistentCache bool, imageDataPersistentCacheKey, dashboardURI, clusterID, runnerID string) TestWorkflowExecutor { if serviceAccountNames == nil { serviceAccountNames = make(map[string]string) } @@ -114,6 +116,8 @@ func New(emitter *event.Emitter, imageDataPersistentCacheKey: imageDataPersistentCacheKey, dashboardURI: dashboardURI, clusterID: clusterID, + runnerID: runnerID, + isRuner: runnerID != "", } } @@ -329,8 +333,14 @@ func (e *executor) Control(ctx context.Context, testWorkflow *testworkflowsv1.Te return nil } -func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWorkflow, request testkube.TestWorkflowExecutionRequest) ( - execution testkube.TestWorkflowExecution, err error) { +func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWorkflow, request testkube.TestWorkflowExecutionRequest) (execution testkube.TestWorkflowExecution, err error) { + + // We'll get basic execuition data from the request (from Control Plane as it manages the data in runner mode) + id := primitive.NewObjectID().Hex() + if request.Id != "" { + id = request.Id + } + // Delete unnecessary data delete(workflow.Annotations, "kubectl.kubernetes.io/last-applied-configuration") @@ -377,9 +387,6 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor return execution, fmt.Errorf("not supported execution namespace %s", namespace) } - // Build the basic Execution data - id := primitive.NewObjectID().Hex() - // Handle secrets auto-creation secrets := e.secretManager.Batch(namespace, "twe-", id) @@ -416,6 +423,8 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor cloudUiUrl, env.Config().Cloud.OrgId, env.Config().Cloud.EnvId) } + // Build the basic Execution data + now := time.Now() labels := make(map[string]string) for key, value := range workflow.Labels { @@ -512,25 +521,6 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor return execution, errors.Wrap(err, "processing error") } - // Load execution identifier data - number, err := e.repository.GetNextExecutionNumber(context.Background(), workflow.Name) - if err != nil { - log.DefaultLogger.Errorw("failed to retrieve TestWorkflow execution number", "id", id, "error", err) - } - - executionName := request.Name - if executionName == "" { - executionName = fmt.Sprintf("%s-%d", workflow.Name, number) - } - - testWorkflowExecutionName := request.TestWorkflowExecutionName - // Ensure it is unique name - // TODO: Consider if we shouldn't make name unique across all TestWorkflows - next, _ := e.repository.GetByNameAndTestWorkflow(ctx, executionName, workflow.Name) - if next.Name == executionName { - return execution, errors.Wrap(err, "execution name already exists") - } - var tags map[string]string if workflow.Spec.Execution != nil { tags = workflow.Spec.Execution.Tags @@ -542,11 +532,55 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor log.DefaultLogger.Errorw("failed to encode tags", "id", id, "error", err) } + // Get (for centralized mode) TW execution or create it + if request.Id != "" { + execution, err = e.repository.Get(ctx, request.Id) + if err != nil { + return execution, errors.Wrap(err, "can't get execution "+request.Id+" from control plane") + } + } else { + // TODO: Consider storing "config" as well + + // Load execution identifier data + number, err := e.repository.GetNextExecutionNumber(context.Background(), workflow.Name) + if err != nil { + log.DefaultLogger.Errorw("failed to retrieve TestWorkflow execution number", "id", id, "error", err) + } + + executionName := request.Name + if executionName == "" { + executionName = fmt.Sprintf("%s-%d", workflow.Name, number) + } + + testWorkflowExecutionName := request.TestWorkflowExecutionName + // Ensure it is unique name + // TODO: Consider if we shouldn't make name unique across all TestWorkflows + next, _ := e.repository.GetByNameAndTestWorkflow(ctx, executionName, workflow.Name) + if next.Name == executionName { + return execution, errors.Wrap(err, "execution name already exists") + } + + execution = testkube.TestWorkflowExecution{ + Id: id, + Name: executionName, + Number: number, + TestWorkflowExecutionName: testWorkflowExecutionName, + DisableWebhooks: request.DisableWebhooks, + RunnerId: e.runnerID, + RunningContext: request.RunningContext, + } + } + + // Load rest data + execution.Namespace = namespace + execution.ScheduledAt = now + execution.StatusAt = now + // Build machine with actual execution data executionMachine := expressions.NewMachine().Register("execution", map[string]interface{}{ - "id": id, - "name": executionName, - "number": number, + "id": execution.Id, + "name": execution.Name, + "number": execution.Number, "scheduledAt": now.UTC().Format(constants.RFC3339Millis), "disableWebhooks": request.DisableWebhooks, "tags": tagsData, @@ -558,34 +592,38 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor return execution, errors.Wrap(err, "processing error") } - // Build Execution entity - // TODO: Consider storing "config" as well - execution = testkube.TestWorkflowExecution{ - Id: id, - Name: executionName, - Namespace: namespace, - Number: number, - ScheduledAt: now, - StatusAt: now, - Signature: stage.MapSignatureListToInternal(bundle.Signature), - Result: &testkube.TestWorkflowResult{ - Status: common.Ptr(testkube.QUEUED_TestWorkflowStatus), - PredictedStatus: common.Ptr(testkube.PASSED_TestWorkflowStatus), - Initialization: &testkube.TestWorkflowStepResult{ - Status: common.Ptr(testkube.QUEUED_TestWorkflowStepStatus), - }, - Steps: stage.MapSignatureListToStepResults(bundle.Signature), + execution.Signature = stage.MapSignatureListToInternal(bundle.Signature) + execution.Result = &testkube.TestWorkflowResult{ + Status: common.Ptr(testkube.QUEUED_TestWorkflowStatus), + PredictedStatus: common.Ptr(testkube.PASSED_TestWorkflowStatus), + Initialization: &testkube.TestWorkflowStepResult{ + Status: common.Ptr(testkube.QUEUED_TestWorkflowStepStatus), }, - Output: []testkube.TestWorkflowOutput{}, - Workflow: testworkflowmappers.MapKubeToAPI(initialWorkflow), - ResolvedWorkflow: testworkflowmappers.MapKubeToAPI(resolvedWorkflow), - TestWorkflowExecutionName: testWorkflowExecutionName, - DisableWebhooks: request.DisableWebhooks, - Tags: tags, - } - err = e.repository.Insert(ctx, execution) - if err != nil { - return execution, errors.Wrap(err, "inserting execution to storage") + Steps: stage.MapSignatureListToStepResults(bundle.Signature), + } + + execution.Output = []testkube.TestWorkflowOutput{} + execution.Workflow = testworkflowmappers.MapKubeToAPI(initialWorkflow) + execution.ResolvedWorkflow = testworkflowmappers.MapKubeToAPI(resolvedWorkflow) + execution.Tags = tags + + // Insert or save execution + if request.Id != "" { + execution.Id = request.Id + + err = e.repository.Update(ctx, execution) + if err != nil { + return execution, errors.Wrap(err, "inserting execution to storage") + } + + log.DefaultLogger.Infof("testworkflow execution %s updated", execution.Id) + } else { + err = e.repository.Insert(ctx, execution) + if err != nil { + return execution, errors.Wrap(err, "inserting execution to storage") + } + + log.DefaultLogger.Infof("testworkflow execution %s created", execution.Id) } // Inform about execution start diff --git a/pkg/utils/codec/json.go b/pkg/utils/codec/json.go new file mode 100644 index 00000000000..8ebc8374010 --- /dev/null +++ b/pkg/utils/codec/json.go @@ -0,0 +1,19 @@ +package codec + +import "encoding/json" + +// Convert any data to JSON bytes using generics +func ToJSONBytes[T any](data T) ([]byte, error) { + jsonBytes, err := json.Marshal(data) + if err != nil { + return nil, err + } + return jsonBytes, nil +} + +// Convert JSON bytes back to Go data using generics +func FromJSONBytes[T any](jsonBytes []byte) (T, error) { + var result T + err := json.Unmarshal(jsonBytes, &result) + return result, err +} diff --git a/proto/service.proto b/proto/service.proto index 6911fb50b5f..81a6638c1d1 100644 --- a/proto/service.proto +++ b/proto/service.proto @@ -64,6 +64,7 @@ message ExecuteRequest { map headers = 3; bytes body = 4; string message_id = 5; + string command = 6; } message TestWorkflowNotificationsRequest {