Skip to content

Commit

Permalink
feat: added runner handlers to the agent
Browse files Browse the repository at this point in the history
  • Loading branch information
exu committed Sep 26, 2024
1 parent 4e2a0ee commit 6e13778
Show file tree
Hide file tree
Showing 51 changed files with 854 additions and 303 deletions.
37 changes: 36 additions & 1 deletion api/v1/testkube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7157,6 +7157,16 @@ components:
context:
type: string
description: Context value depending from its type
runnerIds:
type: array
items:
type: string
tags:
type: object
additionalProperties:
type: string



Webhook:
description: CRD based webhook data
Expand Down Expand Up @@ -8001,6 +8011,10 @@ components:
TestWorkflowExecutionRequest:
type: object
properties:
id:
type: string
description: custom execution id
defatult: ""
name:
type: string
description: custom execution name
Expand All @@ -8015,6 +8029,8 @@ components:
default: false
tags:
$ref: "#/components/schemas/TestWorkflowTagValue"
runningContext:
$ref: "#/components/schemas/RunningContext"

TestWorkflowWithExecution:
type: object
Expand Down Expand Up @@ -8108,6 +8124,15 @@ components:
- false
tags:
$ref: "#/components/schemas/TestWorkflowTagValue"
runnerId:
type: string
description: Runner id that executed the test workflow
runningContext:
$ref: "#/components/schemas/RunningContext"
groupId:
type: string
description: test workflow correlation id when run against multiple agent in On Prem mode

required:
- id
- name
Expand All @@ -8120,7 +8145,15 @@ components:
type: string
description: unique execution identifier
format: bson objectId
example: "62f395e004109209b50edfc1"
example: "twes0m33x3cut10n1d"
runnerId:
type: string
description: runner identifier
example: "some-unique-id-per-env-1"
groupId:
type: string
description: group identifier when run against multiple runner
example: "twgs0m3gr0up1d"
name:
type: string
description: execution name
Expand All @@ -8142,6 +8175,8 @@ components:
$ref: "#/components/schemas/TestWorkflowSummary"
tags:
$ref: "#/components/schemas/TestWorkflowTagValue"
runningContext:
$ref: "#/components/schemas/RunningContext"
required:
- id
- name
Expand Down
63 changes: 46 additions & 17 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

corev1 "k8s.io/api/core/v1"

"github.com/kubeshop/testkube/pkg/agent/handlers"
"github.com/kubeshop/testkube/pkg/cache"

"github.com/nats-io/nats.go"
Expand All @@ -27,6 +28,7 @@ import (
testworkflow2 "github.com/kubeshop/testkube/pkg/repository/testworkflow"
"github.com/kubeshop/testkube/pkg/secretmanager"
"github.com/kubeshop/testkube/pkg/tcl/checktcl"
"github.com/kubeshop/testkube/pkg/tcl/controlplanetcl"
"github.com/kubeshop/testkube/pkg/tcl/schedulertcl"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/presets"

Expand Down Expand Up @@ -122,7 +124,7 @@ func runMigrations() (err error) {
return migrations.Migrator.Run(version.Version, migrator.MigrationTypeServer)
}

func runMongoMigrations(ctx context.Context, db *mongo.Database, migrationsDir string) error {
func runMongoMigrations(ctx context.Context, db *mongo.Database, _ string) error {
migrationsCollectionName := "__migrations"
activeMigrations, err := dbmigrator.GetDbMigrationsFromFs(dbmigrations.MongoMigrationsFs)
if err != nil {
Expand All @@ -147,13 +149,18 @@ func main() {
cfg.CleanLegacyVars()
exitOnError("error getting application config", err)

md := metadata.Pairs("api-key", cfg.TestkubeProAPIKey, "runner-id", cfg.TestkubeProRunnerId)
ctx := metadata.NewOutgoingContext(context.Background(), md)

features, err := featureflags.Get()
exitOnError("error getting application feature flags", err)

log.DefaultLogger.Infow("Feature flags configured", "ff", features)
logger := log.DefaultLogger.With("apiVersion", version.Version)

logger.Infow("Feature flags configured", "ff", features)

// Run services within an errgroup to propagate errors between services.
g, ctx := errgroup.WithContext(context.Background())
g, ctx := errgroup.WithContext(ctx)

// Cancel the errgroup context on SIGINT and SIGTERM,
// which shuts everything down gracefully.
Expand Down Expand Up @@ -271,18 +278,18 @@ func main() {
var artifactStorage domainstorage.ArtifactsStorage
var storageClient domainstorage.Client
if mode == common.ModeAgent {
resultsRepository = cloudresult.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
testResultsRepository = cloudtestresult.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
configRepository = cloudconfig.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
resultsRepository = cloudresult.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
testResultsRepository = cloudtestresult.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
configRepository = cloudconfig.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
// Pro edition only (tcl protected code)
testWorkflowResultsRepository = cloudtestworkflow.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
testWorkflowResultsRepository = cloudtestworkflow.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
var opts []cloudtestworkflow.Option
if cfg.StorageSkipVerify {
opts = append(opts, cloudtestworkflow.WithSkipVerify())
}
testWorkflowOutputRepository = cloudtestworkflow.NewCloudOutputRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, opts...)
testWorkflowOutputRepository = cloudtestworkflow.NewCloudOutputRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId, opts...)
triggerLeaseBackend = triggers.NewAcquireAlwaysLeaseBackend()
artifactStorage = cloudartifacts.NewCloudArtifactsStorage(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
artifactStorage = cloudartifacts.NewCloudArtifactsStorage(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
} else {
mongoSSLConfig := getMongoSSLConfig(cfg, secretClient)
db, err := storage.GetMongoDatabase(cfg.APIMongoDSN, cfg.APIMongoDB, cfg.APIMongoDBType, cfg.APIMongoAllowTLS, mongoSSLConfig)
Expand Down Expand Up @@ -431,13 +438,24 @@ func main() {
exitOnError("Creating job templates", err)
}

proContext := newProContext(cfg, grpcClient)
proContext := newProContext(ctx, cfg, grpcClient)
proContext.ClusterId = clusterId

// Check Pro/Enterprise subscription
var subscriptionChecker checktcl.SubscriptionChecker
if mode == common.ModeAgent {
subscriptionChecker, err = checktcl.NewSubscriptionChecker(ctx, proContext, grpcClient, grpcConn)
exitOnError("Failed creating subscription checker", err)

// Load environment/org details based on token grpc call
environment, err := controlplanetcl.GetEnvironment(ctx, proContext, grpcClient, grpcConn)
warnOnError("Getting environment details from control plane", err)
proContext.EnvID = environment.Id
proContext.EnvName = environment.Name
proContext.EnvSlug = environment.Slug
proContext.OrgID = environment.OrganizationId
proContext.OrgName = environment.OrganizationName
proContext.OrgSlug = environment.OrganizationSlug
}

serviceAccountNames := map[string]string{
Expand Down Expand Up @@ -599,6 +617,7 @@ func main() {
cfg.ImageDataPersistentCacheKey,
cfg.TestkubeDashboardURI,
clusterId,
proContext.RunnerId,
)

go testWorkflowExecutor.Recover(context.Background())
Expand Down Expand Up @@ -661,6 +680,11 @@ func main() {
features,
proContext,
)

// register agent handlers (this one will remove need of API server in centralized environment mode)
// commands will be routed through grpc `Command` field
agentHandle.RegisterCommandHandler(cloud.ExecuteCommand, handlers.NewExecuteTestWorkflowHandler(testWorkflowExecutor, testWorkflowsClient, logger))

if err != nil {
exitOnError("Starting agent", err)
}
Expand Down Expand Up @@ -895,9 +919,10 @@ func newGRPCTransportCredentials(cfg *config.Config) (credentials.TransportCrede
})
}

func newProContext(cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext {
func newProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext {
proContext := config.ProContext{
APIKey: cfg.TestkubeProAPIKey,
RunnerId: cfg.TestkubeProRunnerId,
URL: cfg.TestkubeProURL,
TLSInsecure: cfg.TestkubeProTLSInsecure,
WorkerCount: cfg.TestkubeProWorkerCount,
Expand All @@ -915,22 +940,20 @@ func newProContext(cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient)
return proContext
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
md := metadata.Pairs("api-key", cfg.TestkubeProAPIKey)
ctx = metadata.NewOutgoingContext(ctx, md)
ctx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
getProContext, err := grpcClient.GetProContext(ctx, &emptypb.Empty{})
proContextResponse, err := grpcClient.GetProContext(ctx, &emptypb.Empty{})
if err != nil {
log.DefaultLogger.Warnf("cannot fetch pro-context from cloud: %s", err)
return proContext
}

if proContext.EnvID == "" {
proContext.EnvID = getProContext.EnvId
proContext.EnvID = proContextResponse.EnvId
}

if proContext.OrgID == "" {
proContext.OrgID = getProContext.OrgId
proContext.OrgID = proContextResponse.OrgId
}

return proContext
Expand All @@ -942,3 +965,9 @@ func exitOnError(title string, err error) {
os.Exit(1)
}
}

func warnOnError(title string, err error) {
if err != nil {
log.DefaultLogger.Errorw(title, "error", err)
}
}
1 change: 1 addition & 0 deletions cmd/kubectl-testkube/commands/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func PopulateMasterFlags(cmd *cobra.Command, opts *HelmOptions) {
cmd.Flags().StringVar(&opts.Master.AgentToken, "agent-token", "", "Testkube Pro agent key [required for centralized mode]")
cmd.Flags().StringVar(&opts.Master.OrgId, "org-id", "", "Testkube Pro organization id [required for centralized mode]")
cmd.Flags().StringVar(&opts.Master.EnvId, "env-id", "", "Testkube Pro environment id [required for centralized mode]")
cmd.Flags().StringVar(&opts.Master.RunnerId, "runner-id", "", "Testkube Pro Multi Runner id [required for centralized mode]")

cmd.Flags().BoolVar(&opts.Master.Features.LogsV2, "feature-logs-v2", false, "Logs v2 feature flag")
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/kubectl-testkube/commands/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ func prepareTestkubeProHelmArgs(options HelmOptions, isMigration bool) []string
args = append(args, "--set", fmt.Sprintf("testkube-logs.pro.envId=%s", options.Master.EnvId))
}

if options.Master.RunnerId != "" {
args = append(args, "--set", fmt.Sprintf("testkube-api.cloud.runnerId=%s", options.Master.RunnerId))
}

if options.Master.OrgId != "" {
args = append(args, "--set", fmt.Sprintf("testkube-api.cloud.orgId=%s", options.Master.OrgId))
args = append(args, "--set", fmt.Sprintf("testkube-logs.pro.orgId=%s", options.Master.OrgId))
Expand Down
1 change: 1 addition & 0 deletions cmd/kubectl-testkube/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func init() {
RootCmd.AddCommand(NewGenerateCmd())

RootCmd.AddCommand(NewInitCmd())
RootCmd.AddCommand(NewRunnerCmd())
RootCmd.AddCommand(NewUpgradeCmd())
RootCmd.AddCommand(NewPurgeCmd())
RootCmd.AddCommand(NewWatchCmd())
Expand Down
22 changes: 22 additions & 0 deletions cmd/kubectl-testkube/commands/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package commands

import (
"github.com/spf13/cobra"

"github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/pro"
)

func NewRunnerCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "runner <command>",
Aliases: []string{""},
Short: "Testkube Runner related commands",
Run: func(cmd *cobra.Command, args []string) {
cmd.Help()
},
}

cmd.AddCommand(pro.NewInitCmd())

return cmd
}
1 change: 1 addition & 0 deletions cmd/kubectl-testkube/config/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type Master struct {
IdToken string `json:"idToken,omitempty"`
OrgId string `json:"orgId,omitempty"`
EnvId string `json:"envId,omitempty"`
RunnerId string `json:"runnerId,omitempty"`
Insecure bool `json:"insecure,omitempty"`
UiUrlPrefix string `json:"uiUrlPrefix,omitempty"`
AgentUrlPrefix string `json:"agentUrlPrefix,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion cmd/testworkflow-init/obfuscator/obfuscator.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (s *obfuscator) Write(p []byte) (n int, err error) {

// Write the rest of data
if len(p) > 0 {
nn, err = s.dst.Write(p)
_, err = s.dst.Write(p)
return size, err
}
return size, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/testworkflow-toolkit/commands/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func NewArtifactsCmd() *cobra.Command {
if env.CloudEnabled() {
ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
defer cancel()
ctx = agent.AddAPIKeyMeta(ctx, env.Config().Cloud.ApiKey)
ctx = agent.AddContextMetadata(ctx, env.Config().Cloud.ApiKey, "")
executor, client := env.Cloud(ctx)
proContext, err := client.GetProContext(ctx, &emptypb.Empty{})
var supported []*cloud.Capability
Expand Down
2 changes: 1 addition & 1 deletion cmd/testworkflow-toolkit/env/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,5 @@ func Cloud(ctx context.Context) (cloudexecutor.Executor, cloud.TestKubeCloudAPIC
ui.Fail(fmt.Errorf("failed to connect with Cloud: %w", err))
}
grpcClient := cloud.NewTestKubeCloudAPIClient(grpcConn)
return cloudexecutor.NewCloudGRPCExecutor(grpcClient, grpcConn, cfg.ApiKey), grpcClient
return cloudexecutor.NewCloudGRPCExecutor(grpcClient, grpcConn, cfg.ApiKey, cfg.RunnerId), grpcClient
}
1 change: 1 addition & 0 deletions cmd/testworkflow-toolkit/env/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type envCloudConfig struct {
UiUrl string `envconfig:"TK_C_UI_URL"`
OrgId string `envconfig:"TK_C_ORG_ID"`
EnvId string `envconfig:"TK_C_ENV_ID"`
RunnerId string `envconfig:"TK_C_RUNNER_ID" default:""`
SkipVerify bool `envconfig:"TK_C_SKIP_VERIFY" default:"false"`
TlsInsecure bool `envconfig:"TK_C_TLS_INSECURE" default:"false"`
}
Expand Down
Loading

0 comments on commit 6e13778

Please sign in to comment.