Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Multi agent #5844

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion api/v1/testkube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7157,6 +7157,16 @@ components:
context:
type: string
description: Context value depending from its type
runnerIds:
type: array
items:
type: string
tags:
type: object
additionalProperties:
type: string



Webhook:
description: CRD based webhook data
Expand Down Expand Up @@ -8001,6 +8011,10 @@ components:
TestWorkflowExecutionRequest:
type: object
properties:
id:
type: string
description: custom execution id
default: ""
name:
type: string
description: custom execution name
Expand All @@ -8015,6 +8029,8 @@ components:
default: false
tags:
$ref: "#/components/schemas/TestWorkflowTagValue"
runningContext:
$ref: "#/components/schemas/RunningContext"

TestWorkflowWithExecution:
type: object
Expand Down Expand Up @@ -8108,6 +8124,15 @@ components:
- false
tags:
$ref: "#/components/schemas/TestWorkflowTagValue"
runnerId:
type: string
description: Runner id that executed the test workflow
runningContext:
$ref: "#/components/schemas/RunningContext"
groupId:
type: string
description: test workflow correlation id when run against multiple agent in On Prem mode

required:
- id
- name
Expand All @@ -8120,7 +8145,15 @@ components:
type: string
description: unique execution identifier
format: bson objectId
example: "62f395e004109209b50edfc1"
example: "twes0m33x3cut10n1d"
runnerId:
type: string
description: runner identifier
example: "some-unique-id-per-env-1"
groupId:
type: string
description: group identifier when run against multiple runner
example: "twgs0m3gr0up1d"
name:
type: string
description: execution name
Expand All @@ -8142,6 +8175,8 @@ components:
$ref: "#/components/schemas/TestWorkflowSummary"
tags:
$ref: "#/components/schemas/TestWorkflowTagValue"
runningContext:
$ref: "#/components/schemas/RunningContext"
required:
- id
- name
Expand Down
63 changes: 46 additions & 17 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

corev1 "k8s.io/api/core/v1"

"github.com/kubeshop/testkube/pkg/agent/handlers"
"github.com/kubeshop/testkube/pkg/cache"

"github.com/nats-io/nats.go"
Expand All @@ -27,6 +28,7 @@ import (
testworkflow2 "github.com/kubeshop/testkube/pkg/repository/testworkflow"
"github.com/kubeshop/testkube/pkg/secretmanager"
"github.com/kubeshop/testkube/pkg/tcl/checktcl"
"github.com/kubeshop/testkube/pkg/tcl/controlplanetcl"
"github.com/kubeshop/testkube/pkg/tcl/schedulertcl"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/presets"

Expand Down Expand Up @@ -122,7 +124,7 @@ func runMigrations() (err error) {
return migrations.Migrator.Run(version.Version, migrator.MigrationTypeServer)
}

func runMongoMigrations(ctx context.Context, db *mongo.Database, migrationsDir string) error {
func runMongoMigrations(ctx context.Context, db *mongo.Database, _ string) error {
migrationsCollectionName := "__migrations"
activeMigrations, err := dbmigrator.GetDbMigrationsFromFs(dbmigrations.MongoMigrationsFs)
if err != nil {
Expand All @@ -147,13 +149,18 @@ func main() {
cfg.CleanLegacyVars()
exitOnError("error getting application config", err)

md := metadata.Pairs("api-key", cfg.TestkubeProAPIKey, "runner-id", cfg.TestkubeProRunnerId)
ctx := metadata.NewOutgoingContext(context.Background(), md)

features, err := featureflags.Get()
exitOnError("error getting application feature flags", err)

log.DefaultLogger.Infow("Feature flags configured", "ff", features)
logger := log.DefaultLogger.With("apiVersion", version.Version)

logger.Infow("Feature flags configured", "ff", features)

// Run services within an errgroup to propagate errors between services.
g, ctx := errgroup.WithContext(context.Background())
g, ctx := errgroup.WithContext(ctx)

// Cancel the errgroup context on SIGINT and SIGTERM,
// which shuts everything down gracefully.
Expand Down Expand Up @@ -271,18 +278,18 @@ func main() {
var artifactStorage domainstorage.ArtifactsStorage
var storageClient domainstorage.Client
if mode == common.ModeAgent {
resultsRepository = cloudresult.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
testResultsRepository = cloudtestresult.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
configRepository = cloudconfig.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
resultsRepository = cloudresult.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
testResultsRepository = cloudtestresult.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
configRepository = cloudconfig.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
// Pro edition only (tcl protected code)
testWorkflowResultsRepository = cloudtestworkflow.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
testWorkflowResultsRepository = cloudtestworkflow.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
var opts []cloudtestworkflow.Option
if cfg.StorageSkipVerify {
opts = append(opts, cloudtestworkflow.WithSkipVerify())
}
testWorkflowOutputRepository = cloudtestworkflow.NewCloudOutputRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, opts...)
testWorkflowOutputRepository = cloudtestworkflow.NewCloudOutputRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId, opts...)
triggerLeaseBackend = triggers.NewAcquireAlwaysLeaseBackend()
artifactStorage = cloudartifacts.NewCloudArtifactsStorage(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
artifactStorage = cloudartifacts.NewCloudArtifactsStorage(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
} else {
mongoSSLConfig := getMongoSSLConfig(cfg, secretClient)
db, err := storage.GetMongoDatabase(cfg.APIMongoDSN, cfg.APIMongoDB, cfg.APIMongoDBType, cfg.APIMongoAllowTLS, mongoSSLConfig)
Expand Down Expand Up @@ -431,13 +438,24 @@ func main() {
exitOnError("Creating job templates", err)
}

proContext := newProContext(cfg, grpcClient)
proContext := newProContext(ctx, cfg, grpcClient)
proContext.ClusterId = clusterId

// Check Pro/Enterprise subscription
var subscriptionChecker checktcl.SubscriptionChecker
if mode == common.ModeAgent {
subscriptionChecker, err = checktcl.NewSubscriptionChecker(ctx, proContext, grpcClient, grpcConn)
exitOnError("Failed creating subscription checker", err)

// Load environment/org details based on token grpc call
environment, err := controlplanetcl.GetEnvironment(ctx, proContext, grpcClient, grpcConn)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executor need to follow loading it from this one - not from inlined env variables

warnOnError("Getting environment details from control plane", err)
proContext.EnvID = environment.Id
proContext.EnvName = environment.Name
proContext.EnvSlug = environment.Slug
proContext.OrgID = environment.OrganizationId
proContext.OrgName = environment.OrganizationName
proContext.OrgSlug = environment.OrganizationSlug
}

serviceAccountNames := map[string]string{
Expand Down Expand Up @@ -599,6 +617,7 @@ func main() {
cfg.ImageDataPersistentCacheKey,
cfg.TestkubeDashboardURI,
clusterId,
proContext.RunnerId,
)

go testWorkflowExecutor.Recover(context.Background())
Expand Down Expand Up @@ -661,6 +680,11 @@ func main() {
features,
proContext,
)

// register agent handlers (this one will remove need of API server in centralized environment mode)
// commands will be routed through grpc `Command` field
agentHandle.RegisterCommandHandler(cloud.ExecuteCommand, handlers.NewExecuteTestWorkflowHandler(testWorkflowExecutor, testWorkflowsClient, logger))

if err != nil {
exitOnError("Starting agent", err)
}
Expand Down Expand Up @@ -895,9 +919,10 @@ func newGRPCTransportCredentials(cfg *config.Config) (credentials.TransportCrede
})
}

func newProContext(cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext {
func newProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext {
proContext := config.ProContext{
APIKey: cfg.TestkubeProAPIKey,
RunnerId: cfg.TestkubeProRunnerId,
URL: cfg.TestkubeProURL,
TLSInsecure: cfg.TestkubeProTLSInsecure,
WorkerCount: cfg.TestkubeProWorkerCount,
Expand All @@ -915,22 +940,20 @@ func newProContext(cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient)
return proContext
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
md := metadata.Pairs("api-key", cfg.TestkubeProAPIKey)
ctx = metadata.NewOutgoingContext(ctx, md)
ctx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
getProContext, err := grpcClient.GetProContext(ctx, &emptypb.Empty{})
proContextResponse, err := grpcClient.GetProContext(ctx, &emptypb.Empty{})
if err != nil {
log.DefaultLogger.Warnf("cannot fetch pro-context from cloud: %s", err)
return proContext
}

if proContext.EnvID == "" {
proContext.EnvID = getProContext.EnvId
proContext.EnvID = proContextResponse.EnvId
}

if proContext.OrgID == "" {
proContext.OrgID = getProContext.OrgId
proContext.OrgID = proContextResponse.OrgId
}

return proContext
Expand All @@ -942,3 +965,9 @@ func exitOnError(title string, err error) {
os.Exit(1)
}
}

func warnOnError(title string, err error) {
if err != nil {
log.DefaultLogger.Errorw(title, "error", err)
}
}
1 change: 1 addition & 0 deletions cmd/kubectl-testkube/commands/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func PopulateMasterFlags(cmd *cobra.Command, opts *HelmOptions) {
cmd.Flags().StringVar(&opts.Master.AgentToken, "agent-token", "", "Testkube Pro agent key [required for centralized mode]")
cmd.Flags().StringVar(&opts.Master.OrgId, "org-id", "", "Testkube Pro organization id [required for centralized mode]")
cmd.Flags().StringVar(&opts.Master.EnvId, "env-id", "", "Testkube Pro environment id [required for centralized mode]")
cmd.Flags().StringVar(&opts.Master.RunnerId, "runner-id", "", "Testkube Pro Multi Runner id [required for centralized mode]")

cmd.Flags().BoolVar(&opts.Master.Features.LogsV2, "feature-logs-v2", false, "Logs v2 feature flag")
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/kubectl-testkube/commands/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ func prepareTestkubeProHelmArgs(options HelmOptions, isMigration bool) []string
args = append(args, "--set", fmt.Sprintf("testkube-logs.pro.envId=%s", options.Master.EnvId))
}

if options.Master.RunnerId != "" {
args = append(args, "--set", fmt.Sprintf("testkube-api.cloud.runnerId=%s", options.Master.RunnerId))
}

if options.Master.OrgId != "" {
args = append(args, "--set", fmt.Sprintf("testkube-api.cloud.orgId=%s", options.Master.OrgId))
args = append(args, "--set", fmt.Sprintf("testkube-logs.pro.orgId=%s", options.Master.OrgId))
Expand Down
1 change: 1 addition & 0 deletions cmd/kubectl-testkube/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func init() {
RootCmd.AddCommand(NewGenerateCmd())

RootCmd.AddCommand(NewInitCmd())
RootCmd.AddCommand(NewRunnerCmd())
RootCmd.AddCommand(NewUpgradeCmd())
RootCmd.AddCommand(NewPurgeCmd())
RootCmd.AddCommand(NewWatchCmd())
Expand Down
22 changes: 22 additions & 0 deletions cmd/kubectl-testkube/commands/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package commands

import (
"github.com/spf13/cobra"

"github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/pro"
)

func NewRunnerCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "runner <command>",
Aliases: []string{""},
Short: "Testkube Runner related commands",
Run: func(cmd *cobra.Command, args []string) {
cmd.Help()
},
}

cmd.AddCommand(pro.NewInitCmd())

return cmd
}
1 change: 1 addition & 0 deletions cmd/kubectl-testkube/config/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type Master struct {
IdToken string `json:"idToken,omitempty"`
OrgId string `json:"orgId,omitempty"`
EnvId string `json:"envId,omitempty"`
RunnerId string `json:"runnerId,omitempty"`
Insecure bool `json:"insecure,omitempty"`
UiUrlPrefix string `json:"uiUrlPrefix,omitempty"`
AgentUrlPrefix string `json:"agentUrlPrefix,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion cmd/testworkflow-init/obfuscator/obfuscator.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (s *obfuscator) Write(p []byte) (n int, err error) {

// Write the rest of data
if len(p) > 0 {
nn, err = s.dst.Write(p)
_, err = s.dst.Write(p)
return size, err
}
return size, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/testworkflow-toolkit/commands/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func NewArtifactsCmd() *cobra.Command {
if env.CloudEnabled() {
ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
defer cancel()
ctx = agent.AddAPIKeyMeta(ctx, env.Config().Cloud.ApiKey)
ctx = agent.AddContextMetadata(ctx, env.Config().Cloud.ApiKey, "")
executor, client := env.Cloud(ctx)
proContext, err := client.GetProContext(ctx, &emptypb.Empty{})
var supported []*cloud.Capability
Expand Down
2 changes: 1 addition & 1 deletion cmd/testworkflow-toolkit/env/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,5 @@ func Cloud(ctx context.Context) (cloudexecutor.Executor, cloud.TestKubeCloudAPIC
ui.Fail(fmt.Errorf("failed to connect with Cloud: %w", err))
}
grpcClient := cloud.NewTestKubeCloudAPIClient(grpcConn)
return cloudexecutor.NewCloudGRPCExecutor(grpcClient, grpcConn, cfg.ApiKey), grpcClient
return cloudexecutor.NewCloudGRPCExecutor(grpcClient, grpcConn, cfg.ApiKey, cfg.RunnerId), grpcClient
}
1 change: 1 addition & 0 deletions cmd/testworkflow-toolkit/env/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type envCloudConfig struct {
UiUrl string `envconfig:"TK_C_UI_URL"`
OrgId string `envconfig:"TK_C_ORG_ID"`
EnvId string `envconfig:"TK_C_ENV_ID"`
RunnerId string `envconfig:"TK_C_RUNNER_ID" default:""`
SkipVerify bool `envconfig:"TK_C_SKIP_VERIFY" default:"false"`
TlsInsecure bool `envconfig:"TK_C_TLS_INSECURE" default:"false"`
}
Expand Down
7 changes: 6 additions & 1 deletion internal/app/api/v1/testworkflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (

testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1"
"github.com/kubeshop/testkube/internal/common"
"github.com/kubeshop/testkube/pkg/agent"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/log"
"github.com/kubeshop/testkube/pkg/mapper/testworkflows"
"github.com/kubeshop/testkube/pkg/scheduler"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowresolver"
Expand Down Expand Up @@ -354,7 +356,8 @@ func (s *TestkubeAPI) PreviewTestWorkflowHandler() fiber.Handler {
// TODO: Add metrics
func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler {
return func(c *fiber.Ctx) (err error) {
ctx := c.Context()
// pass metadata to context
ctx := agent.Context(c.Context(), *s.proContext)
name := c.Params("id")
errPrefix := fmt.Sprintf("failed to execute test workflow '%s'", name)
workflow, err := s.TestWorkflowsClient.Get(name)
Expand All @@ -369,6 +372,8 @@ func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler {
return s.BadRequest(c, errPrefix, "invalid body", err)
}

log.DefaultLogger.Infow("TestWorkflow execution request", "name", name, "request", request)

var results []testkube.TestWorkflowExecution
var errs []error

Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Config struct {
TestkubeOAuthProvider string `envconfig:"TESTKUBE_OAUTH_PROVIDER" default:""`
TestkubeOAuthScopes string `envconfig:"TESTKUBE_OAUTH_SCOPES" default:""`
TestkubeProAPIKey string `envconfig:"TESTKUBE_PRO_API_KEY" default:""`
TestkubeProRunnerId string `envconfig:"TESTKUBE_PRO_RUNNER_ID" default:"default-runner"`
TestkubeProURL string `envconfig:"TESTKUBE_PRO_URL" default:""`
TestkubeProTLSInsecure bool `envconfig:"TESTKUBE_PRO_TLS_INSECURE" default:"false"`
TestkubeProWorkerCount int `envconfig:"TESTKUBE_PRO_WORKER_COUNT" default:"50"`
Expand Down
6 changes: 6 additions & 0 deletions internal/config/procontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@ type ProContext struct {
WorkflowNotificationsWorkerCount int
SkipVerify bool
EnvID string
EnvName string
EnvSlug string
OrgID string
OrgName string
OrgSlug string
Migrate string
ConnectionTimeout int
DashboardURI string
ClusterId string
RunnerId string
}
Loading
Loading