Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve gh jsonschema, fix gh watch loop, allow watching all prs events #1185

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions internal/source/github_events/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,18 @@ type (

// OnMatchers defines allowed GitHub matcher criteria.
OnMatchers On `yaml:"on"`

// BeforeDuration is the duration used to decrease the initial time after used for filtering old events.
// It is particularly useful during testing, allowing you to set it to 48h to retrieve events older than the plugin's start time.
// If not specified, the plugin's start time is used as the initial value.
BeforeDuration time.Duration `yaml:"beforeDuration"`
}

// On defines allowed GitHub matcher criteria.
On struct {
PullRequests []PullRequest `yaml:"pullRequests,omitempty"`
PullRequests []PullRequest `yaml:"pullRequests"`
// EventsAPI watches for /events API
EventsAPI []EventsAPIMatcher `yaml:"events"`
EventsAPI []EventsAPIMatcher `yaml:"events,omitempty"`
}
PullRequest struct {
// Types patterns defines if we should watch only for pull requests with given state criteria.
Expand Down
8 changes: 6 additions & 2 deletions internal/source/github_events/event_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (g *GitHubEvent) GetEvent() *github.Event {
}

type GitHubPullRequest struct {
RepoName string
*github.PullRequest
}

Expand All @@ -38,8 +39,11 @@ func (g *GitHubPullRequest) Type() string {

func (g *GitHubPullRequest) GetEvent() *github.Event {
return &github.Event{
Type: ptr.FromType(g.Type()),
Actor: g.User,
Type: ptr.FromType(g.Type()),
Actor: g.User,
Repo: &github.Repository{
Name: ptr.FromType(g.RepoName),
},
CreatedAt: g.CreatedAt,
}
}
Expand Down
53 changes: 27 additions & 26 deletions internal/source/github_events/github_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,32 @@ func normalizeRepos(in []RepositoryConfig) (map[string]matchCriteria, map[string
lastProcessTime := map[string]time.Time{}

for _, repo := range in {
if len(repo.OnMatchers.PullRequests) == 0 && len(repo.OnMatchers.EventsAPI) == 0 {
continue
}

split := strings.Split(repo.Name, "/")
if len(split) != 2 {
return nil, nil, fmt.Errorf(`Wrong repository name. Expected pattern "owner/repository", got %q`, repo.Name)
}

var on On
existing := repos[repo.Name]
on.PullRequests = append(existing.Matchers.PullRequests, repo.OnMatchers.PullRequests...)
on.EventsAPI = append(existing.Matchers.EventsAPI, repo.OnMatchers.EventsAPI...)
if repo.OnMatchers.PullRequests != nil && len(repo.OnMatchers.PullRequests) == 0 {
// to make sure that we also emit events for:
// repositories:
// - name: owner/repo1
// on:
// pullRequests: []
existing.Matchers.PullRequests = append(existing.Matchers.PullRequests, PullRequest{})
}
existing.Matchers.PullRequests = append(existing.Matchers.PullRequests, repo.OnMatchers.PullRequests...)

if len(repo.OnMatchers.EventsAPI) > 0 {
existing.Matchers.EventsAPI = append(existing.Matchers.EventsAPI, repo.OnMatchers.EventsAPI...)
}

repos[repo.Name] = matchCriteria{
RepoOwner: split[0],
RepoName: split[1],
Matchers: on,
Matchers: existing.Matchers,
}
lastProcessTime[repo.Name] = time.Now()
lastProcessTime[repo.Name] = time.Now().Add(-repo.BeforeDuration)
}

return repos, lastProcessTime, nil
Expand All @@ -101,7 +107,7 @@ func (w *Watcher) AsyncConsumeEvents(ctx context.Context, stream *source.StreamO
return
case <-timer.C:
w.log.Debug("Checking for new GitHub events in all registered repositories...")
err := w.visitAllRepositories(ctx, func(repo matchCriteria, events []CommonEvent) error {
w.visitAllRepositories(ctx, func(repo matchCriteria, events []CommonEvent) error {
log := w.log.WithFields(logrus.Fields{
"repoName": repo.RepoName,
"repoOwner": repo.RepoOwner,
Expand All @@ -113,9 +119,6 @@ func (w *Watcher) AsyncConsumeEvents(ctx context.Context, stream *source.StreamO
w.lastProcessTime[repoKey(repo)] = time.Now()
return nil
})
if err != nil {
w.log.WithError(err).Errorf("while processing events, next retry in %d", w.refreshDuration)
}

timer.Reset(w.refreshDuration)
}
Expand Down Expand Up @@ -193,17 +196,14 @@ func (w *Watcher) emitMatchingEvent(ctx context.Context, stream *source.StreamOu
Message: msg,
}
}

// no need to stress CPU with additional JSON unmarshalling
continue
}
}
}

type repositoryEventsProcessor func(repo matchCriteria, events []CommonEvent) error

func (w *Watcher) visitAllRepositories(ctx context.Context, process repositoryEventsProcessor) error {
for _, repo := range w.repos {
func (w *Watcher) visitAllRepositories(ctx context.Context, process repositoryEventsProcessor) {
for name, repo := range w.repos {
var eventsToProcess []CommonEvent

repoEvents, err := w.listRepositoryEvents(ctx, repo)
Expand All @@ -220,16 +220,14 @@ func (w *Watcher) visitAllRepositories(ctx context.Context, process repositoryEv
eventsToProcess = append(eventsToProcess, prs...)

if len(eventsToProcess) == 0 {
return nil
continue
}

err = process(repo, eventsToProcess)
if err != nil {
return fmt.Errorf("while processing events: %w", err)
w.log.WithError(err).Errorf("Failed to process events for %s", name)
}
}

return nil
}

func (w *Watcher) listAllPullRequests(ctx context.Context, repo matchCriteria) ([]CommonEvent, error) {
Expand All @@ -255,9 +253,12 @@ func (w *Watcher) listAllPullRequests(ctx context.Context, repo matchCriteria) (
w.log.WithField("prNumber", e.GetNumber()).Debug("Ignoring old pull requests")
continue
}
eventsToProcess = append(eventsToProcess, &GitHubPullRequest{e})
eventsToProcess = append(eventsToProcess, &GitHubPullRequest{
RepoName: fmt.Sprintf("%s/%s", repo.RepoOwner, repo.RepoName),
PullRequest: e,
})
}
w.log.Infof("PRs %d", len(eventsToProcess))
w.log.Debug("Selected %d PRs to process", len(eventsToProcess))
return eventsToProcess, nil
}

Expand Down Expand Up @@ -312,7 +313,7 @@ func addOptions(s string, opts interface{}) (string, error) {
// This API is not built to serve real-time use cases. Depending on the time of day, event latency can be anywhere from 30s to 6h.
// source: https://docs.github.com/en/rest/activity/events?apiVersion=2022-11-28#list-repository-events
func (w *Watcher) listRepositoryEvents(ctx context.Context, repo matchCriteria) ([]CommonEvent, error) {
w.log.Debugf("Listing all %d last emitted github events", perPageItems)
w.log.Debug("Listing all emitted github events")

var events []*github.Event
opts := github.ListOptions{
Expand All @@ -336,7 +337,6 @@ func (w *Watcher) listRepositoryEvents(ctx context.Context, repo matchCriteria)

var eventsToProcess []CommonEvent
for _, e := range events {
fmt.Println(e.GetType())
if e == nil || e.GetCreatedAt().Before(w.lastProcessTime[repoKey(repo)]) {
w.log.WithField("eventType", e.GetType()).Debug("Ignoring old events")
continue
Expand All @@ -352,6 +352,7 @@ func (w *Watcher) listRepositoryEvents(ctx context.Context, repo matchCriteria)
eventsToProcess = append(eventsToProcess, &GitHubEvent{e})
}

w.log.Debug("Selected %d events to process", len(eventsToProcess))
return eventsToProcess, nil
}

Expand Down
44 changes: 41 additions & 3 deletions internal/source/github_events/jsonschema.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,24 @@
"type": "array",
"items": {
"type": "string",
"title": "Pull Request Type"
}
"title": "Pull Request Type",
"oneOf": [
{
"const": "open",
"title": "Open"
},
{
"const": "closed",
"title": "Closed"
},
{
"const": "merged",
"title": "Merged"
}
]
},
"uniqueItems": true,
"minItems": 0
},
"paths": {
"title": "File Patterns",
Expand Down Expand Up @@ -156,10 +172,32 @@
"items": {
"title": "Events API Matcher",
"type": "object",
"required": [
"type"
],
"properties": {
"type": {
"title": "Event Type",
"type": "string"
"type": "string",
"enum": [
"CommitCommentEvent",
"CreateEvent",
"DeleteEvent",
"ForkEvent",
"GollumEvent",
"IssueCommentEvent",
"IssuesEvent",
"MemberEvent",
"PublicEvent",
"PullRequestEvent",
"PullRequestReviewEvent",
"PullRequestReviewCommentEvent",
"PullRequestReviewThreadEvent",
"PushEvent",
"ReleaseEvent",
"SponsorshipEvent",
"WatchEvent"
]
},
"jsonPath": {
"title": "JSONPath Expression",
Expand Down
3 changes: 2 additions & 1 deletion internal/source/github_events/templates/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,15 @@ type (
MessageMutatorOption func(message api.Message, payload any) (api.Message, error)
)

func pullRequestEventMessage(_ *github.Event, event any, opts ...MessageMutatorOption) (api.Message, error) {
func pullRequestEventMessage(gh *github.Event, event any, opts ...MessageMutatorOption) (api.Message, error) {
pr, ok := event.(*github.PullRequest)
if !ok {
return api.Message{}, fmt.Errorf("got unknown event type %T", event)
}

var fields api.TextFields

fields = append(fields, api.TextField{Key: "Repository", Value: gh.GetRepo().GetName()})
fields = append(fields, api.TextField{Key: "Author", Value: pr.GetUser().GetLogin()})
fields = append(fields, api.TextField{Key: "State", Value: pr.GetState()})
fields = append(fields, api.TextField{Key: "Merged", Value: strconv.FormatBool(!pr.GetMergedAt().IsZero())})
Expand Down
Loading