Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add jobs synchronizer test #10

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/github-actions-manager/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func initModules(logger *zap.Logger, config *Config) ([]cmd.Module, error) {
runners := runners.NewSynchronizer(logger, &config.GitHub.Runners, target, registry)
modules = append(modules, runners)

jobs, err := jobs.NewSynchronizer(logger, &config.GitHub.Jobs, client, kv, registry)
jobs, err := jobs.NewSynchronizer(logger, &config.GitHub.Jobs, client, kv, registry, jobs.RealClock{})
if err != nil {
return nil, fmt.Errorf("cannot setup job sync: %w", err)
}
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ require (
github.com/BurntSushi/toml v1.1.0
github.com/Masterminds/sprig/v3 v3.2.2
github.com/bradleyfalzon/ghinstallation/v2 v2.0.4
github.com/go-playground/assert/v2 v2.0.1
github.com/go-playground/validator/v10 v10.11.0
github.com/google/go-github/v45 v45.1.0
github.com/gorilla/mux v1.8.0
github.com/slack-go/slack v0.11.0
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.21.0
golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20220609170525-579cf78fd858
gopkg.in/h2non/gock.v1 v1.1.2
k8s.io/api v0.24.2
k8s.io/apimachinery v0.24.2
k8s.io/client-go v0.24.2
Expand All @@ -22,7 +25,9 @@ require (
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down Expand Up @@ -293,6 +295,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4=
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
Expand Down Expand Up @@ -738,6 +742,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/h2non/gock.v1 v1.1.2 h1:jBbHXgGBK/AoPVfJh5x4r/WxIrElvbLel8TCZkkZJoY=
gopkg.in/h2non/gock.v1 v1.1.2/go.mod h1:n7UGz/ckNChHiK05rDoiC4MYSunEC/lyaUm2WWaDva0=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
Expand Down
18 changes: 18 additions & 0 deletions pkg/github/jobs/clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package jobs

import "time"

type Clock interface {
Now() time.Time
After(d time.Duration) <-chan time.Time
}

type RealClock struct{}

func (RealClock) Now() time.Time { return time.Now() }
func (RealClock) After(d time.Duration) <-chan time.Time { return time.After(d) }

type TestClock struct{}

func (TestClock) Now() time.Time { return time.Now() }
func (TestClock) After(d time.Duration) <-chan time.Time { return time.After(0) }
54 changes: 29 additions & 25 deletions pkg/github/jobs/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http"
"strconv"
"strings"
"time"

gh "github.com/oursky/github-actions-manager/pkg/github"
"github.com/oursky/github-actions-manager/pkg/kv"
Expand All @@ -26,23 +25,32 @@ type Synchronizer struct {
github *github.Client
kv kv.Store

state *channels.Broadcaster[*State]
metrics *metrics
state *channels.Broadcaster[*State]
metrics *metrics
webhookRuns chan webhookObject[*github.WorkflowRun]
webhookJobs chan webhookObject[*github.WorkflowJob]
clock Clock
}

func NewSynchronizer(logger *zap.Logger, config *Config, client *http.Client, kv kv.Store, registry *prometheus.Registry) (*Synchronizer, error) {
func NewSynchronizer(logger *zap.Logger, config *Config, client *http.Client, kv kv.Store, registry *prometheus.Registry, clock Clock) (*Synchronizer, error) {
logger = logger.Named("jobs-sync")

server := newWebhookServer(logger, config.GetWebhookServerAddr(), config.WebhookSecret)

runs := make(chan webhookObject[*github.WorkflowRun])
jobs := make(chan webhookObject[*github.WorkflowJob])

return &Synchronizer{
logger: logger,
config: config,
server: server,
github: github.NewClient(client),
kv: kv,
state: channels.NewBroadcaster[*State](nil),
metrics: newMetrics(registry),
logger: logger,
config: config,
server: server,
github: github.NewClient(client),
kv: kv,
state: channels.NewBroadcaster[*State](nil),
metrics: newMetrics(registry),
webhookRuns: runs,
webhookJobs: jobs,
clock: clock,
}, nil
}

Expand All @@ -51,14 +59,11 @@ func (s *Synchronizer) Start(ctx context.Context, g *errgroup.Group) error {
return nil
}

runs := make(chan webhookObject[*github.WorkflowRun])
jobs := make(chan webhookObject[*github.WorkflowJob])

if err := s.server.Start(ctx, g, runs, jobs); err != nil {
if err := s.server.Start(ctx, g, s.webhookRuns, s.webhookJobs); err != nil {
return fmt.Errorf("jobs: %w", err)
}
g.Go(func() error {
s.run(ctx, runs, jobs)
s.run(ctx, s.webhookRuns, s.webhookJobs)
return nil
})
return nil
Expand All @@ -71,8 +76,7 @@ func (s *Synchronizer) State() *channels.Broadcaster[*State] {
func (s *Synchronizer) run(
ctx context.Context,
webhookRuns <-chan webhookObject[*github.WorkflowRun],
webhookJobs <-chan webhookObject[*github.WorkflowJob],
) {
webhookJobs <-chan webhookObject[*github.WorkflowJob]) {
runs := make(map[Key]cell[github.WorkflowRun])
jobs := make(map[Key]cell[github.WorkflowJob])

Expand All @@ -87,13 +91,13 @@ func (s *Synchronizer) run(

case run := <-webhookRuns:
runs[run.Key] = cell[github.WorkflowRun]{
UpdatedAt: time.Now(),
UpdatedAt: s.clock.Now(),
Object: run.Object,
}

case job := <-webhookJobs:
jobs[job.Key] = cell[github.WorkflowJob]{
UpdatedAt: time.Now(),
UpdatedAt: s.clock.Now(),
Object: job.Object,
}

Expand All @@ -110,10 +114,10 @@ func (s *Synchronizer) run(
}

runs[runKey] = cell[github.WorkflowRun]{
UpdatedAt: time.Now(),
UpdatedAt: s.clock.Now(),
Object: run,
}
case <-time.After(syncInterval):
case <-s.clock.After(syncInterval):
if len(runs) == 0 {
continue
}
Expand Down Expand Up @@ -161,7 +165,7 @@ func (s *Synchronizer) run(
}
}

retentionLimit := time.Now().Add(-s.config.GetRetentionPeriod())
retentionLimit := s.clock.Now().Add(-s.config.GetRetentionPeriod())

runRefs := make(map[Key]int)
for key, job := range jobs {
Expand Down Expand Up @@ -222,7 +226,7 @@ func (s *Synchronizer) loadState(
continue
}
runs[Key{RepoOwner: owner, RepoName: repo, ID: id}] = cell[github.WorkflowRun]{
UpdatedAt: time.Now(),
UpdatedAt: s.clock.Now(),
Object: wrun,
}

Expand All @@ -236,7 +240,7 @@ func (s *Synchronizer) loadState(
}
for _, job := range wjobs.Jobs {
jobs[Key{RepoOwner: owner, RepoName: repo, ID: job.GetID()}] = cell[github.WorkflowJob]{
UpdatedAt: time.Now(),
UpdatedAt: s.clock.Now(),
Object: job,
}
}
Expand Down
150 changes: 150 additions & 0 deletions pkg/github/jobs/synchronizer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package jobs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use jobs_test to avoid using package internal functions.

Copy link
Contributor Author

@Ngkaokis Ngkaokis Jul 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I alter it, I can't create a webhookObject for testing. Should I create another public function sth like NewWebhookObject()


import (
"context"
"net/http"
"strings"
"testing"
"time"

"github.com/go-playground/assert/v2"
"github.com/google/go-github/v45/github"
"github.com/oursky/github-actions-manager/pkg/kv"
"github.com/oursky/github-actions-manager/pkg/utils/tomltypes"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"gopkg.in/h2non/gock.v1"
)

func ptr[T any](v T) *T {
return &v
}
func TestRun(t *testing.T) {

logger, _ := zap.NewProduction()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use NewNop to avoid polluting output.

sync_page_size := 5
webhook_server_addr := "127.0.0.1:8001"
config := &Config{
Disabled: false,
ReplayEnabled: true,
RetentionPeriod: &tomltypes.Duration{1 * time.Hour},
SyncInterval: &tomltypes.Duration{5 * time.Second},
SyncPageSize: &sync_page_size,
WebhookServerAddr: &webhook_server_addr,
WebhookSecret: "testing",
}

kv := kv.NewInMemoryStore()
registry := prometheus.NewRegistry()
client := &http.Client{Transport: &http.Transport{}}
gock.InterceptClient(client)
defer gock.Off()

testGithubWorkflowRun := &github.WorkflowRun{
ID: ptr(int64(0)),
Status: ptr("in_progress"),
Conclusion: ptr(""),
WorkflowID: ptr(int64(0)),
HeadCommit: &github.HeadCommit{},
HeadRepository: &github.Repository{},
}

testCommitMsg := testGithubWorkflowRun.GetHeadCommit().GetMessage()
testCommitMsgTitle, _, _ := strings.Cut(testCommitMsg, "\n")
testCommitURL := testGithubWorkflowRun.GetHeadRepository().GetHTMLURL() + "/commit/" + testGithubWorkflowRun.GetHeadCommit().GetID()

testWorkflowRun := &WorkflowRun{
Key: Key{ID: int64(0), RepoOwner: "tester", RepoName: "testing"},

Name: testGithubWorkflowRun.GetName(),
URL: testGithubWorkflowRun.GetHTMLURL(),
Status: "completed",
Conclusion: "success",

StartedAt: testGithubWorkflowRun.GetRunStartedAt().Time,
CommitMessageTitle: testCommitMsgTitle,
CommitURL: testCommitURL,
}

testGithubWorkflowJob := &github.WorkflowJob{
ID: ptr(int64(0)),
HTMLURL: ptr("testing"),
Status: ptr("in_progress"),
Conclusion: ptr(""),
}

var startedAt *time.Time
if gt := testGithubWorkflowJob.GetStartedAt(); !gt.IsZero() {
startedAt = &gt.Time
}

var completedAt *time.Time
if gt := testGithubWorkflowJob.GetCompletedAt(); !gt.IsZero() {
completedAt = &gt.Time
}

testWorkflowJob := &WorkflowJob{
Key: Key{ID: int64(0), RepoOwner: "tester", RepoName: "testing"},

Name: testGithubWorkflowJob.GetName(),
URL: testGithubWorkflowJob.GetHTMLURL(),
Status: "completed",
Conclusion: "success",

StartedAt: startedAt,
CompletedAt: completedAt,
RunnerID: testGithubWorkflowJob.RunnerID,
RunnerName: testGithubWorkflowJob.RunnerName,
RunnerLabels: testGithubWorkflowJob.Labels,
}

testWorkflowRun.Jobs = append(testWorkflowRun.Jobs, testWorkflowJob)

testWebhookJob := NewWebhookObject(
Key{ID: int64(0), RepoOwner: "tester", RepoName: "testing"},
testGithubWorkflowJob,
)

testWebhookRun := NewWebhookObject(
Key{ID: int64(0), RepoOwner: "tester", RepoName: "testing"},
testGithubWorkflowRun,
)

testUpdatedGithubWorkflowJob := &github.WorkflowJob{
ID: ptr(int64(0)),
HTMLURL: ptr("testing"),
Status: ptr("completed"),
Conclusion: ptr("success"),
}

testUpdatedGithubWorkflowRun := &github.WorkflowRun{
ID: ptr(int64(0)),
Status: ptr("completed"),
Conclusion: ptr("success"),
WorkflowID: ptr(int64(0)),
HeadCommit: &github.HeadCommit{},
HeadRepository: &github.Repository{},
}

gock.New("https://api.github.com/repos/(.*)/(.*)/actions/jobs/(.*)").
Persist().
Reply(200).
JSON(testUpdatedGithubWorkflowJob)

gock.New("https://api.github.com/repos/(.*)/(.*)/actions/runs/(.*)").
Persist().
Reply(200).
JSON(testUpdatedGithubWorkflowRun)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

g, ctx := errgroup.WithContext(ctx)
s, _ := NewSynchronizer(logger, config, client, kv, registry, TestClock{})
s.Start(ctx, g)
s.webhookRuns <- testWebhookRun
s.webhookJobs <- testWebhookJob
time.Sleep(1 * time.Second)
assert.Equal(t, testWorkflowRun, s.metrics.state.WorkflowRuns[0])

}
7 changes: 7 additions & 0 deletions pkg/github/jobs/webhook_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,10 @@ func (s *webhookServer) handle(
})
}
}

func NewWebhookObject[T any](key Key, object T) webhookObject[T] {
return webhookObject[T]{
key,
object,
}
}