Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [TKC-2882] stream service and parallel logs #6052

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bd39964
feat: api methods for service logs
vsukhin Nov 25, 2024
2c77962
fix: client for get service logs
vsukhin Nov 25, 2024
c71f5c8
fix: change log
vsukhin Nov 25, 2024
3e70a83
fix: disable hints
vsukhin Nov 25, 2024
aa9acb3
fix: routing
vsukhin Nov 26, 2024
68fe09f
fix: show service logs
vsukhin Nov 26, 2024
c55187f
fix: check service name
vsukhin Nov 27, 2024
93b7ac6
fix: check service name
vsukhin Nov 27, 2024
cf2dd76
fix: log comment
vsukhin Nov 27, 2024
fff63c0
fix: check for testworkflow service
vsukhin Nov 27, 2024
c7bed19
fix: friendly error
vsukhin Nov 27, 2024
f34e5d1
fix: add spinner
vsukhin Nov 27, 2024
07ce308
feat: proto for service notifications
vsukhin Nov 27, 2024
9c03fe4
feat: add cloud grpc method for server notifications
vsukhin Nov 27, 2024
9d85b35
fix: change timeeout
vsukhin Nov 28, 2024
57db7a0
fix: waiting for service pod
vsukhin Nov 28, 2024
cccfcb7
fix: typo
vsukhin Nov 28, 2024
c4658c5
fix: waiting for service pod
vsukhin Nov 28, 2024
3768550
fix: add service name check
vsukhin Nov 28, 2024
c2fd448
fix: adjust help
vsukhin Nov 29, 2024
b46bb41
fix: add method to parallel step
vsukhin Nov 29, 2024
59bdce3
fix: use retry library
vsukhin Nov 29, 2024
4224930
fix: rename const
vsukhin Nov 29, 2024
bcda47f
fix: 0 attempts
vsukhin Nov 29, 2024
97b5bd3
fix: use option
vsukhin Nov 29, 2024
3e2c45f
fix: remove ctx
vsukhin Nov 29, 2024
2819e43
feat: add cli support for parallel steps
vsukhin Dec 2, 2024
81b0295
fix: rename url
vsukhin Dec 2, 2024
c71fb91
feat: api methods for parallel steps
vsukhin Dec 2, 2024
6003cde
fix: tune parallel step detection
vsukhin Dec 2, 2024
5136c31
fix: typo
vsukhin Dec 2, 2024
6b89ab0
fix: cli
vsukhin Dec 2, 2024
bcffbd5
feat: add proto for parallel step logs
vsukhin Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions cmd/api-server/commons/commons.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,18 +282,19 @@ func ReadDefaultExecutors(cfg *config.Config) (executors []testkube.ExecutorDeta

func ReadProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext {
proContext := config.ProContext{
APIKey: cfg.TestkubeProAPIKey,
URL: cfg.TestkubeProURL,
TLSInsecure: cfg.TestkubeProTLSInsecure,
WorkerCount: cfg.TestkubeProWorkerCount,
LogStreamWorkerCount: cfg.TestkubeProLogStreamWorkerCount,
WorkflowNotificationsWorkerCount: cfg.TestkubeProWorkflowNotificationsWorkerCount,
SkipVerify: cfg.TestkubeProSkipVerify,
EnvID: cfg.TestkubeProEnvID,
OrgID: cfg.TestkubeProOrgID,
Migrate: cfg.TestkubeProMigrate,
ConnectionTimeout: cfg.TestkubeProConnectionTimeout,
DashboardURI: cfg.TestkubeDashboardURI,
APIKey: cfg.TestkubeProAPIKey,
URL: cfg.TestkubeProURL,
TLSInsecure: cfg.TestkubeProTLSInsecure,
WorkerCount: cfg.TestkubeProWorkerCount,
LogStreamWorkerCount: cfg.TestkubeProLogStreamWorkerCount,
WorkflowNotificationsWorkerCount: cfg.TestkubeProWorkflowNotificationsWorkerCount,
WorkflowServiceNotificationsWorkerCount: cfg.TestkubeProWorkflowServiceNotificationsWorkerCount,
SkipVerify: cfg.TestkubeProSkipVerify,
EnvID: cfg.TestkubeProEnvID,
OrgID: cfg.TestkubeProOrgID,
Migrate: cfg.TestkubeProMigrate,
ConnectionTimeout: cfg.TestkubeProConnectionTimeout,
DashboardURI: cfg.TestkubeDashboardURI,
}

if cfg.TestkubeProAPIKey == "" || grpcClient == nil {
Expand Down
22 changes: 22 additions & 0 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,27 @@ func main() {
}
return notifications.Channel(), nil
}
getTestWorkflowServiceNotificationsStream := func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error) {
execution, err := testWorkflowResultsRepository.Get(ctx, executionID)
if err != nil {
return nil, errors.Join(err, agent.ErrGetTestWorkflowExecution)
}

if execution.Result != nil && execution.Result.IsFinished() {
return nil, agent.ErrFinishedTestWorkflowExecution
}

notifications := executionWorker.Notifications(ctx, fmt.Sprintf("%s-%s-%d", execution.Id, serviceName, serviceIndex), executionworkertypes.NotificationsOptions{
Hints: executionworkertypes.Hints{
Namespace: execution.Namespace,
ScheduledAt: common.Ptr(execution.ScheduledAt),
},
})
if notifications.Err() != nil {
return nil, notifications.Err()
}
return notifications.Channel(), nil
}
getDeprecatedLogStream := func(ctx context.Context, executionID string) (chan output.Output, error) {
return nil, errors.New("deprecated features have been disabled")
}
Expand All @@ -337,6 +358,7 @@ func main() {
grpcClient,
getDeprecatedLogStream,
getTestWorkflowNotificationsStream,
getTestWorkflowServiceNotificationsStream,
clusterId,
cfg.TestkubeClusterName,
features,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewGetTestWorkflowExecutionsCmd() *cobra.Command {
ui.Info("Getting logs for test workflow execution", executionID)

logs, err := client.GetTestWorkflowExecutionLogs(executionID)
ui.ExitOnError("getting logs from executor", err)
ui.ExitOnError("getting logs from test workflow", err)

sigs := flattenSignatures(execution.Signature)

Expand Down
93 changes: 80 additions & 13 deletions cmd/kubectl-testkube/commands/testworkflows/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
)

const (
LogTimestampLength = 30 // time.RFC3339Nano without 00:00 timezone
apiErrorMessage = "processing error:"
LogTimestampLength = 30 // time.RFC3339Nano without 00:00 timezone
apiErrorMessage = "processing error:"
serviceLogsCheckDelay = 100 * time.Millisecond
)

var (
Expand All @@ -47,6 +48,8 @@ func NewRunTestWorkflowCmd() *cobra.Command {
masks []string
tags map[string]string
selectors []string
serviceName string
serviceIndex int
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -146,7 +149,7 @@ func NewRunTestWorkflowCmd() *cobra.Command {
ui.NL()
if !execution.FailedToInitialize() {
if watchEnabled && len(args) > 0 {
exitCode = uiWatch(execution, client)
exitCode = uiWatch(execution, serviceName, serviceIndex, client)
ui.NL()
if downloadArtifactsEnabled {
tests.DownloadTestWorkflowArtifacts(execution.Id, downloadDir, format, masks, client, outputPretty)
Expand Down Expand Up @@ -181,12 +184,30 @@ func NewRunTestWorkflowCmd() *cobra.Command {
cmd.Flags().StringArrayVarP(&masks, "mask", "", []string{}, "regexp to filter downloaded files, single or comma separated, like report/.* or .*\\.json,.*\\.js$")
cmd.Flags().StringToStringVarP(&tags, "tag", "", map[string]string{}, "execution tags in a form of name1=val1 passed to executor")
cmd.Flags().StringSliceVarP(&selectors, "label", "l", nil, "label key value pair: --label key1=value1 or label expression")
cmd.Flags().StringVar(&serviceName, "service-name", "", "test workflow service name")
cmd.Flags().IntVar(&serviceIndex, "service-index", 0, "test workflow service index")

return cmd
}

func uiWatch(execution testkube.TestWorkflowExecution, client apiclientv1.Client) int {
result, err := watchTestWorkflowLogs(execution.Id, execution.Signature, client)
func uiWatch(execution testkube.TestWorkflowExecution, serviceName string, serviceIndex int, client apiclientv1.Client) int {
var result *testkube.TestWorkflowResult
var err error

if serviceName == "" {
result, err = watchTestWorkflowLogs(execution.Id, execution.Signature, client)
} else {
found := false
if execution.Workflow != nil && execution.Workflow.Spec != nil {
_, found = execution.Workflow.Spec.Services[serviceName]
}

if !found {
ui.Failf("unknown service '%s' for test workflow execution %s", serviceName, execution.Id)
}

result, err = watchTestWorkflowServiceLogs(execution.Id, serviceName, serviceIndex, execution.Signature, client)
}
ui.ExitOnError("reading test workflow execution logs", err)

// Apply the result in the execution
Expand Down Expand Up @@ -283,15 +304,10 @@ func getTimestampLength(line string) int {
return 0
}

func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
ui.Info("Getting logs from test workflow job", id)

notifications, err := client.GetTestWorkflowExecutionNotifications(id)
ui.ExitOnError("getting logs from executor", err)

func printTestWorkflowLogs(signature []testkube.TestWorkflowSignature,
notifications chan testkube.TestWorkflowExecutionNotification) (result *testkube.TestWorkflowResult) {
steps := flattenSignatures(signature)

var result *testkube.TestWorkflowResult
var isLineBeginning = true
for l := range notifications {
if l.Output != nil {
Expand All @@ -309,8 +325,59 @@ func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature
}

ui.NL()
return result
}

func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
ui.Info("Getting logs from test workflow job", id)

notifications, err := client.GetTestWorkflowExecutionNotifications(id)
if err != nil {
return nil, err
}

return printTestWorkflowLogs(signature, notifications), nil
}

func watchTestWorkflowServiceLogs(id, serviceName string, serviceIndex int,
signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
ui.Info("Getting logs from test workflow service job", fmt.Sprintf("%s-%s-%d", id, serviceName, serviceIndex))

var (
notifications chan testkube.TestWorkflowExecutionNotification
nErr error
)

spinner := ui.NewSpinner("Waiting for service logs")
for {
notifications, nErr = client.GetTestWorkflowExecutionServiceNotifications(id, serviceName, serviceIndex)
if nErr != nil {
execution, cErr := client.GetTestWorkflowExecution(id)
if cErr != nil {
spinner.Fail()
return nil, cErr
}

if execution.Result != nil {
if execution.Result.IsFinished() {
nErr = errors.New("test workflow execution is finished")
} else {
time.Sleep(serviceLogsCheckDelay)
continue
}
}
}

if nErr != nil {
spinner.Fail()
return nil, nErr
}

break
}

return result, err
spinner.Success()
return printTestWorkflowLogs(signature, notifications), nil
}

func printStatusHeader(i, n int, name string) {
Expand Down
10 changes: 9 additions & 1 deletion cmd/kubectl-testkube/commands/testworkflows/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ import (
)

func NewWatchTestWorkflowExecutionCmd() *cobra.Command {
var (
serviceName string
serviceIndex int
)

cmd := &cobra.Command{
Use: "testworkflowexecution <executionName>",
Aliases: []string{"testworkflowexecutions", "twe", "tw"},
Expand All @@ -31,7 +36,7 @@ func NewWatchTestWorkflowExecutionCmd() *cobra.Command {
ui.ExitOnError("render test workflow execution", err)

ui.NL()
exitCode := uiWatch(execution, client)
exitCode := uiWatch(execution, serviceName, serviceIndex, client)
ui.NL()

execution, err = client.GetTestWorkflowExecution(execution.Id)
Expand All @@ -43,5 +48,8 @@ func NewWatchTestWorkflowExecutionCmd() *cobra.Command {
},
}

cmd.Flags().StringVar(&serviceName, "service-name", "", "test workflow service name")
cmd.Flags().IntVar(&serviceIndex, "service-index", 0, "test workflow service index")

return cmd
}
2 changes: 2 additions & 0 deletions internal/app/api/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ func (s *TestkubeAPI) Init(server server.HTTPServer) {
testWorkflowExecutions.Post("/", s.ExecuteTestWorkflowHandler())
testWorkflowExecutions.Get("/:executionID", s.GetTestWorkflowExecutionHandler())
testWorkflowExecutions.Get("/:executionID/notifications", s.StreamTestWorkflowExecutionNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/:serviceName/:serviceIndex<int>", s.StreamTestWorkflowExecutionServiceNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream", s.StreamTestWorkflowExecutionNotificationsWebSocketHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream/:serviceName/:serviceIndex<int>", s.StreamTestWorkflowExecutionServiceNotificationsWebSocketHandler())
testWorkflowExecutions.Post("/:executionID/abort", s.AbortTestWorkflowExecutionHandler())
testWorkflowExecutions.Post("/:executionID/pause", s.PauseTestWorkflowExecutionHandler())
testWorkflowExecutions.Post("/:executionID/resume", s.ResumeTestWorkflowExecutionHandler())
Expand Down
Loading
Loading